Merge branch 'dev' into feat/lua-promise-abort

This commit is contained in:
əlemi 2024-10-01 17:48:09 +02:00 committed by GitHub
commit fe835c84ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 718 additions and 406 deletions

View file

@ -25,7 +25,7 @@ use crate::errors::ControllerResult;
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
#[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
pub trait Controller<T: Sized + Send + Sync>: Sized + Send + Sync {
/// Enqueue a new value to be sent to all other users.
async fn send(&self, x: T) -> ControllerResult<()>;
@ -54,7 +54,6 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
async fn try_recv(&self) -> ControllerResult<Option<T>>;
}
/// Type wrapper for Boxed dynamic callback.
pub struct ControllerCallback<T>(pub Box<dyn Sync + Send + Fn(T)>);

View file

@ -19,9 +19,9 @@ pub mod event;
/// data structure for remote users
pub mod user;
pub use controller::Controller;
pub use change::TextChange;
pub use config::Config;
pub use controller::Controller;
pub use cursor::Cursor;
pub use event::Event;
pub use user::User;

View file

@ -30,7 +30,12 @@ struct BufferWorker {
}
impl BufferController {
pub(crate) fn spawn(user_id: Uuid, path: &str, tx: mpsc::Sender<Operation>, rx: Streaming<BufferEvent>) -> Self {
pub(crate) fn spawn(
user_id: Uuid,
path: &str,
tx: mpsc::Sender<Operation>,
rx: Streaming<BufferEvent>,
) -> Self {
let init = diamond_types::LocalVersion::default();
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
@ -68,21 +73,25 @@ impl BufferController {
callback: cb_rx,
};
tokio::spawn(async move {
BufferController::work(worker, tx, rx).await
});
tokio::spawn(async move { BufferController::work(worker, tx, rx).await });
BufferController(controller)
}
async fn work(mut worker: BufferWorker, tx: mpsc::Sender<Operation>, mut rx: Streaming<BufferEvent>) {
async fn work(
mut worker: BufferWorker,
tx: mpsc::Sender<Operation>,
mut rx: Streaming<BufferEvent>,
) {
let mut branch = diamond_types::list::Branch::new();
let mut oplog = diamond_types::list::OpLog::new();
let mut timer = Timer::new(10); // TODO configurable!!
tracing::debug!("controller worker started");
loop {
if worker.controller.upgrade().is_none() { break };
if worker.controller.upgrade().is_none() {
break;
};
// block until one of these is ready
tokio::select! {

View file

@ -4,8 +4,11 @@ use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming;
use uuid::Uuid;
use crate::{api::{controller::ControllerCallback, Cursor, User}, ext::IgnorableError};
use codemp_proto::cursor::{CursorPosition, CursorEvent};
use crate::{
api::{controller::ControllerCallback, Cursor, User},
ext::IgnorableError,
};
use codemp_proto::cursor::{CursorEvent, CursorPosition};
use super::controller::{CursorController, CursorControllerInner};
@ -21,7 +24,11 @@ struct CursorWorker {
}
impl CursorController {
pub(crate) fn spawn(user_map: Arc<dashmap::DashMap<Uuid, User>>, tx: mpsc::Sender<CursorPosition>, rx: Streaming<CursorEvent>) -> Self {
pub(crate) fn spawn(
user_map: Arc<dashmap::DashMap<Uuid, User>>,
tx: mpsc::Sender<CursorPosition>,
rx: Streaming<CursorEvent>,
) -> Self {
// TODO we should tweak the channel buffer size to better propagate backpressure
let (op_tx, op_rx) = mpsc::channel(64);
let (stream_tx, stream_rx) = mpsc::channel(1);
@ -52,11 +59,17 @@ impl CursorController {
CursorController(controller)
}
async fn work(mut worker: CursorWorker, tx: mpsc::Sender<CursorPosition>, mut rx: Streaming<CursorEvent>) {
async fn work(
mut worker: CursorWorker,
tx: mpsc::Sender<CursorPosition>,
mut rx: Streaming<CursorEvent>,
) {
loop {
tracing::debug!("cursor worker polling");
if worker.controller.upgrade().is_none() { break }; // clean exit: all controllers dropped
tokio::select!{
if worker.controller.upgrade().is_none() {
break;
}; // clean exit: all controllers dropped
tokio::select! {
biased;
// new poller

View file

@ -61,4 +61,3 @@ impl From<tokio::sync::oneshot::error::RecvError> for ControllerError {
/// Wraps [std::result::Result] with a [ControllerError].
pub type ControllerResult<T> = std::result::Result<T, ControllerError>;

View file

@ -16,7 +16,7 @@ use tokio::sync::mpsc;
pub async fn select_buffer(
buffers: &[crate::buffer::Controller],
timeout: Option<std::time::Duration>,
runtime: &tokio::runtime::Runtime
runtime: &tokio::runtime::Runtime,
) -> ControllerResult<Option<crate::buffer::Controller>> {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tasks = Vec::new();
@ -46,7 +46,7 @@ pub async fn select_buffer(
t.abort();
}
return Ok(x);
},
}
}
}
}
@ -104,11 +104,13 @@ pub trait IgnorableError {
}
impl<T, E> IgnorableError for std::result::Result<T, E>
where E : std::fmt::Debug {
where
E: std::fmt::Debug,
{
/// Logs the error as a warning and returns a unit.
fn unwrap_or_warn(self, msg: &str) {
match self {
Ok(_) => {},
Ok(_) => {}
Err(e) => tracing::warn!("{}: {:?}", msg, e),
}
}

View file

@ -1,7 +1,10 @@
use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni;
use crate::{api::{Controller, TextChange}, errors::ControllerError};
use crate::{
api::{Controller, TextChange},
errors::ControllerError,
};
use super::null_check;
@ -19,7 +22,9 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result<String, Con
/// Try to fetch a [TextChange], or return null if there's nothing.
#[jni(package = "mp.code", class = "BufferController")]
fn try_recv(controller: &mut crate::buffer::Controller) -> Result<Option<TextChange>, ControllerError> {
fn try_recv(
controller: &mut crate::buffer::Controller,
) -> Result<Option<TextChange>, ControllerError> {
super::tokio().block_on(controller.try_recv())
}
@ -31,23 +36,34 @@ fn recv(controller: &mut crate::buffer::Controller) -> Result<TextChange, Contro
/// Send a [TextChange] to the server.
#[jni(package = "mp.code", class = "BufferController")]
fn send(controller: &mut crate::buffer::Controller, change: TextChange) -> Result<(), ControllerError> {
fn send(
controller: &mut crate::buffer::Controller,
change: TextChange,
) -> Result<(), ControllerError> {
super::tokio().block_on(controller.send(change))
}
/// Register a callback for buffer changes.
#[jni(package = "mp.code", class = "BufferController")]
fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::buffer::Controller, cb: JObject<'local>) {
fn callback<'local>(
env: &mut JNIEnv<'local>,
controller: &mut crate::buffer::Controller,
cb: JObject<'local>,
) {
null_check!(env, cb, {});
let Ok(cb_ref) = env.new_global_ref(cb) else {
env.throw_new("mp/code/exceptions/JNIException", "Failed to pin callback reference!")
env.throw_new(
"mp/code/exceptions/JNIException",
"Failed to pin callback reference!",
)
.expect("Failed to throw exception!");
return;
};
controller.callback(move |controller: crate::buffer::Controller| {
let jvm = super::jvm();
let mut env = jvm.attach_current_thread_permanently()
let mut env = jvm
.attach_current_thread_permanently()
.expect("failed attaching to main JVM thread");
if let Err(e) = env.with_local_frame(5, |env| {
use jni_toolbox::IntoJavaObject;
@ -56,7 +72,7 @@ fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::buffer::Co
&cb_ref,
"accept",
"(Ljava/lang/Object;)V",
&[jni::objects::JValueGen::Object(&jcontroller)]
&[jni::objects::JValueGen::Object(&jcontroller)],
) {
tracing::error!("error invoking callback: {e:?}");
};

View file

@ -1,5 +1,10 @@
use crate::{
api::Config,
client::Client,
errors::{ConnectionError, RemoteError},
Workspace,
};
use jni_toolbox::jni;
use crate::{api::Config, client::Client, errors::{ConnectionError, RemoteError}, Workspace};
/// Connect using the given credentials to the default server, and return a [Client] to interact with it.
#[jni(package = "mp.code", class = "Client", ptr)]
@ -33,13 +38,21 @@ fn delete_workspace(client: &mut Client, workspace: String) -> Result<(), Remote
/// Invite another user to an owned workspace.
#[jni(package = "mp.code", class = "Client")]
fn invite_to_workspace(client: &mut Client, workspace: String, user: String) -> Result<(), RemoteError> {
fn invite_to_workspace(
client: &mut Client,
workspace: String,
user: String,
) -> Result<(), RemoteError> {
super::tokio().block_on(client.invite_to_workspace(workspace, user))
}
/// List available workspaces.
#[jni(package = "mp.code", class = "Client")]
fn list_workspaces(client: &mut Client, owned: bool, invited: bool) -> Result<Vec<String>, RemoteError> {
fn list_workspaces(
client: &mut Client,
owned: bool,
invited: bool,
) -> Result<Vec<String>, RemoteError> {
super::tokio().block_on(client.list_workspaces(owned, invited))
}

View file

@ -1,6 +1,9 @@
use crate::{
api::{Controller, Cursor},
errors::ControllerError,
};
use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni;
use crate::{api::{Controller, Cursor}, errors::ControllerError};
use super::null_check;
@ -24,17 +27,25 @@ fn send(controller: &mut crate::cursor::Controller, cursor: Cursor) -> Result<()
/// Register a callback for cursor changes.
#[jni(package = "mp.code", class = "CursorController")]
fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::cursor::Controller, cb: JObject<'local>) {
fn callback<'local>(
env: &mut JNIEnv<'local>,
controller: &mut crate::cursor::Controller,
cb: JObject<'local>,
) {
null_check!(env, cb, {});
let Ok(cb_ref) = env.new_global_ref(cb) else {
env.throw_new("mp/code/exceptions/JNIException", "Failed to pin callback reference!")
env.throw_new(
"mp/code/exceptions/JNIException",
"Failed to pin callback reference!",
)
.expect("Failed to throw exception!");
return;
};
controller.callback(move |controller: crate::cursor::Controller| {
let jvm = super::jvm();
let mut env = jvm.attach_current_thread_permanently()
let mut env = jvm
.attach_current_thread_permanently()
.expect("failed attaching to main JVM thread");
if let Err(e) = env.with_local_frame(5, |env| {
use jni_toolbox::IntoJavaObject;
@ -43,7 +54,7 @@ fn callback<'local>(env: &mut JNIEnv<'local>, controller: &mut crate::cursor::Co
&cb_ref,
"accept",
"(Ljava/lang/Object;)V",
&[jni::objects::JValueGen::Object(&jcontroller)]
&[jni::objects::JValueGen::Object(&jcontroller)],
) {
tracing::error!("error invoking callback: {e:?}");
};

View file

@ -1,19 +1,19 @@
pub mod client;
pub mod workspace;
pub mod cursor;
pub mod buffer;
pub mod client;
pub mod cursor;
pub mod ext;
pub mod workspace;
/// Gets or creates the relevant [tokio::runtime::Runtime].
fn tokio() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(||
RT.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("could not create tokio runtime")
)
})
}
/// A static reference to [jni::JavaVM] that is set on JNI load.
@ -27,10 +27,7 @@ pub(crate) fn jvm() -> std::sync::Arc<jni::JavaVM> {
/// Called upon initialisation of the JVM.
#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn JNI_OnLoad(
vm: jni::JavaVM,
_: *mut std::ffi::c_void
) -> jni::sys::jint {
pub extern "system" fn JNI_OnLoad(vm: jni::JavaVM, _: *mut std::ffi::c_void) -> jni::sys::jint {
unsafe { JVM = Some(std::sync::Arc::new(vm)) };
jni::sys::JNI_VERSION_1_1
}
@ -48,7 +45,11 @@ pub(crate) fn setup_logger(debug: bool, path: Option<String>) {
.with_source_location(false)
.compact();
let level = if debug { tracing::Level::DEBUG } else {tracing::Level::INFO };
let level = if debug {
tracing::Level::DEBUG
} else {
tracing::Level::INFO
};
let builder = tracing_subscriber::fmt()
.event_format(format)
@ -58,13 +59,16 @@ pub(crate) fn setup_logger(debug: bool, path: Option<String>) {
let logfile = std::fs::File::create(path).expect("failed creating logfile");
builder.with_writer(std::sync::Mutex::new(logfile)).init();
} else {
builder.with_writer(std::sync::Mutex::new(std::io::stdout())).init();
builder
.with_writer(std::sync::Mutex::new(std::io::stdout()))
.init();
}
}
/// Performs a null check on the given variable and throws a NullPointerException on the Java side
/// if it is null. Finally, it returns with the given default value.
macro_rules! null_check { // TODO replace
macro_rules! null_check {
// TODO replace
($env: ident, $var: ident, $return: expr) => {
if $var.is_null() {
let mut message = stringify!($var).to_string();
@ -80,9 +84,14 @@ pub(crate) use null_check;
impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError {
fn jclass(&self) -> String {
match self {
crate::errors::ConnectionError::Transport(_) => "mp/code/exceptions/ConnectionTransportException",
crate::errors::ConnectionError::Remote(_) => "mp/code/exceptions/ConnectionRemoteException"
}.to_string()
crate::errors::ConnectionError::Transport(_) => {
"mp/code/exceptions/ConnectionTransportException"
}
crate::errors::ConnectionError::Remote(_) => {
"mp/code/exceptions/ConnectionRemoteException"
}
}
.to_string()
}
}
@ -95,9 +104,14 @@ impl jni_toolbox::JniToolboxError for crate::errors::RemoteError {
impl jni_toolbox::JniToolboxError for crate::errors::ControllerError {
fn jclass(&self) -> String {
match self {
crate::errors::ControllerError::Stopped => "mp/code/exceptions/ControllerStoppedException",
crate::errors::ControllerError::Unfulfilled => "mp/code/exceptions/ControllerUnfulfilledException",
}.to_string()
crate::errors::ControllerError::Stopped => {
"mp/code/exceptions/ControllerStoppedException"
}
crate::errors::ControllerError::Unfulfilled => {
"mp/code/exceptions/ControllerUnfulfilledException"
}
}
.to_string()
}
}
@ -106,12 +120,17 @@ macro_rules! into_java_ptr_class {
($type: ty, $jclass: literal) => {
impl<'j> jni_toolbox::IntoJavaObject<'j> for $type {
const CLASS: &'static str = $jclass;
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class(Self::CLASS)?;
env.new_object(
class,
"(J)V",
&[jni::objects::JValueGen::Long(Box::into_raw(Box::new(self)) as jni::sys::jlong)]
&[jni::objects::JValueGen::Long(
Box::into_raw(Box::new(self)) as jni::sys::jlong
)],
)
}
}
@ -125,7 +144,10 @@ into_java_ptr_class!(crate::buffer::Controller, "mp/code/BufferController");
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::User {
const CLASS: &'static str = "mp/code/data/User";
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let id_field = self.id.into_java_object(env)?;
let name_field = env.new_string(self.name)?;
let class = env.find_class(Self::CLASS)?;
@ -134,15 +156,18 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::User {
"(Ljava/util/UUID;Ljava/lang/String;)V",
&[
jni::objects::JValueGen::Object(&id_field),
jni::objects::JValueGen::Object(&name_field)
]
jni::objects::JValueGen::Object(&name_field),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
const CLASS: &'static str = "mp/code/Workspace$Event";
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let (ordinal, arg) = match self {
crate::api::Event::UserJoin(arg) => (0, env.new_string(arg)?),
crate::api::Event::UserLeave(arg) => (1, env.new_string(arg)?),
@ -150,12 +175,10 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
};
let type_class = env.find_class("mp/code/Workspace$Event$Type")?;
let variants: jni::objects::JObjectArray = env.call_method(
type_class,
"getEnumConstants",
"()[Ljava/lang/Object;",
&[]
)?.l()?.into();
let variants: jni::objects::JObjectArray = env
.call_method(type_class, "getEnumConstants", "()[Ljava/lang/Object;", &[])?
.l()?
.into();
let event_type = env.get_object_array_element(variants, ordinal)?;
let event_class = env.find_class(Self::CLASS)?;
@ -164,23 +187,32 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
"(Lmp/code/Workspace$Event$Type;Ljava/lang/String;)V",
&[
jni::objects::JValueGen::Object(&event_type),
jni::objects::JValueGen::Object(&arg)
]
jni::objects::JValueGen::Object(&arg),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
const CLASS: &'static str = "mp/code/data/TextChange";
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let content = env.new_string(self.content)?;
let hash_class = env.find_class("java/util/OptionalLong")?;
let hash = if let Some(h) = self.hash {
env.call_static_method(hash_class, "of", "(J)Ljava/util/OptionalLong;", &[jni::objects::JValueGen::Long(h)])
env.call_static_method(
hash_class,
"of",
"(J)Ljava/util/OptionalLong;",
&[jni::objects::JValueGen::Long(h)],
)
} else {
env.call_static_method(hash_class, "empty", "()Ljava/util/OptionalLong;", &[])
}?.l()?;
}?
.l()?;
let class = env.find_class(Self::CLASS)?;
env.new_object(
@ -190,15 +222,18 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
jni::objects::JValueGen::Long(self.start.into()),
jni::objects::JValueGen::Long(self.end.into()),
jni::objects::JValueGen::Object(&content),
jni::objects::JValueGen::Object(&hash)
]
jni::objects::JValueGen::Object(&hash),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor {
const CLASS: &'static str = "mp/code/data/Cursor";
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class("mp/code/data/Cursor")?;
let buffer = env.new_string(&self.buffer)?;
let user = if let Some(user) = self.user {
@ -216,8 +251,8 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor {
jni::objects::JValueGen::Int(self.end.0),
jni::objects::JValueGen::Int(self.end.1),
jni::objects::JValueGen::Object(&buffer),
jni::objects::JValueGen::Object(&user)
]
jni::objects::JValueGen::Object(&user),
],
)
}
}
@ -226,7 +261,10 @@ macro_rules! from_java_ptr {
($type: ty) => {
impl<'j> jni_toolbox::FromJava<'j> for &mut $type {
type From = jni::sys::jobject;
fn from_java(_env: &mut jni::JNIEnv<'j>, value: Self::From) -> Result<Self, jni::errors::Error> {
fn from_java(
_env: &mut jni::JNIEnv<'j>,
value: Self::From,
) -> Result<Self, jni::errors::Error> {
Ok(unsafe { Box::leak(Box::from_raw(value as *mut $type)) })
}
}
@ -240,9 +278,14 @@ from_java_ptr!(crate::buffer::Controller);
impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
type From = jni::objects::JObject<'j>;
fn from_java(env: &mut jni::JNIEnv<'j>, config: Self::From) -> Result<Self, jni::errors::Error> {
fn from_java(
env: &mut jni::JNIEnv<'j>,
config: Self::From,
) -> Result<Self, jni::errors::Error> {
let username = {
let jfield = env.get_field(&config, "username", "Ljava/lang/String;")?.l()?;
let jfield = env
.get_field(&config, "username", "Ljava/lang/String;")?
.l()?;
if jfield.is_null() {
return Err(jni::errors::Error::NullPtr("Username can never be null!"));
}
@ -250,7 +293,9 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
};
let password = {
let jfield = env.get_field(&config, "password", "Ljava/lang/String;")?.l()?;
let jfield = env
.get_field(&config, "password", "Ljava/lang/String;")?
.l()?;
if jfield.is_null() {
return Err(jni::errors::Error::NullPtr("Password can never be null!"));
}
@ -258,9 +303,13 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
};
let host = {
let jfield = env.get_field(&config, "host", "Ljava/util/Optional;")?.l()?;
let jfield = env
.get_field(&config, "host", "Ljava/util/Optional;")?
.l()?;
if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? {
let field = env.call_method(&jfield, "get", "()Ljava/lang/Object;", &[])?.l()?;
let field = env
.call_method(&jfield, "get", "()Ljava/lang/Object;", &[])?
.l()?;
Some(unsafe { env.get_string_unchecked(&field.into()) }?.into())
} else {
None
@ -268,7 +317,9 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
};
let port = {
let jfield = env.get_field(&config, "port", "Ljava/util/OptionalInt;")?.l()?;
let jfield = env
.get_field(&config, "port", "Ljava/util/OptionalInt;")?
.l()?;
if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? {
let ivalue = env.call_method(&jfield, "getAsInt", "()I", &[])?.i()?;
Some(ivalue.clamp(0, 65535) as u16)
@ -278,35 +329,55 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
};
let tls = {
let jfield = env.get_field(&config, "host", "Ljava/util/Optional;")?.l()?;
let jfield = env
.get_field(&config, "host", "Ljava/util/Optional;")?
.l()?;
if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? {
let field = env.call_method(&jfield, "get", "()Ljava/lang/Object;", &[])?.l()?;
let bool_true = env.get_static_field("java/lang/Boolean", "TRUE", "Ljava/lang/Boolean;")?.l()?;
Some(env.call_method(
let field = env
.call_method(&jfield, "get", "()Ljava/lang/Object;", &[])?
.l()?;
let bool_true = env
.get_static_field("java/lang/Boolean", "TRUE", "Ljava/lang/Boolean;")?
.l()?;
Some(
env.call_method(
field,
"equals",
"(Ljava/lang/Object;)Z",
&[jni::objects::JValueGen::Object(&bool_true)]
)?.z()?) // what a joke
&[jni::objects::JValueGen::Object(&bool_true)],
)?
.z()?,
) // what a joke
} else {
None
}
};
Ok(Self { username, password, host, port, tls })
Ok(Self {
username,
password,
host,
port,
tls,
})
}
}
impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor {
type From = jni::objects::JObject<'j>;
fn from_java(env: &mut jni::JNIEnv<'j>, cursor: Self::From) -> Result<Self, jni::errors::Error> {
fn from_java(
env: &mut jni::JNIEnv<'j>,
cursor: Self::From,
) -> Result<Self, jni::errors::Error> {
let start_row = env.get_field(&cursor, "startRow", "I")?.i()?;
let start_col = env.get_field(&cursor, "startCol", "I")?.i()?;
let end_row = env.get_field(&cursor, "endRow", "I")?.i()?;
let end_col = env.get_field(&cursor, "endCol", "I")?.i()?;
let buffer = {
let jfield = env.get_field(&cursor, "buffer", "Ljava/lang/String;")?.l()?;
let jfield = env
.get_field(&cursor, "buffer", "Ljava/lang/String;")?
.l()?;
if jfield.is_null() {
return Err(jni::errors::Error::NullPtr("Buffer can never be null!"));
}
@ -322,18 +393,34 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor {
}
};
Ok(Self { start: (start_row, start_col), end: (end_row, end_col), buffer, user })
Ok(Self {
start: (start_row, start_col),
end: (end_row, end_col),
buffer,
user,
})
}
}
impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange {
type From = jni::objects::JObject<'j>;
fn from_java(env: &mut jni::JNIEnv<'j>, change: Self::From) -> Result<Self, jni::errors::Error> {
let start = env.get_field(&change, "start", "J")?.j()?.clamp(0, u32::MAX.into()) as u32;
let end = env.get_field(&change, "end", "J")?.j()?.clamp(0, u32::MAX.into()) as u32;
fn from_java(
env: &mut jni::JNIEnv<'j>,
change: Self::From,
) -> Result<Self, jni::errors::Error> {
let start = env
.get_field(&change, "start", "J")?
.j()?
.clamp(0, u32::MAX.into()) as u32;
let end = env
.get_field(&change, "end", "J")?
.j()?
.clamp(0, u32::MAX.into()) as u32;
let content = {
let jfield = env.get_field(&change, "content", "Ljava/lang/String;")?.l()?;
let jfield = env
.get_field(&change, "content", "Ljava/lang/String;")?
.l()?;
if jfield.is_null() {
return Err(jni::errors::Error::NullPtr("Content can never be null!"));
}
@ -341,13 +428,20 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange {
};
let hash = {
let jfield = env.get_field(&change, "hash", "Ljava/util/OptionalLong;")?.l()?;
let jfield = env
.get_field(&change, "hash", "Ljava/util/OptionalLong;")?
.l()?;
if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? {
Some(env.call_method(&jfield, "getAsLong", "()J", &[])?.j()?)
} else {
None
}
};
Ok(Self { start, end, content, hash })
Ok(Self {
start,
end,
content,
hash,
})
}
}

View file

@ -1,5 +1,8 @@
use crate::{
errors::{ConnectionError, ControllerError, RemoteError},
Workspace,
};
use jni_toolbox::jni;
use crate::{errors::{ConnectionError, ControllerError, RemoteError}, Workspace};
/// Get the workspace id.
#[jni(package = "mp.code", class = "Workspace")]
@ -45,7 +48,10 @@ fn create_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr
/// Attach to a buffer and return a pointer to its [crate::buffer::Controller].
#[jni(package = "mp.code", class = "Workspace")]
fn attach_to_buffer(workspace: &mut Workspace, path: String) -> Result<crate::buffer::Controller, ConnectionError> {
fn attach_to_buffer(
workspace: &mut Workspace,
path: String,
) -> Result<crate::buffer::Controller, ConnectionError> {
super::tokio().block_on(workspace.attach(&path))
}
@ -69,7 +75,10 @@ fn fetch_users(workspace: &mut Workspace) -> Result<(), RemoteError> {
/// List users attached to a buffer.
#[jni(package = "mp.code", class = "Workspace")]
fn list_buffer_users(workspace: &mut Workspace, path: String) -> Result<Vec<crate::api::User>, RemoteError> {
fn list_buffer_users(
workspace: &mut Workspace,
path: String,
) -> Result<Vec<crate::api::User>, RemoteError> {
super::tokio().block_on(workspace.list_buffer_users(&path))
}

View file

@ -1,24 +1,28 @@
use napi::threadsafe_function::{ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi;
use crate::api::TextChange;
use crate::api::Controller;
use crate::api::TextChange;
use crate::buffer::controller::BufferController;
use napi::threadsafe_function::{
ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi_derive::napi;
#[napi]
impl BufferController {
/// Register a callback to be invoked every time a new event is available to consume
/// There can only be one callback registered at any given time.
#[napi(js_name = "callback", ts_args_type = "fun: (event: BufferController) => void")]
pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()>{
let tsfn : ThreadsafeFunction<crate::buffer::controller::BufferController, Fatal> =
fun.create_threadsafe_function(0,
|ctx : ThreadSafeCallContext<crate::buffer::controller::BufferController>| {
#[napi(
js_name = "callback",
ts_args_type = "fun: (event: BufferController) => void"
)]
pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()> {
let tsfn: ThreadsafeFunction<crate::buffer::controller::BufferController, Fatal> = fun
.create_threadsafe_function(
0,
|ctx: ThreadSafeCallContext<crate::buffer::controller::BufferController>| {
Ok(vec![ctx.value])
}
},
)?;
self.callback(move |controller : BufferController| {
self.callback(move |controller: BufferController| {
tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking);
//check this with tracing also we could use Ok(event) to get the error
// If it blocks the main thread too many time we have to change this
@ -41,7 +45,7 @@ impl BufferController {
/// Block until next buffer event without returning it
#[napi(js_name = "poll")]
pub async fn js_poll(&self) -> napi::Result<()>{
pub async fn js_poll(&self) -> napi::Result<()> {
Ok(self.poll().await?)
}

View file

@ -1,5 +1,5 @@
use napi_derive::napi;
use crate::{Client, Workspace};
use napi_derive::napi;
#[napi(object, js_name = "User")]
pub struct JsUser {
@ -28,7 +28,7 @@ impl From<crate::api::User> for JsUser {
#[napi]
/// connect to codemp servers and return a client session
pub async fn connect(config: crate::api::Config) -> napi::Result<crate::Client>{
pub async fn connect(config: crate::api::Config) -> napi::Result<crate::Client> {
Ok(crate::Client::connect(config).await?)
}
@ -48,13 +48,21 @@ impl Client {
#[napi(js_name = "list_workspaces")]
/// list available workspaces
pub async fn js_list_workspaces(&self, owned: bool, invited: bool) -> napi::Result<Vec<String>> {
pub async fn js_list_workspaces(
&self,
owned: bool,
invited: bool,
) -> napi::Result<Vec<String>> {
Ok(self.list_workspaces(owned, invited).await?)
}
#[napi(js_name = "invite_to_workspace")]
/// invite user to given workspace, if able to
pub async fn js_invite_to_workspace(&self, workspace: String, user: String) -> napi::Result<()> {
pub async fn js_invite_to_workspace(
&self,
workspace: String,
user: String,
) -> napi::Result<()> {
Ok(self.invite_to_workspace(workspace, user).await?)
}

View file

@ -1,8 +1,10 @@
use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi_derive::napi;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadSafeCallContext, ThreadsafeFunctionCallMode};
use crate::api::Controller;
use crate::cursor::controller::CursorController;
use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::{
ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi_derive::napi;
#[napi(object, js_name = "Cursor")]
pub struct JsCursor {
@ -18,7 +20,7 @@ pub struct JsCursor {
impl From<JsCursor> for crate::api::Cursor {
fn from(value: JsCursor) -> Self {
crate::api::Cursor {
start : (value.start_row, value.start_col),
start: (value.start_row, value.start_col),
end: (value.end_row, value.end_col),
buffer: value.buffer,
user: value.user,
@ -29,32 +31,33 @@ impl From<JsCursor> for crate::api::Cursor {
impl From<crate::api::Cursor> for JsCursor {
fn from(value: crate::api::Cursor) -> Self {
JsCursor {
start_row : value.start.0,
start_col : value.start.1,
end_row : value.end.0,
start_row: value.start.0,
start_col: value.start.1,
end_row: value.end.0,
end_col: value.end.1,
buffer: value.buffer,
user: value.user.map(|x| x.to_string())
user: value.user.map(|x| x.to_string()),
}
}
}
#[napi]
impl CursorController {
/// Register a callback to be called on receive.
/// There can only be one callback registered at any given time.
#[napi(js_name = "callback", ts_args_type = "fun: (event: CursorController) => void")]
pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()>{
let tsfn : ThreadsafeFunction<crate::cursor::controller::CursorController, Fatal> =
fun.create_threadsafe_function(0,
|ctx : ThreadSafeCallContext<crate::cursor::controller::CursorController>| {
#[napi(
js_name = "callback",
ts_args_type = "fun: (event: CursorController) => void"
)]
pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()> {
let tsfn: ThreadsafeFunction<crate::cursor::controller::CursorController, Fatal> = fun
.create_threadsafe_function(
0,
|ctx: ThreadSafeCallContext<crate::cursor::controller::CursorController>| {
Ok(vec![ctx.value])
}
},
)?;
self.callback(move |controller : CursorController| {
self.callback(move |controller: CursorController| {
tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking);
//check this with tracing also we could use Ok(event) to get the error
// If it blocks the main thread too many time we have to change this
@ -75,16 +78,14 @@ impl CursorController {
Ok(self.send(crate::api::Cursor::from(pos)).await?)
}
/// Get next cursor event if available without blocking
#[napi(js_name= "try_recv")]
#[napi(js_name = "try_recv")]
pub async fn js_try_recv(&self) -> napi::Result<Option<JsCursor>> {
Ok(self.try_recv().await?
.map(JsCursor::from))
Ok(self.try_recv().await?.map(JsCursor::from))
}
/// Block until next
#[napi(js_name= "recv")]
#[napi(js_name = "recv")]
pub async fn js_recv(&self) -> napi::Result<JsCursor> {
Ok(self.recv().await?.into())
}

View file

@ -1,13 +1,11 @@
use napi_derive::napi;
/// Hash function
#[napi(js_name = "hash")]
pub fn js_hash(data: String) -> i64 {
crate::ext::hash(data)
}
/// Get the current version of the client
#[napi(js_name = "version")]
pub fn js_version() -> String {

View file

@ -1,9 +1,8 @@
pub mod client;
pub mod workspace;
pub mod cursor;
pub mod buffer;
pub mod client;
pub mod cursor;
pub mod ext;
pub mod workspace;
impl From<crate::errors::ConnectionError> for napi::Error {
fn from(value: crate::errors::ConnectionError) -> Self {
@ -33,7 +32,11 @@ impl JsLogger {
#[napi(constructor)]
pub fn new(debug: Option<bool>) -> JsLogger {
let (tx, rx) = tokio::sync::mpsc::channel(256);
let level = if debug.unwrap_or(false) { tracing::Level::DEBUG } else {tracing::Level::INFO }; //TODO: study this tracing subscriber and customize it
let level = if debug.unwrap_or(false) {
tracing::Level::DEBUG
} else {
tracing::Level::INFO
}; //TODO: study this tracing subscriber and customize it
let format = tracing_subscriber::fmt::format()
.with_level(true)
.with_target(true)
@ -55,11 +58,7 @@ impl JsLogger {
#[napi]
pub async fn message(&self) -> Option<String> {
self.0
.lock()
.await
.recv()
.await
self.0.lock().await.recv().await
}
}
@ -73,5 +72,7 @@ impl std::io::Write for JsLoggerProducer {
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> { Ok(()) }
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

View file

@ -1,8 +1,8 @@
use napi_derive::napi;
use crate::Workspace;
use crate::buffer::controller::BufferController;
use crate::cursor::controller::CursorController;
use crate::ffi::js::client::JsUser;
use crate::Workspace;
use napi_derive::napi;
#[napi(object, js_name = "Event")]
pub struct JsEvent {
@ -13,9 +13,18 @@ pub struct JsEvent {
impl From<crate::api::Event> for JsEvent {
fn from(value: crate::api::Event) -> Self {
match value {
crate::api::Event::FileTreeUpdated(value) => Self { r#type: "filetree".into(), value },
crate::api::Event::UserJoin(value) => Self { r#type: "join".into(), value },
crate::api::Event::UserLeave(value) => Self { r#type: "leave".into(), value },
crate::api::Event::FileTreeUpdated(value) => Self {
r#type: "filetree".into(),
value,
},
crate::api::Event::UserJoin(value) => Self {
r#type: "join".into(),
value,
},
crate::api::Event::UserLeave(value) => Self {
r#type: "leave".into(),
value,
},
}
}
}
@ -64,7 +73,6 @@ impl Workspace {
Ok(self.create(&path).await?)
}
/// Attach to a workspace buffer, starting a BufferController
#[napi(js_name = "attach")]
pub async fn js_attach(&self, path: String) -> napi::Result<BufferController> {
@ -105,8 +113,7 @@ impl Workspace {
/// List users attached to a specific buffer
#[napi(js_name = "list_buffer_users")]
pub async fn js_list_buffer_users(&self, path: String) -> napi::Result<Vec<JsUser>> {
Ok(
self
Ok(self
.list_buffer_users(&path)
.await?
.into_iter()

View file

@ -1,28 +1,41 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use crate::prelude::*;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
impl LuaUserData for CodempBufferController {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
methods.add_method("send", |_, this, (change,): (CodempTextChange,)|
a_sync! { this => this.send(change).await? }
methods.add_method(
"send",
|_, this, (change,): (CodempTextChange,)| a_sync! { this => this.send(change).await? },
);
methods.add_method("try_recv", |_, this, ()| a_sync! { this => this.try_recv().await? });
methods.add_method(
"try_recv",
|_, this, ()| a_sync! { this => this.try_recv().await? },
);
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("content", |_, this, ()| a_sync! { this => this.content().await? });
methods.add_method(
"content",
|_, this, ()| a_sync! { this => this.content().await? },
);
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempBufferController| super::ext::callback().invoke(cb.clone(), controller));
methods.add_method("clear_callback", |_, this, ()| {
this.clear_callback();
Ok(())
});
methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| {
this.callback(move |controller: CodempBufferController| {
super::ext::callback().invoke(cb.clone(), controller)
});
Ok(())
});
}
@ -40,7 +53,9 @@ impl LuaUserData for CodempTextChange {
}
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("apply", |_, this, (txt,):(String,)| Ok(this.apply(&txt)));
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
methods.add_method("apply", |_, this, (txt,): (String,)| Ok(this.apply(&txt)));
}
}

View file

@ -1,6 +1,6 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use crate::prelude::*;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
@ -13,22 +13,28 @@ impl LuaUserData for CodempClient {
}
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
methods.add_method("refresh", |_, this, ()|
a_sync! { this => this.refresh().await? }
methods.add_method(
"refresh",
|_, this, ()| a_sync! { this => this.refresh().await? },
);
methods.add_method("join_workspace", |_, this, (ws,):(String,)|
a_sync! { this => this.join_workspace(ws).await? }
methods.add_method(
"join_workspace",
|_, this, (ws,): (String,)| a_sync! { this => this.join_workspace(ws).await? },
);
methods.add_method("create_workspace", |_, this, (ws,):(String,)|
a_sync! { this => this.create_workspace(ws).await? }
methods.add_method(
"create_workspace",
|_, this, (ws,): (String,)| a_sync! { this => this.create_workspace(ws).await? },
);
methods.add_method("delete_workspace", |_, this, (ws,):(String,)|
a_sync! { this => this.delete_workspace(ws).await? }
methods.add_method(
"delete_workspace",
|_, this, (ws,): (String,)| a_sync! { this => this.delete_workspace(ws).await? },
);
methods.add_method("invite_to_workspace", |_, this, (ws,user):(String,String)|
@ -39,11 +45,13 @@ impl LuaUserData for CodempClient {
a_sync! { this => this.list_workspaces(owned.unwrap_or(true), invited.unwrap_or(true)).await? }
);
methods.add_method("leave_workspace", |_, this, (ws,):(String,)|
methods.add_method("leave_workspace", |_, this, (ws,): (String,)| {
Ok(this.leave_workspace(&ws))
);
});
methods.add_method("get_workspace", |_, this, (ws,):(String,)| Ok(this.get_workspace(&ws)));
methods.add_method("get_workspace", |_, this, (ws,): (String,)| {
Ok(this.get_workspace(&ws))
});
}
}

View file

@ -1,6 +1,6 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use crate::prelude::*;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
@ -8,20 +8,29 @@ use super::ext::lua_tuple;
impl LuaUserData for CodempCursorController {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
methods.add_method("send", |_, this, (cursor,):(CodempCursor,)|
a_sync! { this => this.send(cursor).await? }
methods.add_method(
"send",
|_, this, (cursor,): (CodempCursor,)| a_sync! { this => this.send(cursor).await? },
);
methods.add_method("try_recv", |_, this, ()|
a_sync! { this => this.try_recv().await? }
methods.add_method(
"try_recv",
|_, this, ()| a_sync! { this => this.try_recv().await? },
);
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller));
methods.add_method("clear_callback", |_, this, ()| {
this.clear_callback();
Ok(())
});
methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| {
this.callback(move |controller: CodempCursorController| {
super::ext::callback().invoke(cb.clone(), controller)
});
Ok(())
});
}
@ -30,7 +39,9 @@ impl LuaUserData for CodempCursorController {
from_lua_serde! { CodempCursor }
impl LuaUserData for CodempCursor {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
}
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {

View file

@ -1,15 +1,15 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
pub(crate) fn tokio() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(||
RT.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("could not create tokio runtime")
)
})
}
macro_rules! a_sync {
@ -32,24 +32,22 @@ macro_rules! a_sync {
pub(crate) use a_sync;
pub(crate) struct Promise(pub(crate) Option<tokio::task::JoinHandle<LuaResult<super::callback::CallbackArg>>>);
pub(crate) struct Promise(
pub(crate) Option<tokio::task::JoinHandle<LuaResult<super::callback::CallbackArg>>>,
);
impl LuaUserData for Promise {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("ready", |_, this|
fields.add_field_method_get("ready", |_, this| {
Ok(this.0.as_ref().map_or(true, |x| x.is_finished()))
);
});
}
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
// TODO: await MUST NOT be used in callbacks!!
methods.add_method_mut("await", |_, this, ()| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.block_on(x)
.map_err(LuaError::runtime)?
},
Some(x) => tokio().block_on(x).map_err(LuaError::runtime)?,
});
methods.add_method_mut("cancel", |_, this, ()| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
@ -58,13 +56,15 @@ impl LuaUserData for Promise {
Ok(())
},
});
methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() {
methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| {
match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.spawn(async move {
tokio().spawn(async move {
match x.await {
Err(e) => tracing::error!("could not join promise to run callback: {e}"),
Err(e) => {
tracing::error!("could not join promise to run callback: {e}")
}
Ok(res) => match res {
Err(e) => super::callback().failure(e),
Ok(val) => super::callback().invoke(cb, val),
@ -72,12 +72,13 @@ impl LuaUserData for Promise {
}
});
Ok(())
},
}
}
});
}
}
pub(crate) fn setup_driver(_: &Lua, (block,):(Option<bool>,)) -> LuaResult<Option<Driver>> {
pub(crate) fn setup_driver(_: &Lua, (block,): (Option<bool>,)) -> LuaResult<Option<Driver>> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let future = async move {
tracing::info!(" :: driving runtime...");
@ -96,12 +97,16 @@ pub(crate) fn setup_driver(_: &Lua, (block,):(Option<bool>,)) -> LuaResult<Optio
}
#[derive(Debug)]
pub(crate) struct Driver(pub(crate) tokio::sync::mpsc::UnboundedSender<()>, Option<std::thread::JoinHandle<()>>);
pub(crate) struct Driver(
pub(crate) tokio::sync::mpsc::UnboundedSender<()>,
Option<std::thread::JoinHandle<()>>,
);
impl LuaUserData for Driver {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method_mut("stop", |_, this, ()| {
match this.1.take() {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
methods.add_method_mut("stop", |_, this, ()| match this.1.take() {
None => Ok(false),
Some(handle) => {
if this.0.send(()).is_err() {
@ -111,10 +116,7 @@ impl LuaUserData for Driver {
Err(e) => Err(LuaError::runtime(format!("runtime thread panicked: {e:?}"))),
Ok(()) => Ok(true),
}
},
}
});
}
}

View file

@ -1,7 +1,7 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use crate::prelude::*;
use crate::ext::IgnorableError;
use crate::prelude::*;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
pub(crate) fn callback() -> &'static CallbackChannel<LuaCallback> {
static CHANNEL: std::sync::OnceLock<CallbackChannel<LuaCallback>> = std::sync::OnceLock::new();
@ -10,7 +10,7 @@ pub(crate) fn callback() -> &'static CallbackChannel<LuaCallback> {
pub(crate) struct CallbackChannel<T> {
tx: std::sync::Arc<tokio::sync::mpsc::UnboundedSender<T>>,
rx: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<T>>
rx: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<T>>,
}
impl Default for CallbackChannel<LuaCallback> {
@ -26,12 +26,16 @@ impl Default for CallbackChannel<LuaCallback> {
impl CallbackChannel<LuaCallback> {
pub(crate) fn invoke(&self, cb: LuaFunction, arg: impl Into<CallbackArg>) {
self.tx.send(LuaCallback::Invoke(cb, arg.into()))
self.tx
.send(LuaCallback::Invoke(cb, arg.into()))
.unwrap_or_warn("error scheduling callback")
}
pub(crate) fn failure(&self, err: impl std::error::Error) {
self.tx.send(LuaCallback::Fail(format!("promise failed with error: {err:?}")))
self.tx
.send(LuaCallback::Fail(format!(
"promise failed with error: {err:?}"
)))
.unwrap_or_warn("error scheduling callback failure")
}
@ -40,12 +44,12 @@ impl CallbackChannel<LuaCallback> {
Err(e) => {
tracing::debug!("backing off from callback mutex: {e}");
None
},
}
Ok(mut lock) => match lock.try_recv() {
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
tracing::error!("callback channel closed");
None
},
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => None,
Ok(cb) => Some(cb),
},
@ -95,15 +99,63 @@ impl IntoLua for CallbackArg {
}
}
impl From<()> for CallbackArg { fn from(_: ()) -> Self { CallbackArg::Nil } }
impl From<String> for CallbackArg { fn from(value: String) -> Self { CallbackArg::Str(value) } }
impl From<CodempClient> for CallbackArg { fn from(value: CodempClient) -> Self { CallbackArg::Client(value) } }
impl From<CodempCursorController> for CallbackArg { fn from(value: CodempCursorController) -> Self { CallbackArg::CursorController(value) } }
impl From<CodempBufferController> for CallbackArg { fn from(value: CodempBufferController) -> Self { CallbackArg::BufferController(value) } }
impl From<CodempWorkspace> for CallbackArg { fn from(value: CodempWorkspace) -> Self { CallbackArg::Workspace(value) } }
impl From<Vec<String>> for CallbackArg { fn from(value: Vec<String>) -> Self { CallbackArg::VecStr(value) } }
impl From<CodempEvent> for CallbackArg { fn from(value: CodempEvent) -> Self { CallbackArg::Event(value) } }
impl From<CodempCursor> for CallbackArg { fn from(value: CodempCursor) -> Self { CallbackArg::Cursor(value) } }
impl From<Option<CodempCursor>> for CallbackArg { fn from(value: Option<CodempCursor>) -> Self { CallbackArg::MaybeCursor(value) } }
impl From<CodempTextChange> for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } }
impl From<Option<CodempTextChange>> for CallbackArg { fn from(value: Option<CodempTextChange>) -> Self { CallbackArg::MaybeTextChange(value) } }
impl From<()> for CallbackArg {
fn from(_: ()) -> Self {
CallbackArg::Nil
}
}
impl From<String> for CallbackArg {
fn from(value: String) -> Self {
CallbackArg::Str(value)
}
}
impl From<CodempClient> for CallbackArg {
fn from(value: CodempClient) -> Self {
CallbackArg::Client(value)
}
}
impl From<CodempCursorController> for CallbackArg {
fn from(value: CodempCursorController) -> Self {
CallbackArg::CursorController(value)
}
}
impl From<CodempBufferController> for CallbackArg {
fn from(value: CodempBufferController) -> Self {
CallbackArg::BufferController(value)
}
}
impl From<CodempWorkspace> for CallbackArg {
fn from(value: CodempWorkspace) -> Self {
CallbackArg::Workspace(value)
}
}
impl From<Vec<String>> for CallbackArg {
fn from(value: Vec<String>) -> Self {
CallbackArg::VecStr(value)
}
}
impl From<CodempEvent> for CallbackArg {
fn from(value: CodempEvent) -> Self {
CallbackArg::Event(value)
}
}
impl From<CodempCursor> for CallbackArg {
fn from(value: CodempCursor) -> Self {
CallbackArg::Cursor(value)
}
}
impl From<Option<CodempCursor>> for CallbackArg {
fn from(value: Option<CodempCursor>) -> Self {
CallbackArg::MaybeCursor(value)
}
}
impl From<CodempTextChange> for CallbackArg {
fn from(value: CodempTextChange) -> Self {
CallbackArg::TextChange(value)
}
}
impl From<Option<CodempTextChange>> for CallbackArg {
fn from(value: Option<CodempTextChange>) -> Self {
CallbackArg::MaybeTextChange(value)
}
}

View file

@ -1,7 +1,7 @@
use std::{io::Write, sync::Mutex};
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
@ -12,12 +12,21 @@ impl Write for LuaLoggerProducer {
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> { Ok(()) }
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
// TODO can we make this less verbose?
pub(crate) fn setup_tracing(_: &Lua, (printer, debug): (LuaValue, Option<bool>)) -> LuaResult<bool> {
let level = if debug.unwrap_or_default() { tracing::Level::DEBUG } else {tracing::Level::INFO };
pub(crate) fn setup_tracing(
_: &Lua,
(printer, debug): (LuaValue, Option<bool>),
) -> LuaResult<bool> {
let level = if debug.unwrap_or_default() {
tracing::Level::DEBUG
} else {
tracing::Level::INFO
};
let format = tracing_subscriber::fmt::format()
.with_level(true)
.with_target(true)
@ -36,16 +45,15 @@ pub(crate) fn setup_tracing(_: &Lua, (printer, debug): (LuaValue, Option<bool>))
| LuaValue::Thread(_)
| LuaValue::UserData(_)
| LuaValue::Error(_) => return Err(LuaError::BindError), // TODO full BadArgument type??
LuaValue::Nil => {
tracing_subscriber::fmt()
LuaValue::Nil => tracing_subscriber::fmt()
.event_format(format)
.with_max_level(level)
.with_writer(std::sync::Mutex::new(std::io::stderr()))
.try_init()
.is_ok()
},
.is_ok(),
LuaValue::String(path) => {
let logfile = std::fs::File::create(path.to_string_lossy()).map_err(|e| LuaError::RuntimeError(e.to_string()))?;
let logfile = std::fs::File::create(path.to_string_lossy())
.map_err(|e| LuaError::RuntimeError(e.to_string()))?;
tracing_subscriber::fmt()
.event_format(format)
.with_max_level(level)
@ -53,7 +61,7 @@ pub(crate) fn setup_tracing(_: &Lua, (printer, debug): (LuaValue, Option<bool>))
.with_ansi(false)
.try_init()
.is_ok()
},
}
LuaValue::Function(cb) => {
let (tx, mut rx) = mpsc::unbounded_channel();
let res = tracing_subscriber::fmt()
@ -71,7 +79,7 @@ pub(crate) fn setup_tracing(_: &Lua, (printer, debug): (LuaValue, Option<bool>))
});
}
res
},
}
};
Ok(success)

View file

@ -2,8 +2,8 @@ pub mod a_sync;
pub mod callback;
pub mod log;
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
pub(crate) use a_sync::tokio;
pub(crate) use callback::callback;

View file

@ -1,42 +1,64 @@
mod client;
mod workspace;
mod cursor;
mod buffer;
mod client;
mod cursor;
mod ext;
mod workspace;
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use crate::prelude::*;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
// define multiple entrypoints, so this library can have multiple names and still work
#[mlua::lua_module(name = "codemp")] fn entry_1(lua: &Lua) -> LuaResult<LuaTable> { entrypoint(lua) }
#[mlua::lua_module(name = "libcodemp")] fn entry_2(lua: &Lua) -> LuaResult<LuaTable> { entrypoint(lua) }
#[mlua::lua_module(name = "codemp_native")] fn entry_3(lua: &Lua) -> LuaResult<LuaTable> { entrypoint(lua) }
#[mlua::lua_module(name = "codemp_lua")] fn entry_4(lua: &Lua) -> LuaResult<LuaTable> { entrypoint(lua) }
#[mlua::lua_module(name = "codemp")]
fn entry_1(lua: &Lua) -> LuaResult<LuaTable> {
entrypoint(lua)
}
#[mlua::lua_module(name = "libcodemp")]
fn entry_2(lua: &Lua) -> LuaResult<LuaTable> {
entrypoint(lua)
}
#[mlua::lua_module(name = "codemp_native")]
fn entry_3(lua: &Lua) -> LuaResult<LuaTable> {
entrypoint(lua)
}
#[mlua::lua_module(name = "codemp_lua")]
fn entry_4(lua: &Lua) -> LuaResult<LuaTable> {
entrypoint(lua)
}
fn entrypoint(lua: &Lua) -> LuaResult<LuaTable> {
let exports = lua.create_table()?;
// entrypoint
exports.set("connect", lua.create_function(|_, (config,):(CodempConfig,)|
ext::a_sync::a_sync! { => CodempClient::connect(config).await? }
)?)?;
exports.set(
"connect",
lua.create_function(
|_, (config,): (CodempConfig,)| ext::a_sync::a_sync! { => CodempClient::connect(config).await? },
)?,
)?;
// utils
exports.set("hash", lua.create_function(|_, (txt,):(String,)|
Ok(crate::ext::hash(txt))
)?)?;
exports.set(
"hash",
lua.create_function(|_, (txt,): (String,)| Ok(crate::ext::hash(txt)))?,
)?;
exports.set("version", lua.create_function(|_, ()|
Ok(crate::version())
)?)?;
exports.set(
"version",
lua.create_function(|_, ()| Ok(crate::version()))?,
)?;
// runtime
exports.set("setup_driver", lua.create_function(ext::a_sync::setup_driver)?)?;
exports.set("poll_callback", lua.create_function(|lua, ()| {
exports.set(
"setup_driver",
lua.create_function(ext::a_sync::setup_driver)?,
)?;
exports.set(
"poll_callback",
lua.create_function(|lua, ()| {
let mut val = LuaMultiValue::new();
match ext::callback().recv() {
None => {},
None => {}
Some(ext::callback::LuaCallback::Invoke(cb, arg)) => {
val.push_back(LuaValue::Function(cb));
val.push_back(arg.into_lua(lua)?);
@ -44,30 +66,34 @@ fn entrypoint(lua: &Lua) -> LuaResult<LuaTable> {
Some(ext::callback::LuaCallback::Fail(msg)) => {
val.push_back(false.into_lua(lua)?);
val.push_back(msg.into_lua(lua)?);
},
}
}
Ok(val)
})?)?;
})?,
)?;
// logging
exports.set("setup_tracing", lua.create_function(ext::log::setup_tracing)?)?;
exports.set(
"setup_tracing",
lua.create_function(ext::log::setup_tracing)?,
)?;
Ok(exports)
}
impl From::<crate::errors::ConnectionError> for LuaError {
impl From<crate::errors::ConnectionError> for LuaError {
fn from(value: crate::errors::ConnectionError) -> Self {
LuaError::runtime(value.to_string())
}
}
impl From::<crate::errors::RemoteError> for LuaError {
impl From<crate::errors::RemoteError> for LuaError {
fn from(value: crate::errors::RemoteError) -> Self {
LuaError::runtime(value.to_string())
}
}
impl From::<crate::errors::ControllerError> for LuaError {
impl From<crate::errors::ControllerError> for LuaError {
fn from(value: crate::errors::ControllerError) -> Self {
LuaError::runtime(value.to_string())
}

View file

@ -1,49 +1,60 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use crate::prelude::*;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
impl LuaUserData for CodempWorkspace {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("create", |_, this, (name,):(String,)|
a_sync! { this => this.create(&name).await? }
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
methods.add_method(
"create",
|_, this, (name,): (String,)| a_sync! { this => this.create(&name).await? },
);
methods.add_method("attach", |_, this, (name,):(String,)|
a_sync! { this => this.attach(&name).await? }
methods.add_method(
"attach",
|_, this, (name,): (String,)| a_sync! { this => this.attach(&name).await? },
);
methods.add_method("detach", |_, this, (name,):(String,)|
methods.add_method("detach", |_, this, (name,): (String,)| {
Ok(this.detach(&name))
});
methods.add_method(
"delete",
|_, this, (name,): (String,)| a_sync! { this => this.delete(&name).await? },
);
methods.add_method("delete", |_, this, (name,):(String,)|
a_sync! { this => this.delete(&name).await? }
methods.add_method("get_buffer", |_, this, (name,): (String,)| {
Ok(this.buffer_by_name(&name))
});
methods.add_method(
"event",
|_, this, ()| a_sync! { this => this.event().await? },
);
methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name)));
methods.add_method("event", |_, this, ()|
a_sync! { this => this.event().await? }
methods.add_method(
"fetch_buffers",
|_, this, ()| a_sync! { this => this.fetch_buffers().await? },
);
methods.add_method(
"fetch_users",
|_, this, ()| a_sync! { this => this.fetch_users().await? },
);
methods.add_method("fetch_buffers", |_, this, ()|
a_sync! { this => this.fetch_buffers().await? }
);
methods.add_method("fetch_users", |_, this, ()|
a_sync! { this => this.fetch_users().await? }
);
methods.add_method("filetree", |_, this, (filter, strict,):(Option<String>, Option<bool>,)|
methods.add_method(
"filetree",
|_, this, (filter, strict): (Option<String>, Option<bool>)| {
Ok(this.filetree(filter.as_deref(), strict.unwrap_or(false)))
},
);
methods.add_method("user_list", |_, this, ()|
Ok(this.user_list())
);
methods.add_method("user_list", |_, this, ()| Ok(this.user_list()));
}
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
@ -57,7 +68,9 @@ impl LuaUserData for CodempWorkspace {
from_lua_serde! { CodempEvent }
impl LuaUserData for CodempEvent {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
}
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
@ -69,8 +82,7 @@ impl LuaUserData for CodempEvent {
fields.add_field_method_get("value", |_, this| match this {
CodempEvent::FileTreeUpdated(x)
| CodempEvent::UserJoin(x)
| CodempEvent::UserLeave(x)
=> Ok(x.clone()),
| CodempEvent::UserLeave(x) => Ok(x.clone()),
});
}
}

View file

@ -1,5 +1,5 @@
use codemp_proto::{
common::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient,
buffer::buffer_client::BufferClient, common::Token, cursor::cursor_client::CursorClient,
workspace::workspace_client::WorkspaceClient,
};
use tonic::{
@ -14,10 +14,7 @@ type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>;
#[derive(Debug, Clone)]
pub struct SessionInterceptor(pub tokio::sync::watch::Receiver<codemp_proto::common::Token>);
impl tonic::service::Interceptor for SessionInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> tonic::Result<tonic::Request<()>> {
fn call(&mut self, mut request: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
if let Ok(token) = self.0.borrow().token.parse() {
request.metadata_mut().insert("session", token);
}

View file

@ -2,17 +2,11 @@
//! All-in-one renamed imports with `use codemp::prelude::*`.
pub use crate::api::{
Controller as CodempController,
TextChange as CodempTextChange,
Cursor as CodempCursor,
User as CodempUser,
Event as CodempEvent,
Config as CodempConfig,
Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor,
Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser,
};
pub use crate::{
client::Client as CodempClient,
workspace::Workspace as CodempWorkspace,
cursor::Controller as CodempCursorController,
buffer::Controller as CodempBufferController,
buffer::Controller as CodempBufferController, client::Client as CodempClient,
cursor::Controller as CodempCursorController, workspace::Workspace as CodempWorkspace,
};

View file

@ -157,7 +157,7 @@ impl Workspace {
Some((_name, controller)) => match Arc::into_inner(controller.0) {
None => false, // dangling ref! we can't drop this
Some(_) => true, // dropping it now
}
},
}
}
@ -239,7 +239,6 @@ impl Workspace {
}))
.await?;
self.0.filetree.remove(path);
Ok(())
@ -315,11 +314,15 @@ impl Workspace {
tokio::spawn(async move {
loop {
// TODO can we stop responsively rather than poll for Arc being dropped?
if weak.upgrade().is_none() { break };
if weak.upgrade().is_none() {
break;
};
let Some(res) = tokio::select!(
x = stream.message() => Some(x),
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None,
) else { continue };
) else {
continue;
};
match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name),