feat: cursor+buffer controllers know their ws id

This commit is contained in:
əlemi 2024-11-16 16:42:58 +01:00
parent 4fcab00d34
commit 03a158e678
Signed by: alemi
GPG key ID: A4895B84D311642C
5 changed files with 22 additions and 5 deletions

View file

@ -22,9 +22,14 @@ use crate::ext::IgnorableError;
pub struct BufferController(pub(crate) Arc<BufferControllerInner>); pub struct BufferController(pub(crate) Arc<BufferControllerInner>);
impl BufferController { impl BufferController {
/// Get id of workspace containing this controller
pub fn workspace_id(&self) -> &str {
&self.0.workspace_id
}
/// Get the buffer path. /// Get the buffer path.
pub fn path(&self) -> &str { pub fn path(&self) -> &str {
&self.0.name &self.0.path
} }
/// Return buffer whole content, updating internal acknowledgement tracker. /// Return buffer whole content, updating internal acknowledgement tracker.
@ -50,7 +55,7 @@ impl BufferController {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct BufferControllerInner { pub(crate) struct BufferControllerInner {
pub(crate) name: String, pub(crate) path: String,
pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>, pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) local_version: watch::Receiver<diamond_types::LocalVersion>, pub(crate) local_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) ops_in: mpsc::UnboundedSender<TextChange>, pub(crate) ops_in: mpsc::UnboundedSender<TextChange>,
@ -59,6 +64,7 @@ pub(crate) struct BufferControllerInner {
pub(crate) delta_request: mpsc::Sender<oneshot::Sender<Option<BufferUpdate>>>, pub(crate) delta_request: mpsc::Sender<oneshot::Sender<Option<BufferUpdate>>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
pub(crate) ack_tx: mpsc::UnboundedSender<LocalVersion>, pub(crate) ack_tx: mpsc::UnboundedSender<LocalVersion>,
pub(crate) workspace_id: String,
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]

View file

@ -40,6 +40,7 @@ impl BufferController {
path: &str, path: &str,
tx: mpsc::Sender<Operation>, tx: mpsc::Sender<Operation>,
rx: Streaming<BufferEvent>, rx: Streaming<BufferEvent>,
workspace_id: &str,
) -> Self { ) -> Self {
let init = diamond_types::LocalVersion::default(); let init = diamond_types::LocalVersion::default();
@ -57,7 +58,7 @@ impl BufferController {
let agent_id = oplog.get_or_create_agent_id(&user_id.to_string()); let agent_id = oplog.get_or_create_agent_id(&user_id.to_string());
let controller = Arc::new(BufferControllerInner { let controller = Arc::new(BufferControllerInner {
name: path.to_string(), path: path.to_string(),
latest_version: latest_version_rx, latest_version: latest_version_rx,
local_version: my_version_rx, local_version: my_version_rx,
ops_in: opin_tx, ops_in: opin_tx,
@ -66,6 +67,7 @@ impl BufferController {
delta_request: recv_tx, delta_request: recv_tx,
callback: cb_tx, callback: cb_tx,
ack_tx, ack_tx,
workspace_id: workspace_id.to_string(),
}); });
let weak = Arc::downgrade(&controller); let weak = Arc::downgrade(&controller);

View file

@ -25,12 +25,19 @@ use codemp_proto::{
#[cfg_attr(feature = "js", napi_derive::napi)] #[cfg_attr(feature = "js", napi_derive::napi)]
pub struct CursorController(pub(crate) Arc<CursorControllerInner>); pub struct CursorController(pub(crate) Arc<CursorControllerInner>);
impl CursorController {
pub fn workspace_id(&self) -> &str {
&self.0.workspace_id
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct CursorControllerInner { pub(crate) struct CursorControllerInner {
pub(crate) op: mpsc::UnboundedSender<CursorPosition>, pub(crate) op: mpsc::UnboundedSender<CursorPosition>,
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>, pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
pub(crate) workspace_id: String,
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]

View file

@ -28,6 +28,7 @@ impl CursorController {
user_map: Arc<dashmap::DashMap<Uuid, User>>, user_map: Arc<dashmap::DashMap<Uuid, User>>,
tx: mpsc::Sender<CursorPosition>, tx: mpsc::Sender<CursorPosition>,
rx: Streaming<CursorEvent>, rx: Streaming<CursorEvent>,
workspace_id: &str,
) -> Self { ) -> Self {
// TODO we should tweak the channel buffer size to better propagate backpressure // TODO we should tweak the channel buffer size to better propagate backpressure
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
@ -39,6 +40,7 @@ impl CursorController {
stream: stream_tx, stream: stream_tx,
callback: cb_tx, callback: cb_tx,
poll: poll_tx, poll: poll_tx,
workspace_id: workspace_id.to_string(),
}); });
let weak = Arc::downgrade(&controller); let weak = Arc::downgrade(&controller);

View file

@ -113,7 +113,7 @@ impl Workspace {
let users = Arc::new(DashMap::default()); let users = Arc::new(DashMap::default());
let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream); let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream, &name);
let ws = Self(Arc::new(WorkspaceInner { let ws = Self(Arc::new(WorkspaceInner {
name, name,
@ -175,7 +175,7 @@ impl Workspace {
); );
let stream = self.0.services.buf().attach(req).await?.into_inner(); let stream = self.0.services.buf().attach(req).await?.into_inner();
let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream); let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream, &self.0.name);
self.0.buffers.insert(path.to_string(), controller.clone()); self.0.buffers.insert(path.to_string(), controller.clone());
Ok(controller) Ok(controller)