From 917a6b96c2818dd628e0a759b57f8316266f64e4 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 5 Sep 2024 23:59:05 +0200 Subject: [PATCH] feat: automatically map cursor uuids to names not a fan of passing an Arc down to the cursor worker, but it needs to access a mapping managed by the workspace so not sure if it can be better. Into and From protocol types and Cursor are gone: do things manually (since user is now a different thing, it can't be auto). Also api::Cursor got changed: user field is Option now --- src/api/cursor.rs | 62 +--------------------------------------- src/cursor/controller.rs | 8 ++++-- src/cursor/worker.rs | 28 +++++++++++------- src/workspace.rs | 8 ++++-- 4 files changed, 30 insertions(+), 76 deletions(-) diff --git a/src/api/cursor.rs b/src/api/cursor.rs index 44a8130..dc7467f 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -1,9 +1,6 @@ //! ### Cursor //! Represents the position of a remote user's cursor. -use codemp_proto as proto; -use uuid::Uuid; - #[cfg(feature = "python")] use pyo3::prelude::*; @@ -19,62 +16,5 @@ pub struct Cursor { /// Path of buffer this cursor is on. pub buffer: String, /// User display name, if provided. - pub user: Option, // TODO this should be a string, not the UUID! -} - -impl From for Cursor { - fn from(value: proto::cursor::CursorPosition) -> Self { - Self { - start: (value.start.row, value.start.col), - end: (value.end.row, value.end.col), - buffer: value.buffer.path, - user: None, - } - } -} - -impl From for proto::cursor::CursorPosition { - fn from(value: Cursor) -> Self { - Self { - buffer: proto::files::BufferNode { path: value.buffer }, - start: proto::cursor::RowCol { - row: value.start.0, - col: value.start.1, - }, - end: proto::cursor::RowCol { - row: value.end.0, - col: value.end.1, - }, - } - } -} - -impl From for Cursor { - fn from(value: proto::cursor::CursorEvent) -> Self { - Self { - start: (value.position.start.row, value.position.start.col), - end: (value.position.end.row, value.position.end.col), - buffer: value.position.buffer.path, - user: Some(value.user.uuid()), - } - } -} - -impl From for proto::cursor::CursorEvent { - fn from(value: Cursor) -> Self { - Self { - user: value.user.unwrap_or_default().into(), - position: proto::cursor::CursorPosition { - buffer: proto::files::BufferNode { path: value.buffer }, - start: proto::cursor::RowCol { - row: value.start.0, - col: value.start.1, - }, - end: proto::cursor::RowCol { - row: value.end.0, - col: value.end.1, - }, - }, - } - } + pub user: Option, } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index b4bf6a2..780590e 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult}; -use codemp_proto::cursor::CursorPosition; +use codemp_proto::{cursor::{CursorPosition, RowCol}, files::BufferNode}; /// A [Controller] for asynchronously sending and receiving [Cursor] event. /// @@ -31,7 +31,11 @@ impl Controller for CursorController { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self.0.op.send(cursor.into()).await?) + Ok(self.0.op.send(CursorPosition { + buffer: BufferNode { path: cursor.buffer }, + start: RowCol { row: cursor.start.0, col: cursor.start.1 }, + end: RowCol { row: cursor.end.0, col: cursor.end.1 }, + }).await?) } async fn try_recv(&self) -> ControllerResult> { diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index bbe2a13..ece4bf5 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -2,14 +2,16 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; +use uuid::Uuid; -use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, ext::IgnorableError}; +use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; pub(crate) struct CursorWorker { op: mpsc::Receiver, + map: Arc>, stream: mpsc::Receiver>>, poll: mpsc::UnboundedReceiver>, pollers: Vec>, @@ -19,15 +21,10 @@ pub(crate) struct CursorWorker { callback: watch::Receiver>>, } -impl Default for CursorWorker { - fn default() -> Self { - Self::new(64) - } -} - impl CursorWorker { - fn new(buffer_size: usize) -> Self { - let (op_tx, op_rx) = mpsc::channel(buffer_size); + pub fn new(user_map: Arc>) -> Self { + // TODO we should tweak the channel buffer size to better propagate backpressure + let (op_tx, op_rx) = mpsc::channel(64); let (stream_tx, stream_rx) = mpsc::channel(1); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (cb_tx, cb_rx) = watch::channel(None); @@ -41,6 +38,7 @@ impl CursorWorker { }; Self { op: op_rx, + map: user_map, stream: stream_rx, store: std::collections::VecDeque::default(), stop: end_rx, @@ -82,7 +80,17 @@ impl ControllerWorker for CursorWorker { // server sents us a cursor Ok(Some(cur)) = rx.message() => { tracing::debug!("received cursor from server"); - self.store.push_back(cur.into()); + let mut cursor = Cursor { + buffer: cur.position.buffer.path, + start: (cur.position.start.row, cur.position.start.col), + end: (cur.position.end.row, cur.position.end.col), + user: None, + }; + let user_id = Uuid::from(cur.user); + if let Some(user) = self.map.get(&user_id) { + cursor.user = Some(user.name.clone()); + } + self.store.push_back(cursor); for tx in self.pollers.drain(..) { tx.send(()).unwrap_or_warn("poller dropped before unblocking"); } diff --git a/src/workspace.rs b/src/workspace.rs index a768d53..6c4dc97 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -44,7 +44,7 @@ struct WorkspaceInner { cursor: cursor::Controller, buffers: DashMap, filetree: DashSet, - users: DashMap, + users: Arc>, services: Services, // TODO can we drop the mutex? events: tokio::sync::Mutex>, @@ -70,7 +70,9 @@ impl Workspace { .await? .into_inner(); - let worker = CursorWorker::default(); + let users = Arc::new(DashMap::default()); + + let worker = CursorWorker::new(users.clone()); let controller = worker.controller(); tokio::spawn(async move { tracing::debug!("controller worker started"); @@ -84,7 +86,7 @@ impl Workspace { cursor: controller, buffers: DashMap::default(), filetree: DashSet::default(), - users: DashMap::default(), + users, events: tokio::sync::Mutex::new(ev_rx), services, }));