Merge pull request #26 from hexedtech/fix/lifetimes

fix: lifetimes
This commit is contained in:
zaaarf 2024-09-26 02:46:38 +02:00 committed by GitHub
commit 39f88587e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 136 additions and 276 deletions

View file

@ -5,7 +5,7 @@
[![Crates.io Version](https://img.shields.io/crates/v/codemp)](https://crates.io/crates/codemp) [![Crates.io Version](https://img.shields.io/crates/v/codemp)](https://crates.io/crates/codemp)
[![Gitter Chat](https://img.shields.io/gitter/room/hexedtech/codemp)](https://gitter.im/hexedtech/codemp) [![Gitter Chat](https://img.shields.io/gitter/room/hexedtech/codemp)](https://gitter.im/hexedtech/codemp)
[![GitHub last commit](https://img.shields.io/github/last-commit/hexedtech/codemp)](https://github.com/hexedtech/codemp/commits/dev/) [![GitHub last commit](https://img.shields.io/github/last-commit/hexedtech/codemp)](https://github.com/hexedtech/codemp/commits/dev/)
[![GitHub commits since tagged version](https://img.shields.io/github/commits-since/hexedtech/codemp/v0.7.0)](https://github.com/hexedtech/codemp/releases/tag/v0.7.0) [![GitHub commits since tagged version](https://img.shields.io/github/commits-since/hexedtech/codemp/v0.7.1)](https://github.com/hexedtech/codemp/releases/tag/v0.7.1)
> `codemp` is a **collaborative** text editing solution to work remotely. > `codemp` is a **collaborative** text editing solution to work remotely.

View file

@ -106,16 +106,6 @@ public final class BufferController {
poll(this.ptr); poll(this.ptr);
} }
private static native boolean stop(long self);
/**
* Stops the controller. Any further calls to it will fail.
* @return true if it was stopped successfully
*/
public boolean stop() {
return stop(this.ptr);
}
private static native void free(long self); private static native void free(long self);
static { static {

View file

@ -83,16 +83,6 @@ public final class CursorController {
poll(this.ptr); poll(this.ptr);
} }
private static native boolean stop(long self);
/**
* Stops the controller. Any further calls to it will fail.
* @return true if it was stopped successfully
*/
public boolean stop() {
return stop(this.ptr);
}
private static native void free(long self); private static native void free(long self);
static { static {

View file

@ -113,14 +113,14 @@ public final class Workspace {
return attach_to_buffer(ptr, path); return attach_to_buffer(ptr, path);
} }
private static native DetachResult detach_from_buffer(long self, String path); private static native boolean detach_from_buffer(long self, String path);
/** /**
* Detaches from a given buffer. * Detaches from a given buffer.
* @param path the path of the buffer to detach from * @param path the path of the buffer to detach from
* @return a {@link DetachResult} representing the outcome of the operation * @return a boolean, true only if there are still dangling references preventing controller from stopping
*/ */
public DetachResult detachFromBuffer(String path) { public boolean detachFromBuffer(String path) {
return detach_from_buffer(this.ptr, path); return detach_from_buffer(this.ptr, path);
} }

View file

@ -223,7 +223,7 @@ function Workspace:attach(path) end
---@param path string relative path ("name") of buffer to detach from ---@param path string relative path ("name") of buffer to detach from
---@return boolean success ---@return boolean success
---detach from an active buffer, closing all streams. returns false if buffer was no longer active ---detach from an active buffer, closing all streams. returns false if there are still dangling references
function Workspace:detach(path) end function Workspace:detach(path) end
---@param filter? string apply a filter to the return elements ---@param filter? string apply a filter to the return elements
@ -232,6 +232,10 @@ function Workspace:detach(path) end
---return the list of available buffers in this workspace, as relative paths from workspace root ---return the list of available buffers in this workspace, as relative paths from workspace root
function Workspace:filetree(filter, strict) end function Workspace:filetree(filter, strict) end
---@return string[]
---return all names of users currently in this workspace
function Workspace:user_list() end
---@return NilPromise ---@return NilPromise
---@async ---@async
---@nodiscard ---@nodiscard
@ -297,10 +301,6 @@ function BufferController:recv() end
---block until next text change without returning it ---block until next text change without returning it
function BufferController:poll() end function BufferController:poll() end
---@return boolean success
---stop buffer worker and disconnect, returns false if was already stopped
function BufferController:stop() end
---clears any previously registered buffer callback ---clears any previously registered buffer callback
function BufferController:clear_callback() end function BufferController:clear_callback() end
@ -354,10 +354,6 @@ function CursorController:recv() end
---block until next cursor event without returning it ---block until next cursor event without returning it
function CursorController:poll() end function CursorController:poll() end
---@return boolean success
---stop cursor worker and disconnect, returns false if was already stopped
function CursorController:stop() end
---clears any previously registered cursor callback ---clears any previously registered cursor callback
function CursorController:clear_callback() end function CursorController:clear_callback() end

View file

@ -102,7 +102,6 @@ class BufferController:
def callback(self, def callback(self,
cb: Callable[[BufferController], None]) -> None: ... cb: Callable[[BufferController], None]) -> None: ...
def clear_callback(self) -> None: ... def clear_callback(self) -> None: ...
def stop(self) -> bool: ...
@ -131,5 +130,4 @@ class CursorController:
def callback(self, def callback(self,
cb: Callable[[CursorController], None]) -> None: ... cb: Callable[[CursorController], None]) -> None: ...
def clear_callback(self) -> None: ... def clear_callback(self) -> None: ...
def stop(self) -> bool: ...

View file

@ -5,21 +5,13 @@
use crate::errors::ControllerResult; use crate::errors::ControllerResult;
pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
type Controller : Controller<T>;
type Tx;
type Rx;
fn controller(&self) -> Self::Controller;
async fn work(self, tx: Self::Tx, rx: Self::Rx);
}
// note that we don't use thiserror's #[from] because we don't want the error structs to contain // note that we don't use thiserror's #[from] because we don't want the error structs to contain
// these foreign types, and also we want these to be easily constructable // these foreign types, and also we want these to be easily constructable
/// Asynchronous and thread-safe handle to a generic bidirectional stream. /// Asynchronous and thread-safe handle to a generic bidirectional stream.
/// ///
/// This generic trait is implemented by actors managing stream procedures. /// This generic trait is implemented by actors managing stream procedures, and will generally
/// imply a background worker.
/// ///
/// Events can be enqueued for dispatching without blocking with [`Controller::send`]. /// Events can be enqueued for dispatching without blocking with [`Controller::send`].
/// ///
@ -27,6 +19,9 @@ pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, /// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively,
/// [`Controller::poll`] combined with [`Controller::try_recv`]. /// [`Controller::poll`] combined with [`Controller::try_recv`].
/// ///
/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
/// been dropped.
///
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
@ -57,15 +52,6 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// Attempt to receive a value, return None if nothing is currently available. /// Attempt to receive a value, return None if nothing is currently available.
async fn try_recv(&self) -> ControllerResult<Option<T>>; async fn try_recv(&self) -> ControllerResult<Option<T>>;
/// Stop underlying worker.
///
/// After this is called, nothing can be received or sent anymore; however, existing
/// controllers will still be accessible until all handles are dropped.
///
/// Returns true if the stop signal was successfully sent, false if channel was
/// closed (probably because worker had already been stopped).
fn stop(&self) -> bool;
} }

View file

@ -47,7 +47,6 @@ pub(crate) struct BufferControllerInner {
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>, pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>, pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>,
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<DeltaRequest>, pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
@ -101,8 +100,4 @@ impl Controller<TextChange> for BufferController {
tracing::warn!("no active buffer worker to clear callback"); tracing::warn!("no active buffer worker to clear callback");
} }
} }
fn stop(&self) -> bool {
self.0.stopper.send(()).is_ok()
}
} }

View file

@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
use crate::api::controller::{ControllerCallback, ControllerWorker}; use crate::api::controller::ControllerCallback;
use crate::api::TextChange; use crate::api::TextChange;
use crate::ext::{IgnorableError, InternallyMutable}; use crate::ext::{IgnorableError, InternallyMutable};
@ -16,21 +16,21 @@ use super::controller::{BufferController, BufferControllerInner};
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>); pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>); pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
pub(crate) struct BufferWorker { struct BufferWorker {
user_id: Uuid, user_id: Uuid,
path: String,
latest_version: watch::Sender<diamond_types::LocalVersion>, latest_version: watch::Sender<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>, ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>, content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
delta_req: mpsc::Receiver<DeltaRequest>, delta_req: mpsc::Receiver<DeltaRequest>,
stop: mpsc::UnboundedReceiver<()>, controller: std::sync::Weak<BufferControllerInner>,
controller: BufferController,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>, callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
} }
impl BufferWorker { impl BufferController {
pub fn new(user_id: Uuid, path: &str) -> Self { pub(crate) fn spawn(user_id: Uuid, path: &str, tx: mpsc::Sender<Operation>, rx: Streaming<BufferEvent>) -> Self {
let init = diamond_types::LocalVersion::default(); let init = diamond_types::LocalVersion::default();
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
@ -42,67 +42,63 @@ impl BufferWorker {
let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let (poller_tx, poller_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel(); let controller = Arc::new(BufferControllerInner {
let controller = BufferControllerInner {
name: path.to_string(), name: path.to_string(),
latest_version: latest_version_rx, latest_version: latest_version_rx,
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
ops_in: opin_tx, ops_in: opin_tx,
poller: poller_tx, poller: poller_tx,
stopper: end_tx,
content_request: req_tx, content_request: req_tx,
delta_request: recv_tx, delta_request: recv_tx,
callback: cb_tx, callback: cb_tx,
}; });
BufferWorker { let weak = Arc::downgrade(&controller);
let worker = BufferWorker {
user_id, user_id,
path: path.to_string(),
latest_version: latest_version_tx, latest_version: latest_version_tx,
ops_in: opin_rx, ops_in: opin_rx,
poller: poller_rx, poller: poller_rx,
pollers: Vec::new(), pollers: Vec::new(),
stop: end_rx, controller: weak,
controller: BufferController(Arc::new(controller)),
content_checkout: req_rx, content_checkout: req_rx,
delta_req: recv_rx, delta_req: recv_rx,
callback: cb_rx, callback: cb_rx,
} };
}
}
impl ControllerWorker<TextChange> for BufferWorker { tokio::spawn(async move {
type Controller = BufferController; BufferController::work(worker, tx, rx).await
type Tx = mpsc::Sender<Operation>; });
type Rx = Streaming<BufferEvent>;
fn controller(&self) -> BufferController { BufferController(controller)
self.controller.clone()
} }
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut worker: BufferWorker, tx: mpsc::Sender<Operation>, mut rx: Streaming<BufferEvent>) {
let mut branch = diamond_types::list::Branch::new(); let mut branch = diamond_types::list::Branch::new();
let mut oplog = diamond_types::list::OpLog::new(); let mut oplog = diamond_types::list::OpLog::new();
let mut timer = Timer::new(10); // TODO configurable!! let mut timer = Timer::new(10); // TODO configurable!!
tracing::debug!("controller worker started");
loop { loop {
if worker.controller.upgrade().is_none() { break };
// block until one of these is ready // block until one of these is ready
tokio::select! { tokio::select! {
biased; biased;
// received stop signal
_ = self.stop.recv() => break,
// received a new poller, add it to collection // received a new poller, add it to collection
res = self.poller.recv() => match res { res = worker.poller.recv() => match res {
None => break tracing::error!("poller channel closed"), None => break tracing::error!("poller channel closed"),
Some(tx) => self.pollers.push(tx), Some(tx) => worker.pollers.push(tx),
}, },
// received a text change from editor // received a text change from editor
res = self.ops_in.recv() => match res { res = worker.ops_in.recv() => match res {
None => break tracing::debug!("stopping: editor closed channel"), None => break tracing::debug!("stopping: editor closed channel"),
Some((change, ack)) => { Some((change, ack)) => {
let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string()); let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string());
let last_ver = oplog.local_version(); let last_ver = oplog.local_version();
// clip to buffer extents // clip to buffer extents
let clip_end = std::cmp::min(branch.len(), change.end as usize); let clip_end = std::cmp::min(branch.len(), change.end as usize);
@ -120,7 +116,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
if change.is_delete() || change.is_insert() { if change.is_delete() || change.is_insert() {
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
.unwrap_or_warn("failed to send change!"); .unwrap_or_warn("failed to send change!");
self.latest_version.send(oplog.local_version()) worker.latest_version.send(oplog.local_version())
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
} }
ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack");
@ -129,18 +125,19 @@ impl ControllerWorker<TextChange> for BufferWorker {
// received a message from server: add to oplog and update latest version (+unlock pollers) // received a message from server: add to oplog and update latest version (+unlock pollers)
res = rx.message() => match res { res = rx.message() => match res {
Err(_e) => break, Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path),
Ok(None) => break, Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path),
Ok(Some(change)) => { Ok(Some(change)) => match worker.controller.upgrade() {
match oplog.decode_and_add(&change.op.data) { None => break, // clean exit actually, just weird we caught it here
Some(controller) => match oplog.decode_and_add(&change.op.data) {
Ok(local_version) => { Ok(local_version) => {
self.latest_version.send(local_version) worker.latest_version.send(local_version)
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
for tx in self.pollers.drain(..) { for tx in worker.pollers.drain(..) {
tx.send(()).unwrap_or_warn("could not wake up poller"); tx.send(()).unwrap_or_warn("could not wake up poller");
} }
if let Some(cb) = self.callback.borrow().as_ref() { if let Some(cb) = worker.callback.borrow().as_ref() {
cb.call(self.controller.clone()); // TODO should we run this on another task/thread? cb.call(BufferController(controller)); // TODO should we run this on another task/thread?
} }
}, },
Err(e) => tracing::error!("could not deserialize operation from server: {}", e), Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
@ -149,7 +146,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
}, },
// controller is ready to apply change and recv(), calculate it and send it back // controller is ready to apply change and recv(), calculate it and send it back
res = self.delta_req.recv() => match res { res = worker.delta_req.recv() => match res {
None => break tracing::error!("no more active controllers: can't send changes"), None => break tracing::error!("no more active controllers: can't send changes"),
Some((last_ver, tx)) => { Some((last_ver, tx)) => {
if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() {
@ -194,7 +191,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
}, },
// received a request for full CRDT content // received a request for full CRDT content
res = self.content_checkout.recv() => match res { res = worker.content_checkout.recv() => match res {
None => break tracing::error!("no more active controllers: can't update content"), None => break tracing::error!("no more active controllers: can't update content"),
Some(tx) => { Some(tx) => {
branch.merge(&oplog, oplog.local_version_ref()); branch.merge(&oplog, oplog.local_version_ref());
@ -204,6 +201,8 @@ impl ControllerWorker<TextChange> for BufferWorker {
} }
} }
} }
tracing::debug!("controller worker stopped");
} }
} }

View file

@ -28,7 +28,6 @@ pub(crate) struct CursorControllerInner {
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>, pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
pub(crate) stop: mpsc::UnboundedSender<()>,
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
@ -81,8 +80,4 @@ impl Controller<Cursor> for CursorController {
tracing::warn!("no active cursor worker to clear callback"); tracing::warn!("no active cursor worker to clear callback");
} }
} }
fn stop(&self) -> bool {
self.0.stop.send(()).is_ok()
}
} }

View file

@ -4,104 +4,98 @@ use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError}; use crate::{api::{controller::ControllerCallback, Cursor, User}, ext::IgnorableError};
use codemp_proto::cursor::{CursorPosition, CursorEvent}; use codemp_proto::cursor::{CursorPosition, CursorEvent};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
pub(crate) struct CursorWorker { struct CursorWorker {
op: mpsc::Receiver<CursorPosition>, op: mpsc::Receiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>, map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>, stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
store: std::collections::VecDeque<Cursor>, store: std::collections::VecDeque<Cursor>,
stop: mpsc::UnboundedReceiver<()>, controller: std::sync::Weak<CursorControllerInner>,
controller: CursorController,
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>, callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
} }
impl CursorWorker { impl CursorController {
pub fn new(user_map: Arc<dashmap::DashMap<Uuid, User>>) -> Self { pub(crate) fn spawn(user_map: Arc<dashmap::DashMap<Uuid, User>>, tx: mpsc::Sender<CursorPosition>, rx: Streaming<CursorEvent>) -> Self {
// TODO we should tweak the channel buffer size to better propagate backpressure // TODO we should tweak the channel buffer size to better propagate backpressure
let (op_tx, op_rx) = mpsc::channel(64); let (op_tx, op_rx) = mpsc::channel(64);
let (stream_tx, stream_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1);
let (end_tx, end_rx) = mpsc::unbounded_channel();
let (cb_tx, cb_rx) = watch::channel(None); let (cb_tx, cb_rx) = watch::channel(None);
let (poll_tx, poll_rx) = mpsc::unbounded_channel(); let (poll_tx, poll_rx) = mpsc::unbounded_channel();
let controller = CursorControllerInner { let controller = Arc::new(CursorControllerInner {
op: op_tx, op: op_tx,
stream: stream_tx, stream: stream_tx,
stop: end_tx,
callback: cb_tx, callback: cb_tx,
poll: poll_tx, poll: poll_tx,
}; });
Self {
let weak = Arc::downgrade(&controller);
let worker = CursorWorker {
op: op_rx, op: op_rx,
map: user_map, map: user_map,
stream: stream_rx, stream: stream_rx,
store: std::collections::VecDeque::default(), store: std::collections::VecDeque::default(),
stop: end_rx, controller: weak,
controller: CursorController(Arc::new(controller)),
callback: cb_rx, callback: cb_rx,
poll: poll_rx, poll: poll_rx,
pollers: Vec::new(), pollers: Vec::new(),
} };
}
}
impl ControllerWorker<Cursor> for CursorWorker { tokio::spawn(async move { CursorController::work(worker, tx, rx).await });
type Controller = CursorController;
type Tx = mpsc::Sender<CursorPosition>;
type Rx = Streaming<CursorEvent>;
fn controller(&self) -> CursorController { CursorController(controller)
self.controller.clone()
} }
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut worker: CursorWorker, tx: mpsc::Sender<CursorPosition>, mut rx: Streaming<CursorEvent>) {
loop { loop {
tracing::debug!("cursor worker polling"); tracing::debug!("cursor worker polling");
if worker.controller.upgrade().is_none() { break }; // clean exit: all controllers dropped
tokio::select!{ tokio::select!{
biased; biased;
// received stop signal
Some(()) = self.stop.recv() => { break; },
// new poller // new poller
Some(poller) = self.poll.recv() => self.pollers.push(poller), Some(poller) = worker.poll.recv() => worker.pollers.push(poller),
// client moved their cursor // client moved their cursor
Some(op) = self.op.recv() => { Some(op) = worker.op.recv() => {
tracing::debug!("received cursor from editor"); tracing::debug!("received cursor from editor");
tx.send(op).await.unwrap_or_warn("could not update cursor"); tx.send(op).await.unwrap_or_warn("could not update cursor");
}, },
// server sents us a cursor // server sents us a cursor
Ok(Some(cur)) = rx.message() => { Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() {
tracing::debug!("received cursor from server"); None => break, // clean exit, just weird that we got it here
let mut cursor = Cursor { Some(controller) => {
buffer: cur.position.buffer.path, tracing::debug!("received cursor from server");
start: (cur.position.start.row, cur.position.start.col), let mut cursor = Cursor {
end: (cur.position.end.row, cur.position.end.col), buffer: cur.position.buffer.path,
user: None, start: (cur.position.start.row, cur.position.start.col),
}; end: (cur.position.end.row, cur.position.end.col),
let user_id = Uuid::from(cur.user); user: None,
if let Some(user) = self.map.get(&user_id) { };
cursor.user = Some(user.name.clone()); let user_id = Uuid::from(cur.user);
} if let Some(user) = worker.map.get(&user_id) {
self.store.push_back(cursor); cursor.user = Some(user.name.clone());
for tx in self.pollers.drain(..) { }
tx.send(()).unwrap_or_warn("poller dropped before unblocking"); worker.store.push_back(cursor);
} for tx in worker.pollers.drain(..) {
if let Some(cb) = self.callback.borrow().as_ref() { tx.send(()).unwrap_or_warn("poller dropped before unblocking");
tracing::debug!("running cursor callback"); }
cb.call(self.controller.clone()); // TODO should this run in its own task/thread? if let Some(cb) = worker.callback.borrow().as_ref() {
} tracing::debug!("running cursor callback");
cb.call(CursorController(controller)); // TODO should this run in its own task/thread?
}
},
}, },
// client wants to get next cursor event // client wants to get next cursor event
Some(tx) = self.stream.recv() => tx.send(self.store.pop_front()) Some(tx) = worker.stream.recv() => tx.send(worker.store.pop_front())
.unwrap_or_warn("client gave up receiving"), .unwrap_or_warn("client gave up receiving"),
else => break, else => break,

View file

@ -80,12 +80,6 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro
super::tokio().block_on(controller.poll()) super::tokio().block_on(controller.poll())
} }
/// Stop the controller.
#[jni(package = "mp.code", class = "BufferController")]
fn stop(controller: &mut crate::buffer::Controller) -> bool {
controller.stop()
}
/// Called by the Java GC to drop a [crate::buffer::Controller]. /// Called by the Java GC to drop a [crate::buffer::Controller].
#[jni(package = "mp.code", class = "BufferController")] #[jni(package = "mp.code", class = "BufferController")]
fn free(input: jni::sys::jlong) { fn free(input: jni::sys::jlong) {

View file

@ -67,12 +67,6 @@ fn poll(controller: &mut crate::cursor::Controller) -> Result<(), ControllerErro
super::tokio().block_on(controller.poll()) super::tokio().block_on(controller.poll())
} }
/// Stop the controller.
#[jni(package = "mp.code", class = "CursorController")]
fn stop(controller: &mut crate::cursor::Controller) -> bool {
controller.stop()
}
/// Called by the Java GC to drop a [crate::cursor::Controller]. /// Called by the Java GC to drop a [crate::cursor::Controller].
#[jni(package = "mp.code", class = "CursorController")] #[jni(package = "mp.code", class = "CursorController")]
fn free(input: jni::sys::jlong) { fn free(input: jni::sys::jlong) {

View file

@ -170,26 +170,6 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
} }
} }
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::workspace::DetachResult {
const CLASS: &'static str = "mp/code/data/DetachResult";
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let ordinal = match self {
crate::workspace::DetachResult::NotAttached => 0,
crate::workspace::DetachResult::Detaching => 1,
crate::workspace::DetachResult::AlreadyDetached => 2
};
let class = env.find_class(Self::CLASS)?;
let variants: jni::objects::JObjectArray = env.call_method(
class,
"getEnumConstants",
"()[Ljava/lang/Object;",
&[]
)?.l()?.into();
env.get_object_array_element(variants, ordinal)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
const CLASS: &'static str = "mp/code/data/TextChange"; const CLASS: &'static str = "mp/code/data/TextChange";
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> { fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {

View file

@ -51,7 +51,7 @@ fn attach_to_buffer(workspace: &mut Workspace, path: String) -> Result<crate::bu
/// Detach from a buffer. /// Detach from a buffer.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
fn detach_from_buffer(workspace: &mut Workspace, path: String) -> crate::workspace::DetachResult { fn detach_from_buffer(workspace: &mut Workspace, path: String) -> bool {
workspace.detach(&path) workspace.detach(&path)
} }

View file

@ -27,7 +27,7 @@ impl BufferController {
} }
#[napi(js_name = "clear_callback")] #[napi(js_name = "clear_callback")]
pub fn js_clear_callback(&self) -> () { pub fn js_clear_callback(&self) {
self.clear_callback(); self.clear_callback();
} }

View file

@ -18,8 +18,6 @@ impl LuaUserData for CodempBufferController {
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("content", |_, this, ()| a_sync! { this => this.content().await? }); methods.add_method("content", |_, this, ()| a_sync! { this => this.content().await? });
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });

View file

@ -19,8 +19,6 @@ impl LuaUserData for CodempCursorController {
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller)); this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller));

View file

@ -1,7 +1,6 @@
use mlua_codemp_patch as mlua; use mlua_codemp_patch as mlua;
use mlua::prelude::*; use mlua::prelude::*;
use crate::prelude::*; use crate::prelude::*;
use crate::workspace::DetachResult;
use super::ext::a_sync::a_sync; use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde; use super::ext::from_lua_serde;
@ -18,7 +17,7 @@ impl LuaUserData for CodempWorkspace {
); );
methods.add_method("detach", |_, this, (name,):(String,)| methods.add_method("detach", |_, this, (name,):(String,)|
Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached)) Ok(this.detach(&name))
); );
methods.add_method("delete", |_, this, (name,):(String,)| methods.add_method("delete", |_, this, (name,):(String,)|

View file

@ -67,11 +67,6 @@ impl CursorController {
fn pyclear_callback(&self) { fn pyclear_callback(&self) {
self.clear_callback(); self.clear_callback();
} }
#[pyo3(name = "stop")]
fn pystop(&self) -> bool {
self.stop()
}
} }
// need to do manually since Controller is a trait implementation // need to do manually since Controller is a trait implementation
@ -137,11 +132,6 @@ impl BufferController {
fn pyclear_callback(&self) { fn pyclear_callback(&self) {
self.clear_callback(); self.clear_callback();
} }
#[pyo3(name = "stop")]
fn pystop(&self) -> bool {
self.stop()
}
} }
// We have to write this manually since // We have to write this manually since

View file

@ -172,9 +172,9 @@ impl Config {
kwds: Option<Bound<'_, PyDict>>, kwds: Option<Bound<'_, PyDict>>,
) -> PyResult<Self> { ) -> PyResult<Self> {
if let Some(kwgs) = kwds { if let Some(kwgs) = kwds {
let host = kwgs.get_item("host")?.map(|e| e.extract().ok()).flatten(); let host = kwgs.get_item("host")?.and_then(|e| e.extract().ok());
let port = kwgs.get_item("port")?.map(|e| e.extract().ok()).flatten(); let port = kwgs.get_item("port")?.and_then(|e| e.extract().ok());
let tls = kwgs.get_item("tls")?.map(|e| e.extract().ok()).flatten(); let tls = kwgs.get_item("tls")?.and_then(|e| e.extract().ok());
Ok(Config { Ok(Config {
username, username,

View file

@ -23,11 +23,7 @@ impl Workspace {
#[pyo3(name = "detach")] #[pyo3(name = "detach")]
fn pydetach(&self, path: String) -> bool { fn pydetach(&self, path: String) -> bool {
match self.detach(path.as_str()) { self.detach(path.as_str())
crate::workspace::DetachResult::NotAttached => false,
crate::workspace::DetachResult::Detaching => true,
crate::workspace::DetachResult::AlreadyDetached => true,
}
} }
#[pyo3(name = "event")] #[pyo3(name = "event")]

View file

@ -4,9 +4,8 @@
//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems.
use crate::{ use crate::{
api::{controller::ControllerWorker, Controller, Event, User}, api::{Event, User},
buffer::{self, worker::BufferWorker}, buffer, cursor,
cursor::{self, worker::CursorWorker},
errors::{ConnectionResult, ControllerResult, RemoteResult}, errors::{ConnectionResult, ControllerResult, RemoteResult},
ext::InternallyMutable, ext::InternallyMutable,
network::Services, network::Services,
@ -49,9 +48,11 @@ struct WorkspaceInner {
user: User, // TODO back-reference to global user id... needed for buffer controllers user: User, // TODO back-reference to global user id... needed for buffer controllers
cursor: cursor::Controller, cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>, buffers: DashMap<String, buffer::Controller>,
services: Services,
// TODO these two are Arced so that the inner worker can hold them without holding the
// WorkspaceInner itself, otherwise its impossible to drop Workspace
filetree: DashSet<String>, filetree: DashSet<String>,
users: Arc<DashMap<Uuid, User>>, users: Arc<DashMap<Uuid, User>>,
services: Services,
// TODO can we drop the mutex? // TODO can we drop the mutex?
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>, events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
} }
@ -79,13 +80,7 @@ impl Workspace {
let users = Arc::new(DashMap::default()); let users = Arc::new(DashMap::default());
let worker = CursorWorker::new(users.clone()); let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream);
let controller = worker.controller();
tokio::spawn(async move {
tracing::debug!("controller worker started");
worker.work(tx, cur_stream).await;
tracing::debug!("controller worker stopped");
});
let ws = Self(Arc::new(WorkspaceInner { let ws = Self(Arc::new(WorkspaceInner {
name, name,
@ -141,14 +136,7 @@ impl Workspace {
); );
let stream = self.0.services.buf().attach(req).await?.into_inner(); let stream = self.0.services.buf().attach(req).await?.into_inner();
let worker = BufferWorker::new(self.0.user.id, path); let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream);
let controller = worker.controller();
tokio::spawn(async move {
tracing::debug!("controller worker started");
worker.work(tx, stream).await;
tracing::debug!("controller worker stopped");
});
self.0.buffers.insert(path.to_string(), controller.clone()); self.0.buffers.insert(path.to_string(), controller.clone());
Ok(controller) Ok(controller)
@ -156,18 +144,19 @@ impl Workspace {
/// Detach from an active buffer. /// Detach from an active buffer.
/// ///
/// This option will be carried in background. BufferWorker will be stopped and dropped. /// This will stop and drop its [`buffer::Controller`].
/// There may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be ///
/// accessible anymore from [`Workspace`]. /// Returns `true` if connectly dropped or wasn't present, `false` if dropped but wasn't last ref
pub fn detach(&self, path: &str) -> DetachResult { ///
/// If this method returns `false` you have a dangling ref, maybe just waiting for garbage
/// collection or maybe preventing the controller from being dropped completely
#[allow(clippy::redundant_pattern_matching)] // all cases are clearer this way
pub fn detach(&self, path: &str) -> bool {
match self.0.buffers.remove(path) { match self.0.buffers.remove(path) {
None => DetachResult::NotAttached, None => true, // noop: we werent attached in the first place
Some((_name, controller)) => { Some((_name, controller)) => match Arc::into_inner(controller.0) {
if controller.stop() { None => false, // dangling ref! we can't drop this
DetachResult::Detaching Some(_) => true, // dropping it now
} else {
DetachResult::AlreadyDetached
}
} }
} }
} }
@ -241,6 +230,8 @@ impl Workspace {
/// Delete a buffer. /// Delete a buffer.
pub async fn delete(&self, path: &str) -> RemoteResult<()> { pub async fn delete(&self, path: &str) -> RemoteResult<()> {
self.detach(path); // just in case
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
workspace_client workspace_client
.delete_buffer(tonic::Request::new(BufferNode { .delete_buffer(tonic::Request::new(BufferNode {
@ -248,9 +239,6 @@ impl Workspace {
})) }))
.await?; .await?;
if let Some((_name, controller)) = self.0.buffers.remove(path) {
controller.stop();
}
self.0.filetree.remove(path); self.0.filetree.remove(path);
@ -320,17 +308,24 @@ impl Workspace {
tx: mpsc::UnboundedSender<crate::api::Event>, tx: mpsc::UnboundedSender<crate::api::Event>,
) { ) {
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
let inner = self.0.clone(); let weak = Arc::downgrade(&self.0);
let name = self.id(); let name = self.id();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match stream.message().await { // TODO can we stop responsively rather than poll for Arc being dropped?
if weak.upgrade().is_none() { break };
let Some(res) = tokio::select!(
x = stream.message() => Some(x),
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None,
) else { continue };
match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name), Ok(None) => break tracing::info!("leaving workspace {}", name),
Ok(Some(WorkspaceEvent { event: None })) => { Ok(Some(WorkspaceEvent { event: None })) => {
tracing::warn!("workspace {} received empty event", name) tracing::warn!("workspace {} received empty event", name)
} }
Ok(Some(WorkspaceEvent { event: Some(ev) })) => { Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
let Some(inner) = weak.upgrade() else { break };
let update = crate::api::Event::from(&ev); let update = crate::api::Event::from(&ev);
match ev { match ev {
// user // user
@ -350,9 +345,7 @@ impl Workspace {
} }
WorkspaceEventInner::Delete(FileDelete { path }) => { WorkspaceEventInner::Delete(FileDelete { path }) => {
inner.filetree.remove(&path); inner.filetree.remove(&path);
if let Some((_name, controller)) = inner.buffers.remove(&path) { let _ = inner.buffers.remove(&path);
controller.stop();
}
} }
} }
if tx.send(update).is_err() { if tx.send(update).is_err() {
@ -364,28 +357,3 @@ impl Workspace {
}); });
} }
} }
impl Drop for WorkspaceInner {
fn drop(&mut self) {
for entry in self.buffers.iter() {
if !entry.value().stop() {
tracing::warn!(
"could not stop buffer worker {} for workspace {}",
entry.value().path(),
self.name
);
}
}
if !self.cursor.stop() {
tracing::warn!("could not stop cursor worker for workspace {}", self.name);
}
}
}
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(eq, eq_int))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), derive(PartialEq))]
pub enum DetachResult {
NotAttached,
Detaching,
AlreadyDetached,
}