feat(lua): hand rolled a_sync! to the rescue

This commit is contained in:
əlemi 2024-08-17 00:06:57 +02:00
parent e7fa9f4a5b
commit fdcfc611b1
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 128 additions and 51 deletions

View file

@ -26,10 +26,10 @@ tokio-stream = "0.1"
dashmap = "5.5" dashmap = "5.5"
# glue (multiple) # glue (multiple)
lazy_static = { version = "1.4", optional = true }
tracing-subscriber = { version = "0.3", optional = true } tracing-subscriber = { version = "0.3", optional = true }
# glue (java) # glue (java)
lazy_static = { version = "1.4", optional = true }
jni = { version = "0.21", features = ["invocation"], optional = true } jni = { version = "0.21", features = ["invocation"], optional = true }
# glue (lua) # glue (lua)
@ -51,7 +51,7 @@ pyo3-build-config = { version = "0.19", optional = true }
[features] [features]
default = [] default = []
lua = ["mlua", "lazy_static", "tracing-subscriber"] lua = ["mlua", "tracing-subscriber"]
java = ["lazy_static", "jni", "tracing-subscriber"] java = ["lazy_static", "jni", "tracing-subscriber"]
js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"] js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"]
python = ["pyo3", "pyo3-asyncio", "tracing-subscriber", "pyo3-build-config"] python = ["pyo3", "pyo3-asyncio", "tracing-subscriber", "pyo3-build-config"]

View file

@ -17,13 +17,70 @@ impl From::<CodempError> for LuaError {
} }
} }
lazy_static::lazy_static!{ fn tokio() -> &'static tokio::runtime::Runtime {
static ref RT : tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("could not create tokio runtime"); use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(||
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("could not create tokio runtime")
)
}
struct Promise<T: Send + Sync + IntoLuaMulti>(Option<tokio::task::JoinHandle<LuaResult<T>>>);
impl<T: Send + Sync + IntoLuaMulti + 'static> LuaUserData for Promise<T> {
fn add_fields<'a, F: LuaUserDataFields<'a, Self>>(fields: &mut F) {
fields.add_field_method_get("ready", |_, this|
Ok(this.0.as_ref().map_or(true, |x| x.is_finished()))
);
}
fn add_methods<'a, M: LuaUserDataMethods<'a, Self>>(methods: &mut M) {
// TODO: await MUST NOT be used in callbacks!!
methods.add_method_mut("await", |_, this, ()| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.block_on(x)
.map_err(LuaError::runtime)?
},
});
methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.spawn(async move {
match x.await {
Err(e) => tracing::error!("could not join promise to run callback: {e}"),
Ok(Err(e)) => tracing::error!("promise returned error: {e}"),
Ok(Ok(res)) => {
if let Err(e) = cb.call::<T,()>(res) {
tracing::error!("error running promise callback: {e}");
}
},
}
});
Ok(())
},
});
}
}
macro_rules! a_sync {
($($clone:ident)* => $x:expr) => {
{
$(let $clone = $clone.clone();)*
Ok(Promise(Some(tokio().spawn(async move { $x }))))
}
};
} }
fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult<Driver> { fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult<Driver> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || RT.block_on(async move { std::thread::spawn(move || tokio().block_on(async move {
tracing::info!(" :: driving runtime...");
tokio::select! { tokio::select! {
() = std::future::pending::<()>() => {}, () = std::future::pending::<()>() => {},
_ = rx.recv() => {}, _ = rx.recv() => {},
@ -52,12 +109,12 @@ impl LuaUserData for CodempClient {
// join a remote workspace and start processing cursor events // join a remote workspace and start processing cursor events
methods.add_method("join_workspace", |_, this, (session,):(String,)| methods.add_method("join_workspace", |_, this, (session,):(String,)|
Ok(RT.block_on(async { this.join_workspace(&session).await })?) a_sync! { this => Ok(this.join_workspace(&session).await?) }
); );
methods.add_method("leave_workspace", |_, this, (session,):(String,)| { methods.add_method("leave_workspace", |_, this, (session,):(String,)|
Ok(this.leave_workspace(&session)) Ok(this.leave_workspace(&session))
}); );
methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session))); methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session)));
methods.add_method("active_workspaces", |_, this, ()| Ok(this.active_workspaces())); methods.add_method("active_workspaces", |_, this, ()| Ok(this.active_workspaces()));
@ -69,28 +126,46 @@ impl LuaUserData for CodempClient {
impl LuaUserData for CodempWorkspace { impl LuaUserData for CodempWorkspace {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("create_buffer", |_, this, (name,):(String,)| { methods.add_method("create_buffer", |_, this, (name,):(String,)|
Ok(RT.block_on(async { this.create(&name).await })?) a_sync! { this => Ok(this.create(&name).await?) }
}); );
methods.add_method("attach", |_, this, (name,):(String,)| { methods.add_method("attach", |_, this, (name,):(String,)|
Ok(RT.block_on(async { this.attach(&name).await })?) a_sync! { this => Ok(this.attach(&name).await?) }
}); );
methods.add_method("detach", |_, this, (name,):(String,)| { methods.add_method("detach", |_, this, (name,):(String,)|
Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached)) Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached))
}); );
methods.add_method("delete_buffer", |_, this, (name,):(String,)| { methods.add_method("delete_buffer", |_, this, (name,):(String,)|
Ok(RT.block_on(this.delete(&name))?) a_sync! { this => Ok(this.delete(&name).await?) }
}); );
methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name)));
methods.add_method("event", |_, this, ()| Ok(RT.block_on(this.event())?)); methods.add_method("event", |_, this, ()|
a_sync! { this => Ok(this.event().await?) }
);
methods.add_method("fetch_buffers", |_, this, ()| Ok(RT.block_on(this.fetch_buffers())?)); methods.add_method("fetch_buffers", |_, this, ()|
methods.add_method("fetch_users", |_, this, ()| Ok(RT.block_on(this.fetch_users())?)); a_sync! { this => Ok(this.fetch_buffers().await?) }
);
methods.add_method("fetch_users", |_, this, ()|
a_sync! { this => Ok(this.fetch_users().await?) }
);
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
let _this = this.clone();
tokio().spawn(async move {
while let Ok(ev) = _this.event().await {
if let Err(e) = cb.call::<CodempEvent,()>(ev) {
tracing::error!("error running workspace callback: {e}");
}
}
});
Ok(())
});
} }
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
@ -109,12 +184,12 @@ impl LuaUserData for CodempEvent {
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("type", |_, this| match this { fields.add_field_method_get("type", |_, this| match this {
CodempEvent::FileTreeUpdated => Ok("filetree"), CodempEvent::FileTreeUpdated(_) => Ok("filetree"),
CodempEvent::UserJoin(_) | CodempEvent::UserLeave(_) => Ok("user"), CodempEvent::UserJoin(_) | CodempEvent::UserLeave(_) => Ok("user"),
}); });
fields.add_field_method_get("value", |_, this| match this { fields.add_field_method_get("value", |_, this| match this {
CodempEvent::FileTreeUpdated => Ok(None), CodempEvent::FileTreeUpdated(x) => Ok(x.clone()),
CodempEvent::UserJoin(x) | CodempEvent::UserLeave(x) => Ok(Some(x.clone())), CodempEvent::UserJoin(x) | CodempEvent::UserLeave(x) => Ok(x.clone()),
}); });
} }
} }
@ -123,19 +198,21 @@ impl LuaUserData for CodempCursorController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| { methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)|
Ok(RT.block_on(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }))?) a_sync! { this => Ok(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }).await?) }
}); );
methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?)); methods.add_method("try_recv", |_, this, ()|
methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?)); a_sync! { this => Ok(this.try_recv().await?) }
methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?)); );
methods.add_method("recv", |_, this, ()| a_sync! { this => Ok(this.recv().await?) });
methods.add_method("poll", |_, this, ()| a_sync! { this => Ok(this.poll().await?) });
methods.add_method("stop", |_, this, ()| Ok(this.stop())); methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(ControllerCallback::from(move |controller| { this.callback(ControllerCallback::from(move |controller: CodempCursorController| {
if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller,)) { if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller.clone(),)) {
tracing::error!("error running cursor callback: {e}"); tracing::error!("error running cursor callback: {e}");
} }
})); }));
@ -184,33 +261,33 @@ impl LuaUserData for CodempBufferController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (start, end, text): (usize, usize, String)| { methods.add_method("send", |_, this, (start, end, content, hash): (usize, usize, String, Option<i64>)|
Ok( a_sync! { this => Ok(
RT.block_on(this.send( this.send(
CodempTextChange { CodempTextChange {
start: start as u32, start: start as u32,
end: end as u32, end: end as u32,
content: text, content,
hash: None, hash,
} }
))? ).await?
) )}
}); );
methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?)); methods.add_method("try_recv", |_, this, ()| a_sync! { this => Ok(this.try_recv().await?) });
methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?)); methods.add_method("recv", |_, this, ()| a_sync! { this => Ok(this.recv().await?) });
methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?)); methods.add_method("poll", |_, this, ()| a_sync! { this => Ok(this.poll().await?) });
methods.add_method("stop", |_, this, ()| Ok(this.stop())); methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("content", |_, this, ()| Ok(RT.block_on(this.content())?)); methods.add_method("content", |_, this, ()| a_sync! { this => Ok(this.content().await?) });
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempBufferController| { this.callback(move |controller: CodempBufferController| {
let _c = controller.clone(); let _c = controller.clone();
if let Err(e) = cb.call::<(CodempBufferController,), ()>((controller,)) { if let Err(e) = cb.call::<(CodempBufferController,), ()>((_c,)) {
tracing::error!("error running buffer#{} callback: {e}", _c.name()); tracing::error!("error running buffer#{} callback: {e}", controller.name());
} }
}); });
Ok(()) Ok(())
@ -240,7 +317,7 @@ fn codemp_lua(lua: &Lua) -> LuaResult<LuaTable> {
// entrypoint // entrypoint
exports.set("connect", lua.create_function(|_, (host, username, password):(String,String,String)| exports.set("connect", lua.create_function(|_, (host, username, password):(String,String,String)|
Ok(RT.block_on(CodempClient::new(host, username, password))?) a_sync! { => Ok(CodempClient::new(host, username, password).await?) }
)?)?; )?)?;
// utils // utils
@ -318,7 +395,7 @@ fn logger(_: &Lua, (printer, debug): (LuaValue, Option<bool>)) -> LuaResult<bool
.try_init() .try_init()
.is_ok(); .is_ok();
if res { if res {
RT.spawn(async move { tokio().spawn(async move {
while let Some(msg) = rx.recv().await { while let Some(msg) = rx.recv().await {
let _ = cb.call::<(String,),()>((msg,)); let _ = cb.call::<(String,),()>((msg,));
// if the logger fails logging who logs it? // if the logger fails logging who logs it?