2023-08-16 23:09:47 +02:00
|
|
|
use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex};
|
|
|
|
use tonic::async_trait;
|
|
|
|
|
2023-08-17 02:58:55 +02:00
|
|
|
use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller};
|
2023-08-16 23:09:47 +02:00
|
|
|
|
|
|
|
pub struct CursorController {
|
|
|
|
uid: String,
|
|
|
|
op: mpsc::Sender<CursorEvent>,
|
|
|
|
stream: Mutex<broadcast::Receiver<CursorEvent>>,
|
|
|
|
}
|
|
|
|
|
2023-08-17 00:04:37 +02:00
|
|
|
impl CursorController {
|
|
|
|
pub(crate) fn new(
|
|
|
|
uid: String,
|
|
|
|
op: mpsc::Sender<CursorEvent>,
|
|
|
|
stream: Mutex<broadcast::Receiver<CursorEvent>>
|
|
|
|
) -> Self {
|
|
|
|
CursorController { uid, op, stream }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
#[async_trait]
|
|
|
|
impl Controller<CursorEvent> for CursorController {
|
|
|
|
type Input = CursorPosition;
|
|
|
|
|
2023-08-17 02:58:55 +02:00
|
|
|
async fn send(&self, cursor: CursorPosition) -> Result<(), Error> {
|
2023-08-16 23:09:47 +02:00
|
|
|
Ok(self.op.send(CursorEvent {
|
|
|
|
user: self.uid.clone(),
|
|
|
|
position: Some(cursor),
|
|
|
|
}).await?)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO is this cancelable? so it can be used in tokio::select!
|
|
|
|
// TODO is the result type overkill? should be an option?
|
2023-08-17 02:58:55 +02:00
|
|
|
async fn recv(&self) -> Result<CursorEvent, Error> {
|
2023-08-16 23:09:47 +02:00
|
|
|
let mut stream = self.stream.lock().await;
|
|
|
|
match stream.recv().await {
|
|
|
|
Ok(x) => Ok(x),
|
2023-08-17 02:58:55 +02:00
|
|
|
Err(RecvError::Closed) => Err(Error::Channel { send: false }),
|
2023-08-16 23:09:47 +02:00
|
|
|
Err(RecvError::Lagged(n)) => {
|
|
|
|
tracing::error!("cursor channel lagged behind, skipping {} events", n);
|
|
|
|
Ok(stream.recv().await.expect("could not receive after lagging"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// fn try_poll(&self) -> Option<Option<CursorPosition>> {
|
|
|
|
// match self.stream.try_lock() {
|
|
|
|
// Err(_) => None,
|
|
|
|
// Ok(mut x) => match x.try_recv() {
|
|
|
|
// Ok(x) => Some(Some(x)),
|
|
|
|
// Err(TryRecvError::Empty) => None,
|
|
|
|
// Err(TryRecvError::Closed) => Some(None),
|
|
|
|
// Err(TryRecvError::Lagged(n)) => {
|
|
|
|
// tracing::error!("cursor channel lagged behind, skipping {} events", n);
|
|
|
|
// Some(Some(x.try_recv().expect("could not receive after lagging")))
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
}
|