feat: implemented Workspace receiver in glues

This commit is contained in:
əlemi 2024-10-03 03:07:14 +02:00
parent ca04601bea
commit b897b26bb9
Signed by: alemi
GPG key ID: A4895B84D311642C
15 changed files with 257 additions and 46 deletions

View file

@ -2,8 +2,8 @@ package mp.code;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import mp.code.data.DetachResult;
import mp.code.exceptions.ConnectionException; import mp.code.exceptions.ConnectionException;
import mp.code.exceptions.ConnectionRemoteException; import mp.code.exceptions.ConnectionRemoteException;
import mp.code.exceptions.ControllerException; import mp.code.exceptions.ControllerException;
@ -168,16 +168,58 @@ public final class Workspace {
delete_buffer(this.ptr, path); delete_buffer(this.ptr, path);
} }
private static native Event event(long self) throws ControllerException; private static native Event try_recv(long self) throws ControllerException;
/** /**
* Blocks until a workspace event occurs. * Tries to get a {@link Event} from the queue if any were present, and returns
* You shouldn't call this, unless it's on a dedicated thread. * an empty optional otherwise.
* @return the {@link Event} that has occurred * @return the first workspace event in queue, if any are present
* @throws ControllerException if the event arrived while the underlying controller was already closed * @throws ControllerException if the controller was stopped
*/ */
public Event event() throws ControllerException { public Optional<Event> tryRecv() throws ControllerException {
return event(this.ptr); return Optional.ofNullable(try_recv(this.ptr));
}
private static native Event recv(long self) throws ControllerException;
/**
* Blocks until a {@link Event} is available and returns it.
* @return the workspace event that occurred
* @throws ControllerException if the controller was stopped
*/
public Event recv() throws ControllerException {
return recv(this.ptr);
}
private static native void callback(long self, Consumer<Workspace> cb);
/**
* Registers a callback to be invoked whenever a new {@link Event} is ready to be received.
* This will not work unless a Java thread has been dedicated to the event loop.
* @see Extensions#drive(boolean)
*/
public void callback(Consumer<Workspace> cb) {
callback(this.ptr, cb);
}
private static native void clear_callback(long self);
/**
* Clears the registered callback.
* @see #callback(Consumer)
*/
public void clearCallback() {
clear_callback(this.ptr);
}
private static native void poll(long self) throws ControllerException;
/**
* Blocks until a {@link Event} is available.
* @throws ControllerException if the controller was stopped
*/
public void poll() throws ControllerException {
poll(this.ptr);
} }
private static native void free(long self); private static native void free(long self);

View file

@ -1,15 +0,0 @@
package mp.code.data;
import mp.code.Workspace;
/**
* The result of a {@link Workspace#detachFromBuffer(String)} operation.
*/
public enum DetachResult {
/** The user was not attached to this buffer. */
NOT_ATTACHED,
/** The user detached from the buffer and stopped it. */
DETACHING,
/** The user was attached, but the buffer was already stopped. */
ALREADY_DETACHED
}

View file

@ -75,6 +75,16 @@ function WorkspaceEventPromise:await() end
function WorkspaceEventPromise:and_then(cb) end function WorkspaceEventPromise:and_then(cb) end
---@class (exact) MaybeWorkspaceEventPromise : Promise
local MaybeWorkspaceEventPromise = {}
--- block until promise is ready and return value
--- @return WorkspaceEvent | nil
function MaybeWorkspaceEventPromise:await() end
---@param cb fun(x: WorkspaceEvent | nil) callback to invoke
---invoke callback asynchronously as soon as promise is ready
function MaybeWorkspaceEventPromise:and_then(cb) end
---@class (exact) BufferControllerPromise : Promise ---@class (exact) BufferControllerPromise : Promise
local BufferControllerPromise = {} local BufferControllerPromise = {}
--- block until promise is ready and return value --- block until promise is ready and return value
@ -252,11 +262,30 @@ function Workspace:fetch_users(path) end
---@field type string ---@field type string
---@field value string ---@field value string
---@return MaybeWorkspaceEventPromise
---@async
---@nodiscard
---try to receive workspace events, returning nil if none is available
function Workspace:try_recv() end
---@return WorkspaceEventPromise ---@return WorkspaceEventPromise
---@async ---@async
---@nodiscard ---@nodiscard
---get next workspace event ---block until next workspace event and return it
function Workspace:event() end function Workspace:recv() end
---@return NilPromise
---@async
---@nodiscard
---block until next workspace event without returning it
function Workspace:poll() end
---clears any previously registered workspace callback
function Workspace:clear_callback() end
---@param cb fun(w: Workspace) callback to invoke on each workspace event received
---register a new callback to be called on workspace events (replaces any previously registered one)
function Workspace:callback(cb) end

View file

@ -52,6 +52,9 @@ class Client:
def user_name(self) -> str: ... def user_name(self) -> str: ...
def refresh(self) -> Promise[None]: ... def refresh(self) -> Promise[None]: ...
class Event:
pass
class Workspace: class Workspace:
""" """
Handle to a workspace inside codemp. It manages buffers. Handle to a workspace inside codemp. It manages buffers.
@ -69,6 +72,11 @@ class Workspace:
def buffer_by_name(self, path: str) -> Optional[BufferController]: ... def buffer_by_name(self, path: str) -> Optional[BufferController]: ...
def buffer_list(self) -> list[str]: ... def buffer_list(self) -> list[str]: ...
def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ... def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ...
def recv(self) -> Promise[Event]: ...
def try_recv(self) -> Promise[Optional[Event]]: ...
def poll(self) -> Promise[None]: ...
def clear_callback(self) -> None: ...
def callback(self, cb: Callable[[Workspace], None]) -> None: ...
class TextChange: class TextChange:
""" """

View file

@ -1,7 +1,7 @@
use jni::{objects::JObject, JNIEnv}; use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni; use jni_toolbox::jni;
use crate::{api::{Controller, TextChange}, errors::ControllerError}; use crate::{api::{controller::{AsyncReceiver, AsyncSender}, TextChange}, errors::ControllerError};
use super::null_check; use super::null_check;

View file

@ -1,6 +1,6 @@
use jni::{objects::JObject, JNIEnv}; use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni; use jni_toolbox::jni;
use crate::{api::{Controller, Cursor}, errors::ControllerError}; use crate::{api::{controller::{AsyncSender, AsyncReceiver}, Cursor}, errors::ControllerError};
use super::null_check; use super::null_check;

View file

@ -1,5 +1,6 @@
use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni; use jni_toolbox::jni;
use crate::{errors::{ConnectionError, ControllerError, RemoteError}, Workspace}; use crate::{api::controller::AsyncReceiver, errors::{ConnectionError, ControllerError, RemoteError}, ffi::java::null_check, Workspace};
/// Get the workspace id. /// Get the workspace id.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
@ -79,10 +80,61 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr
super::tokio().block_on(workspace.delete(&path)) super::tokio().block_on(workspace.delete(&path))
} }
/// Block and receive a workspace event
#[jni(package = "mp.code", class = "Workspace")]
fn recv(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> {
super::tokio().block_on(workspace.recv())
}
/// Receive a workspace event if present. /// Receive a workspace event if present.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
fn event(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> { fn try_recv(workspace: &mut Workspace) -> Result<Option<crate::api::Event>, ControllerError> {
super::tokio().block_on(workspace.event()) super::tokio().block_on(workspace.try_recv())
}
/// Block until a workspace event is available
#[jni(package = "mp.code", class = "Workspace")]
fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> {
super::tokio().block_on(workspace.poll())
}
/// Clear previously registered callback
#[jni(package = "mp.code", class = "Workspace")]
fn clear_callback(workspace: &mut Workspace) {
workspace.clear_callback();
}
/// Register a callback for workspace events.
#[jni(package = "mp.code", class = "Workspace")]
fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::Workspace, cb: JObject<'local>) {
null_check!(env, cb, {});
let Ok(cb_ref) = env.new_global_ref(cb) else {
env.throw_new("mp/code/exceptions/JNIException", "Failed to pin callback reference!")
.expect("Failed to throw exception!");
return;
};
controller.callback(move |workspace: crate::Workspace| {
let jvm = super::jvm();
let mut env = jvm.attach_current_thread_permanently()
.expect("failed attaching to main JVM thread");
if let Err(e) = env.with_local_frame(5, |env| {
use jni_toolbox::IntoJavaObject;
let jworkspace = workspace.into_java_object(env)?;
if let Err(e) = env.call_method(
&cb_ref,
"accept",
"(Ljava/lang/Object;)V",
&[jni::objects::JValueGen::Object(&jworkspace)]
) {
tracing::error!("error invoking callback: {e:?}");
};
Ok::<(), jni::errors::Error>(())
}) {
tracing::error!("error invoking callback: {e}");
let _ = env.exception_describe();
}
});
} }
/// Called by the Java GC to drop a [Workspace]. /// Called by the Java GC to drop a [Workspace].

View file

@ -1,7 +1,7 @@
use napi::threadsafe_function::{ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi::threadsafe_function::{ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi; use napi_derive::napi;
use crate::api::TextChange; use crate::api::TextChange;
use crate::api::Controller; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::buffer::controller::BufferController; use crate::buffer::controller::BufferController;

View file

@ -1,7 +1,7 @@
use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi_derive::napi; use napi_derive::napi;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadSafeCallContext, ThreadsafeFunctionCallMode}; use napi::threadsafe_function::{ThreadsafeFunction, ThreadSafeCallContext, ThreadsafeFunctionCallMode};
use crate::api::Controller; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::cursor::controller::CursorController; use crate::cursor::controller::CursorController;

View file

@ -1,7 +1,10 @@
use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi; use napi_derive::napi;
use crate::Workspace; use crate::Workspace;
use crate::buffer::controller::BufferController; use crate::buffer::controller::BufferController;
use crate::cursor::controller::CursorController; use crate::cursor::controller::CursorController;
use crate::api::controller::AsyncReceiver;
#[napi(object, js_name = "Event")] #[napi(object, js_name = "Event")]
pub struct JsEvent { pub struct JsEvent {
@ -61,8 +64,43 @@ impl Workspace {
Ok(self.delete(&path).await?) Ok(self.delete(&path).await?)
} }
#[napi(js_name = "event")] #[napi(js_name = "recv")]
pub async fn js_event(&self) -> napi::Result<JsEvent> { pub async fn js_recv(&self) -> napi::Result<JsEvent> {
Ok(JsEvent::from(self.event().await?)) Ok(JsEvent::from(self.recv().await?))
}
#[napi(js_name = "try_recv")]
pub async fn js_try_recv(&self) -> napi::Result<Option<JsEvent>> {
Ok(self.try_recv().await?.map(JsEvent::from))
}
#[napi(js_name = "poll")]
pub async fn js_poll(&self) -> napi::Result<()> {
self.poll().await?;
Ok(())
}
#[napi(js_name = "clear_callback")]
pub fn js_clear_callbacl(&self) -> napi::Result<()> {
self.clear_callback();
Ok(())
}
#[napi(js_name = "callback", ts_args_type = "fun: (event: Workspace) => void")]
pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()>{
let tsfn : ThreadsafeFunction<crate::Workspace, Fatal> =
fun.create_threadsafe_function(0,
|ctx : ThreadSafeCallContext<crate::Workspace>| {
Ok(vec![ctx.value])
}
)?;
self.callback(move |controller : Workspace| {
tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking); //check this with tracing also we could use Ok(event) to get the error
// If it blocks the main thread too many time we have to change this
});
Ok(())
} }
} }

View file

@ -67,6 +67,7 @@ pub(crate) enum CallbackArg {
BufferController(CodempBufferController), BufferController(CodempBufferController),
Workspace(CodempWorkspace), Workspace(CodempWorkspace),
Event(CodempEvent), Event(CodempEvent),
MaybeEvent(Option<CodempEvent>),
Cursor(CodempCursor), Cursor(CodempCursor),
MaybeCursor(Option<CodempCursor>), MaybeCursor(Option<CodempCursor>),
TextChange(CodempTextChange), TextChange(CodempTextChange),
@ -87,6 +88,7 @@ impl IntoLua for CallbackArg {
CallbackArg::Workspace(x) => x.into_lua(lua), CallbackArg::Workspace(x) => x.into_lua(lua),
CallbackArg::VecStr(x) => x.into_lua(lua), CallbackArg::VecStr(x) => x.into_lua(lua),
CallbackArg::Event(x) => x.into_lua(lua), CallbackArg::Event(x) => x.into_lua(lua),
CallbackArg::MaybeEvent(x) => x.into_lua(lua),
CallbackArg::Cursor(x) => x.into_lua(lua), CallbackArg::Cursor(x) => x.into_lua(lua),
CallbackArg::MaybeCursor(x) => x.into_lua(lua), CallbackArg::MaybeCursor(x) => x.into_lua(lua),
CallbackArg::TextChange(x) => x.into_lua(lua), CallbackArg::TextChange(x) => x.into_lua(lua),
@ -107,3 +109,4 @@ impl From<CodempCursor> for CallbackArg { fn from(value: CodempCursor) -> Self {
impl From<Option<CodempCursor>> for CallbackArg { fn from(value: Option<CodempCursor>) -> Self { CallbackArg::MaybeCursor(value) } } impl From<Option<CodempCursor>> for CallbackArg { fn from(value: Option<CodempCursor>) -> Self { CallbackArg::MaybeCursor(value) } }
impl From<CodempTextChange> for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } } impl From<CodempTextChange> for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } }
impl From<Option<CodempTextChange>> for CallbackArg { fn from(value: Option<CodempTextChange>) -> Self { CallbackArg::MaybeTextChange(value) } } impl From<Option<CodempTextChange>> for CallbackArg { fn from(value: Option<CodempTextChange>) -> Self { CallbackArg::MaybeTextChange(value) } }
impl From<Option<CodempEvent>> for CallbackArg { fn from(value: Option<CodempEvent>) -> Self { CallbackArg::MaybeEvent(value) } }

View file

@ -26,10 +26,6 @@ impl LuaUserData for CodempWorkspace {
methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name)));
methods.add_method("event", |_, this, ()|
a_sync! { this => this.event().await? }
);
methods.add_method("fetch_buffers", |_, this, ()| methods.add_method("fetch_buffers", |_, this, ()|
a_sync! { this => this.fetch_buffers().await? } a_sync! { this => this.fetch_buffers().await? }
); );
@ -44,6 +40,28 @@ impl LuaUserData for CodempWorkspace {
methods.add_method("user_list", |_, this, ()| methods.add_method("user_list", |_, this, ()|
Ok(this.user_list()) Ok(this.user_list())
); );
methods.add_method("recv", |_, this, ()|
a_sync! { this => this.recv().await? }
);
methods.add_method("try_recv", |_, this, ()|
a_sync! { this => this.try_recv().await? }
);
methods.add_method("poll", |_, this, ()|
a_sync! { this => this.poll().await? }
);
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempWorkspace| super::ext::callback().invoke(cb.clone(), controller));
Ok(())
});
methods.add_method("clear_callbacl", |_, this, ()| {
this.clear_callback();
Ok(())
});
} }
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) { fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {

View file

@ -1,4 +1,4 @@
use crate::api::Controller; use crate::api::controller::{AsyncSender, AsyncReceiver};
use crate::api::Cursor; use crate::api::Cursor;
use crate::api::TextChange; use crate::api::TextChange;
use crate::buffer::Controller as BufferController; use crate::buffer::Controller as BufferController;

View file

@ -1,6 +1,8 @@
use crate::buffer::Controller as BufferController; use crate::buffer::Controller as BufferController;
use crate::cursor::Controller as CursorController; use crate::cursor::Controller as CursorController;
use crate::workspace::Workspace; use crate::workspace::Workspace;
use crate::api::controller::AsyncReceiver;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*; use pyo3::prelude::*;
use super::a_sync_allow_threads; use super::a_sync_allow_threads;
@ -26,12 +28,6 @@ impl Workspace {
self.detach(path.as_str()) self.detach(path.as_str())
} }
#[pyo3(name = "event")]
fn pyevent(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.event().await)
}
#[pyo3(name = "fetch_buffers")] #[pyo3(name = "fetch_buffers")]
fn pyfetch_buffers(&self, py: Python) -> PyResult<Promise> { fn pyfetch_buffers(&self, py: Python) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
@ -87,4 +83,42 @@ impl Workspace {
fn pyuser_list(&self) -> Vec<String> { fn pyuser_list(&self) -> Vec<String> {
self.user_list() self.user_list()
} }
#[pyo3(name = "recv")]
fn pyrecv(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.recv().await)
}
#[pyo3(name = "try_recv")]
fn pytry_recv(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.try_recv().await)
}
#[pyo3(name = "poll")]
fn pypoll(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.poll().await)
}
#[pyo3(name = "clear_callback")]
fn pyclear_callbacl(&self, _py: Python) {
self.clear_callback();
}
#[pyo3(name = "callback")]
fn pycallback(&self, py: Python, cb: PyObject) -> PyResult<()> {
if !cb.bind_borrowed(py).is_callable() {
return Err(PyValueError::new_err("The object passed must be callable."));
}
self.callback(move |ws| {
Python::with_gil(|py| {
// TODO what to do with this error?
let _ = cb.call1(py, (ws,));
})
});
Ok(())
}
} }

View file

@ -3,6 +3,8 @@
pub use crate::api::{ pub use crate::api::{
Controller as CodempController, Controller as CodempController,
controller::AsyncSender as CodempAsyncSender,
controller::AsyncReceiver as CodempAsyncReceiver,
TextChange as CodempTextChange, TextChange as CodempTextChange,
Cursor as CodempCursor, Cursor as CodempCursor,
User as CodempUser, User as CodempUser,