feat: workspace worker

Co-authored-by: alemi <me@alemi.dev>
This commit is contained in:
zaaarf 2024-11-19 23:57:39 +01:00
parent f724215d5e
commit 34fd4bdf82
No known key found for this signature in database
GPG key ID: 102E445F4C3F829B
2 changed files with 68 additions and 54 deletions

View file

@ -37,7 +37,7 @@ pub struct Client(Arc<ClientInner>);
#[derive(Debug)] #[derive(Debug)]
struct ClientInner { struct ClientInner {
user: User, user: Arc<User>,
config: crate::api::Config, config: crate::api::Config,
workspaces: DashMap<String, Workspace>, workspaces: DashMap<String, Workspace>,
auth: AuthClient<Channel>, auth: AuthClient<Channel>,
@ -67,7 +67,7 @@ impl Client {
SessionClient::with_interceptor(channel, network::SessionInterceptor(claims.channel())); SessionClient::with_interceptor(channel, network::SessionInterceptor(claims.channel()));
Ok(Client(Arc::new(ClientInner { Ok(Client(Arc::new(ClientInner {
user: resp.user.into(), user: Arc::new(resp.user.into()),
workspaces: DashMap::default(), workspaces: DashMap::default(),
claims, claims,
auth, auth,

View file

@ -10,7 +10,7 @@ use crate::{
}, },
buffer, cursor, buffer, cursor,
errors::{ConnectionResult, ControllerResult, RemoteResult}, errors::{ConnectionResult, ControllerResult, RemoteResult},
ext::InternallyMutable, ext::{IgnorableError, InternallyMutable},
network::Services, network::Services,
}; };
@ -26,8 +26,8 @@ use codemp_proto::{
}; };
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use std::sync::Arc; use std::sync::{Arc, Weak};
use tokio::sync::{mpsc, mpsc::error::TryRecvError}; use tokio::sync::{mpsc::{self, error::TryRecvError}, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
@ -48,17 +48,15 @@ pub struct Workspace(Arc<WorkspaceInner>);
#[derive(Debug)] #[derive(Debug)]
struct WorkspaceInner { struct WorkspaceInner {
name: String, name: String,
user: User, // TODO back-reference to global user id... needed for buffer controllers current_user: Arc<User>,
cursor: cursor::Controller, cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>, buffers: DashMap<String, buffer::Controller>,
services: Services, services: Services,
// TODO these two are Arced so that the inner worker can hold them without holding the
// WorkspaceInner itself, otherwise its impossible to drop Workspace
filetree: DashSet<String>, filetree: DashSet<String>,
users: Arc<DashMap<Uuid, User>>, users: Arc<DashMap<Uuid, User>>,
// TODO can we drop the mutex?
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>, events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
callback: std::sync::Mutex<Option<ControllerCallback<Workspace>>>, // TODO lmao another one callback: watch::Sender<Option<ControllerCallback<Workspace>>>,
poll_tx: mpsc::UnboundedSender<oneshot::Sender<()>>,
} }
impl AsyncReceiver<Event> for Workspace { impl AsyncReceiver<Event> for Workspace {
@ -71,32 +69,27 @@ impl AsyncReceiver<Event> for Workspace {
} }
async fn poll(&self) -> ControllerResult<()> { async fn poll(&self) -> ControllerResult<()> {
loop { let (tx, rx) = oneshot::channel();
if !self.0.events.lock().await.is_empty() { self.0.poll_tx.send(tx)?;
break Ok(()); Ok(rx.await?)
}
// TODO disgusting, please send help
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
} }
// TODO please send HELP ASAP this is hurting me emotionally
fn clear_callback(&self) { fn clear_callback(&self) {
*self.0.callback.lock().expect("mutex poisoned") = None; self.0.callback.send_replace(None);
} }
fn callback(&self, cb: impl Into<ControllerCallback<Self>>) { fn callback(&self, cb: impl Into<ControllerCallback<Self>>) {
*self.0.callback.lock().expect("mutex poisoned") = Some(cb.into()); self.0.callback.send_replace(Some(cb.into()));
} }
} }
impl Workspace { impl Workspace {
pub(crate) async fn connect( pub(crate) async fn connect(
name: String, name: String,
user: User, user: Arc<User>,
config: crate::api::Config, config: crate::api::Config,
token: Token, token: Token,
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>, // TODO ughh receiving this claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
) -> ConnectionResult<Self> { ) -> ConnectionResult<Self> {
let workspace_claim = InternallyMutable::new(token); let workspace_claim = InternallyMutable::new(token);
let services = let services =
@ -105,6 +98,8 @@ impl Workspace {
let (tx, rx) = mpsc::channel(128); let (tx, rx) = mpsc::channel(128);
let (ev_tx, ev_rx) = mpsc::unbounded_channel(); let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
let (cb_tx, cb_rx) = watch::channel(None);
let cur_stream = services let cur_stream = services
.cur() .cur()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .attach(tokio_stream::wrappers::ReceiverStream::new(rx))
@ -112,24 +107,36 @@ impl Workspace {
.into_inner(); .into_inner();
let users = Arc::new(DashMap::default()); let users = Arc::new(DashMap::default());
let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream, &name); let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream, &name);
let ws = Self(Arc::new(WorkspaceInner { let ws = Self(Arc::new(WorkspaceInner {
name, name: name.clone(),
user, current_user: user,
cursor: controller, cursor: controller,
buffers: DashMap::default(), buffers: DashMap::default(),
filetree: DashSet::default(), filetree: DashSet::default(),
users, users,
events: tokio::sync::Mutex::new(ev_rx), events: tokio::sync::Mutex::new(ev_rx),
services, services,
callback: std::sync::Mutex::new(None), callback: cb_tx,
poll_tx,
})); }));
let weak = Arc::downgrade(&ws.0);
let worker = WorkspaceWorker {
callback: cb_rx,
pollers: Vec::new(),
poll_rx,
events: ev_tx,
};
let _t = tokio::spawn(async move {
worker.work(name, ws_stream, weak).await;
});
ws.fetch_users().await?; ws.fetch_users().await?;
ws.fetch_buffers().await?; ws.fetch_buffers().await?;
ws.run_actor(ws_stream, ev_tx);
Ok(ws) Ok(ws)
} }
@ -175,7 +182,7 @@ impl Workspace {
); );
let stream = self.0.services.buf().attach(req).await?.into_inner(); let stream = self.0.services.buf().attach(req).await?.into_inner();
let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream, &self.0.name); let controller = buffer::Controller::spawn(self.0.current_user.id, path, tx, stream, &self.0.name);
self.0.buffers.insert(path.to_string(), controller.clone()); self.0.buffers.insert(path.to_string(), controller.clone());
Ok(controller) Ok(controller)
@ -325,29 +332,26 @@ impl Workspace {
tree.sort(); tree.sort();
tree tree
} }
}
pub(crate) fn run_actor( struct WorkspaceWorker {
&self, callback: watch::Receiver<Option<ControllerCallback<Workspace>>>,
mut stream: Streaming<WorkspaceEvent>, pollers: Vec<oneshot::Sender<()>>,
tx: mpsc::UnboundedSender<crate::api::Event>, poll_rx: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
) { events: mpsc::UnboundedSender<crate::api::Event>,
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? }
let weak = Arc::downgrade(&self.0);
let name = self.id(); impl WorkspaceWorker {
tokio::spawn(async move { pub(crate) async fn work(mut self, name: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
tracing::debug!("workspace worker starting"); tracing::debug!("workspace worker starting");
loop { loop {
// TODO can we stop responsively rather than poll for Arc being dropped? tokio::select! {
if weak.upgrade().is_none() { res = self.poll_rx.recv() => match res {
break; None => break tracing::debug!("pollers channel closed: workspace has been dropped"),
}; Some(x) => self.pollers.push(x),
let Some(res) = tokio::select!( },
x = stream.message() => Some(x),
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None, res = stream.message() => match res {
) else {
continue;
};
match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name), Ok(None) => break tracing::info!("leaving workspace {}", name),
Ok(Some(WorkspaceEvent { event: None })) => { Ok(Some(WorkspaceEvent { event: None })) => {
@ -377,13 +381,23 @@ impl Workspace {
let _ = inner.buffers.remove(&path); let _ = inner.buffers.remove(&path);
} }
} }
if tx.send(update).is_err() { if self.events.send(update).is_err() {
tracing::warn!("no active controller to receive workspace event"); tracing::warn!("no active controller to receive workspace event");
} }
self.pollers.drain(..).for_each(|x| {
x.send(()).unwrap_or_warn("poller dropped before completion");
});
if let Some(cb) = self.callback.borrow().as_ref() {
if let Some(ws) = weak.upgrade() {
cb.call(Workspace(ws));
} else {
break tracing::debug!("workspace worker clean exit");
} }
} }
} }
},
}
}
tracing::debug!("workspace worker stopping"); tracing::debug!("workspace worker stopping");
});
} }
} }