From fdcfc611b17a172c67a3b6ddf6ea621f5175bee8 Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 17 Aug 2024 00:06:57 +0200 Subject: [PATCH] feat(lua): hand rolled a_sync! to the rescue --- Cargo.toml | 4 +- src/ffi/lua.rs | 175 +++++++++++++++++++++++++++++++++++-------------- 2 files changed, 128 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 74d096f..b78f720 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,10 +26,10 @@ tokio-stream = "0.1" dashmap = "5.5" # glue (multiple) -lazy_static = { version = "1.4", optional = true } tracing-subscriber = { version = "0.3", optional = true } # glue (java) +lazy_static = { version = "1.4", optional = true } jni = { version = "0.21", features = ["invocation"], optional = true } # glue (lua) @@ -51,7 +51,7 @@ pyo3-build-config = { version = "0.19", optional = true } [features] default = [] -lua = ["mlua", "lazy_static", "tracing-subscriber"] +lua = ["mlua", "tracing-subscriber"] java = ["lazy_static", "jni", "tracing-subscriber"] js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"] python = ["pyo3", "pyo3-asyncio", "tracing-subscriber", "pyo3-build-config"] diff --git a/src/ffi/lua.rs b/src/ffi/lua.rs index 723c340..d0967fd 100644 --- a/src/ffi/lua.rs +++ b/src/ffi/lua.rs @@ -17,13 +17,70 @@ impl From:: for LuaError { } } -lazy_static::lazy_static!{ - static ref RT : tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("could not create tokio runtime"); +fn tokio() -> &'static tokio::runtime::Runtime { + use std::sync::OnceLock; + static RT: OnceLock = OnceLock::new(); + RT.get_or_init(|| + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("could not create tokio runtime") + ) +} + +struct Promise(Option>>); + +impl LuaUserData for Promise { + fn add_fields<'a, F: LuaUserDataFields<'a, Self>>(fields: &mut F) { + fields.add_field_method_get("ready", |_, this| + Ok(this.0.as_ref().map_or(true, |x| x.is_finished())) + ); + } + + fn add_methods<'a, M: LuaUserDataMethods<'a, Self>>(methods: &mut M) { + // TODO: await MUST NOT be used in callbacks!! + methods.add_method_mut("await", |_, this, ()| match this.0.take() { + None => Err(LuaError::runtime("Promise already awaited")), + Some(x) => { + tokio() + .block_on(x) + .map_err(LuaError::runtime)? + }, + }); + methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() { + None => Err(LuaError::runtime("Promise already awaited")), + Some(x) => { + tokio() + .spawn(async move { + match x.await { + Err(e) => tracing::error!("could not join promise to run callback: {e}"), + Ok(Err(e)) => tracing::error!("promise returned error: {e}"), + Ok(Ok(res)) => { + if let Err(e) = cb.call::(res) { + tracing::error!("error running promise callback: {e}"); + } + }, + } + }); + Ok(()) + }, + }); + } +} + +macro_rules! a_sync { + ($($clone:ident)* => $x:expr) => { + { + $(let $clone = $clone.clone();)* + Ok(Promise(Some(tokio().spawn(async move { $x })))) + } + }; } fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - std::thread::spawn(move || RT.block_on(async move { + std::thread::spawn(move || tokio().block_on(async move { + tracing::info!(" :: driving runtime..."); tokio::select! { () = std::future::pending::<()>() => {}, _ = rx.recv() => {}, @@ -52,12 +109,12 @@ impl LuaUserData for CodempClient { // join a remote workspace and start processing cursor events methods.add_method("join_workspace", |_, this, (session,):(String,)| - Ok(RT.block_on(async { this.join_workspace(&session).await })?) + a_sync! { this => Ok(this.join_workspace(&session).await?) } ); - methods.add_method("leave_workspace", |_, this, (session,):(String,)| { + methods.add_method("leave_workspace", |_, this, (session,):(String,)| Ok(this.leave_workspace(&session)) - }); + ); methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session))); methods.add_method("active_workspaces", |_, this, ()| Ok(this.active_workspaces())); @@ -69,28 +126,46 @@ impl LuaUserData for CodempClient { impl LuaUserData for CodempWorkspace { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); - methods.add_method("create_buffer", |_, this, (name,):(String,)| { - Ok(RT.block_on(async { this.create(&name).await })?) - }); + methods.add_method("create_buffer", |_, this, (name,):(String,)| + a_sync! { this => Ok(this.create(&name).await?) } + ); - methods.add_method("attach", |_, this, (name,):(String,)| { - Ok(RT.block_on(async { this.attach(&name).await })?) - }); + methods.add_method("attach", |_, this, (name,):(String,)| + a_sync! { this => Ok(this.attach(&name).await?) } + ); - methods.add_method("detach", |_, this, (name,):(String,)| { + methods.add_method("detach", |_, this, (name,):(String,)| Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached)) - }); + ); - methods.add_method("delete_buffer", |_, this, (name,):(String,)| { - Ok(RT.block_on(this.delete(&name))?) - }); + methods.add_method("delete_buffer", |_, this, (name,):(String,)| + a_sync! { this => Ok(this.delete(&name).await?) } + ); methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); - methods.add_method("event", |_, this, ()| Ok(RT.block_on(this.event())?)); + methods.add_method("event", |_, this, ()| + a_sync! { this => Ok(this.event().await?) } + ); - methods.add_method("fetch_buffers", |_, this, ()| Ok(RT.block_on(this.fetch_buffers())?)); - methods.add_method("fetch_users", |_, this, ()| Ok(RT.block_on(this.fetch_users())?)); + methods.add_method("fetch_buffers", |_, this, ()| + a_sync! { this => Ok(this.fetch_buffers().await?) } + ); + methods.add_method("fetch_users", |_, this, ()| + a_sync! { this => Ok(this.fetch_users().await?) } + ); + + methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { + let _this = this.clone(); + tokio().spawn(async move { + while let Ok(ev) = _this.event().await { + if let Err(e) = cb.call::(ev) { + tracing::error!("error running workspace callback: {e}"); + } + } + }); + Ok(()) + }); } fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { @@ -109,12 +184,12 @@ impl LuaUserData for CodempEvent { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fields.add_field_method_get("type", |_, this| match this { - CodempEvent::FileTreeUpdated => Ok("filetree"), + CodempEvent::FileTreeUpdated(_) => Ok("filetree"), CodempEvent::UserJoin(_) | CodempEvent::UserLeave(_) => Ok("user"), }); fields.add_field_method_get("value", |_, this| match this { - CodempEvent::FileTreeUpdated => Ok(None), - CodempEvent::UserJoin(x) | CodempEvent::UserLeave(x) => Ok(Some(x.clone())), + CodempEvent::FileTreeUpdated(x) => Ok(x.clone()), + CodempEvent::UserJoin(x) | CodempEvent::UserLeave(x) => Ok(x.clone()), }); } } @@ -123,19 +198,21 @@ impl LuaUserData for CodempCursorController { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); - methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| { - Ok(RT.block_on(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }))?) - }); - methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?)); - methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?)); - methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?)); + methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| + a_sync! { this => Ok(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }).await?) } + ); + methods.add_method("try_recv", |_, this, ()| + a_sync! { this => Ok(this.try_recv().await?) } + ); + methods.add_method("recv", |_, this, ()| a_sync! { this => Ok(this.recv().await?) }); + methods.add_method("poll", |_, this, ()| a_sync! { this => Ok(this.poll().await?) }); methods.add_method("stop", |_, this, ()| Ok(this.stop())); - methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); + methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { - this.callback(ControllerCallback::from(move |controller| { - if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller,)) { + this.callback(ControllerCallback::from(move |controller: CodempCursorController| { + if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller.clone(),)) { tracing::error!("error running cursor callback: {e}"); } })); @@ -184,33 +261,33 @@ impl LuaUserData for CodempBufferController { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); - methods.add_method("send", |_, this, (start, end, text): (usize, usize, String)| { - Ok( - RT.block_on(this.send( + methods.add_method("send", |_, this, (start, end, content, hash): (usize, usize, String, Option)| + a_sync! { this => Ok( + this.send( CodempTextChange { start: start as u32, end: end as u32, - content: text, - hash: None, + content, + hash, } - ))? - ) - }); + ).await? + )} + ); - methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?)); - methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?)); - methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?)); + methods.add_method("try_recv", |_, this, ()| a_sync! { this => Ok(this.try_recv().await?) }); + methods.add_method("recv", |_, this, ()| a_sync! { this => Ok(this.recv().await?) }); + methods.add_method("poll", |_, this, ()| a_sync! { this => Ok(this.poll().await?) }); methods.add_method("stop", |_, this, ()| Ok(this.stop())); - methods.add_method("content", |_, this, ()| Ok(RT.block_on(this.content())?)); + methods.add_method("content", |_, this, ()| a_sync! { this => Ok(this.content().await?) }); - methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); + methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) }); methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { this.callback(move |controller: CodempBufferController| { let _c = controller.clone(); - if let Err(e) = cb.call::<(CodempBufferController,), ()>((controller,)) { - tracing::error!("error running buffer#{} callback: {e}", _c.name()); + if let Err(e) = cb.call::<(CodempBufferController,), ()>((_c,)) { + tracing::error!("error running buffer#{} callback: {e}", controller.name()); } }); Ok(()) @@ -240,7 +317,7 @@ fn codemp_lua(lua: &Lua) -> LuaResult { // entrypoint exports.set("connect", lua.create_function(|_, (host, username, password):(String,String,String)| - Ok(RT.block_on(CodempClient::new(host, username, password))?) + a_sync! { => Ok(CodempClient::new(host, username, password).await?) } )?)?; // utils @@ -318,7 +395,7 @@ fn logger(_: &Lua, (printer, debug): (LuaValue, Option)) -> LuaResult((msg,)); // if the logger fails logging who logs it?