diff --git a/Cargo.toml b/Cargo.toml index c3922ca..5434c65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ lazy_static = { version = "1.4", optional = true } jni = { version = "0.21", features = ["invocation"], optional = true } # glue (lua) -mlua = { version = "0.10.0-beta.1", features = ["module", "luajit", "send"], optional = true } +mlua = { git = "https://github.com/mlua-rs/mlua" , rev = "ece66c46bfdc62685b758ffff16286f2806b2662", features = ["module", "luajit", "send"], optional = true } # glue (js) napi = { version = "2.16", features = ["full"], optional = true } @@ -51,7 +51,7 @@ pyo3-build-config = { version = "0.19", optional = true } [features] default = [] rust = [] # used for ci matrix -lua = ["mlua", "tracing-subscriber"] +lua = ["mlua", "tracing-subscriber", "lazy_static"] java = ["lazy_static", "jni", "tracing-subscriber"] js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"] python = ["pyo3", "tracing-subscriber", "pyo3-build-config"] diff --git a/src/ffi/lua.rs b/src/ffi/lua.rs index 9539f7a..4bc9a31 100644 --- a/src/ffi/lua.rs +++ b/src/ffi/lua.rs @@ -1,12 +1,13 @@ use std::io::Write; use std::sync::Mutex; -use crate::api::controller::ControllerCallback; use crate::api::Cursor; +use crate::ext::IgnorableError; use crate::prelude::*; use crate::workspace::worker::DetachResult; use mlua::prelude::*; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; impl From:: for LuaError { fn from(value: crate::errors::ConnectionError) -> Self { @@ -37,16 +38,86 @@ fn tokio() -> &'static tokio::runtime::Runtime { ) } +// TODO cannot do Box ?? maybe its temporary because im using betas +enum CallbackArg { + CursorController(CodempCursorController), + BufferController(CodempBufferController), + Event(CodempEvent), +} + +impl From for CallbackArg { + fn from(value: CodempCursorController) -> Self { + Self::CursorController(value) + } +} + +impl From for CallbackArg { + fn from(value: CodempBufferController) -> Self { + Self::BufferController(value) + } +} + +impl From for CallbackArg { + fn from(value: CodempEvent) -> Self { + Self::Event(value) + } +} + +struct CallbackChannel { + tx: tokio::sync::mpsc::UnboundedSender<(LuaFunction, CallbackArg)>, + rx: std::sync::Mutex> +} + +impl Default for CallbackChannel { + fn default() -> Self { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let rx = std::sync::Mutex::new(rx); + Self { + tx, rx, + } + } +} + +impl CallbackChannel { + fn send(&self, cb: LuaFunction, arg: impl Into) { + self.tx.send((cb, arg.into())).unwrap_or_warn("error scheduling callback") + } + + fn recv(&self) -> Option<(LuaFunction, CallbackArg)> { + match self.rx.try_lock() { + Ok(mut lock) => match lock.try_recv() { + Ok(res) => Some(res), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => { + tracing::error!("callback channel closed"); + None + }, + }, + Err(e) => { + tracing::warn!("could not acquire callback channel mutex: {e}"); + None + }, + } + } +} + + +lazy_static::lazy_static! { + static ref CHANNEL: CallbackChannel = CallbackChannel::default(); +} + + + struct Promise(Option>>); impl LuaUserData for Promise { - fn add_fields<'a, F: LuaUserDataFields<'a, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("ready", |_, this| Ok(this.0.as_ref().map_or(true, |x| x.is_finished())) ); } - fn add_methods<'a, M: LuaUserDataMethods<'a, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { // TODO: await MUST NOT be used in callbacks!! methods.add_method_mut("await", |_, this, ()| match this.0.take() { None => Err(LuaError::runtime("Promise already awaited")), @@ -56,24 +127,24 @@ impl LuaUserData for Promise { .map_err(LuaError::runtime)? }, }); - methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() { - None => Err(LuaError::runtime("Promise already awaited")), - Some(x) => { - tokio() - .spawn(async move { - match x.await { - Err(e) => tracing::error!("could not join promise to run callback: {e}"), - Ok(Err(e)) => tracing::error!("promise returned error: {e}"), - Ok(Ok(res)) => { - if let Err(e) = cb.call::(res) { - tracing::error!("error running promise callback: {e}"); - } - }, - } - }); - Ok(()) - }, - }); + // methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() { + // None => Err(LuaError::runtime("Promise already awaited")), + // Some(x) => { + // tokio() + // .spawn(async move { + // match x.await { + // Err(e) => tracing::error!("could not join promise to run callback: {e}"), + // Ok(Err(e)) => tracing::error!("promise returned error: {e}"), + // Ok(Ok(res)) => { + // if let Err(e) = cb.call::<()>(res) { + // tracing::error!("error running promise callback: {e}"); + // } + // }, + // } + // }); + // Ok(()) + // }, + // }); } } @@ -88,34 +159,47 @@ macro_rules! a_sync { fn spawn_runtime_driver(_: &Lua, ():()) -> LuaResult { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - std::thread::spawn(move || tokio().block_on(async move { + let handle = std::thread::spawn(move || tokio().block_on(async move { tracing::info!(" :: driving runtime..."); tokio::select! { () = std::future::pending::<()>() => {}, _ = rx.recv() => {}, } })); - Ok(Driver(tx)) + Ok(Driver(tx, Some(handle))) } -#[derive(Debug, Clone)] -struct Driver(tokio::sync::mpsc::UnboundedSender<()>); +#[derive(Debug)] +struct Driver(tokio::sync::mpsc::UnboundedSender<()>, Option>); impl LuaUserData for Driver { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); - methods.add_method("stop", |_, this, ()| Ok(this.0.send(()).is_ok())); + methods.add_method_mut("stop", |_, this, ()| { + match this.1.take() { + None => Ok(false), + Some(handle) => { + if this.0.send(()).is_err() { + tracing::warn!("found runtime already stopped while attempting to stop it"); + } + match handle.join() { + Err(e) => Err(LuaError::runtime(format!("runtime thread panicked: {e:?}"))), + Ok(()) => Ok(true), + } + }, + } + }); } } impl LuaUserData for CodempClient { - fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("id", |_, this| Ok(this.user().id.to_string())); fields.add_field_method_get("username", |_, this| Ok(this.user().name.clone())); fields.add_field_method_get("active_workspaces", |_, this| Ok(this.active_workspaces())); } - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("refresh", |_, this, ()| @@ -152,17 +236,17 @@ impl LuaUserData for CodempClient { impl LuaUserData for CodempWorkspace { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("create_buffer", |_, this, (name,):(String,)| a_sync! { this => Ok(this.create(&name).await?) } ); - methods.add_method("attach", |_, this, (name,):(String,)| + methods.add_method("attach_buffer", |_, this, (name,):(String,)| a_sync! { this => Ok(this.attach(&name).await?) } ); - methods.add_method("detach", |_, this, (name,):(String,)| + methods.add_method("detach_buffer", |_, this, (name,):(String,)| Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached)) ); @@ -183,24 +267,24 @@ impl LuaUserData for CodempWorkspace { a_sync! { this => Ok(this.fetch_users().await?) } ); - methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { - let _this = this.clone(); - tokio().spawn(async move { - while let Ok(ev) = _this.event().await { - if let Err(e) = cb.call::(ev) { - tracing::error!("error running workspace callback: {e}"); - } - } - }); - Ok(()) - }); + // methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { + // let _this = this.clone(); + // tokio().spawn(async move { + // while let Ok(ev) = _this.event().await { + // if let Err(e) = cb.call::<()>(ev) { + // tracing::error!("error running workspace callback: {e}"); + // } + // } + // }); + // Ok(()) + // }); methods.add_method("filetree", |_, this, (filter,):(Option,)| Ok(this.filetree(filter.as_deref())) ); } - fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("name", |_, this| Ok(this.id())); fields.add_field_method_get("cursor", |_, this| Ok(this.cursor())); fields.add_field_method_get("active_buffers", |_, this| Ok(this.buffer_list())); @@ -209,11 +293,11 @@ impl LuaUserData for CodempWorkspace { } impl LuaUserData for CodempEvent { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); } - fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("type", |_, this| match this { CodempEvent::FileTreeUpdated(_) => Ok("filetree"), CodempEvent::UserJoin(_) | CodempEvent::UserLeave(_) => Ok("user"), @@ -226,7 +310,7 @@ impl LuaUserData for CodempEvent { } impl LuaUserData for CodempCursorController { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| @@ -242,22 +326,18 @@ impl LuaUserData for CodempCursorController { methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { - this.callback(ControllerCallback::from(move |controller: CodempCursorController| { - if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller.clone(),)) { - tracing::error!("error running cursor callback: {e}"); - } - })); + this.callback(move |controller: CodempCursorController| CHANNEL.send(cb.clone(), controller)); Ok(()) }); } } impl LuaUserData for Cursor { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); } - fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("user", |_, this| Ok(this.user.map(|x| x.to_string()))); fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone())); fields.add_field_method_get("start", |_, this| Ok(RowCol::from(this.start))); @@ -278,18 +358,18 @@ impl From<(i32, i32)> for RowCol { } impl LuaUserData for RowCol { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); } - fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("row", |_, this| Ok(this.row)); fields.add_field_method_get("col", |_, this| Ok(this.col)); } } impl LuaUserData for CodempBufferController { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("send", |_, this, (start, end, content, hash): (usize, usize, String, Option)| @@ -315,26 +395,21 @@ impl LuaUserData for CodempBufferController { methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { - this.callback(move |controller: CodempBufferController| { - let _c = controller.clone(); - if let Err(e) = cb.call::<(CodempBufferController,), ()>((_c,)) { - tracing::error!("error running buffer#{} callback: {e}", controller.name()); - } - }); + this.callback(move |controller: CodempBufferController| CHANNEL.send(cb.clone(), controller)); Ok(()) }); } } impl LuaUserData for CodempTextChange { - fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fn add_fields>(fields: &mut F) { fields.add_field_method_get("content", |_, this| Ok(this.content.clone())); fields.add_field_method_get("first", |_, this| Ok(this.start)); fields.add_field_method_get("last", |_, this| Ok(this.end)); fields.add_field_method_get("hash", |_, this| Ok(this.hash)); } - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("apply", |_, this, (txt,):(String,)| Ok(this.apply(&txt))); } @@ -420,7 +495,7 @@ fn logger(_: &Lua, (printer, debug): (LuaValue, Option)) -> LuaResult((msg,)); + let _ = cb.call::<()>((msg,)); // if the logger fails logging who logs it? } }); @@ -450,6 +525,15 @@ fn codemp_native(lua: &Lua) -> LuaResult { // runtime exports.set("spawn_runtime_driver", lua.create_function(spawn_runtime_driver)?)?; + exports.set("poll_callback", lua.create_function(|_, ()| { + // TODO pass args too + if let Some((cb, _arg)) = CHANNEL.recv() { + Ok(Some(cb)) + } else { + Ok(None) + } + })?)?; + // logging exports.set("logger", lua.create_function(logger)?)?;