From 1b16d4af59594f4ced87506d4345c7a2da69c758 Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 28 Sep 2024 03:33:13 +0200 Subject: [PATCH 1/8] chore: split controller trait in 2 sides Co-authored-by: zaaarf --- src/api/controller.rs | 29 +++++++++++++++++++++++------ src/buffer/controller.rs | 26 ++++++++++++++++---------- src/cursor/controller.rs | 10 ++++++++-- src/ext.rs | 2 +- 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index 7b54b81..691c672 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -8,16 +8,17 @@ use crate::errors::ControllerResult; // note that we don't use thiserror's #[from] because we don't want the error structs to contain // these foreign types, and also we want these to be easily constructable -/// Asynchronous and thread-safe handle to a generic bidirectional stream. +/// Asynchronous and thread-safe handle to a generic bidirectional stream. Exists as a combination +/// of [`AsyncSender`] and [`AsyncReceiver`]. /// /// This generic trait is implemented by actors managing stream procedures, and will generally /// imply a background worker. /// -/// Events can be enqueued for dispatching without blocking with [`Controller::send`]. +/// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`]. /// -/// For receiving events from the server, an asynchronous API with [`Controller::recv`] is -/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, -/// [`Controller::poll`] combined with [`Controller::try_recv`]. +/// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is +/// provided; if that is not feasible, consider using [`AsyncReceiver::callback`] or, alternatively, +/// [`AsyncReceiver::poll`] combined with [`AsyncReceiver::try_recv`]. /// /// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have /// been dropped. @@ -25,10 +26,26 @@ use crate::errors::ControllerResult; /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait Controller : Sized + Send + Sync { +pub trait Controller : AsyncSender + AsyncReceiver {} + +/// Asynchronous and thread-safe handle to send data over a stream. +/// See [`Controller`]'s documentation for details. +/// +/// Details about the receiving end are left to the implementor. +#[allow(async_fn_in_trait)] +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +pub trait AsyncSender : Sized + Send + Sync { /// Enqueue a new value to be sent to all other users. async fn send(&self, x: T) -> ControllerResult<()>; +} +/// Asynchronous and thread-safe handle to receive data from a stream. +/// See [`Controller`]'s documentation for details. +/// +/// Details about the sender are left to the implementor. +#[allow(async_fn_in_trait)] +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +pub trait AsyncReceiver : Sized + Send + Sync { /// Block until a value is available and returns it. async fn recv(&self) -> ControllerResult { loop { diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 73229be..bcf115e 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; -use crate::api::controller::{Controller, ControllerCallback}; +use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; use crate::ext::InternallyMutable; @@ -53,7 +53,21 @@ pub(crate) struct BufferControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController { +impl Controller for BufferController {} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncSender for BufferController { + async fn send(&self, op: TextChange) -> ControllerResult<()> { + // we let the worker do the updating to the last version and send it back. + let (tx, rx) = oneshot::channel(); + self.0.ops_in.send((op, tx))?; + self.0.last_update.set(rx.await?); + Ok(()) + } +} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncReceiver for BufferController { async fn poll(&self) -> ControllerResult<()> { if self.0.last_update.get() != *self.0.latest_version.borrow() { return Ok(()); @@ -80,14 +94,6 @@ impl Controller for BufferController { Ok(change) } - async fn send(&self, op: TextChange) -> ControllerResult<()> { - // we let the worker do the updating to the last version and send it back. - let (tx, rx) = oneshot::channel(); - self.0.ops_in.send((op, tx))?; - self.0.last_update.set(rx.await?); - Ok(()) - } - fn callback(&self, cb: impl Into>) { if self.0.callback.send(Some(cb.into())).is_err() { // TODO should we panic? we failed what we were supposed to do diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index a8dc862..e16c1d1 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; use crate::{ - api::{controller::ControllerCallback, Controller, Cursor}, + api::{controller::{AsyncReceiver, AsyncSender, ControllerCallback}, Controller, Cursor}, errors::ControllerResult, }; use codemp_proto::{ @@ -31,7 +31,10 @@ pub(crate) struct CursorControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for CursorController { +impl Controller for CursorController {} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncSender for CursorController { async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); @@ -54,7 +57,10 @@ impl Controller for CursorController { }) .await?) } +} +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncReceiver for CursorController { async fn try_recv(&self) -> ControllerResult> { let (tx, rx) = oneshot::channel(); self.0.stream.send(tx).await?; diff --git a/src/ext.rs b/src/ext.rs index 7daa3a5..7228793 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,7 +1,7 @@ //! ### Extensions //! Contains a number of utils used internally or that may be of general interest. -use crate::{api::Controller, errors::ControllerResult}; +use crate::{api::controller::AsyncReceiver, errors::ControllerResult}; use tokio::sync::mpsc; /// Poll all given buffer controllers and wait, returning the first one ready. From ddbad59ae2fa70982bd1f115c17aa1807b5cd2da Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 28 Sep 2024 03:56:57 +0200 Subject: [PATCH 2/8] feat: implemented AsyncReceiver for Workspace... ... its very bad tho, very very bad Co-authored-by: zaaarf --- src/workspace.rs | 50 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/src/workspace.rs b/src/workspace.rs index c4c1d9e..87d2f83 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,7 +4,7 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{Event, User}, + api::{controller::{AsyncReceiver, ControllerCallback}, Event, User}, buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, @@ -24,7 +24,7 @@ use codemp_proto::{ use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc::error::TryRecvError, mpsc}; use tonic::Streaming; use uuid::Uuid; @@ -55,6 +55,39 @@ struct WorkspaceInner { users: Arc>, // TODO can we drop the mutex? events: tokio::sync::Mutex>, + callback: std::sync::Mutex>>, // TODO lmao another one +} + +impl AsyncReceiver for Workspace { + async fn try_recv(&self) -> ControllerResult> { + match self.0 + .events + .lock() + .await + .try_recv() + { + Ok(x) => Ok(Some(x)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped), + } + } + + async fn poll(&self) -> ControllerResult<()> { + loop { + if !self.0.events.lock().await.is_empty() { break Ok(()) } + // TODO disgusting, please send help + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + + // TODO please send HELP ASAP this is hurting me emotionally + fn clear_callback(&self) { + *self.0.callback.lock().expect("mutex poisoned") = None; + } + + fn callback(&self, cb: impl Into>) { + *self.0.callback.lock().expect("mutex poisoned") = Some(cb.into()); + } } impl Workspace { @@ -91,6 +124,7 @@ impl Workspace { users, events: tokio::sync::Mutex::new(ev_rx), services, + callback: std::sync::Mutex::new(None), })); ws.fetch_users().await?; @@ -161,18 +195,6 @@ impl Workspace { } } - /// Await next workspace [Event] and return it when it arrives. - // TODO this method is weird and ugly, can we make it more standard? - pub async fn event(&self) -> ControllerResult { - self.0 - .events - .lock() - .await - .recv() - .await - .ok_or(crate::errors::ControllerError::Unfulfilled) - } - /// Re-fetch the list of available buffers in the workspace. pub async fn fetch_buffers(&self) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws(); From c0bc92e812d496971910f61b31c84fab679b9b5c Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 00:24:10 +0200 Subject: [PATCH 3/8] feat: split tx/rx generic in controller --- src/api/controller.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index 691c672..d2e81b6 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -26,7 +26,11 @@ use crate::errors::ControllerResult; /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait Controller : AsyncSender + AsyncReceiver {} +pub trait Controller : AsyncSender + AsyncReceiver +where + Tx: Sized + Sync + Send, + Rx: Sized + Sync + Send, +{} /// Asynchronous and thread-safe handle to send data over a stream. /// See [`Controller`]'s documentation for details. From ca04601bea6d5af26c72c28f36e1e1da5c771bca Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 00:59:11 +0200 Subject: [PATCH 4/8] chore: refactor worker move stuff out of tokio select so that RA doesnt choke --- src/buffer/worker.rs | 192 ++++++++++++++++++++++++------------------- 1 file changed, 106 insertions(+), 86 deletions(-) diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 9e6bfa7..e1f568a 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use diamond_types::list::{Branch, OpLog}; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; @@ -27,6 +28,9 @@ struct BufferWorker { delta_req: mpsc::Receiver, controller: std::sync::Weak, callback: watch::Receiver>>, + oplog: OpLog, + branch: Branch, + timer: Timer, } impl BufferController { @@ -66,6 +70,9 @@ impl BufferController { content_checkout: req_rx, delta_req: recv_rx, callback: cb_rx, + oplog: OpLog::new(), + branch: Branch::new(), + timer: Timer::new(10), // TODO configurable! }; tokio::spawn(async move { @@ -76,11 +83,7 @@ impl BufferController { } async fn work(mut worker: BufferWorker, tx: mpsc::Sender, mut rx: Streaming) { - let mut branch = diamond_types::list::Branch::new(); - let mut oplog = diamond_types::list::OpLog::new(); - let mut timer = Timer::new(10); // TODO configurable!! tracing::debug!("controller worker started"); - loop { if worker.controller.upgrade().is_none() { break }; @@ -97,105 +100,28 @@ impl BufferController { // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some((change, ack)) => { - let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string()); - let last_ver = oplog.local_version(); - // clip to buffer extents - let clip_end = std::cmp::min(branch.len(), change.end as usize); - let clip_start = std::cmp::max(0, change.start as usize); - - // in case we have a "replace" span - if change.is_delete() { - branch.delete_without_content(&mut oplog, agent_id, clip_start..clip_end); - } - - if change.is_insert() { - branch.insert(&mut oplog, agent_id, clip_start, &change.content); - } - - if change.is_delete() || change.is_insert() { - tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await - .unwrap_or_warn("failed to send change!"); - worker.latest_version.send(oplog.local_version()) - .unwrap_or_warn("failed to update latest version!"); - } - ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); - }, + Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) res = rx.message() => match res { Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path), Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path), - Ok(Some(change)) => match worker.controller.upgrade() { - None => break, // clean exit actually, just weird we caught it here - Some(controller) => match oplog.decode_and_add(&change.op.data) { - Ok(local_version) => { - worker.latest_version.send(local_version) - .unwrap_or_warn("failed to update latest version!"); - for tx in worker.pollers.drain(..) { - tx.send(()).unwrap_or_warn("could not wake up poller"); - } - if let Some(cb) = worker.callback.borrow().as_ref() { - cb.call(BufferController(controller)); // TODO should we run this on another task/thread? - } - }, - Err(e) => tracing::error!("could not deserialize operation from server: {}", e), - } - }, + Ok(Some(change)) => if worker.handle_server_change(change).await { break }, }, // controller is ready to apply change and recv(), calculate it and send it back res = worker.delta_req.recv() => match res { None => break tracing::error!("no more active controllers: can't send changes"), - Some((last_ver, tx)) => { - if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { - // x.0.start should always be after lastver! - // this step_ver will be the version after we apply the operation - // we give it to the controller so that he knows where it's at. - let step_ver = oplog.version_union(&[lv.end-1], &last_ver); - branch.merge(&oplog, &step_ver); - let new_local_v = branch.local_version(); - - let hash = if timer.step() { - Some(crate::ext::hash(branch.content().to_string())) - } else { None }; - - let tc = match dtop.kind { - diamond_types::list::operation::OpKind::Ins => { - if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { - tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); - } - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.start() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - }, - - diamond_types::list::operation::OpKind::Del => { - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - } - }; - tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } else { - tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } - }, + Some((last_ver, tx)) => worker.handle_delta_request(last_ver, tx).await, }, // received a request for full CRDT content res = worker.content_checkout.recv() => match res { None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { - branch.merge(&oplog, oplog.local_version_ref()); - let content = branch.content().to_string(); + worker.branch.merge(&worker.oplog, worker.oplog.local_version_ref()); + let content = worker.branch.content().to_string(); tx.send(content).unwrap_or_warn("checkout request dropped"); }, } @@ -206,6 +132,100 @@ impl BufferController { } } +impl BufferWorker { + async fn handle_editor_change(&mut self, change: TextChange, ack: oneshot::Sender, tx: &mpsc::Sender) { + let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); + let last_ver = self.oplog.local_version(); + // clip to buffer extents + let clip_end = std::cmp::min(self.branch.len(), change.end as usize); + let clip_start = std::cmp::max(0, change.start as usize); + + // in case we have a "replace" span + if change.is_delete() { + self.branch.delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); + } + + if change.is_insert() { + self.branch.insert(&mut self.oplog, agent_id, clip_start, &change.content); + } + + if change.is_delete() || change.is_insert() { + tx.send(Operation { data: self.oplog.encode_from(Default::default(), &last_ver) }).await + .unwrap_or_warn("failed to send change!"); + self.latest_version.send(self.oplog.local_version()) + .unwrap_or_warn("failed to update latest version!"); + } + ack.send(self.branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); + } + + async fn handle_server_change(&mut self, change: BufferEvent) -> bool { + match self.controller.upgrade() { + None => true, // clean exit actually, just weird we caught it here + Some(controller) => match self.oplog.decode_and_add(&change.op.data) { + Ok(local_version) => { + self.latest_version.send(local_version) + .unwrap_or_warn("failed to update latest version!"); + for tx in self.pollers.drain(..) { + tx.send(()).unwrap_or_warn("could not wake up poller"); + } + if let Some(cb) = self.callback.borrow().as_ref() { + cb.call(BufferController(controller)); // TODO should we run this on another task/thread? + } + false + }, + Err(e) => { + tracing::error!("could not deserialize operation from server: {}", e); + true + }, + } + } + } + + async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { + if let Some((lv, Some(dtop))) = self.oplog + .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) + .next() + { + // x.0.start should always be after lastver! + // this step_ver will be the version after we apply the operation + // we give it to the controller so that he knows where it's at. + let step_ver = self.oplog.version_union(&[lv.end-1], &last_ver); + self.branch.merge(&self.oplog, &step_ver); + let new_local_v = self.branch.local_version(); + + let hash = if self.timer.step() { + Some(crate::ext::hash(self.branch.content().to_string())) + } else { None }; + + let tc = match dtop.kind { + diamond_types::list::operation::OpKind::Ins => { + if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { + tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); + } + crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.start() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash + } + }, + + diamond_types::list::operation::OpKind::Del => { + crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash + } + } + }; + tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } else { + tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } + } +} + struct Timer(u32, u32); impl Timer { fn new(period: u32) -> Self { From b897b26bb988980ce2e9cb2d1810dfcc3b7ed920 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:07:14 +0200 Subject: [PATCH 5/8] feat: implemented Workspace receiver in glues --- dist/java/src/mp/code/Workspace.java | 58 +++++++++++++++++--- dist/java/src/mp/code/data/DetachResult.java | 15 ----- dist/lua/annotations.lua | 33 ++++++++++- dist/py/src/codemp/codemp.pyi | 8 +++ src/ffi/java/buffer.rs | 2 +- src/ffi/java/cursor.rs | 2 +- src/ffi/java/workspace.rs | 58 +++++++++++++++++++- src/ffi/js/buffer.rs | 2 +- src/ffi/js/cursor.rs | 2 +- src/ffi/js/workspace.rs | 44 ++++++++++++++- src/ffi/lua/ext/callback.rs | 3 + src/ffi/lua/workspace.rs | 26 +++++++-- src/ffi/python/controllers.rs | 2 +- src/ffi/python/workspace.rs | 46 ++++++++++++++-- src/prelude.rs | 2 + 15 files changed, 257 insertions(+), 46 deletions(-) delete mode 100644 dist/java/src/mp/code/data/DetachResult.java diff --git a/dist/java/src/mp/code/Workspace.java b/dist/java/src/mp/code/Workspace.java index 31ee6e3..10ae3b1 100644 --- a/dist/java/src/mp/code/Workspace.java +++ b/dist/java/src/mp/code/Workspace.java @@ -2,8 +2,8 @@ package mp.code; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; -import mp.code.data.DetachResult; import mp.code.exceptions.ConnectionException; import mp.code.exceptions.ConnectionRemoteException; import mp.code.exceptions.ControllerException; @@ -168,16 +168,58 @@ public final class Workspace { delete_buffer(this.ptr, path); } - private static native Event event(long self) throws ControllerException; + private static native Event try_recv(long self) throws ControllerException; /** - * Blocks until a workspace event occurs. - * You shouldn't call this, unless it's on a dedicated thread. - * @return the {@link Event} that has occurred - * @throws ControllerException if the event arrived while the underlying controller was already closed + * Tries to get a {@link Event} from the queue if any were present, and returns + * an empty optional otherwise. + * @return the first workspace event in queue, if any are present + * @throws ControllerException if the controller was stopped */ - public Event event() throws ControllerException { - return event(this.ptr); + public Optional tryRecv() throws ControllerException { + return Optional.ofNullable(try_recv(this.ptr)); + } + + private static native Event recv(long self) throws ControllerException; + + /** + * Blocks until a {@link Event} is available and returns it. + * @return the workspace event that occurred + * @throws ControllerException if the controller was stopped + */ + public Event recv() throws ControllerException { + return recv(this.ptr); + } + + private static native void callback(long self, Consumer cb); + + /** + * Registers a callback to be invoked whenever a new {@link Event} is ready to be received. + * This will not work unless a Java thread has been dedicated to the event loop. + * @see Extensions#drive(boolean) + */ + public void callback(Consumer cb) { + callback(this.ptr, cb); + } + + private static native void clear_callback(long self); + + /** + * Clears the registered callback. + * @see #callback(Consumer) + */ + public void clearCallback() { + clear_callback(this.ptr); + } + + private static native void poll(long self) throws ControllerException; + + /** + * Blocks until a {@link Event} is available. + * @throws ControllerException if the controller was stopped + */ + public void poll() throws ControllerException { + poll(this.ptr); } private static native void free(long self); diff --git a/dist/java/src/mp/code/data/DetachResult.java b/dist/java/src/mp/code/data/DetachResult.java deleted file mode 100644 index c367354..0000000 --- a/dist/java/src/mp/code/data/DetachResult.java +++ /dev/null @@ -1,15 +0,0 @@ -package mp.code.data; - -import mp.code.Workspace; - -/** - * The result of a {@link Workspace#detachFromBuffer(String)} operation. - */ -public enum DetachResult { - /** The user was not attached to this buffer. */ - NOT_ATTACHED, - /** The user detached from the buffer and stopped it. */ - DETACHING, - /** The user was attached, but the buffer was already stopped. */ - ALREADY_DETACHED -} diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 04055ba..dc6d625 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -75,6 +75,16 @@ function WorkspaceEventPromise:await() end function WorkspaceEventPromise:and_then(cb) end +---@class (exact) MaybeWorkspaceEventPromise : Promise +local MaybeWorkspaceEventPromise = {} +--- block until promise is ready and return value +--- @return WorkspaceEvent | nil +function MaybeWorkspaceEventPromise:await() end +---@param cb fun(x: WorkspaceEvent | nil) callback to invoke +---invoke callback asynchronously as soon as promise is ready +function MaybeWorkspaceEventPromise:and_then(cb) end + + ---@class (exact) BufferControllerPromise : Promise local BufferControllerPromise = {} --- block until promise is ready and return value @@ -252,11 +262,30 @@ function Workspace:fetch_users(path) end ---@field type string ---@field value string +---@return MaybeWorkspaceEventPromise +---@async +---@nodiscard +---try to receive workspace events, returning nil if none is available +function Workspace:try_recv() end + ---@return WorkspaceEventPromise ---@async ---@nodiscard ----get next workspace event -function Workspace:event() end +---block until next workspace event and return it +function Workspace:recv() end + +---@return NilPromise +---@async +---@nodiscard +---block until next workspace event without returning it +function Workspace:poll() end + +---clears any previously registered workspace callback +function Workspace:clear_callback() end + +---@param cb fun(w: Workspace) callback to invoke on each workspace event received +---register a new callback to be called on workspace events (replaces any previously registered one) +function Workspace:callback(cb) end diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index 4df875b..6c088c5 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -52,6 +52,9 @@ class Client: def user_name(self) -> str: ... def refresh(self) -> Promise[None]: ... +class Event: + pass + class Workspace: """ Handle to a workspace inside codemp. It manages buffers. @@ -69,6 +72,11 @@ class Workspace: def buffer_by_name(self, path: str) -> Optional[BufferController]: ... def buffer_list(self) -> list[str]: ... def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ... + def recv(self) -> Promise[Event]: ... + def try_recv(self) -> Promise[Optional[Event]]: ... + def poll(self) -> Promise[None]: ... + def clear_callback(self) -> None: ... + def callback(self, cb: Callable[[Workspace], None]) -> None: ... class TextChange: """ diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 8cc1ce4..158ac03 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -1,7 +1,7 @@ use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{api::{Controller, TextChange}, errors::ControllerError}; +use crate::{api::{controller::{AsyncReceiver, AsyncSender}, TextChange}, errors::ControllerError}; use super::null_check; diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 4cc1a4b..0514b39 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -1,6 +1,6 @@ use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{api::{Controller, Cursor}, errors::ControllerError}; +use crate::{api::{controller::{AsyncSender, AsyncReceiver}, Cursor}, errors::ControllerError}; use super::null_check; diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 6fc38cd..c59ebc0 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -1,5 +1,6 @@ +use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{errors::{ConnectionError, ControllerError, RemoteError}, Workspace}; +use crate::{api::controller::AsyncReceiver, errors::{ConnectionError, ControllerError, RemoteError}, ffi::java::null_check, Workspace}; /// Get the workspace id. #[jni(package = "mp.code", class = "Workspace")] @@ -79,10 +80,61 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr super::tokio().block_on(workspace.delete(&path)) } +/// Block and receive a workspace event +#[jni(package = "mp.code", class = "Workspace")] +fn recv(workspace: &mut Workspace) -> Result { + super::tokio().block_on(workspace.recv()) +} + /// Receive a workspace event if present. #[jni(package = "mp.code", class = "Workspace")] -fn event(workspace: &mut Workspace) -> Result { - super::tokio().block_on(workspace.event()) +fn try_recv(workspace: &mut Workspace) -> Result, ControllerError> { + super::tokio().block_on(workspace.try_recv()) +} + +/// Block until a workspace event is available +#[jni(package = "mp.code", class = "Workspace")] +fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { + super::tokio().block_on(workspace.poll()) +} + +/// Clear previously registered callback +#[jni(package = "mp.code", class = "Workspace")] +fn clear_callback(workspace: &mut Workspace) { + workspace.clear_callback(); +} + +/// Register a callback for workspace events. +#[jni(package = "mp.code", class = "Workspace")] +fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::Workspace, cb: JObject<'local>) { + null_check!(env, cb, {}); + let Ok(cb_ref) = env.new_global_ref(cb) else { + env.throw_new("mp/code/exceptions/JNIException", "Failed to pin callback reference!") + .expect("Failed to throw exception!"); + return; + }; + + controller.callback(move |workspace: crate::Workspace| { + let jvm = super::jvm(); + let mut env = jvm.attach_current_thread_permanently() + .expect("failed attaching to main JVM thread"); + if let Err(e) = env.with_local_frame(5, |env| { + use jni_toolbox::IntoJavaObject; + let jworkspace = workspace.into_java_object(env)?; + if let Err(e) = env.call_method( + &cb_ref, + "accept", + "(Ljava/lang/Object;)V", + &[jni::objects::JValueGen::Object(&jworkspace)] + ) { + tracing::error!("error invoking callback: {e:?}"); + }; + Ok::<(), jni::errors::Error>(()) + }) { + tracing::error!("error invoking callback: {e}"); + let _ = env.exception_describe(); + } + }); } /// Called by the Java GC to drop a [Workspace]. diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index a1a3727..b26d54d 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,7 +1,7 @@ use napi::threadsafe_function::{ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi_derive::napi; use crate::api::TextChange; -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::buffer::controller::BufferController; diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index b02f9f2..7fa0a47 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,7 +1,7 @@ use napi::threadsafe_function::ErrorStrategy::Fatal; use napi_derive::napi; use napi::threadsafe_function::{ThreadsafeFunction, ThreadSafeCallContext, ThreadsafeFunctionCallMode}; -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index 6021c12..775a2e2 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -1,7 +1,10 @@ +use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi_derive::napi; use crate::Workspace; use crate::buffer::controller::BufferController; use crate::cursor::controller::CursorController; +use crate::api::controller::AsyncReceiver; #[napi(object, js_name = "Event")] pub struct JsEvent { @@ -61,8 +64,43 @@ impl Workspace { Ok(self.delete(&path).await?) } - #[napi(js_name = "event")] - pub async fn js_event(&self) -> napi::Result { - Ok(JsEvent::from(self.event().await?)) + #[napi(js_name = "recv")] + pub async fn js_recv(&self) -> napi::Result { + Ok(JsEvent::from(self.recv().await?)) + } + + #[napi(js_name = "try_recv")] + pub async fn js_try_recv(&self) -> napi::Result> { + Ok(self.try_recv().await?.map(JsEvent::from)) + } + + #[napi(js_name = "poll")] + pub async fn js_poll(&self) -> napi::Result<()> { + self.poll().await?; + Ok(()) + } + + #[napi(js_name = "clear_callback")] + pub fn js_clear_callbacl(&self) -> napi::Result<()> { + self.clear_callback(); + Ok(()) + } + + #[napi(js_name = "callback", ts_args_type = "fun: (event: Workspace) => void")] + pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()>{ + let tsfn : ThreadsafeFunction = + fun.create_threadsafe_function(0, + |ctx : ThreadSafeCallContext| { + Ok(vec![ctx.value]) + } + )?; + self.callback(move |controller : Workspace| { + + tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking); //check this with tracing also we could use Ok(event) to get the error + // If it blocks the main thread too many time we have to change this + + }); + + Ok(()) } } diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index 636f9c0..db118d7 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -67,6 +67,7 @@ pub(crate) enum CallbackArg { BufferController(CodempBufferController), Workspace(CodempWorkspace), Event(CodempEvent), + MaybeEvent(Option), Cursor(CodempCursor), MaybeCursor(Option), TextChange(CodempTextChange), @@ -87,6 +88,7 @@ impl IntoLua for CallbackArg { CallbackArg::Workspace(x) => x.into_lua(lua), CallbackArg::VecStr(x) => x.into_lua(lua), CallbackArg::Event(x) => x.into_lua(lua), + CallbackArg::MaybeEvent(x) => x.into_lua(lua), CallbackArg::Cursor(x) => x.into_lua(lua), CallbackArg::MaybeCursor(x) => x.into_lua(lua), CallbackArg::TextChange(x) => x.into_lua(lua), @@ -107,3 +109,4 @@ impl From for CallbackArg { fn from(value: CodempCursor) -> Self { impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeCursor(value) } } impl From for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } } impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeTextChange(value) } } +impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeEvent(value) } } diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index 7868d0f..fc87c60 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -26,10 +26,6 @@ impl LuaUserData for CodempWorkspace { methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); - methods.add_method("event", |_, this, ()| - a_sync! { this => this.event().await? } - ); - methods.add_method("fetch_buffers", |_, this, ()| a_sync! { this => this.fetch_buffers().await? } ); @@ -44,6 +40,28 @@ impl LuaUserData for CodempWorkspace { methods.add_method("user_list", |_, this, ()| Ok(this.user_list()) ); + + methods.add_method("recv", |_, this, ()| + a_sync! { this => this.recv().await? } + ); + + methods.add_method("try_recv", |_, this, ()| + a_sync! { this => this.try_recv().await? } + ); + + methods.add_method("poll", |_, this, ()| + a_sync! { this => this.poll().await? } + ); + + methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { + this.callback(move |controller: CodempWorkspace| super::ext::callback().invoke(cb.clone(), controller)); + Ok(()) + }); + + methods.add_method("clear_callbacl", |_, this, ()| { + this.clear_callback(); + Ok(()) + }); } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index b1f8c21..e3bfd8d 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,4 +1,4 @@ -use crate::api::Controller; +use crate::api::controller::{AsyncSender, AsyncReceiver}; use crate::api::Cursor; use crate::api::TextChange; use crate::buffer::Controller as BufferController; diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 4615be5..40898ca 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -1,6 +1,8 @@ use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use crate::workspace::Workspace; +use crate::api::controller::AsyncReceiver; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use super::a_sync_allow_threads; @@ -26,12 +28,6 @@ impl Workspace { self.detach(path.as_str()) } - #[pyo3(name = "event")] - fn pyevent(&self, py: Python) -> PyResult { - let this = self.clone(); - a_sync_allow_threads!(py, this.event().await) - } - #[pyo3(name = "fetch_buffers")] fn pyfetch_buffers(&self, py: Python) -> PyResult { let this = self.clone(); @@ -87,4 +83,42 @@ impl Workspace { fn pyuser_list(&self) -> Vec { self.user_list() } + + #[pyo3(name = "recv")] + fn pyrecv(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.recv().await) + } + + #[pyo3(name = "try_recv")] + fn pytry_recv(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.try_recv().await) + } + + #[pyo3(name = "poll")] + fn pypoll(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.poll().await) + } + + #[pyo3(name = "clear_callback")] + fn pyclear_callbacl(&self, _py: Python) { + self.clear_callback(); + } + + #[pyo3(name = "callback")] + fn pycallback(&self, py: Python, cb: PyObject) -> PyResult<()> { + if !cb.bind_borrowed(py).is_callable() { + return Err(PyValueError::new_err("The object passed must be callable.")); + } + + self.callback(move |ws| { + Python::with_gil(|py| { + // TODO what to do with this error? + let _ = cb.call1(py, (ws,)); + }) + }); + Ok(()) + } } diff --git a/src/prelude.rs b/src/prelude.rs index a2fd045..2e0a640 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,6 +3,8 @@ pub use crate::api::{ Controller as CodempController, + controller::AsyncSender as CodempAsyncSender, + controller::AsyncReceiver as CodempAsyncReceiver, TextChange as CodempTextChange, Cursor as CodempCursor, User as CodempUser, From 226a02f8f7c56fd4a33588c478301645b5f81e14 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:43:20 +0200 Subject: [PATCH 6/8] test: update doctests --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f9172e1..8f66ca7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # client.create_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap(); -//! use codemp::api::Controller; // needed to access trait methods +//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods //! let cursor = workspace.cursor(); //! let event = cursor.recv().await.expect("disconnected while waiting for event!"); //! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); @@ -66,7 +66,7 @@ //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # client.create_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap(); -//! # use codemp::api::Controller; +//! # use codemp::api::controller::{AsyncSender, AsyncReceiver}; //! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach"); //! buffer.content(); // force-sync //! if let Some(change) = buffer.try_recv().await.unwrap() { From 8225524cb3dbe20cb989855d39fbc6e93ea837a4 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 04:05:58 +0200 Subject: [PATCH 7/8] chore: fixed imports --- src/cursor/controller.rs | 5 ++++- src/ffi/java/buffer.rs | 5 ++++- src/ffi/java/cursor.rs | 9 ++++----- src/ffi/java/workspace.rs | 20 +++++++++++++++----- src/ffi/js/buffer.rs | 4 +--- src/ffi/js/cursor.rs | 3 --- src/ffi/js/workspace.rs | 29 ++++++++++++++--------------- 7 files changed, 42 insertions(+), 33 deletions(-) diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index e16c1d1..a0bced9 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; use crate::{ - api::{controller::{AsyncReceiver, AsyncSender, ControllerCallback}, Controller, Cursor}, + api::{ + controller::{AsyncReceiver, AsyncSender, ControllerCallback}, + Controller, Cursor, + }, errors::ControllerResult, }; use codemp_proto::{ diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 3df1243..639c187 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -2,7 +2,10 @@ use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; use crate::{ - api::{controller::{AsyncReceiver, AsyncSender}, TextChange}, + api::{ + controller::{AsyncReceiver, AsyncSender}, + TextChange, + }, errors::ControllerError, }; diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index c4c8c94..65cb0c4 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -1,13 +1,12 @@ use crate::{ - api::{Controller, Cursor}, + api::{ + controller::{AsyncReceiver, AsyncSender}, + Cursor, + }, errors::ControllerError, }; use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{ - api::{controller::{AsyncSender, AsyncReceiver}, Cursor}, - errors::ControllerError -}; use super::null_check; diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 50dde7a..4bd3da8 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -1,8 +1,10 @@ use crate::{ api::controller::AsyncReceiver, errors::{ConnectionError, ControllerError, RemoteError}, + ffi::java::null_check, Workspace, }; +use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; /// Get the workspace id. @@ -115,17 +117,25 @@ fn clear_callback(workspace: &mut Workspace) { /// Register a callback for workspace events. #[jni(package = "mp.code", class = "Workspace")] -fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::Workspace, cb: JObject<'local>) { +fn callback<'local>( + env: &mut JNIEnv<'local>, + controller: &mut crate::Workspace, + cb: JObject<'local>, +) { null_check!(env, cb, {}); let Ok(cb_ref) = env.new_global_ref(cb) else { - env.throw_new("mp/code/exceptions/JNIException", "Failed to pin callback reference!") - .expect("Failed to throw exception!"); + env.throw_new( + "mp/code/exceptions/JNIException", + "Failed to pin callback reference!", + ) + .expect("Failed to throw exception!"); return; }; controller.callback(move |workspace: crate::Workspace| { let jvm = super::jvm(); - let mut env = jvm.attach_current_thread_permanently() + let mut env = jvm + .attach_current_thread_permanently() .expect("failed attaching to main JVM thread"); if let Err(e) = env.with_local_frame(5, |env| { use jni_toolbox::IntoJavaObject; @@ -134,7 +144,7 @@ fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::Workspace, &cb_ref, "accept", "(Ljava/lang/Object;)V", - &[jni::objects::JValueGen::Object(&jworkspace)] + &[jni::objects::JValueGen::Object(&jworkspace)], ) { tracing::error!("error invoking callback: {e:?}"); }; diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index ce7bca3..df89f06 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,7 +1,5 @@ -use napi::threadsafe_function::{ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; -use napi_derive::napi; -use crate::api::TextChange; use crate::api::controller::{AsyncReceiver, AsyncSender}; +use crate::api::TextChange; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index 565b059..2bb09e5 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,6 +1,3 @@ -use napi::threadsafe_function::ErrorStrategy::Fatal; -use napi_derive::napi; -use napi::threadsafe_function::{ThreadsafeFunction, ThreadSafeCallContext, ThreadsafeFunctionCallMode}; use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; use napi::threadsafe_function::ErrorStrategy::Fatal; diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index a7e3ba7..e44fe81 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -1,11 +1,14 @@ -use napi::threadsafe_function::ErrorStrategy::Fatal; -use napi::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; -use napi_derive::napi; -use crate::Workspace; +use crate::api::controller::AsyncReceiver; use crate::buffer::controller::BufferController; use crate::cursor::controller::CursorController; -use crate::api::controller::AsyncReceiver; +use crate::Workspace; +use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::{ + ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, +}; +use napi_derive::napi; +use super::client::JsUser; #[napi(object, js_name = "Event")] pub struct JsEvent { @@ -111,18 +114,14 @@ impl Workspace { } #[napi(js_name = "callback", ts_args_type = "fun: (event: Workspace) => void")] - pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()>{ - let tsfn : ThreadsafeFunction = - fun.create_threadsafe_function(0, - |ctx : ThreadSafeCallContext| { + pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()> { + let tsfn: ThreadsafeFunction = fun + .create_threadsafe_function(0, |ctx: ThreadSafeCallContext| { Ok(vec![ctx.value]) - } - )?; - self.callback(move |controller : Workspace| { - + })?; + self.callback(move |controller: Workspace| { tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking); //check this with tracing also we could use Ok(event) to get the error - // If it blocks the main thread too many time we have to change this - + // If it blocks the main thread too many time we have to change this }); Ok(()) From 2fe217ad85dd2f5b581846f825dead7a997ff86c Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 04:06:54 +0200 Subject: [PATCH 8/8] chore: autofmt --- src/api/controller.rs | 11 +++--- src/buffer/worker.rs | 71 ++++++++++++++++++++++------------- src/ffi/lua/ext/a_sync.rs | 2 +- src/ffi/lua/workspace.rs | 34 ++++++++--------- src/ffi/python/controllers.rs | 2 +- src/ffi/python/workspace.rs | 2 +- src/lib.rs | 2 +- src/prelude.rs | 11 ++---- src/workspace.rs | 23 ++++++------ 9 files changed, 84 insertions(+), 74 deletions(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index e15d7bb..dda3b3f 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -13,7 +13,7 @@ use crate::errors::ControllerResult; /// /// This generic trait is implemented by actors managing stream procedures, and will generally /// imply a background worker. -/// +/// /// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`]. /// /// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is @@ -26,11 +26,12 @@ use crate::errors::ControllerResult; /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait Controller : AsyncSender + AsyncReceiver +pub trait Controller: AsyncSender + AsyncReceiver where Tx: Sized + Sync + Send, Rx: Sized + Sync + Send, -{} +{ +} /// Asynchronous and thread-safe handle to send data over a stream. /// See [`Controller`]'s documentation for details. @@ -38,7 +39,7 @@ where /// Details about the receiving end are left to the implementor. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait AsyncSender : Sized + Send + Sync { +pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users. async fn send(&self, x: T) -> ControllerResult<()>; } @@ -49,7 +50,7 @@ pub trait AsyncSender : Sized + Send + Sync { /// Details about the sender are left to the implementor. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait AsyncReceiver : Sized + Send + Sync { +pub trait AsyncReceiver: Sized + Send + Sync { /// Block until a value is available and returns it. async fn recv(&self) -> ControllerResult { loop { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index de669b3..a5a730f 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -142,7 +142,12 @@ impl BufferController { } impl BufferWorker { - async fn handle_editor_change(&mut self, change: TextChange, ack: oneshot::Sender, tx: &mpsc::Sender) { + async fn handle_editor_change( + &mut self, + change: TextChange, + ack: oneshot::Sender, + tx: &mpsc::Sender, + ) { let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); let last_ver = self.oplog.local_version(); // clip to buffer extents @@ -151,20 +156,27 @@ impl BufferWorker { // in case we have a "replace" span if change.is_delete() { - self.branch.delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); + self.branch + .delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); } if change.is_insert() { - self.branch.insert(&mut self.oplog, agent_id, clip_start, &change.content); + self.branch + .insert(&mut self.oplog, agent_id, clip_start, &change.content); } if change.is_delete() || change.is_insert() { - tx.send(Operation { data: self.oplog.encode_from(Default::default(), &last_ver) }).await - .unwrap_or_warn("failed to send change!"); - self.latest_version.send(self.oplog.local_version()) + tx.send(Operation { + data: self.oplog.encode_from(Default::default(), &last_ver), + }) + .await + .unwrap_or_warn("failed to send change!"); + self.latest_version + .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); } - ack.send(self.branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); + ack.send(self.branch.local_version()) + .unwrap_or_warn("controller didn't wait for ack"); } async fn handle_server_change(&mut self, change: BufferEvent) -> bool { @@ -172,7 +184,8 @@ impl BufferWorker { None => true, // clean exit actually, just weird we caught it here Some(controller) => match self.oplog.decode_and_add(&change.op.data) { Ok(local_version) => { - self.latest_version.send(local_version) + self.latest_version + .send(local_version) .unwrap_or_warn("failed to update latest version!"); for tx in self.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); @@ -181,56 +194,60 @@ impl BufferWorker { cb.call(BufferController(controller)); // TODO should we run this on another task/thread? } false - }, + } Err(e) => { tracing::error!("could not deserialize operation from server: {}", e); true - }, - } + } + }, } } async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { - if let Some((lv, Some(dtop))) = self.oplog + if let Some((lv, Some(dtop))) = self + .oplog .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) .next() { // x.0.start should always be after lastver! // this step_ver will be the version after we apply the operation // we give it to the controller so that he knows where it's at. - let step_ver = self.oplog.version_union(&[lv.end-1], &last_ver); + let step_ver = self.oplog.version_union(&[lv.end - 1], &last_ver); self.branch.merge(&self.oplog, &step_ver); let new_local_v = self.branch.local_version(); let hash = if self.timer.step() { Some(crate::ext::hash(self.branch.content().to_string())) - } else { None }; + } else { + None + }; let tc = match dtop.kind { diamond_types::list::operation::OpKind::Ins => { - if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { + if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() + { tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); } crate::api::change::TextChange { start: dtop.start() as u32, end: dtop.start() as u32, content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - }, - - diamond_types::list::operation::OpKind::Del => { - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash + hash, } } + + diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash, + }, }; - tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((new_local_v, Some(tc))) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((last_ver, None)) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } } } diff --git a/src/ffi/lua/ext/a_sync.rs b/src/ffi/lua/ext/a_sync.rs index e53181d..02c672b 100644 --- a/src/ffi/lua/ext/a_sync.rs +++ b/src/ffi/lua/ext/a_sync.rs @@ -54,7 +54,7 @@ impl LuaUserData for Promise { Some(x) => { x.abort(); Ok(()) - }, + } }); methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| { match this.0.take() { diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index dfa707e..15167c9 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -33,9 +33,9 @@ impl LuaUserData for CodempWorkspace { Ok(this.buffer_by_name(&name)) }); - methods.add_method("fetch_buffers", |_, this, ()| - a_sync! { this => this.fetch_buffers().await? } - + methods.add_method( + "fetch_buffers", + |_, this, ()| a_sync! { this => this.fetch_buffers().await? }, ); methods.add_method( "fetch_users", @@ -49,24 +49,21 @@ impl LuaUserData for CodempWorkspace { }, ); - methods.add_method("user_list", |_, this, ()| - Ok(this.user_list()) + methods.add_method("user_list", |_, this, ()| Ok(this.user_list())); + + methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); + + methods.add_method( + "try_recv", + |_, this, ()| a_sync! { this => this.try_recv().await? }, ); - methods.add_method("recv", |_, this, ()| - a_sync! { this => this.recv().await? } - ); + methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("try_recv", |_, this, ()| - a_sync! { this => this.try_recv().await? } - ); - - methods.add_method("poll", |_, this, ()| - a_sync! { this => this.poll().await? } - ); - - methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { - this.callback(move |controller: CodempWorkspace| super::ext::callback().invoke(cb.clone(), controller)); + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { + this.callback(move |controller: CodempWorkspace| { + super::ext::callback().invoke(cb.clone(), controller) + }); Ok(()) }); @@ -74,7 +71,6 @@ impl LuaUserData for CodempWorkspace { this.clear_callback(); Ok(()) }); - } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index e3bfd8d..6ee1e27 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,4 +1,4 @@ -use crate::api::controller::{AsyncSender, AsyncReceiver}; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::Cursor; use crate::api::TextChange; use crate::buffer::Controller as BufferController; diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 40898ca..106495c 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -1,7 +1,7 @@ +use crate::api::controller::AsyncReceiver; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use crate::workspace::Workspace; -use crate::api::controller::AsyncReceiver; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; diff --git a/src/lib.rs b/src/lib.rs index 8f66ca7..40ad39f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # client.create_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap(); -//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods +//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods //! let cursor = workspace.cursor(); //! let event = cursor.recv().await.expect("disconnected while waiting for event!"); //! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); diff --git a/src/prelude.rs b/src/prelude.rs index 41f3bec..0fec44d 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -2,14 +2,9 @@ //! All-in-one renamed imports with `use codemp::prelude::*`. pub use crate::api::{ - Controller as CodempController, - controller::AsyncSender as CodempAsyncSender, - controller::AsyncReceiver as CodempAsyncReceiver, - TextChange as CodempTextChange, - Cursor as CodempCursor, - User as CodempUser, - Event as CodempEvent, - Config as CodempConfig, + controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, + Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, + Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, }; pub use crate::{ diff --git a/src/workspace.rs b/src/workspace.rs index a767c51..fe14388 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,8 +4,11 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{controller::{AsyncReceiver, ControllerCallback}, Event, User}, - buffer, cursor, + api::{ + controller::{AsyncReceiver, ControllerCallback}, + Event, User, + }, + buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, network::Services, @@ -24,7 +27,7 @@ use codemp_proto::{ use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; -use tokio::sync::{mpsc::error::TryRecvError, mpsc}; +use tokio::sync::{mpsc, mpsc::error::TryRecvError}; use tonic::Streaming; use uuid::Uuid; @@ -60,12 +63,7 @@ struct WorkspaceInner { impl AsyncReceiver for Workspace { async fn try_recv(&self) -> ControllerResult> { - match self.0 - .events - .lock() - .await - .try_recv() - { + match self.0.events.lock().await.try_recv() { Ok(x) => Ok(Some(x)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped), @@ -74,7 +72,9 @@ impl AsyncReceiver for Workspace { async fn poll(&self) -> ControllerResult<()> { loop { - if !self.0.events.lock().await.is_empty() { break Ok(()) } + if !self.0.events.lock().await.is_empty() { + break Ok(()); + } // TODO disgusting, please send help tokio::time::sleep(std::time::Duration::from_millis(200)).await; } @@ -312,7 +312,8 @@ impl Workspace { /// A filter may be applied, and it may be strict (equality check) or not (starts_with check). // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn filetree(&self, filter: Option<&str>, strict: bool) -> Vec { - let mut tree = self.0 + let mut tree = self + .0 .filetree .iter() .filter(|f| {