fix: improved workspace events channel

Co-authored-by: zaaarf <me@zaaarf.foo>
This commit is contained in:
əlemi 2024-08-09 00:39:58 +02:00
parent ad881ab067
commit 07dd964466
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -27,7 +27,6 @@ use uuid::Uuid;
#[cfg(feature = "js")] #[cfg(feature = "js")]
use napi_derive::napi; use napi_derive::napi;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass)]
#[cfg_attr(feature = "js", napi)] #[cfg_attr(feature = "js", napi)]
@ -39,10 +38,11 @@ struct WorkspaceInner {
user_id: Uuid, // reference to global user id user_id: Uuid, // reference to global user id
cursor: cursor::Controller, cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>, buffers: DashMap<String, buffer::Controller>,
event: InternallyMutable<WorkspaceEventInner>,
filetree: DashSet<String>, filetree: DashSet<String>,
users: DashMap<Uuid, User>, users: DashMap<Uuid, User>,
services: Services services: Services,
// TODO can we drop the mutex?
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
} }
impl Workspace { impl Workspace {
@ -60,6 +60,7 @@ impl Workspace {
.into_inner(); .into_inner();
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let cur_stream = services.cur() let cur_stream = services.cur()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await? .await?
@ -77,21 +78,22 @@ impl Workspace {
id, id,
user_id, user_id,
cursor: controller, cursor: controller,
event: InternallyMutable::new(WorkspaceEventInner::Join(UserJoin { user: Identity { id: user_id.to_string() } })),
buffers: DashMap::default(), buffers: DashMap::default(),
filetree: DashSet::default(), filetree: DashSet::default(),
users: DashMap::default(), users: DashMap::default(),
events: tokio::sync::Mutex::new(ev_rx),
services, services,
})); }));
ws.fetch_users().await?; ws.fetch_users().await?;
ws.fetch_buffers().await?; ws.fetch_buffers().await?;
ws.run_actor(ws_stream); ws.run_actor(ws_stream, ev_tx);
Ok(ws) Ok(ws)
} }
pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) { pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>, tx: mpsc::UnboundedSender<crate::api::Event>) {
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
let inner = self.0.clone(); let inner = self.0.clone();
let name = self.id(); let name = self.id();
tokio::spawn(async move { tokio::spawn(async move {
@ -103,14 +105,16 @@ impl Workspace {
tracing::warn!("workspace {} received empty event", name) tracing::warn!("workspace {} received empty event", name)
} }
Ok(Some(WorkspaceEvent { event: Some(ev) })) => { Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
let _ev = ev.clone(); let update = crate::api::Event::from(&ev);
match ev { match ev {
// user
WorkspaceEventInner::Join(UserJoin { user }) => { WorkspaceEventInner::Join(UserJoin { user }) => {
inner.users.insert(user.clone().into(), User { id: user.into() }); inner.users.insert(user.clone().into(), User { id: user.into() });
} }
WorkspaceEventInner::Leave(UserLeave { user }) => { WorkspaceEventInner::Leave(UserLeave { user }) => {
inner.users.remove(&user.into()); inner.users.remove(&user.into());
} }
// buffer
WorkspaceEventInner::Create(FileCreate { path }) => { WorkspaceEventInner::Create(FileCreate { path }) => {
inner.filetree.insert(path); inner.filetree.insert(path);
} }
@ -125,7 +129,9 @@ impl Workspace {
} }
} }
} }
inner.event.set(_ev); if tx.send(update).is_err() {
tracing::warn!("no active controller to receive workspace event");
}
}, },
} }
} }
@ -208,14 +214,10 @@ impl Workspace {
} }
} }
/// await and return next workspace event /// await next workspace [crate::api::Event] and return it
/// pub async fn event(&self) -> crate::Result<crate::api::Event> {
/// TODO dont use inner proto type self.0.events.lock().await.recv().await
/// TODO can it fail? are we just masking? .ok_or(crate::Error::Channel { send: false })
/// TODO tiemout i guess
pub async fn poll(&self) -> WorkspaceEventInner {
self.0.event.wait().await;
self.0.event.get()
} }
/// fetch a list of all buffers in a workspace /// fetch a list of all buffers in a workspace