diff --git a/src/workspace/worker.rs b/src/workspace/worker.rs index ccd6ddc..389b960 100644 --- a/src/workspace/worker.rs +++ b/src/workspace/worker.rs @@ -27,7 +27,6 @@ use uuid::Uuid; #[cfg(feature = "js")] use napi_derive::napi; - #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "js", napi)] @@ -39,10 +38,11 @@ struct WorkspaceInner { user_id: Uuid, // reference to global user id cursor: cursor::Controller, buffers: DashMap, - event: InternallyMutable, filetree: DashSet, users: DashMap, - services: Services + services: Services, + // TODO can we drop the mutex? + events: tokio::sync::Mutex>, } impl Workspace { @@ -60,6 +60,7 @@ impl Workspace { .into_inner(); let (tx, rx) = mpsc::channel(256); + let (ev_tx, ev_rx) = mpsc::unbounded_channel(); let cur_stream = services.cur() .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .await? @@ -77,21 +78,22 @@ impl Workspace { id, user_id, cursor: controller, - event: InternallyMutable::new(WorkspaceEventInner::Join(UserJoin { user: Identity { id: user_id.to_string() } })), buffers: DashMap::default(), filetree: DashSet::default(), users: DashMap::default(), + events: tokio::sync::Mutex::new(ev_rx), services, })); ws.fetch_users().await?; ws.fetch_buffers().await?; - ws.run_actor(ws_stream); + ws.run_actor(ws_stream, ev_tx); Ok(ws) } - pub(crate) fn run_actor(&self, mut stream: Streaming) { + pub(crate) fn run_actor(&self, mut stream: Streaming, tx: mpsc::UnboundedSender) { + // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? let inner = self.0.clone(); let name = self.id(); tokio::spawn(async move { @@ -103,14 +105,16 @@ impl Workspace { tracing::warn!("workspace {} received empty event", name) } Ok(Some(WorkspaceEvent { event: Some(ev) })) => { - let _ev = ev.clone(); + let update = crate::api::Event::from(&ev); match ev { + // user WorkspaceEventInner::Join(UserJoin { user }) => { inner.users.insert(user.clone().into(), User { id: user.into() }); } WorkspaceEventInner::Leave(UserLeave { user }) => { inner.users.remove(&user.into()); } + // buffer WorkspaceEventInner::Create(FileCreate { path }) => { inner.filetree.insert(path); } @@ -125,7 +129,9 @@ impl Workspace { } } } - inner.event.set(_ev); + if tx.send(update).is_err() { + tracing::warn!("no active controller to receive workspace event"); + } }, } } @@ -208,14 +214,10 @@ impl Workspace { } } - /// await and return next workspace event - /// - /// TODO dont use inner proto type - /// TODO can it fail? are we just masking? - /// TODO tiemout i guess - pub async fn poll(&self) -> WorkspaceEventInner { - self.0.event.wait().await; - self.0.event.get() + /// await next workspace [crate::api::Event] and return it + pub async fn event(&self) -> crate::Result { + self.0.events.lock().await.recv().await + .ok_or(crate::Error::Channel { send: false }) } /// fetch a list of all buffers in a workspace