From b897b26bb988980ce2e9cb2d1810dfcc3b7ed920 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:07:14 +0200 Subject: [PATCH] feat: implemented Workspace receiver in glues --- dist/java/src/mp/code/Workspace.java | 58 +++++++++++++++++--- dist/java/src/mp/code/data/DetachResult.java | 15 ----- dist/lua/annotations.lua | 33 ++++++++++- dist/py/src/codemp/codemp.pyi | 8 +++ src/ffi/java/buffer.rs | 2 +- src/ffi/java/cursor.rs | 2 +- src/ffi/java/workspace.rs | 58 +++++++++++++++++++- src/ffi/js/buffer.rs | 2 +- src/ffi/js/cursor.rs | 2 +- src/ffi/js/workspace.rs | 44 ++++++++++++++- src/ffi/lua/ext/callback.rs | 3 + src/ffi/lua/workspace.rs | 26 +++++++-- src/ffi/python/controllers.rs | 2 +- src/ffi/python/workspace.rs | 46 ++++++++++++++-- src/prelude.rs | 2 + 15 files changed, 257 insertions(+), 46 deletions(-) delete mode 100644 dist/java/src/mp/code/data/DetachResult.java diff --git a/dist/java/src/mp/code/Workspace.java b/dist/java/src/mp/code/Workspace.java index 31ee6e3..10ae3b1 100644 --- a/dist/java/src/mp/code/Workspace.java +++ b/dist/java/src/mp/code/Workspace.java @@ -2,8 +2,8 @@ package mp.code; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; -import mp.code.data.DetachResult; import mp.code.exceptions.ConnectionException; import mp.code.exceptions.ConnectionRemoteException; import mp.code.exceptions.ControllerException; @@ -168,16 +168,58 @@ public final class Workspace { delete_buffer(this.ptr, path); } - private static native Event event(long self) throws ControllerException; + private static native Event try_recv(long self) throws ControllerException; /** - * Blocks until a workspace event occurs. - * You shouldn't call this, unless it's on a dedicated thread. - * @return the {@link Event} that has occurred - * @throws ControllerException if the event arrived while the underlying controller was already closed + * Tries to get a {@link Event} from the queue if any were present, and returns + * an empty optional otherwise. + * @return the first workspace event in queue, if any are present + * @throws ControllerException if the controller was stopped */ - public Event event() throws ControllerException { - return event(this.ptr); + public Optional tryRecv() throws ControllerException { + return Optional.ofNullable(try_recv(this.ptr)); + } + + private static native Event recv(long self) throws ControllerException; + + /** + * Blocks until a {@link Event} is available and returns it. + * @return the workspace event that occurred + * @throws ControllerException if the controller was stopped + */ + public Event recv() throws ControllerException { + return recv(this.ptr); + } + + private static native void callback(long self, Consumer cb); + + /** + * Registers a callback to be invoked whenever a new {@link Event} is ready to be received. + * This will not work unless a Java thread has been dedicated to the event loop. + * @see Extensions#drive(boolean) + */ + public void callback(Consumer cb) { + callback(this.ptr, cb); + } + + private static native void clear_callback(long self); + + /** + * Clears the registered callback. + * @see #callback(Consumer) + */ + public void clearCallback() { + clear_callback(this.ptr); + } + + private static native void poll(long self) throws ControllerException; + + /** + * Blocks until a {@link Event} is available. + * @throws ControllerException if the controller was stopped + */ + public void poll() throws ControllerException { + poll(this.ptr); } private static native void free(long self); diff --git a/dist/java/src/mp/code/data/DetachResult.java b/dist/java/src/mp/code/data/DetachResult.java deleted file mode 100644 index c367354..0000000 --- a/dist/java/src/mp/code/data/DetachResult.java +++ /dev/null @@ -1,15 +0,0 @@ -package mp.code.data; - -import mp.code.Workspace; - -/** - * The result of a {@link Workspace#detachFromBuffer(String)} operation. - */ -public enum DetachResult { - /** The user was not attached to this buffer. */ - NOT_ATTACHED, - /** The user detached from the buffer and stopped it. */ - DETACHING, - /** The user was attached, but the buffer was already stopped. */ - ALREADY_DETACHED -} diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 04055ba..dc6d625 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -75,6 +75,16 @@ function WorkspaceEventPromise:await() end function WorkspaceEventPromise:and_then(cb) end +---@class (exact) MaybeWorkspaceEventPromise : Promise +local MaybeWorkspaceEventPromise = {} +--- block until promise is ready and return value +--- @return WorkspaceEvent | nil +function MaybeWorkspaceEventPromise:await() end +---@param cb fun(x: WorkspaceEvent | nil) callback to invoke +---invoke callback asynchronously as soon as promise is ready +function MaybeWorkspaceEventPromise:and_then(cb) end + + ---@class (exact) BufferControllerPromise : Promise local BufferControllerPromise = {} --- block until promise is ready and return value @@ -252,11 +262,30 @@ function Workspace:fetch_users(path) end ---@field type string ---@field value string +---@return MaybeWorkspaceEventPromise +---@async +---@nodiscard +---try to receive workspace events, returning nil if none is available +function Workspace:try_recv() end + ---@return WorkspaceEventPromise ---@async ---@nodiscard ----get next workspace event -function Workspace:event() end +---block until next workspace event and return it +function Workspace:recv() end + +---@return NilPromise +---@async +---@nodiscard +---block until next workspace event without returning it +function Workspace:poll() end + +---clears any previously registered workspace callback +function Workspace:clear_callback() end + +---@param cb fun(w: Workspace) callback to invoke on each workspace event received +---register a new callback to be called on workspace events (replaces any previously registered one) +function Workspace:callback(cb) end diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index 4df875b..6c088c5 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -52,6 +52,9 @@ class Client: def user_name(self) -> str: ... def refresh(self) -> Promise[None]: ... +class Event: + pass + class Workspace: """ Handle to a workspace inside codemp. It manages buffers. @@ -69,6 +72,11 @@ class Workspace: def buffer_by_name(self, path: str) -> Optional[BufferController]: ... def buffer_list(self) -> list[str]: ... def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ... + def recv(self) -> Promise[Event]: ... + def try_recv(self) -> Promise[Optional[Event]]: ... + def poll(self) -> Promise[None]: ... + def clear_callback(self) -> None: ... + def callback(self, cb: Callable[[Workspace], None]) -> None: ... class TextChange: """ diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 8cc1ce4..158ac03 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -1,7 +1,7 @@ use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{api::{Controller, TextChange}, errors::ControllerError}; +use crate::{api::{controller::{AsyncReceiver, AsyncSender}, TextChange}, errors::ControllerError}; use super::null_check; diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 4cc1a4b..0514b39 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -1,6 +1,6 @@ use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{api::{Controller, Cursor}, errors::ControllerError}; +use crate::{api::{controller::{AsyncSender, AsyncReceiver}, Cursor}, errors::ControllerError}; use super::null_check; diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 6fc38cd..c59ebc0 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -1,5 +1,6 @@ +use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; -use crate::{errors::{ConnectionError, ControllerError, RemoteError}, Workspace}; +use crate::{api::controller::AsyncReceiver, errors::{ConnectionError, ControllerError, RemoteError}, ffi::java::null_check, Workspace}; /// Get the workspace id. #[jni(package = "mp.code", class = "Workspace")] @@ -79,10 +80,61 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr super::tokio().block_on(workspace.delete(&path)) } +/// Block and receive a workspace event +#[jni(package = "mp.code", class = "Workspace")] +fn recv(workspace: &mut Workspace) -> Result { + super::tokio().block_on(workspace.recv()) +} + /// Receive a workspace event if present. #[jni(package = "mp.code", class = "Workspace")] -fn event(workspace: &mut Workspace) -> Result { - super::tokio().block_on(workspace.event()) +fn try_recv(workspace: &mut Workspace) -> Result, ControllerError> { + super::tokio().block_on(workspace.try_recv()) +} + +/// Block until a workspace event is available +#[jni(package = "mp.code", class = "Workspace")] +fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { + super::tokio().block_on(workspace.poll()) +} + +/// Clear previously registered callback +#[jni(package = "mp.code", class = "Workspace")] +fn clear_callback(workspace: &mut Workspace) { + workspace.clear_callback(); +} + +/// Register a callback for workspace events. +#[jni(package = "mp.code", class = "Workspace")] +fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::Workspace, cb: JObject<'local>) { + null_check!(env, cb, {}); + let Ok(cb_ref) = env.new_global_ref(cb) else { + env.throw_new("mp/code/exceptions/JNIException", "Failed to pin callback reference!") + .expect("Failed to throw exception!"); + return; + }; + + controller.callback(move |workspace: crate::Workspace| { + let jvm = super::jvm(); + let mut env = jvm.attach_current_thread_permanently() + .expect("failed attaching to main JVM thread"); + if let Err(e) = env.with_local_frame(5, |env| { + use jni_toolbox::IntoJavaObject; + let jworkspace = workspace.into_java_object(env)?; + if let Err(e) = env.call_method( + &cb_ref, + "accept", + "(Ljava/lang/Object;)V", + &[jni::objects::JValueGen::Object(&jworkspace)] + ) { + tracing::error!("error invoking callback: {e:?}"); + }; + Ok::<(), jni::errors::Error>(()) + }) { + tracing::error!("error invoking callback: {e}"); + let _ = env.exception_describe(); + } + }); } /// Called by the Java GC to drop a [Workspace]. diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index a1a3727..b26d54d 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,7 +1,7 @@ use napi::threadsafe_function::{ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi_derive::napi; use crate::api::TextChange; -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::buffer::controller::BufferController; diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index b02f9f2..7fa0a47 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,7 +1,7 @@ use napi::threadsafe_function::ErrorStrategy::Fatal; use napi_derive::napi; use napi::threadsafe_function::{ThreadsafeFunction, ThreadSafeCallContext, ThreadsafeFunctionCallMode}; -use crate::api::Controller; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index 6021c12..775a2e2 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -1,7 +1,10 @@ +use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi_derive::napi; use crate::Workspace; use crate::buffer::controller::BufferController; use crate::cursor::controller::CursorController; +use crate::api::controller::AsyncReceiver; #[napi(object, js_name = "Event")] pub struct JsEvent { @@ -61,8 +64,43 @@ impl Workspace { Ok(self.delete(&path).await?) } - #[napi(js_name = "event")] - pub async fn js_event(&self) -> napi::Result { - Ok(JsEvent::from(self.event().await?)) + #[napi(js_name = "recv")] + pub async fn js_recv(&self) -> napi::Result { + Ok(JsEvent::from(self.recv().await?)) + } + + #[napi(js_name = "try_recv")] + pub async fn js_try_recv(&self) -> napi::Result> { + Ok(self.try_recv().await?.map(JsEvent::from)) + } + + #[napi(js_name = "poll")] + pub async fn js_poll(&self) -> napi::Result<()> { + self.poll().await?; + Ok(()) + } + + #[napi(js_name = "clear_callback")] + pub fn js_clear_callbacl(&self) -> napi::Result<()> { + self.clear_callback(); + Ok(()) + } + + #[napi(js_name = "callback", ts_args_type = "fun: (event: Workspace) => void")] + pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()>{ + let tsfn : ThreadsafeFunction = + fun.create_threadsafe_function(0, + |ctx : ThreadSafeCallContext| { + Ok(vec![ctx.value]) + } + )?; + self.callback(move |controller : Workspace| { + + tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking); //check this with tracing also we could use Ok(event) to get the error + // If it blocks the main thread too many time we have to change this + + }); + + Ok(()) } } diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index 636f9c0..db118d7 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -67,6 +67,7 @@ pub(crate) enum CallbackArg { BufferController(CodempBufferController), Workspace(CodempWorkspace), Event(CodempEvent), + MaybeEvent(Option), Cursor(CodempCursor), MaybeCursor(Option), TextChange(CodempTextChange), @@ -87,6 +88,7 @@ impl IntoLua for CallbackArg { CallbackArg::Workspace(x) => x.into_lua(lua), CallbackArg::VecStr(x) => x.into_lua(lua), CallbackArg::Event(x) => x.into_lua(lua), + CallbackArg::MaybeEvent(x) => x.into_lua(lua), CallbackArg::Cursor(x) => x.into_lua(lua), CallbackArg::MaybeCursor(x) => x.into_lua(lua), CallbackArg::TextChange(x) => x.into_lua(lua), @@ -107,3 +109,4 @@ impl From for CallbackArg { fn from(value: CodempCursor) -> Self { impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeCursor(value) } } impl From for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } } impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeTextChange(value) } } +impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeEvent(value) } } diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index 7868d0f..fc87c60 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -26,10 +26,6 @@ impl LuaUserData for CodempWorkspace { methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); - methods.add_method("event", |_, this, ()| - a_sync! { this => this.event().await? } - ); - methods.add_method("fetch_buffers", |_, this, ()| a_sync! { this => this.fetch_buffers().await? } ); @@ -44,6 +40,28 @@ impl LuaUserData for CodempWorkspace { methods.add_method("user_list", |_, this, ()| Ok(this.user_list()) ); + + methods.add_method("recv", |_, this, ()| + a_sync! { this => this.recv().await? } + ); + + methods.add_method("try_recv", |_, this, ()| + a_sync! { this => this.try_recv().await? } + ); + + methods.add_method("poll", |_, this, ()| + a_sync! { this => this.poll().await? } + ); + + methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { + this.callback(move |controller: CodempWorkspace| super::ext::callback().invoke(cb.clone(), controller)); + Ok(()) + }); + + methods.add_method("clear_callbacl", |_, this, ()| { + this.clear_callback(); + Ok(()) + }); } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index b1f8c21..e3bfd8d 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,4 +1,4 @@ -use crate::api::Controller; +use crate::api::controller::{AsyncSender, AsyncReceiver}; use crate::api::Cursor; use crate::api::TextChange; use crate::buffer::Controller as BufferController; diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 4615be5..40898ca 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -1,6 +1,8 @@ use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use crate::workspace::Workspace; +use crate::api::controller::AsyncReceiver; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use super::a_sync_allow_threads; @@ -26,12 +28,6 @@ impl Workspace { self.detach(path.as_str()) } - #[pyo3(name = "event")] - fn pyevent(&self, py: Python) -> PyResult { - let this = self.clone(); - a_sync_allow_threads!(py, this.event().await) - } - #[pyo3(name = "fetch_buffers")] fn pyfetch_buffers(&self, py: Python) -> PyResult { let this = self.clone(); @@ -87,4 +83,42 @@ impl Workspace { fn pyuser_list(&self) -> Vec { self.user_list() } + + #[pyo3(name = "recv")] + fn pyrecv(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.recv().await) + } + + #[pyo3(name = "try_recv")] + fn pytry_recv(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.try_recv().await) + } + + #[pyo3(name = "poll")] + fn pypoll(&self, py: Python) -> PyResult { + let this = self.clone(); + a_sync_allow_threads!(py, this.poll().await) + } + + #[pyo3(name = "clear_callback")] + fn pyclear_callbacl(&self, _py: Python) { + self.clear_callback(); + } + + #[pyo3(name = "callback")] + fn pycallback(&self, py: Python, cb: PyObject) -> PyResult<()> { + if !cb.bind_borrowed(py).is_callable() { + return Err(PyValueError::new_err("The object passed must be callable.")); + } + + self.callback(move |ws| { + Python::with_gil(|py| { + // TODO what to do with this error? + let _ = cb.call1(py, (ws,)); + }) + }); + Ok(()) + } } diff --git a/src/prelude.rs b/src/prelude.rs index a2fd045..2e0a640 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,6 +3,8 @@ pub use crate::api::{ Controller as CodempController, + controller::AsyncSender as CodempAsyncSender, + controller::AsyncReceiver as CodempAsyncReceiver, TextChange as CodempTextChange, Cursor as CodempCursor, User as CodempUser,