diff --git a/dist/java/src/mp/code/BufferController.java b/dist/java/src/mp/code/BufferController.java index 0ae455f..66f4495 100644 --- a/dist/java/src/mp/code/BufferController.java +++ b/dist/java/src/mp/code/BufferController.java @@ -106,16 +106,6 @@ public final class BufferController { poll(this.ptr); } - private static native boolean stop(long self); - - /** - * Stops the controller. Any further calls to it will fail. - * @return true if it was stopped successfully - */ - public boolean stop() { - return stop(this.ptr); - } - private static native void free(long self); static { diff --git a/dist/java/src/mp/code/CursorController.java b/dist/java/src/mp/code/CursorController.java index ee3d238..159e548 100644 --- a/dist/java/src/mp/code/CursorController.java +++ b/dist/java/src/mp/code/CursorController.java @@ -83,16 +83,6 @@ public final class CursorController { poll(this.ptr); } - private static native boolean stop(long self); - - /** - * Stops the controller. Any further calls to it will fail. - * @return true if it was stopped successfully - */ - public boolean stop() { - return stop(this.ptr); - } - private static native void free(long self); static { diff --git a/dist/java/src/mp/code/Workspace.java b/dist/java/src/mp/code/Workspace.java index 62a6e49..31ee6e3 100644 --- a/dist/java/src/mp/code/Workspace.java +++ b/dist/java/src/mp/code/Workspace.java @@ -113,14 +113,14 @@ public final class Workspace { return attach_to_buffer(ptr, path); } - private static native DetachResult detach_from_buffer(long self, String path); + private static native boolean detach_from_buffer(long self, String path); /** * Detaches from a given buffer. * @param path the path of the buffer to detach from - * @return a {@link DetachResult} representing the outcome of the operation + * @return a boolean, true only if there are still dangling references preventing controller from stopping */ - public DetachResult detachFromBuffer(String path) { + public boolean detachFromBuffer(String path) { return detach_from_buffer(this.ptr, path); } diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 9cce3a8..04055ba 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -223,7 +223,7 @@ function Workspace:attach(path) end ---@param path string relative path ("name") of buffer to detach from ---@return boolean success ----detach from an active buffer, closing all streams. returns false if buffer was no longer active +---detach from an active buffer, closing all streams. returns false if there are still dangling references function Workspace:detach(path) end ---@param filter? string apply a filter to the return elements @@ -232,6 +232,10 @@ function Workspace:detach(path) end ---return the list of available buffers in this workspace, as relative paths from workspace root function Workspace:filetree(filter, strict) end +---@return string[] +---return all names of users currently in this workspace +function Workspace:user_list() end + ---@return NilPromise ---@async ---@nodiscard @@ -297,10 +301,6 @@ function BufferController:recv() end ---block until next text change without returning it function BufferController:poll() end ----@return boolean success ----stop buffer worker and disconnect, returns false if was already stopped -function BufferController:stop() end - ---clears any previously registered buffer callback function BufferController:clear_callback() end @@ -354,10 +354,6 @@ function CursorController:recv() end ---block until next cursor event without returning it function CursorController:poll() end ----@return boolean success ----stop cursor worker and disconnect, returns false if was already stopped -function CursorController:stop() end - ---clears any previously registered cursor callback function CursorController:clear_callback() end diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index 3a93af0..4df875b 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -102,7 +102,6 @@ class BufferController: def callback(self, cb: Callable[[BufferController], None]) -> None: ... def clear_callback(self) -> None: ... - def stop(self) -> bool: ... @@ -131,5 +130,4 @@ class CursorController: def callback(self, cb: Callable[[CursorController], None]) -> None: ... def clear_callback(self) -> None: ... - def stop(self) -> bool: ... diff --git a/src/api/controller.rs b/src/api/controller.rs index f56933c..7b54b81 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -5,21 +5,13 @@ use crate::errors::ControllerResult; -pub(crate) trait ControllerWorker { - type Controller : Controller; - type Tx; - type Rx; - - fn controller(&self) -> Self::Controller; - async fn work(self, tx: Self::Tx, rx: Self::Rx); -} - // note that we don't use thiserror's #[from] because we don't want the error structs to contain // these foreign types, and also we want these to be easily constructable /// Asynchronous and thread-safe handle to a generic bidirectional stream. /// -/// This generic trait is implemented by actors managing stream procedures. +/// This generic trait is implemented by actors managing stream procedures, and will generally +/// imply a background worker. /// /// Events can be enqueued for dispatching without blocking with [`Controller::send`]. /// @@ -27,6 +19,9 @@ pub(crate) trait ControllerWorker { /// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, /// [`Controller::poll`] combined with [`Controller::try_recv`]. /// +/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have +/// been dropped. +/// /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] @@ -57,15 +52,6 @@ pub trait Controller : Sized + Send + Sync { /// Attempt to receive a value, return None if nothing is currently available. async fn try_recv(&self) -> ControllerResult>; - - /// Stop underlying worker. - /// - /// After this is called, nothing can be received or sent anymore; however, existing - /// controllers will still be accessible until all handles are dropped. - /// - /// Returns true if the stop signal was successfully sent, false if channel was - /// closed (probably because worker had already been stopped). - fn stop(&self) -> bool; } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index f1fdbbc..73229be 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -47,7 +47,6 @@ pub(crate) struct BufferControllerInner { pub(crate) last_update: InternallyMutable, pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, pub(crate) poller: mpsc::UnboundedSender>, - pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender, pub(crate) callback: watch::Sender>>, @@ -101,8 +100,4 @@ impl Controller for BufferController { tracing::warn!("no active buffer worker to clear callback"); } } - - fn stop(&self) -> bool { - self.0.stopper.send(()).is_ok() - } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index a9353ba..9e6bfa7 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::api::controller::{ControllerCallback, ControllerWorker}; +use crate::api::controller::ControllerCallback; use crate::api::TextChange; use crate::ext::{IgnorableError, InternallyMutable}; @@ -16,21 +16,21 @@ use super::controller::{BufferController, BufferControllerInner}; pub(crate) type DeltaOp = (LocalVersion, Option); pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); -pub(crate) struct BufferWorker { +struct BufferWorker { user_id: Uuid, + path: String, latest_version: watch::Sender, ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, delta_req: mpsc::Receiver, - stop: mpsc::UnboundedReceiver<()>, - controller: BufferController, + controller: std::sync::Weak, callback: watch::Receiver>>, } -impl BufferWorker { - pub fn new(user_id: Uuid, path: &str) -> Self { +impl BufferController { + pub(crate) fn spawn(user_id: Uuid, path: &str, tx: mpsc::Sender, rx: Streaming) -> Self { let init = diamond_types::LocalVersion::default(); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); @@ -42,67 +42,63 @@ impl BufferWorker { let (poller_tx, poller_rx) = mpsc::unbounded_channel(); - let (end_tx, end_rx) = mpsc::unbounded_channel(); - - let controller = BufferControllerInner { + let controller = Arc::new(BufferControllerInner { name: path.to_string(), latest_version: latest_version_rx, last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), ops_in: opin_tx, poller: poller_tx, - stopper: end_tx, content_request: req_tx, delta_request: recv_tx, callback: cb_tx, - }; + }); - BufferWorker { + let weak = Arc::downgrade(&controller); + + let worker = BufferWorker { user_id, + path: path.to_string(), latest_version: latest_version_tx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), - stop: end_rx, - controller: BufferController(Arc::new(controller)), + controller: weak, content_checkout: req_rx, delta_req: recv_rx, callback: cb_rx, - } - } -} + }; -impl ControllerWorker for BufferWorker { - type Controller = BufferController; - type Tx = mpsc::Sender; - type Rx = Streaming; + tokio::spawn(async move { + BufferController::work(worker, tx, rx).await + }); - fn controller(&self) -> BufferController { - self.controller.clone() + BufferController(controller) } - async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut worker: BufferWorker, tx: mpsc::Sender, mut rx: Streaming) { let mut branch = diamond_types::list::Branch::new(); let mut oplog = diamond_types::list::OpLog::new(); let mut timer = Timer::new(10); // TODO configurable!! + tracing::debug!("controller worker started"); + loop { + if worker.controller.upgrade().is_none() { break }; + // block until one of these is ready tokio::select! { biased; - // received stop signal - _ = self.stop.recv() => break, - // received a new poller, add it to collection - res = self.poller.recv() => match res { + res = worker.poller.recv() => match res { None => break tracing::error!("poller channel closed"), - Some(tx) => self.pollers.push(tx), + Some(tx) => worker.pollers.push(tx), }, // received a text change from editor - res = self.ops_in.recv() => match res { + res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), Some((change, ack)) => { - let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string()); + let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string()); let last_ver = oplog.local_version(); // clip to buffer extents let clip_end = std::cmp::min(branch.len(), change.end as usize); @@ -120,7 +116,7 @@ impl ControllerWorker for BufferWorker { if change.is_delete() || change.is_insert() { tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await .unwrap_or_warn("failed to send change!"); - self.latest_version.send(oplog.local_version()) + worker.latest_version.send(oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); } ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); @@ -129,18 +125,19 @@ impl ControllerWorker for BufferWorker { // received a message from server: add to oplog and update latest version (+unlock pollers) res = rx.message() => match res { - Err(_e) => break, - Ok(None) => break, - Ok(Some(change)) => { - match oplog.decode_and_add(&change.op.data) { + Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path), + Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path), + Ok(Some(change)) => match worker.controller.upgrade() { + None => break, // clean exit actually, just weird we caught it here + Some(controller) => match oplog.decode_and_add(&change.op.data) { Ok(local_version) => { - self.latest_version.send(local_version) + worker.latest_version.send(local_version) .unwrap_or_warn("failed to update latest version!"); - for tx in self.pollers.drain(..) { + for tx in worker.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); } - if let Some(cb) = self.callback.borrow().as_ref() { - cb.call(self.controller.clone()); // TODO should we run this on another task/thread? + if let Some(cb) = worker.callback.borrow().as_ref() { + cb.call(BufferController(controller)); // TODO should we run this on another task/thread? } }, Err(e) => tracing::error!("could not deserialize operation from server: {}", e), @@ -149,7 +146,7 @@ impl ControllerWorker for BufferWorker { }, // controller is ready to apply change and recv(), calculate it and send it back - res = self.delta_req.recv() => match res { + res = worker.delta_req.recv() => match res { None => break tracing::error!("no more active controllers: can't send changes"), Some((last_ver, tx)) => { if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { @@ -194,7 +191,7 @@ impl ControllerWorker for BufferWorker { }, // received a request for full CRDT content - res = self.content_checkout.recv() => match res { + res = worker.content_checkout.recv() => match res { None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { branch.merge(&oplog, oplog.local_version_ref()); @@ -204,6 +201,8 @@ impl ControllerWorker for BufferWorker { } } } + + tracing::debug!("controller worker stopped"); } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 8c2c8a4..a8dc862 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -28,7 +28,6 @@ pub(crate) struct CursorControllerInner { pub(crate) stream: mpsc::Sender>>, pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, - pub(crate) stop: mpsc::UnboundedSender<()>, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] @@ -81,8 +80,4 @@ impl Controller for CursorController { tracing::warn!("no active cursor worker to clear callback"); } } - - fn stop(&self) -> bool { - self.0.stop.send(()).is_ok() - } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index ece4bf5..56e455f 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -4,104 +4,98 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError}; +use crate::{api::{controller::ControllerCallback, Cursor, User}, ext::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; -pub(crate) struct CursorWorker { +struct CursorWorker { op: mpsc::Receiver, map: Arc>, stream: mpsc::Receiver>>, poll: mpsc::UnboundedReceiver>, pollers: Vec>, store: std::collections::VecDeque, - stop: mpsc::UnboundedReceiver<()>, - controller: CursorController, + controller: std::sync::Weak, callback: watch::Receiver>>, } -impl CursorWorker { - pub fn new(user_map: Arc>) -> Self { +impl CursorController { + pub(crate) fn spawn(user_map: Arc>, tx: mpsc::Sender, rx: Streaming) -> Self { // TODO we should tweak the channel buffer size to better propagate backpressure let (op_tx, op_rx) = mpsc::channel(64); let (stream_tx, stream_rx) = mpsc::channel(1); - let (end_tx, end_rx) = mpsc::unbounded_channel(); let (cb_tx, cb_rx) = watch::channel(None); let (poll_tx, poll_rx) = mpsc::unbounded_channel(); - let controller = CursorControllerInner { + let controller = Arc::new(CursorControllerInner { op: op_tx, stream: stream_tx, - stop: end_tx, callback: cb_tx, poll: poll_tx, - }; - Self { + }); + + let weak = Arc::downgrade(&controller); + + let worker = CursorWorker { op: op_rx, map: user_map, stream: stream_rx, store: std::collections::VecDeque::default(), - stop: end_rx, - controller: CursorController(Arc::new(controller)), + controller: weak, callback: cb_rx, poll: poll_rx, pollers: Vec::new(), - } - } -} + }; -impl ControllerWorker for CursorWorker { - type Controller = CursorController; - type Tx = mpsc::Sender; - type Rx = Streaming; + tokio::spawn(async move { CursorController::work(worker, tx, rx).await }); - fn controller(&self) -> CursorController { - self.controller.clone() + CursorController(controller) } - async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut worker: CursorWorker, tx: mpsc::Sender, mut rx: Streaming) { loop { tracing::debug!("cursor worker polling"); + if worker.controller.upgrade().is_none() { break }; // clean exit: all controllers dropped tokio::select!{ biased; - // received stop signal - Some(()) = self.stop.recv() => { break; }, - // new poller - Some(poller) = self.poll.recv() => self.pollers.push(poller), + Some(poller) = worker.poll.recv() => worker.pollers.push(poller), // client moved their cursor - Some(op) = self.op.recv() => { + Some(op) = worker.op.recv() => { tracing::debug!("received cursor from editor"); tx.send(op).await.unwrap_or_warn("could not update cursor"); }, // server sents us a cursor - Ok(Some(cur)) = rx.message() => { - tracing::debug!("received cursor from server"); - let mut cursor = Cursor { - buffer: cur.position.buffer.path, - start: (cur.position.start.row, cur.position.start.col), - end: (cur.position.end.row, cur.position.end.col), - user: None, - }; - let user_id = Uuid::from(cur.user); - if let Some(user) = self.map.get(&user_id) { - cursor.user = Some(user.name.clone()); - } - self.store.push_back(cursor); - for tx in self.pollers.drain(..) { - tx.send(()).unwrap_or_warn("poller dropped before unblocking"); - } - if let Some(cb) = self.callback.borrow().as_ref() { - tracing::debug!("running cursor callback"); - cb.call(self.controller.clone()); // TODO should this run in its own task/thread? - } + Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() { + None => break, // clean exit, just weird that we got it here + Some(controller) => { + tracing::debug!("received cursor from server"); + let mut cursor = Cursor { + buffer: cur.position.buffer.path, + start: (cur.position.start.row, cur.position.start.col), + end: (cur.position.end.row, cur.position.end.col), + user: None, + }; + let user_id = Uuid::from(cur.user); + if let Some(user) = worker.map.get(&user_id) { + cursor.user = Some(user.name.clone()); + } + worker.store.push_back(cursor); + for tx in worker.pollers.drain(..) { + tx.send(()).unwrap_or_warn("poller dropped before unblocking"); + } + if let Some(cb) = worker.callback.borrow().as_ref() { + tracing::debug!("running cursor callback"); + cb.call(CursorController(controller)); // TODO should this run in its own task/thread? + } + }, }, // client wants to get next cursor event - Some(tx) = self.stream.recv() => tx.send(self.store.pop_front()) + Some(tx) = worker.stream.recv() => tx.send(worker.store.pop_front()) .unwrap_or_warn("client gave up receiving"), else => break, diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 0857118..8cc1ce4 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -80,12 +80,6 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro super::tokio().block_on(controller.poll()) } -/// Stop the controller. -#[jni(package = "mp.code", class = "BufferController")] -fn stop(controller: &mut crate::buffer::Controller) -> bool { - controller.stop() -} - /// Called by the Java GC to drop a [crate::buffer::Controller]. #[jni(package = "mp.code", class = "BufferController")] fn free(input: jni::sys::jlong) { diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 359ea2b..4cc1a4b 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -67,12 +67,6 @@ fn poll(controller: &mut crate::cursor::Controller) -> Result<(), ControllerErro super::tokio().block_on(controller.poll()) } -/// Stop the controller. -#[jni(package = "mp.code", class = "CursorController")] -fn stop(controller: &mut crate::cursor::Controller) -> bool { - controller.stop() -} - /// Called by the Java GC to drop a [crate::cursor::Controller]. #[jni(package = "mp.code", class = "CursorController")] fn free(input: jni::sys::jlong) { diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 6e0515a..1c7ec45 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -170,26 +170,6 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event { } } -impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::workspace::DetachResult { - const CLASS: &'static str = "mp/code/data/DetachResult"; - fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result, jni::errors::Error> { - let ordinal = match self { - crate::workspace::DetachResult::NotAttached => 0, - crate::workspace::DetachResult::Detaching => 1, - crate::workspace::DetachResult::AlreadyDetached => 2 - }; - - let class = env.find_class(Self::CLASS)?; - let variants: jni::objects::JObjectArray = env.call_method( - class, - "getEnumConstants", - "()[Ljava/lang/Object;", - &[] - )?.l()?.into(); - env.get_object_array_element(variants, ordinal) - } -} - impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { const CLASS: &'static str = "mp/code/data/TextChange"; fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result, jni::errors::Error> { diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 5732397..6fc38cd 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -51,7 +51,7 @@ fn attach_to_buffer(workspace: &mut Workspace, path: String) -> Result crate::workspace::DetachResult { +fn detach_from_buffer(workspace: &mut Workspace, path: String) -> bool { workspace.detach(&path) } diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index b93f3c7..a1a3727 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -27,7 +27,7 @@ impl BufferController { } #[napi(js_name = "clear_callback")] - pub fn js_clear_callback(&self) -> () { + pub fn js_clear_callback(&self) { self.clear_callback(); } diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index acf3d04..d2733c1 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -18,8 +18,6 @@ impl LuaUserData for CodempBufferController { methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("stop", |_, this, ()| Ok(this.stop())); - methods.add_method("content", |_, this, ()| a_sync! { this => this.content().await? }); methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 46d4e29..98eca44 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -19,8 +19,6 @@ impl LuaUserData for CodempCursorController { methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("stop", |_, this, ()| Ok(this.stop())); - methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller)); diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index caed60b..7868d0f 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -1,7 +1,6 @@ use mlua_codemp_patch as mlua; use mlua::prelude::*; use crate::prelude::*; -use crate::workspace::DetachResult; use super::ext::a_sync::a_sync; use super::ext::from_lua_serde; @@ -18,7 +17,7 @@ impl LuaUserData for CodempWorkspace { ); methods.add_method("detach", |_, this, (name,):(String,)| - Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached)) + Ok(this.detach(&name)) ); methods.add_method("delete", |_, this, (name,):(String,)| diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 1bc4e63..b1f8c21 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -67,11 +67,6 @@ impl CursorController { fn pyclear_callback(&self) { self.clear_callback(); } - - #[pyo3(name = "stop")] - fn pystop(&self) -> bool { - self.stop() - } } // need to do manually since Controller is a trait implementation @@ -137,11 +132,6 @@ impl BufferController { fn pyclear_callback(&self) { self.clear_callback(); } - - #[pyo3(name = "stop")] - fn pystop(&self) -> bool { - self.stop() - } } // We have to write this manually since diff --git a/src/ffi/python/mod.rs b/src/ffi/python/mod.rs index 0956b8b..f703051 100644 --- a/src/ffi/python/mod.rs +++ b/src/ffi/python/mod.rs @@ -172,9 +172,9 @@ impl Config { kwds: Option>, ) -> PyResult { if let Some(kwgs) = kwds { - let host = kwgs.get_item("host")?.map(|e| e.extract().ok()).flatten(); - let port = kwgs.get_item("port")?.map(|e| e.extract().ok()).flatten(); - let tls = kwgs.get_item("tls")?.map(|e| e.extract().ok()).flatten(); + let host = kwgs.get_item("host")?.and_then(|e| e.extract().ok()); + let port = kwgs.get_item("port")?.and_then(|e| e.extract().ok()); + let tls = kwgs.get_item("tls")?.and_then(|e| e.extract().ok()); Ok(Config { username, diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 79574b8..4615be5 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -23,11 +23,7 @@ impl Workspace { #[pyo3(name = "detach")] fn pydetach(&self, path: String) -> bool { - match self.detach(path.as_str()) { - crate::workspace::DetachResult::NotAttached => false, - crate::workspace::DetachResult::Detaching => true, - crate::workspace::DetachResult::AlreadyDetached => true, - } + self.detach(path.as_str()) } #[pyo3(name = "event")] diff --git a/src/workspace.rs b/src/workspace.rs index d278ccd..c4c1d9e 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,9 +4,8 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{controller::ControllerWorker, Controller, Event, User}, - buffer::{self, worker::BufferWorker}, - cursor::{self, worker::CursorWorker}, + api::{Event, User}, + buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, network::Services, @@ -49,9 +48,11 @@ struct WorkspaceInner { user: User, // TODO back-reference to global user id... needed for buffer controllers cursor: cursor::Controller, buffers: DashMap, + services: Services, + // TODO these two are Arced so that the inner worker can hold them without holding the + // WorkspaceInner itself, otherwise its impossible to drop Workspace filetree: DashSet, users: Arc>, - services: Services, // TODO can we drop the mutex? events: tokio::sync::Mutex>, } @@ -79,13 +80,7 @@ impl Workspace { let users = Arc::new(DashMap::default()); - let worker = CursorWorker::new(users.clone()); - let controller = worker.controller(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, cur_stream).await; - tracing::debug!("controller worker stopped"); - }); + let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream); let ws = Self(Arc::new(WorkspaceInner { name, @@ -141,14 +136,7 @@ impl Workspace { ); let stream = self.0.services.buf().attach(req).await?.into_inner(); - let worker = BufferWorker::new(self.0.user.id, path); - let controller = worker.controller(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, stream).await; - tracing::debug!("controller worker stopped"); - }); - + let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream); self.0.buffers.insert(path.to_string(), controller.clone()); Ok(controller) @@ -156,18 +144,19 @@ impl Workspace { /// Detach from an active buffer. /// - /// This option will be carried in background. BufferWorker will be stopped and dropped. - /// There may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be - /// accessible anymore from [`Workspace`]. - pub fn detach(&self, path: &str) -> DetachResult { + /// This will stop and drop its [`buffer::Controller`]. + /// + /// Returns `true` if connectly dropped or wasn't present, `false` if dropped but wasn't last ref + /// + /// If this method returns `false` you have a dangling ref, maybe just waiting for garbage + /// collection or maybe preventing the controller from being dropped completely + #[allow(clippy::redundant_pattern_matching)] // all cases are clearer this way + pub fn detach(&self, path: &str) -> bool { match self.0.buffers.remove(path) { - None => DetachResult::NotAttached, - Some((_name, controller)) => { - if controller.stop() { - DetachResult::Detaching - } else { - DetachResult::AlreadyDetached - } + None => true, // noop: we werent attached in the first place + Some((_name, controller)) => match Arc::into_inner(controller.0) { + None => false, // dangling ref! we can't drop this + Some(_) => true, // dropping it now } } } @@ -241,6 +230,8 @@ impl Workspace { /// Delete a buffer. pub async fn delete(&self, path: &str) -> RemoteResult<()> { + self.detach(path); // just in case + let mut workspace_client = self.0.services.ws(); workspace_client .delete_buffer(tonic::Request::new(BufferNode { @@ -248,9 +239,6 @@ impl Workspace { })) .await?; - if let Some((_name, controller)) = self.0.buffers.remove(path) { - controller.stop(); - } self.0.filetree.remove(path); @@ -320,17 +308,24 @@ impl Workspace { tx: mpsc::UnboundedSender, ) { // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? - let inner = self.0.clone(); + let weak = Arc::downgrade(&self.0); let name = self.id(); tokio::spawn(async move { loop { - match stream.message().await { + // TODO can we stop responsively rather than poll for Arc being dropped? + if weak.upgrade().is_none() { break }; + let Some(res) = tokio::select!( + x = stream.message() => Some(x), + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None, + ) else { continue }; + match res { Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Ok(None) => break tracing::info!("leaving workspace {}", name), Ok(Some(WorkspaceEvent { event: None })) => { tracing::warn!("workspace {} received empty event", name) } Ok(Some(WorkspaceEvent { event: Some(ev) })) => { + let Some(inner) = weak.upgrade() else { break }; let update = crate::api::Event::from(&ev); match ev { // user @@ -350,9 +345,7 @@ impl Workspace { } WorkspaceEventInner::Delete(FileDelete { path }) => { inner.filetree.remove(&path); - if let Some((_name, controller)) = inner.buffers.remove(&path) { - controller.stop(); - } + let _ = inner.buffers.remove(&path); } } if tx.send(update).is_err() { @@ -364,28 +357,3 @@ impl Workspace { }); } } - -impl Drop for WorkspaceInner { - fn drop(&mut self) { - for entry in self.buffers.iter() { - if !entry.value().stop() { - tracing::warn!( - "could not stop buffer worker {} for workspace {}", - entry.value().path(), - self.name - ); - } - } - if !self.cursor.stop() { - tracing::warn!("could not stop cursor worker for workspace {}", self.name); - } - } -} - -#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(eq, eq_int))] -#[cfg_attr(any(feature = "py", feature = "py-noabi"), derive(PartialEq))] -pub enum DetachResult { - NotAttached, - Detaching, - AlreadyDetached, -}