diff --git a/src/client.rs b/src/client.rs index 21e4662..15b44c4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -37,7 +37,7 @@ pub struct Client(Arc); #[derive(Debug)] struct ClientInner { - user: User, + user: Arc, config: crate::api::Config, workspaces: DashMap, auth: AuthClient, @@ -67,7 +67,7 @@ impl Client { SessionClient::with_interceptor(channel, network::SessionInterceptor(claims.channel())); Ok(Client(Arc::new(ClientInner { - user: resp.user.into(), + user: Arc::new(resp.user.into()), workspaces: DashMap::default(), claims, auth, diff --git a/src/workspace.rs b/src/workspace.rs index bd46f5c..7de40dd 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -10,7 +10,7 @@ use crate::{ }, buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, - ext::InternallyMutable, + ext::{IgnorableError, InternallyMutable}, network::Services, }; @@ -26,8 +26,8 @@ use codemp_proto::{ }; use dashmap::{DashMap, DashSet}; -use std::sync::Arc; -use tokio::sync::{mpsc, mpsc::error::TryRecvError}; +use std::sync::{Arc, Weak}; +use tokio::sync::{mpsc::{self, error::TryRecvError}, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; @@ -48,17 +48,15 @@ pub struct Workspace(Arc); #[derive(Debug)] struct WorkspaceInner { name: String, - user: User, // TODO back-reference to global user id... needed for buffer controllers + current_user: Arc, cursor: cursor::Controller, buffers: DashMap, services: Services, - // TODO these two are Arced so that the inner worker can hold them without holding the - // WorkspaceInner itself, otherwise its impossible to drop Workspace filetree: DashSet, users: Arc>, - // TODO can we drop the mutex? events: tokio::sync::Mutex>, - callback: std::sync::Mutex>>, // TODO lmao another one + callback: watch::Sender>>, + poll_tx: mpsc::UnboundedSender>, } impl AsyncReceiver for Workspace { @@ -71,32 +69,27 @@ impl AsyncReceiver for Workspace { } async fn poll(&self) -> ControllerResult<()> { - loop { - if !self.0.events.lock().await.is_empty() { - break Ok(()); - } - // TODO disgusting, please send help - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - } + let (tx, rx) = oneshot::channel(); + self.0.poll_tx.send(tx)?; + Ok(rx.await?) } - // TODO please send HELP ASAP this is hurting me emotionally fn clear_callback(&self) { - *self.0.callback.lock().expect("mutex poisoned") = None; + self.0.callback.send_replace(None); } fn callback(&self, cb: impl Into>) { - *self.0.callback.lock().expect("mutex poisoned") = Some(cb.into()); + self.0.callback.send_replace(Some(cb.into())); } } impl Workspace { pub(crate) async fn connect( name: String, - user: User, + user: Arc, config: crate::api::Config, token: Token, - claims: tokio::sync::watch::Receiver, // TODO ughh receiving this + claims: tokio::sync::watch::Receiver, ) -> ConnectionResult { let workspace_claim = InternallyMutable::new(token); let services = @@ -105,6 +98,8 @@ impl Workspace { let (tx, rx) = mpsc::channel(128); let (ev_tx, ev_rx) = mpsc::unbounded_channel(); + let (poll_tx, poll_rx) = mpsc::unbounded_channel(); + let (cb_tx, cb_rx) = watch::channel(None); let cur_stream = services .cur() .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) @@ -112,24 +107,36 @@ impl Workspace { .into_inner(); let users = Arc::new(DashMap::default()); - let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream, &name); let ws = Self(Arc::new(WorkspaceInner { - name, - user, + name: name.clone(), + current_user: user, cursor: controller, buffers: DashMap::default(), filetree: DashSet::default(), users, events: tokio::sync::Mutex::new(ev_rx), services, - callback: std::sync::Mutex::new(None), + callback: cb_tx, + poll_tx, })); + let weak = Arc::downgrade(&ws.0); + + let worker = WorkspaceWorker { + callback: cb_rx, + pollers: Vec::new(), + poll_rx, + events: ev_tx, + }; + + let _t = tokio::spawn(async move { + worker.work(name, ws_stream, weak).await; + }); + ws.fetch_users().await?; ws.fetch_buffers().await?; - ws.run_actor(ws_stream, ev_tx); Ok(ws) } @@ -175,7 +182,7 @@ impl Workspace { ); let stream = self.0.services.buf().attach(req).await?.into_inner(); - let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream, &self.0.name); + let controller = buffer::Controller::spawn(self.0.current_user.id, path, tx, stream, &self.0.name); self.0.buffers.insert(path.to_string(), controller.clone()); Ok(controller) @@ -325,29 +332,26 @@ impl Workspace { tree.sort(); tree } +} - pub(crate) fn run_actor( - &self, - mut stream: Streaming, - tx: mpsc::UnboundedSender, - ) { - // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? - let weak = Arc::downgrade(&self.0); - let name = self.id(); - tokio::spawn(async move { - tracing::debug!("workspace worker starting"); - loop { - // TODO can we stop responsively rather than poll for Arc being dropped? - if weak.upgrade().is_none() { - break; - }; - let Some(res) = tokio::select!( - x = stream.message() => Some(x), - _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None, - ) else { - continue; - }; - match res { +struct WorkspaceWorker { + callback: watch::Receiver>>, + pollers: Vec>, + poll_rx: mpsc::UnboundedReceiver>, + events: mpsc::UnboundedSender, +} + +impl WorkspaceWorker { + pub(crate) async fn work(mut self, name: String, mut stream: Streaming, weak: Weak) { + tracing::debug!("workspace worker starting"); + loop { + tokio::select! { + res = self.poll_rx.recv() => match res { + None => break tracing::debug!("pollers channel closed: workspace has been dropped"), + Some(x) => self.pollers.push(x), + }, + + res = stream.message() => match res { Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Ok(None) => break tracing::info!("leaving workspace {}", name), Ok(Some(WorkspaceEvent { event: None })) => { @@ -377,13 +381,23 @@ impl Workspace { let _ = inner.buffers.remove(&path); } } - if tx.send(update).is_err() { + if self.events.send(update).is_err() { tracing::warn!("no active controller to receive workspace event"); } + self.pollers.drain(..).for_each(|x| { + x.send(()).unwrap_or_warn("poller dropped before completion"); + }); + if let Some(cb) = self.callback.borrow().as_ref() { + if let Some(ws) = weak.upgrade() { + cb.call(Workspace(ws)); + } else { + break tracing::debug!("workspace worker clean exit"); + } + } } - } + }, } - tracing::debug!("workspace worker stopping"); - }); + } + tracing::debug!("workspace worker stopping"); } }