From 4d418c814e2ebe47aba9b13abafd95983414a432 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 26 Sep 2024 02:29:13 +0200 Subject: [PATCH] fix: use Weak refs to prevent leaky cycles --- src/api/controller.rs | 24 +++------- src/buffer/controller.rs | 5 --- src/buffer/worker.rs | 83 ++++++++++++++++++----------------- src/cursor/controller.rs | 5 --- src/cursor/worker.rs | 92 ++++++++++++++++++--------------------- src/workspace.rs | 94 +++++++++++++--------------------------- 6 files changed, 120 insertions(+), 183 deletions(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index f56933c..7b54b81 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -5,21 +5,13 @@ use crate::errors::ControllerResult; -pub(crate) trait ControllerWorker { - type Controller : Controller; - type Tx; - type Rx; - - fn controller(&self) -> Self::Controller; - async fn work(self, tx: Self::Tx, rx: Self::Rx); -} - // note that we don't use thiserror's #[from] because we don't want the error structs to contain // these foreign types, and also we want these to be easily constructable /// Asynchronous and thread-safe handle to a generic bidirectional stream. /// -/// This generic trait is implemented by actors managing stream procedures. +/// This generic trait is implemented by actors managing stream procedures, and will generally +/// imply a background worker. /// /// Events can be enqueued for dispatching without blocking with [`Controller::send`]. /// @@ -27,6 +19,9 @@ pub(crate) trait ControllerWorker { /// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, /// [`Controller::poll`] combined with [`Controller::try_recv`]. /// +/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have +/// been dropped. +/// /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] @@ -57,15 +52,6 @@ pub trait Controller : Sized + Send + Sync { /// Attempt to receive a value, return None if nothing is currently available. async fn try_recv(&self) -> ControllerResult>; - - /// Stop underlying worker. - /// - /// After this is called, nothing can be received or sent anymore; however, existing - /// controllers will still be accessible until all handles are dropped. - /// - /// Returns true if the stop signal was successfully sent, false if channel was - /// closed (probably because worker had already been stopped). - fn stop(&self) -> bool; } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index f1fdbbc..73229be 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -47,7 +47,6 @@ pub(crate) struct BufferControllerInner { pub(crate) last_update: InternallyMutable, pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, pub(crate) poller: mpsc::UnboundedSender>, - pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender, pub(crate) callback: watch::Sender>>, @@ -101,8 +100,4 @@ impl Controller for BufferController { tracing::warn!("no active buffer worker to clear callback"); } } - - fn stop(&self) -> bool { - self.0.stopper.send(()).is_ok() - } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index a9353ba..9e6bfa7 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::api::controller::{ControllerCallback, ControllerWorker}; +use crate::api::controller::ControllerCallback; use crate::api::TextChange; use crate::ext::{IgnorableError, InternallyMutable}; @@ -16,21 +16,21 @@ use super::controller::{BufferController, BufferControllerInner}; pub(crate) type DeltaOp = (LocalVersion, Option); pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); -pub(crate) struct BufferWorker { +struct BufferWorker { user_id: Uuid, + path: String, latest_version: watch::Sender, ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, delta_req: mpsc::Receiver, - stop: mpsc::UnboundedReceiver<()>, - controller: BufferController, + controller: std::sync::Weak, callback: watch::Receiver>>, } -impl BufferWorker { - pub fn new(user_id: Uuid, path: &str) -> Self { +impl BufferController { + pub(crate) fn spawn(user_id: Uuid, path: &str, tx: mpsc::Sender, rx: Streaming) -> Self { let init = diamond_types::LocalVersion::default(); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); @@ -42,67 +42,63 @@ impl BufferWorker { let (poller_tx, poller_rx) = mpsc::unbounded_channel(); - let (end_tx, end_rx) = mpsc::unbounded_channel(); - - let controller = BufferControllerInner { + let controller = Arc::new(BufferControllerInner { name: path.to_string(), latest_version: latest_version_rx, last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), ops_in: opin_tx, poller: poller_tx, - stopper: end_tx, content_request: req_tx, delta_request: recv_tx, callback: cb_tx, - }; + }); - BufferWorker { + let weak = Arc::downgrade(&controller); + + let worker = BufferWorker { user_id, + path: path.to_string(), latest_version: latest_version_tx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), - stop: end_rx, - controller: BufferController(Arc::new(controller)), + controller: weak, content_checkout: req_rx, delta_req: recv_rx, callback: cb_rx, - } - } -} + }; -impl ControllerWorker for BufferWorker { - type Controller = BufferController; - type Tx = mpsc::Sender; - type Rx = Streaming; + tokio::spawn(async move { + BufferController::work(worker, tx, rx).await + }); - fn controller(&self) -> BufferController { - self.controller.clone() + BufferController(controller) } - async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut worker: BufferWorker, tx: mpsc::Sender, mut rx: Streaming) { let mut branch = diamond_types::list::Branch::new(); let mut oplog = diamond_types::list::OpLog::new(); let mut timer = Timer::new(10); // TODO configurable!! + tracing::debug!("controller worker started"); + loop { + if worker.controller.upgrade().is_none() { break }; + // block until one of these is ready tokio::select! { biased; - // received stop signal - _ = self.stop.recv() => break, - // received a new poller, add it to collection - res = self.poller.recv() => match res { + res = worker.poller.recv() => match res { None => break tracing::error!("poller channel closed"), - Some(tx) => self.pollers.push(tx), + Some(tx) => worker.pollers.push(tx), }, // received a text change from editor - res = self.ops_in.recv() => match res { + res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), Some((change, ack)) => { - let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string()); + let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string()); let last_ver = oplog.local_version(); // clip to buffer extents let clip_end = std::cmp::min(branch.len(), change.end as usize); @@ -120,7 +116,7 @@ impl ControllerWorker for BufferWorker { if change.is_delete() || change.is_insert() { tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await .unwrap_or_warn("failed to send change!"); - self.latest_version.send(oplog.local_version()) + worker.latest_version.send(oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); } ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); @@ -129,18 +125,19 @@ impl ControllerWorker for BufferWorker { // received a message from server: add to oplog and update latest version (+unlock pollers) res = rx.message() => match res { - Err(_e) => break, - Ok(None) => break, - Ok(Some(change)) => { - match oplog.decode_and_add(&change.op.data) { + Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path), + Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path), + Ok(Some(change)) => match worker.controller.upgrade() { + None => break, // clean exit actually, just weird we caught it here + Some(controller) => match oplog.decode_and_add(&change.op.data) { Ok(local_version) => { - self.latest_version.send(local_version) + worker.latest_version.send(local_version) .unwrap_or_warn("failed to update latest version!"); - for tx in self.pollers.drain(..) { + for tx in worker.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); } - if let Some(cb) = self.callback.borrow().as_ref() { - cb.call(self.controller.clone()); // TODO should we run this on another task/thread? + if let Some(cb) = worker.callback.borrow().as_ref() { + cb.call(BufferController(controller)); // TODO should we run this on another task/thread? } }, Err(e) => tracing::error!("could not deserialize operation from server: {}", e), @@ -149,7 +146,7 @@ impl ControllerWorker for BufferWorker { }, // controller is ready to apply change and recv(), calculate it and send it back - res = self.delta_req.recv() => match res { + res = worker.delta_req.recv() => match res { None => break tracing::error!("no more active controllers: can't send changes"), Some((last_ver, tx)) => { if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { @@ -194,7 +191,7 @@ impl ControllerWorker for BufferWorker { }, // received a request for full CRDT content - res = self.content_checkout.recv() => match res { + res = worker.content_checkout.recv() => match res { None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { branch.merge(&oplog, oplog.local_version_ref()); @@ -204,6 +201,8 @@ impl ControllerWorker for BufferWorker { } } } + + tracing::debug!("controller worker stopped"); } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 8c2c8a4..a8dc862 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -28,7 +28,6 @@ pub(crate) struct CursorControllerInner { pub(crate) stream: mpsc::Sender>>, pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, - pub(crate) stop: mpsc::UnboundedSender<()>, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] @@ -81,8 +80,4 @@ impl Controller for CursorController { tracing::warn!("no active cursor worker to clear callback"); } } - - fn stop(&self) -> bool { - self.0.stop.send(()).is_ok() - } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index ece4bf5..56e455f 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -4,104 +4,98 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError}; +use crate::{api::{controller::ControllerCallback, Cursor, User}, ext::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; -pub(crate) struct CursorWorker { +struct CursorWorker { op: mpsc::Receiver, map: Arc>, stream: mpsc::Receiver>>, poll: mpsc::UnboundedReceiver>, pollers: Vec>, store: std::collections::VecDeque, - stop: mpsc::UnboundedReceiver<()>, - controller: CursorController, + controller: std::sync::Weak, callback: watch::Receiver>>, } -impl CursorWorker { - pub fn new(user_map: Arc>) -> Self { +impl CursorController { + pub(crate) fn spawn(user_map: Arc>, tx: mpsc::Sender, rx: Streaming) -> Self { // TODO we should tweak the channel buffer size to better propagate backpressure let (op_tx, op_rx) = mpsc::channel(64); let (stream_tx, stream_rx) = mpsc::channel(1); - let (end_tx, end_rx) = mpsc::unbounded_channel(); let (cb_tx, cb_rx) = watch::channel(None); let (poll_tx, poll_rx) = mpsc::unbounded_channel(); - let controller = CursorControllerInner { + let controller = Arc::new(CursorControllerInner { op: op_tx, stream: stream_tx, - stop: end_tx, callback: cb_tx, poll: poll_tx, - }; - Self { + }); + + let weak = Arc::downgrade(&controller); + + let worker = CursorWorker { op: op_rx, map: user_map, stream: stream_rx, store: std::collections::VecDeque::default(), - stop: end_rx, - controller: CursorController(Arc::new(controller)), + controller: weak, callback: cb_rx, poll: poll_rx, pollers: Vec::new(), - } - } -} + }; -impl ControllerWorker for CursorWorker { - type Controller = CursorController; - type Tx = mpsc::Sender; - type Rx = Streaming; + tokio::spawn(async move { CursorController::work(worker, tx, rx).await }); - fn controller(&self) -> CursorController { - self.controller.clone() + CursorController(controller) } - async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut worker: CursorWorker, tx: mpsc::Sender, mut rx: Streaming) { loop { tracing::debug!("cursor worker polling"); + if worker.controller.upgrade().is_none() { break }; // clean exit: all controllers dropped tokio::select!{ biased; - // received stop signal - Some(()) = self.stop.recv() => { break; }, - // new poller - Some(poller) = self.poll.recv() => self.pollers.push(poller), + Some(poller) = worker.poll.recv() => worker.pollers.push(poller), // client moved their cursor - Some(op) = self.op.recv() => { + Some(op) = worker.op.recv() => { tracing::debug!("received cursor from editor"); tx.send(op).await.unwrap_or_warn("could not update cursor"); }, // server sents us a cursor - Ok(Some(cur)) = rx.message() => { - tracing::debug!("received cursor from server"); - let mut cursor = Cursor { - buffer: cur.position.buffer.path, - start: (cur.position.start.row, cur.position.start.col), - end: (cur.position.end.row, cur.position.end.col), - user: None, - }; - let user_id = Uuid::from(cur.user); - if let Some(user) = self.map.get(&user_id) { - cursor.user = Some(user.name.clone()); - } - self.store.push_back(cursor); - for tx in self.pollers.drain(..) { - tx.send(()).unwrap_or_warn("poller dropped before unblocking"); - } - if let Some(cb) = self.callback.borrow().as_ref() { - tracing::debug!("running cursor callback"); - cb.call(self.controller.clone()); // TODO should this run in its own task/thread? - } + Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() { + None => break, // clean exit, just weird that we got it here + Some(controller) => { + tracing::debug!("received cursor from server"); + let mut cursor = Cursor { + buffer: cur.position.buffer.path, + start: (cur.position.start.row, cur.position.start.col), + end: (cur.position.end.row, cur.position.end.col), + user: None, + }; + let user_id = Uuid::from(cur.user); + if let Some(user) = worker.map.get(&user_id) { + cursor.user = Some(user.name.clone()); + } + worker.store.push_back(cursor); + for tx in worker.pollers.drain(..) { + tx.send(()).unwrap_or_warn("poller dropped before unblocking"); + } + if let Some(cb) = worker.callback.borrow().as_ref() { + tracing::debug!("running cursor callback"); + cb.call(CursorController(controller)); // TODO should this run in its own task/thread? + } + }, }, // client wants to get next cursor event - Some(tx) = self.stream.recv() => tx.send(self.store.pop_front()) + Some(tx) = worker.stream.recv() => tx.send(worker.store.pop_front()) .unwrap_or_warn("client gave up receiving"), else => break, diff --git a/src/workspace.rs b/src/workspace.rs index d278ccd..c4c1d9e 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,9 +4,8 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{controller::ControllerWorker, Controller, Event, User}, - buffer::{self, worker::BufferWorker}, - cursor::{self, worker::CursorWorker}, + api::{Event, User}, + buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, network::Services, @@ -49,9 +48,11 @@ struct WorkspaceInner { user: User, // TODO back-reference to global user id... needed for buffer controllers cursor: cursor::Controller, buffers: DashMap, + services: Services, + // TODO these two are Arced so that the inner worker can hold them without holding the + // WorkspaceInner itself, otherwise its impossible to drop Workspace filetree: DashSet, users: Arc>, - services: Services, // TODO can we drop the mutex? events: tokio::sync::Mutex>, } @@ -79,13 +80,7 @@ impl Workspace { let users = Arc::new(DashMap::default()); - let worker = CursorWorker::new(users.clone()); - let controller = worker.controller(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, cur_stream).await; - tracing::debug!("controller worker stopped"); - }); + let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream); let ws = Self(Arc::new(WorkspaceInner { name, @@ -141,14 +136,7 @@ impl Workspace { ); let stream = self.0.services.buf().attach(req).await?.into_inner(); - let worker = BufferWorker::new(self.0.user.id, path); - let controller = worker.controller(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, stream).await; - tracing::debug!("controller worker stopped"); - }); - + let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream); self.0.buffers.insert(path.to_string(), controller.clone()); Ok(controller) @@ -156,18 +144,19 @@ impl Workspace { /// Detach from an active buffer. /// - /// This option will be carried in background. BufferWorker will be stopped and dropped. - /// There may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be - /// accessible anymore from [`Workspace`]. - pub fn detach(&self, path: &str) -> DetachResult { + /// This will stop and drop its [`buffer::Controller`]. + /// + /// Returns `true` if connectly dropped or wasn't present, `false` if dropped but wasn't last ref + /// + /// If this method returns `false` you have a dangling ref, maybe just waiting for garbage + /// collection or maybe preventing the controller from being dropped completely + #[allow(clippy::redundant_pattern_matching)] // all cases are clearer this way + pub fn detach(&self, path: &str) -> bool { match self.0.buffers.remove(path) { - None => DetachResult::NotAttached, - Some((_name, controller)) => { - if controller.stop() { - DetachResult::Detaching - } else { - DetachResult::AlreadyDetached - } + None => true, // noop: we werent attached in the first place + Some((_name, controller)) => match Arc::into_inner(controller.0) { + None => false, // dangling ref! we can't drop this + Some(_) => true, // dropping it now } } } @@ -241,6 +230,8 @@ impl Workspace { /// Delete a buffer. pub async fn delete(&self, path: &str) -> RemoteResult<()> { + self.detach(path); // just in case + let mut workspace_client = self.0.services.ws(); workspace_client .delete_buffer(tonic::Request::new(BufferNode { @@ -248,9 +239,6 @@ impl Workspace { })) .await?; - if let Some((_name, controller)) = self.0.buffers.remove(path) { - controller.stop(); - } self.0.filetree.remove(path); @@ -320,17 +308,24 @@ impl Workspace { tx: mpsc::UnboundedSender, ) { // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? - let inner = self.0.clone(); + let weak = Arc::downgrade(&self.0); let name = self.id(); tokio::spawn(async move { loop { - match stream.message().await { + // TODO can we stop responsively rather than poll for Arc being dropped? + if weak.upgrade().is_none() { break }; + let Some(res) = tokio::select!( + x = stream.message() => Some(x), + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None, + ) else { continue }; + match res { Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Ok(None) => break tracing::info!("leaving workspace {}", name), Ok(Some(WorkspaceEvent { event: None })) => { tracing::warn!("workspace {} received empty event", name) } Ok(Some(WorkspaceEvent { event: Some(ev) })) => { + let Some(inner) = weak.upgrade() else { break }; let update = crate::api::Event::from(&ev); match ev { // user @@ -350,9 +345,7 @@ impl Workspace { } WorkspaceEventInner::Delete(FileDelete { path }) => { inner.filetree.remove(&path); - if let Some((_name, controller)) = inner.buffers.remove(&path) { - controller.stop(); - } + let _ = inner.buffers.remove(&path); } } if tx.send(update).is_err() { @@ -364,28 +357,3 @@ impl Workspace { }); } } - -impl Drop for WorkspaceInner { - fn drop(&mut self) { - for entry in self.buffers.iter() { - if !entry.value().stop() { - tracing::warn!( - "could not stop buffer worker {} for workspace {}", - entry.value().path(), - self.name - ); - } - } - if !self.cursor.stop() { - tracing::warn!("could not stop cursor worker for workspace {}", self.name); - } - } -} - -#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(eq, eq_int))] -#[cfg_attr(any(feature = "py", feature = "py-noabi"), derive(PartialEq))] -pub enum DetachResult { - NotAttached, - Detaching, - AlreadyDetached, -}