diff --git a/dist/java/src/mp/code/Workspace.java b/dist/java/src/mp/code/Workspace.java index 31ee6e3..10ae3b1 100644 --- a/dist/java/src/mp/code/Workspace.java +++ b/dist/java/src/mp/code/Workspace.java @@ -2,8 +2,8 @@ package mp.code; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; -import mp.code.data.DetachResult; import mp.code.exceptions.ConnectionException; import mp.code.exceptions.ConnectionRemoteException; import mp.code.exceptions.ControllerException; @@ -168,16 +168,58 @@ public final class Workspace { delete_buffer(this.ptr, path); } - private static native Event event(long self) throws ControllerException; + private static native Event try_recv(long self) throws ControllerException; /** - * Blocks until a workspace event occurs. - * You shouldn't call this, unless it's on a dedicated thread. - * @return the {@link Event} that has occurred - * @throws ControllerException if the event arrived while the underlying controller was already closed + * Tries to get a {@link Event} from the queue if any were present, and returns + * an empty optional otherwise. + * @return the first workspace event in queue, if any are present + * @throws ControllerException if the controller was stopped */ - public Event event() throws ControllerException { - return event(this.ptr); + public Optional tryRecv() throws ControllerException { + return Optional.ofNullable(try_recv(this.ptr)); + } + + private static native Event recv(long self) throws ControllerException; + + /** + * Blocks until a {@link Event} is available and returns it. + * @return the workspace event that occurred + * @throws ControllerException if the controller was stopped + */ + public Event recv() throws ControllerException { + return recv(this.ptr); + } + + private static native void callback(long self, Consumer cb); + + /** + * Registers a callback to be invoked whenever a new {@link Event} is ready to be received. + * This will not work unless a Java thread has been dedicated to the event loop. + * @see Extensions#drive(boolean) + */ + public void callback(Consumer cb) { + callback(this.ptr, cb); + } + + private static native void clear_callback(long self); + + /** + * Clears the registered callback. + * @see #callback(Consumer) + */ + public void clearCallback() { + clear_callback(this.ptr); + } + + private static native void poll(long self) throws ControllerException; + + /** + * Blocks until a {@link Event} is available. + * @throws ControllerException if the controller was stopped + */ + public void poll() throws ControllerException { + poll(this.ptr); } private static native void free(long self); diff --git a/dist/java/src/mp/code/data/DetachResult.java b/dist/java/src/mp/code/data/DetachResult.java deleted file mode 100644 index c367354..0000000 --- a/dist/java/src/mp/code/data/DetachResult.java +++ /dev/null @@ -1,15 +0,0 @@ -package mp.code.data; - -import mp.code.Workspace; - -/** - * The result of a {@link Workspace#detachFromBuffer(String)} operation. - */ -public enum DetachResult { - /** The user was not attached to this buffer. */ - NOT_ATTACHED, - /** The user detached from the buffer and stopped it. */ - DETACHING, - /** The user was attached, but the buffer was already stopped. */ - ALREADY_DETACHED -} diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index b25ef76..0e573fa 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -89,6 +89,16 @@ function WorkspaceEventPromise:cancel() end function WorkspaceEventPromise:and_then(cb) end +---@class (exact) MaybeWorkspaceEventPromise : Promise +local MaybeWorkspaceEventPromise = {} +--- block until promise is ready and return value +--- @return WorkspaceEvent | nil +function MaybeWorkspaceEventPromise:await() end +---@param cb fun(x: WorkspaceEvent | nil) callback to invoke +---invoke callback asynchronously as soon as promise is ready +function MaybeWorkspaceEventPromise:and_then(cb) end + + ---@class (exact) BufferControllerPromise : Promise local BufferControllerPromise = {} --- block until promise is ready and return value @@ -276,11 +286,30 @@ function Workspace:fetch_users(path) end ---@field type string ---@field value string +---@return MaybeWorkspaceEventPromise +---@async +---@nodiscard +---try to receive workspace events, returning nil if none is available +function Workspace:try_recv() end + ---@return WorkspaceEventPromise ---@async ---@nodiscard ----get next workspace event -function Workspace:event() end +---block until next workspace event and return it +function Workspace:recv() end + +---@return NilPromise +---@async +---@nodiscard +---block until next workspace event without returning it +function Workspace:poll() end + +---clears any previously registered workspace callback +function Workspace:clear_callback() end + +---@param cb fun(w: Workspace) callback to invoke on each workspace event received +---register a new callback to be called on workspace events (replaces any previously registered one) +function Workspace:callback(cb) end diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index 4df875b..6c088c5 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -52,6 +52,9 @@ class Client: def user_name(self) -> str: ... def refresh(self) -> Promise[None]: ... +class Event: + pass + class Workspace: """ Handle to a workspace inside codemp. It manages buffers. @@ -69,6 +72,11 @@ class Workspace: def buffer_by_name(self, path: str) -> Optional[BufferController]: ... def buffer_list(self) -> list[str]: ... def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ... + def recv(self) -> Promise[Event]: ... + def try_recv(self) -> Promise[Optional[Event]]: ... + def poll(self) -> Promise[None]: ... + def clear_callback(self) -> None: ... + def callback(self, cb: Callable[[Workspace], None]) -> None: ... class TextChange: """ diff --git a/src/api/controller.rs b/src/api/controller.rs index 9c384eb..dda3b3f 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -8,16 +8,17 @@ use crate::errors::ControllerResult; // note that we don't use thiserror's #[from] because we don't want the error structs to contain // these foreign types, and also we want these to be easily constructable -/// Asynchronous and thread-safe handle to a generic bidirectional stream. +/// Asynchronous and thread-safe handle to a generic bidirectional stream. Exists as a combination +/// of [`AsyncSender`] and [`AsyncReceiver`]. /// /// This generic trait is implemented by actors managing stream procedures, and will generally /// imply a background worker. /// -/// Events can be enqueued for dispatching without blocking with [`Controller::send`]. +/// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`]. /// -/// For receiving events from the server, an asynchronous API with [`Controller::recv`] is -/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, -/// [`Controller::poll`] combined with [`Controller::try_recv`]. +/// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is +/// provided; if that is not feasible, consider using [`AsyncReceiver::callback`] or, alternatively, +/// [`AsyncReceiver::poll`] combined with [`AsyncReceiver::try_recv`]. /// /// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have /// been dropped. @@ -25,10 +26,31 @@ use crate::errors::ControllerResult; /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait Controller: Sized + Send + Sync { +pub trait Controller: AsyncSender + AsyncReceiver +where + Tx: Sized + Sync + Send, + Rx: Sized + Sync + Send, +{ +} + +/// Asynchronous and thread-safe handle to send data over a stream. +/// See [`Controller`]'s documentation for details. +/// +/// Details about the receiving end are left to the implementor. +#[allow(async_fn_in_trait)] +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users. async fn send(&self, x: T) -> ControllerResult<()>; +} +/// Asynchronous and thread-safe handle to receive data from a stream. +/// See [`Controller`]'s documentation for details. +/// +/// Details about the sender are left to the implementor. +#[allow(async_fn_in_trait)] +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +pub trait AsyncReceiver: Sized + Send + Sync { /// Block until a value is available and returns it. async fn recv(&self) -> ControllerResult { loop { diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 73229be..bcf115e 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; -use crate::api::controller::{Controller, ControllerCallback}; +use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; use crate::ext::InternallyMutable; @@ -53,7 +53,21 @@ pub(crate) struct BufferControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController { +impl Controller for BufferController {} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncSender for BufferController { + async fn send(&self, op: TextChange) -> ControllerResult<()> { + // we let the worker do the updating to the last version and send it back. + let (tx, rx) = oneshot::channel(); + self.0.ops_in.send((op, tx))?; + self.0.last_update.set(rx.await?); + Ok(()) + } +} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncReceiver for BufferController { async fn poll(&self) -> ControllerResult<()> { if self.0.last_update.get() != *self.0.latest_version.borrow() { return Ok(()); @@ -80,14 +94,6 @@ impl Controller for BufferController { Ok(change) } - async fn send(&self, op: TextChange) -> ControllerResult<()> { - // we let the worker do the updating to the last version and send it back. - let (tx, rx) = oneshot::channel(); - self.0.ops_in.send((op, tx))?; - self.0.last_update.set(rx.await?); - Ok(()) - } - fn callback(&self, cb: impl Into>) { if self.0.callback.send(Some(cb.into())).is_err() { // TODO should we panic? we failed what we were supposed to do diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 4855878..a5a730f 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use diamond_types::list::{Branch, OpLog}; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; @@ -27,6 +28,9 @@ struct BufferWorker { delta_req: mpsc::Receiver, controller: std::sync::Weak, callback: watch::Receiver>>, + oplog: OpLog, + branch: Branch, + timer: Timer, } impl BufferController { @@ -71,6 +75,9 @@ impl BufferController { content_checkout: req_rx, delta_req: recv_rx, callback: cb_rx, + oplog: OpLog::new(), + branch: Branch::new(), + timer: Timer::new(10), // TODO configurable! }; tokio::spawn(async move { BufferController::work(worker, tx, rx).await }); @@ -83,11 +90,7 @@ impl BufferController { tx: mpsc::Sender, mut rx: Streaming, ) { - let mut branch = diamond_types::list::Branch::new(); - let mut oplog = diamond_types::list::OpLog::new(); - let mut timer = Timer::new(10); // TODO configurable!! tracing::debug!("controller worker started"); - loop { if worker.controller.upgrade().is_none() { break; @@ -106,105 +109,28 @@ impl BufferController { // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some((change, ack)) => { - let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string()); - let last_ver = oplog.local_version(); - // clip to buffer extents - let clip_end = std::cmp::min(branch.len(), change.end as usize); - let clip_start = std::cmp::max(0, change.start as usize); - - // in case we have a "replace" span - if change.is_delete() { - branch.delete_without_content(&mut oplog, agent_id, clip_start..clip_end); - } - - if change.is_insert() { - branch.insert(&mut oplog, agent_id, clip_start, &change.content); - } - - if change.is_delete() || change.is_insert() { - tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await - .unwrap_or_warn("failed to send change!"); - worker.latest_version.send(oplog.local_version()) - .unwrap_or_warn("failed to update latest version!"); - } - ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); - }, + Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) res = rx.message() => match res { Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path), Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path), - Ok(Some(change)) => match worker.controller.upgrade() { - None => break, // clean exit actually, just weird we caught it here - Some(controller) => match oplog.decode_and_add(&change.op.data) { - Ok(local_version) => { - worker.latest_version.send(local_version) - .unwrap_or_warn("failed to update latest version!"); - for tx in worker.pollers.drain(..) { - tx.send(()).unwrap_or_warn("could not wake up poller"); - } - if let Some(cb) = worker.callback.borrow().as_ref() { - cb.call(BufferController(controller)); // TODO should we run this on another task/thread? - } - }, - Err(e) => tracing::error!("could not deserialize operation from server: {}", e), - } - }, + Ok(Some(change)) => if worker.handle_server_change(change).await { break }, }, // controller is ready to apply change and recv(), calculate it and send it back res = worker.delta_req.recv() => match res { None => break tracing::error!("no more active controllers: can't send changes"), - Some((last_ver, tx)) => { - if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { - // x.0.start should always be after lastver! - // this step_ver will be the version after we apply the operation - // we give it to the controller so that he knows where it's at. - let step_ver = oplog.version_union(&[lv.end-1], &last_ver); - branch.merge(&oplog, &step_ver); - let new_local_v = branch.local_version(); - - let hash = if timer.step() { - Some(crate::ext::hash(branch.content().to_string())) - } else { None }; - - let tc = match dtop.kind { - diamond_types::list::operation::OpKind::Ins => { - if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { - tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); - } - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.start() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - }, - - diamond_types::list::operation::OpKind::Del => { - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - } - }; - tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } else { - tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } - }, + Some((last_ver, tx)) => worker.handle_delta_request(last_ver, tx).await, }, // received a request for full CRDT content res = worker.content_checkout.recv() => match res { None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { - branch.merge(&oplog, oplog.local_version_ref()); - let content = branch.content().to_string(); + worker.branch.merge(&worker.oplog, worker.oplog.local_version_ref()); + let content = worker.branch.content().to_string(); tx.send(content).unwrap_or_warn("checkout request dropped"); }, } @@ -215,6 +141,117 @@ impl BufferController { } } +impl BufferWorker { + async fn handle_editor_change( + &mut self, + change: TextChange, + ack: oneshot::Sender, + tx: &mpsc::Sender, + ) { + let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); + let last_ver = self.oplog.local_version(); + // clip to buffer extents + let clip_end = std::cmp::min(self.branch.len(), change.end as usize); + let clip_start = std::cmp::max(0, change.start as usize); + + // in case we have a "replace" span + if change.is_delete() { + self.branch + .delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); + } + + if change.is_insert() { + self.branch + .insert(&mut self.oplog, agent_id, clip_start, &change.content); + } + + if change.is_delete() || change.is_insert() { + tx.send(Operation { + data: self.oplog.encode_from(Default::default(), &last_ver), + }) + .await + .unwrap_or_warn("failed to send change!"); + self.latest_version + .send(self.oplog.local_version()) + .unwrap_or_warn("failed to update latest version!"); + } + ack.send(self.branch.local_version()) + .unwrap_or_warn("controller didn't wait for ack"); + } + + async fn handle_server_change(&mut self, change: BufferEvent) -> bool { + match self.controller.upgrade() { + None => true, // clean exit actually, just weird we caught it here + Some(controller) => match self.oplog.decode_and_add(&change.op.data) { + Ok(local_version) => { + self.latest_version + .send(local_version) + .unwrap_or_warn("failed to update latest version!"); + for tx in self.pollers.drain(..) { + tx.send(()).unwrap_or_warn("could not wake up poller"); + } + if let Some(cb) = self.callback.borrow().as_ref() { + cb.call(BufferController(controller)); // TODO should we run this on another task/thread? + } + false + } + Err(e) => { + tracing::error!("could not deserialize operation from server: {}", e); + true + } + }, + } + } + + async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { + if let Some((lv, Some(dtop))) = self + .oplog + .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) + .next() + { + // x.0.start should always be after lastver! + // this step_ver will be the version after we apply the operation + // we give it to the controller so that he knows where it's at. + let step_ver = self.oplog.version_union(&[lv.end - 1], &last_ver); + self.branch.merge(&self.oplog, &step_ver); + let new_local_v = self.branch.local_version(); + + let hash = if self.timer.step() { + Some(crate::ext::hash(self.branch.content().to_string())) + } else { + None + }; + + let tc = match dtop.kind { + diamond_types::list::operation::OpKind::Ins => { + if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() + { + tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); + } + crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.start() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash, + } + } + + diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash, + }, + }; + tx.send((new_local_v, Some(tc))) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); + } else { + tx.send((last_ver, None)) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); + } + } +} + struct Timer(u32, u32); impl Timer { fn new(period: u32) -> Self { diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index a8dc862..a0bced9 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; use crate::{ - api::{controller::ControllerCallback, Controller, Cursor}, + api::{ + controller::{AsyncReceiver, AsyncSender, ControllerCallback}, + Controller, Cursor, + }, errors::ControllerResult, }; use codemp_proto::{ @@ -31,7 +34,10 @@ pub(crate) struct CursorControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for CursorController { +impl Controller for CursorController {} + +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncSender for CursorController { async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); @@ -54,7 +60,10 @@ impl Controller for CursorController { }) .await?) } +} +#[cfg_attr(feature = "async-trait", async_trait::async_trait)] +impl AsyncReceiver for CursorController { async fn try_recv(&self) -> ControllerResult> { let (tx, rx) = oneshot::channel(); self.0.stream.send(tx).await?; diff --git a/src/ext.rs b/src/ext.rs index 63fd3f2..06a3cc1 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,7 +1,7 @@ //! ### Extensions //! Contains a number of utils used internally or that may be of general interest. -use crate::{api::Controller, errors::ControllerResult}; +use crate::{api::controller::AsyncReceiver, errors::ControllerResult}; use tokio::sync::mpsc; /// Poll all given buffer controllers and wait, returning the first one ready. diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 3c3bcf8..639c187 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -2,7 +2,10 @@ use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; use crate::{ - api::{Controller, TextChange}, + api::{ + controller::{AsyncReceiver, AsyncSender}, + TextChange, + }, errors::ControllerError, }; diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 68c3885..65cb0c4 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -1,5 +1,8 @@ use crate::{ - api::{Controller, Cursor}, + api::{ + controller::{AsyncReceiver, AsyncSender}, + Cursor, + }, errors::ControllerError, }; use jni::{objects::JObject, JNIEnv}; diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index db3a76e..4bd3da8 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -1,7 +1,10 @@ use crate::{ + api::controller::AsyncReceiver, errors::{ConnectionError, ControllerError, RemoteError}, + ffi::java::null_check, Workspace, }; +use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; /// Get the workspace id. @@ -88,10 +91,69 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr super::tokio().block_on(workspace.delete(&path)) } +/// Block and receive a workspace event +#[jni(package = "mp.code", class = "Workspace")] +fn recv(workspace: &mut Workspace) -> Result { + super::tokio().block_on(workspace.recv()) +} + /// Receive a workspace event if present. #[jni(package = "mp.code", class = "Workspace")] -fn event(workspace: &mut Workspace) -> Result { - super::tokio().block_on(workspace.event()) +fn try_recv(workspace: &mut Workspace) -> Result, ControllerError> { + super::tokio().block_on(workspace.try_recv()) +} + +/// Block until a workspace event is available +#[jni(package = "mp.code", class = "Workspace")] +fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { + super::tokio().block_on(workspace.poll()) +} + +/// Clear previously registered callback +#[jni(package = "mp.code", class = "Workspace")] +fn clear_callback(workspace: &mut Workspace) { + workspace.clear_callback(); +} + +/// Register a callback for workspace events. +#[jni(package = "mp.code", class = "Workspace")] +fn callback<'local>( + env: &mut JNIEnv<'local>, + controller: &mut crate::Workspace, + cb: JObject<'local>, +) { + null_check!(env, cb, {}); + let Ok(cb_ref) = env.new_global_ref(cb) else { + env.throw_new( + "mp/code/exceptions/JNIException", + "Failed to pin callback reference!", + ) + .expect("Failed to throw exception!"); + return; + }; + + controller.callback(move |workspace: crate::Workspace| { + let jvm = super::jvm(); + let mut env = jvm + .attach_current_thread_permanently() + .expect("failed attaching to main JVM thread"); + if let Err(e) = env.with_local_frame(5, |env| { + use jni_toolbox::IntoJavaObject; + let jworkspace = workspace.into_java_object(env)?; + if let Err(e) = env.call_method( + &cb_ref, + "accept", + "(Ljava/lang/Object;)V", + &[jni::objects::JValueGen::Object(&jworkspace)], + ) { + tracing::error!("error invoking callback: {e:?}"); + }; + Ok::<(), jni::errors::Error>(()) + }) { + tracing::error!("error invoking callback: {e}"); + let _ = env.exception_describe(); + } + }); } /// Called by the Java GC to drop a [Workspace]. diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index 80c7575..df89f06 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,4 +1,4 @@ -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::TextChange; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index bdcc240..2bb09e5 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,4 +1,4 @@ -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::{ diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index dbbf651..e44fe81 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -1,9 +1,15 @@ +use crate::api::controller::AsyncReceiver; use crate::buffer::controller::BufferController; use crate::cursor::controller::CursorController; -use crate::ffi::js::client::JsUser; use crate::Workspace; +use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::{ + ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, +}; use napi_derive::napi; +use super::client::JsUser; + #[napi(object, js_name = "Event")] pub struct JsEvent { pub r#type: String, @@ -85,6 +91,42 @@ impl Workspace { Ok(self.delete(&path).await?) } + #[napi(js_name = "recv")] + pub async fn js_recv(&self) -> napi::Result { + Ok(JsEvent::from(self.recv().await?)) + } + + #[napi(js_name = "try_recv")] + pub async fn js_try_recv(&self) -> napi::Result> { + Ok(self.try_recv().await?.map(JsEvent::from)) + } + + #[napi(js_name = "poll")] + pub async fn js_poll(&self) -> napi::Result<()> { + self.poll().await?; + Ok(()) + } + + #[napi(js_name = "clear_callback")] + pub fn js_clear_callback(&self) -> napi::Result<()> { + self.clear_callback(); + Ok(()) + } + + #[napi(js_name = "callback", ts_args_type = "fun: (event: Workspace) => void")] + pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()> { + let tsfn: ThreadsafeFunction = fun + .create_threadsafe_function(0, |ctx: ThreadSafeCallContext| { + Ok(vec![ctx.value]) + })?; + self.callback(move |controller: Workspace| { + tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking); //check this with tracing also we could use Ok(event) to get the error + // If it blocks the main thread too many time we have to change this + }); + + Ok(()) + } + /// Detach from an active buffer, stopping its underlying worker /// this method returns true if no reference or last reference was held, false if there are still /// dangling references to clear @@ -93,12 +135,6 @@ impl Workspace { self.detach(&path) } - /// Wait for next workspace event and return it - #[napi(js_name = "event")] - pub async fn js_event(&self) -> napi::Result { - Ok(JsEvent::from(self.event().await?)) - } - /// Re-fetch remote buffer list #[napi(js_name = "fetch_buffers")] pub async fn js_fetch_buffers(&self) -> napi::Result<()> { diff --git a/src/ffi/lua/ext/a_sync.rs b/src/ffi/lua/ext/a_sync.rs index e53181d..02c672b 100644 --- a/src/ffi/lua/ext/a_sync.rs +++ b/src/ffi/lua/ext/a_sync.rs @@ -54,7 +54,7 @@ impl LuaUserData for Promise { Some(x) => { x.abort(); Ok(()) - }, + } }); methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| { match this.0.take() { diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index 5ce69b5..dd84098 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -71,6 +71,7 @@ pub(crate) enum CallbackArg { BufferController(CodempBufferController), Workspace(CodempWorkspace), Event(CodempEvent), + MaybeEvent(Option), Cursor(CodempCursor), MaybeCursor(Option), TextChange(CodempTextChange), @@ -91,6 +92,7 @@ impl IntoLua for CallbackArg { CallbackArg::Workspace(x) => x.into_lua(lua), CallbackArg::VecStr(x) => x.into_lua(lua), CallbackArg::Event(x) => x.into_lua(lua), + CallbackArg::MaybeEvent(x) => x.into_lua(lua), CallbackArg::Cursor(x) => x.into_lua(lua), CallbackArg::MaybeCursor(x) => x.into_lua(lua), CallbackArg::TextChange(x) => x.into_lua(lua), @@ -99,63 +101,16 @@ impl IntoLua for CallbackArg { } } -impl From<()> for CallbackArg { - fn from(_: ()) -> Self { - CallbackArg::Nil - } -} -impl From for CallbackArg { - fn from(value: String) -> Self { - CallbackArg::Str(value) - } -} -impl From for CallbackArg { - fn from(value: CodempClient) -> Self { - CallbackArg::Client(value) - } -} -impl From for CallbackArg { - fn from(value: CodempCursorController) -> Self { - CallbackArg::CursorController(value) - } -} -impl From for CallbackArg { - fn from(value: CodempBufferController) -> Self { - CallbackArg::BufferController(value) - } -} -impl From for CallbackArg { - fn from(value: CodempWorkspace) -> Self { - CallbackArg::Workspace(value) - } -} -impl From> for CallbackArg { - fn from(value: Vec) -> Self { - CallbackArg::VecStr(value) - } -} -impl From for CallbackArg { - fn from(value: CodempEvent) -> Self { - CallbackArg::Event(value) - } -} -impl From for CallbackArg { - fn from(value: CodempCursor) -> Self { - CallbackArg::Cursor(value) - } -} -impl From> for CallbackArg { - fn from(value: Option) -> Self { - CallbackArg::MaybeCursor(value) - } -} -impl From for CallbackArg { - fn from(value: CodempTextChange) -> Self { - CallbackArg::TextChange(value) - } -} -impl From> for CallbackArg { - fn from(value: Option) -> Self { - CallbackArg::MaybeTextChange(value) - } -} +impl From<()> for CallbackArg { fn from(_: ()) -> Self { CallbackArg::Nil } } +impl From for CallbackArg { fn from(value: String) -> Self { CallbackArg::Str(value) } } +impl From for CallbackArg { fn from(value: CodempClient) -> Self { CallbackArg::Client(value) } } +impl From for CallbackArg { fn from(value: CodempCursorController) -> Self { CallbackArg::CursorController(value) } } +impl From for CallbackArg { fn from(value: CodempBufferController) -> Self { CallbackArg::BufferController(value) } } +impl From for CallbackArg { fn from(value: CodempWorkspace) -> Self { CallbackArg::Workspace(value) } } +impl From> for CallbackArg { fn from(value: Vec) -> Self { CallbackArg::VecStr(value) } } +impl From for CallbackArg { fn from(value: CodempEvent) -> Self { CallbackArg::Event(value) } } +impl From for CallbackArg { fn from(value: CodempCursor) -> Self { CallbackArg::Cursor(value) } } +impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeCursor(value) } } +impl From for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } } +impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeTextChange(value) } } +impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeEvent(value) } } diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index a6dfeb5..15167c9 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -33,11 +33,6 @@ impl LuaUserData for CodempWorkspace { Ok(this.buffer_by_name(&name)) }); - methods.add_method( - "event", - |_, this, ()| a_sync! { this => this.event().await? }, - ); - methods.add_method( "fetch_buffers", |_, this, ()| a_sync! { this => this.fetch_buffers().await? }, @@ -55,6 +50,27 @@ impl LuaUserData for CodempWorkspace { ); methods.add_method("user_list", |_, this, ()| Ok(this.user_list())); + + methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); + + methods.add_method( + "try_recv", + |_, this, ()| a_sync! { this => this.try_recv().await? }, + ); + + methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); + + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { + this.callback(move |controller: CodempWorkspace| { + super::ext::callback().invoke(cb.clone(), controller) + }); + Ok(()) + }); + + methods.add_method("clear_callbacl", |_, this, ()| { + this.clear_callback(); + Ok(()) + }); } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index b1f8c21..6ee1e27 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,4 +1,4 @@ -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::Cursor; use crate::api::TextChange; use crate::buffer::Controller as BufferController; diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 4615be5..106495c 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -1,6 +1,8 @@ +use crate::api::controller::AsyncReceiver; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use crate::workspace::Workspace; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use super::a_sync_allow_threads; @@ -26,12 +28,6 @@ impl Workspace { self.detach(path.as_str()) } - #[pyo3(name = "event")] - fn pyevent(&self, py: Python) -> PyResult { - let this = self.clone(); - a_sync_allow_threads!(py, this.event().await) - } - #[pyo3(name = "fetch_buffers")] fn pyfetch_buffers(&self, py: Python) -> PyResult { let this = self.clone(); @@ -87,4 +83,42 @@ impl Workspace { fn pyuser_list(&self) -> Vec { self.user_list() } + + #[pyo3(name = "recv")] + fn pyrecv(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.recv().await) + } + + #[pyo3(name = "try_recv")] + fn pytry_recv(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.try_recv().await) + } + + #[pyo3(name = "poll")] + fn pypoll(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.poll().await) + } + + #[pyo3(name = "clear_callback")] + fn pyclear_callbacl(&self, _py: Python) { + self.clear_callback(); + } + + #[pyo3(name = "callback")] + fn pycallback(&self, py: Python, cb: PyObject) -> PyResult<()> { + if !cb.bind_borrowed(py).is_callable() { + return Err(PyValueError::new_err("The object passed must be callable.")); + } + + self.callback(move |ws| { + Python::with_gil(|py| { + // TODO what to do with this error? + let _ = cb.call1(py, (ws,)); + }) + }); + Ok(()) + } } diff --git a/src/lib.rs b/src/lib.rs index c57c12a..40ad39f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # client.create_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap(); -//! use codemp::api::Controller; // needed to access trait methods +//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods //! let cursor = workspace.cursor(); //! let event = cursor.recv().await.expect("disconnected while waiting for event!"); //! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); @@ -66,7 +66,7 @@ //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # client.create_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap(); -//! # use codemp::api::Controller; +//! # use codemp::api::controller::{AsyncSender, AsyncReceiver}; //! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach"); //! buffer.content(); // force-sync //! if let Some(change) = buffer.try_recv().await.unwrap() { diff --git a/src/prelude.rs b/src/prelude.rs index b71512a..0fec44d 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -2,6 +2,7 @@ //! All-in-one renamed imports with `use codemp::prelude::*`. pub use crate::api::{ + controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, }; diff --git a/src/workspace.rs b/src/workspace.rs index 9f1fc2c..fe14388 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,7 +4,10 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{Event, User}, + api::{ + controller::{AsyncReceiver, ControllerCallback}, + Event, User, + }, buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, @@ -24,7 +27,7 @@ use codemp_proto::{ use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, mpsc::error::TryRecvError}; use tonic::Streaming; use uuid::Uuid; @@ -55,6 +58,36 @@ struct WorkspaceInner { users: Arc>, // TODO can we drop the mutex? events: tokio::sync::Mutex>, + callback: std::sync::Mutex>>, // TODO lmao another one +} + +impl AsyncReceiver for Workspace { + async fn try_recv(&self) -> ControllerResult> { + match self.0.events.lock().await.try_recv() { + Ok(x) => Ok(Some(x)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped), + } + } + + async fn poll(&self) -> ControllerResult<()> { + loop { + if !self.0.events.lock().await.is_empty() { + break Ok(()); + } + // TODO disgusting, please send help + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + + // TODO please send HELP ASAP this is hurting me emotionally + fn clear_callback(&self) { + *self.0.callback.lock().expect("mutex poisoned") = None; + } + + fn callback(&self, cb: impl Into>) { + *self.0.callback.lock().expect("mutex poisoned") = Some(cb.into()); + } } impl Workspace { @@ -91,6 +124,7 @@ impl Workspace { users, events: tokio::sync::Mutex::new(ev_rx), services, + callback: std::sync::Mutex::new(None), })); ws.fetch_users().await?; @@ -166,18 +200,6 @@ impl Workspace { } } - /// Await next workspace [Event] and return it when it arrives. - // TODO this method is weird and ugly, can we make it more standard? - pub async fn event(&self) -> ControllerResult { - self.0 - .events - .lock() - .await - .recv() - .await - .ok_or(crate::errors::ControllerError::Unfulfilled) - } - /// Re-fetch the list of available buffers in the workspace. pub async fn fetch_buffers(&self) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws(); @@ -290,7 +312,8 @@ impl Workspace { /// A filter may be applied, and it may be strict (equality check) or not (starts_with check). // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn filetree(&self, filter: Option<&str>, strict: bool) -> Vec { - let mut tree = self.0 + let mut tree = self + .0 .filetree .iter() .filter(|f| {