diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 16d5d26..1c1fda2 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -18,11 +18,36 @@ struct CursorWorker { stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>, poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>, - store: std::collections::VecDeque<Cursor>, + store: std::collections::VecDeque<codemp_proto::cursor::CursorEvent>, controller: std::sync::Weak<CursorControllerInner>, callback: watch::Receiver<Option<ControllerCallback<CursorController>>>, } +impl CursorWorker { + fn handle_recv(&mut self, tx: oneshot::Sender<Option<Cursor>>) { + tx.send( + self.store.pop_front().and_then(|event| { + let user_id = Uuid::from(event.user); + if let Some(user_name) = self.map.get(&user_id).map(|u| u.name.clone()) { + Some(Cursor { + user: user_name, + sel: Selection { + buffer: event.position.buffer.path, + start_row: event.position.start.row, + start_col: event.position.start.col, + end_row: event.position.end.row, + end_col: event.position.end.col + } + }) + } else { + tracing::warn!("received cursor for unknown user {user_id}"); + None + } + }) + ).unwrap_or_warn("client gave up receiving!"); + } +} + impl CursorController { pub(crate) fn spawn( user_map: Arc<dashmap::DashMap<Uuid, User>>, @@ -88,19 +113,7 @@ impl CursorController { None => break, // clean exit, just weird that we got it here Some(controller) => { tracing::debug!("received cursor from server"); - let user_id = Uuid::from(cur.user); - let cursor = Cursor { - user: worker.map.get(&user_id).map(|u| u.name.clone()).unwrap_or_default(), - sel: Selection { - buffer: cur.position.buffer.path, - start_row: cur.position.start.row, - start_col: cur.position.start.col, - end_row: cur.position.end.row, - end_col: cur.position.end.col - } - }; - - worker.store.push_back(cursor); + worker.store.push_back(cur); for tx in worker.pollers.drain(..) { tx.send(()).unwrap_or_warn("poller dropped before unblocking"); } @@ -112,8 +125,7 @@ impl CursorController { }, // client wants to get next cursor event - Some(tx) = worker.stream.recv() => tx.send(worker.store.pop_front()) - .unwrap_or_warn("client gave up receiving"), + Some(tx) = worker.stream.recv() => worker.handle_recv(tx), else => break, }