diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 62b6f6d..9f1a9d1 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -22,9 +22,14 @@ use crate::ext::IgnorableError; pub struct BufferController(pub(crate) Arc); impl BufferController { + /// Get id of workspace containing this controller + pub fn workspace_id(&self) -> &str { + &self.0.workspace_id + } + /// Get the buffer path. pub fn path(&self) -> &str { - &self.0.name + &self.0.path } /// Return buffer whole content, updating internal acknowledgement tracker. @@ -50,7 +55,7 @@ impl BufferController { #[derive(Debug)] pub(crate) struct BufferControllerInner { - pub(crate) name: String, + pub(crate) path: String, pub(crate) latest_version: watch::Receiver, pub(crate) local_version: watch::Receiver, pub(crate) ops_in: mpsc::UnboundedSender, @@ -59,6 +64,7 @@ pub(crate) struct BufferControllerInner { pub(crate) delta_request: mpsc::Sender>>, pub(crate) callback: watch::Sender>>, pub(crate) ack_tx: mpsc::UnboundedSender, + pub(crate) workspace_id: String, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 48de01b..8a9b405 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -40,6 +40,7 @@ impl BufferController { path: &str, tx: mpsc::Sender, rx: Streaming, + workspace_id: &str, ) -> Self { let init = diamond_types::LocalVersion::default(); @@ -57,7 +58,7 @@ impl BufferController { let agent_id = oplog.get_or_create_agent_id(&user_id.to_string()); let controller = Arc::new(BufferControllerInner { - name: path.to_string(), + path: path.to_string(), latest_version: latest_version_rx, local_version: my_version_rx, ops_in: opin_tx, @@ -66,6 +67,7 @@ impl BufferController { delta_request: recv_tx, callback: cb_tx, ack_tx, + workspace_id: workspace_id.to_string(), }); let weak = Arc::downgrade(&controller); diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index ec26069..d0c544c 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -25,12 +25,19 @@ use codemp_proto::{ #[cfg_attr(feature = "js", napi_derive::napi)] pub struct CursorController(pub(crate) Arc); +impl CursorController { + pub fn workspace_id(&self) -> &str { + &self.0.workspace_id + } +} + #[derive(Debug)] pub(crate) struct CursorControllerInner { pub(crate) op: mpsc::UnboundedSender, pub(crate) stream: mpsc::Sender>>, pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, + pub(crate) workspace_id: String, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index a690a9a..16d5d26 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -28,6 +28,7 @@ impl CursorController { user_map: Arc>, tx: mpsc::Sender, rx: Streaming, + workspace_id: &str, ) -> Self { // TODO we should tweak the channel buffer size to better propagate backpressure let (op_tx, op_rx) = mpsc::unbounded_channel(); @@ -39,6 +40,7 @@ impl CursorController { stream: stream_tx, callback: cb_tx, poll: poll_tx, + workspace_id: workspace_id.to_string(), }); let weak = Arc::downgrade(&controller); diff --git a/src/workspace.rs b/src/workspace.rs index 4d0a788..bd46f5c 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -113,7 +113,7 @@ impl Workspace { let users = Arc::new(DashMap::default()); - let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream); + let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream, &name); let ws = Self(Arc::new(WorkspaceInner { name, @@ -175,7 +175,7 @@ impl Workspace { ); let stream = self.0.services.buf().attach(req).await?.into_inner(); - let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream); + let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream, &self.0.name); self.0.buffers.insert(path.to_string(), controller.clone()); Ok(controller)