feat(lua): introduce callbacks channel

this means that lua must poll but at least they run on main thread and
dont segfault the main process
This commit is contained in:
əlemi 2024-09-01 03:13:03 +02:00 committed by zaaarf
parent e50703fc77
commit 8296b473bb
No known key found for this signature in database
GPG key ID: 102E445F4C3F829B
2 changed files with 152 additions and 68 deletions

View file

@ -33,7 +33,7 @@ lazy_static = { version = "1.4", optional = true }
jni = { version = "0.21", features = ["invocation"], optional = true }
# glue (lua)
mlua = { version = "0.10.0-beta.1", features = ["module", "luajit", "send"], optional = true }
mlua = { git = "https://github.com/mlua-rs/mlua" , rev = "ece66c46bfdc62685b758ffff16286f2806b2662", features = ["module", "luajit", "send"], optional = true }
# glue (js)
napi = { version = "2.16", features = ["full"], optional = true }
@ -51,7 +51,7 @@ pyo3-build-config = { version = "0.19", optional = true }
[features]
default = []
rust = [] # used for ci matrix
lua = ["mlua", "tracing-subscriber"]
lua = ["mlua", "tracing-subscriber", "lazy_static"]
java = ["lazy_static", "jni", "tracing-subscriber"]
js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"]
python = ["pyo3", "tracing-subscriber", "pyo3-build-config"]

View file

@ -1,12 +1,13 @@
use std::io::Write;
use std::sync::Mutex;
use crate::api::controller::ControllerCallback;
use crate::api::Cursor;
use crate::ext::IgnorableError;
use crate::prelude::*;
use crate::workspace::worker::DetachResult;
use mlua::prelude::*;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
impl From::<crate::errors::ConnectionError> for LuaError {
fn from(value: crate::errors::ConnectionError) -> Self {
@ -37,16 +38,86 @@ fn tokio() -> &'static tokio::runtime::Runtime {
)
}
// TODO cannot do Box<dyn IntoLuaMulti> ?? maybe its temporary because im using betas
enum CallbackArg {
CursorController(CodempCursorController),
BufferController(CodempBufferController),
Event(CodempEvent),
}
impl From<CodempCursorController> for CallbackArg {
fn from(value: CodempCursorController) -> Self {
Self::CursorController(value)
}
}
impl From<CodempBufferController> for CallbackArg {
fn from(value: CodempBufferController) -> Self {
Self::BufferController(value)
}
}
impl From<CodempEvent> for CallbackArg {
fn from(value: CodempEvent) -> Self {
Self::Event(value)
}
}
struct CallbackChannel {
tx: tokio::sync::mpsc::UnboundedSender<(LuaFunction, CallbackArg)>,
rx: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<(LuaFunction, CallbackArg)>>
}
impl Default for CallbackChannel {
fn default() -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rx = std::sync::Mutex::new(rx);
Self {
tx, rx,
}
}
}
impl CallbackChannel {
fn send(&self, cb: LuaFunction, arg: impl Into<CallbackArg>) {
self.tx.send((cb, arg.into())).unwrap_or_warn("error scheduling callback")
}
fn recv(&self) -> Option<(LuaFunction, CallbackArg)> {
match self.rx.try_lock() {
Ok(mut lock) => match lock.try_recv() {
Ok(res) => Some(res),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => {
tracing::error!("callback channel closed");
None
},
},
Err(e) => {
tracing::warn!("could not acquire callback channel mutex: {e}");
None
},
}
}
}
lazy_static::lazy_static! {
static ref CHANNEL: CallbackChannel = CallbackChannel::default();
}
struct Promise<T: Send + Sync + IntoLuaMulti>(Option<tokio::task::JoinHandle<LuaResult<T>>>);
impl<T: Send + Sync + IntoLuaMulti + 'static> LuaUserData for Promise<T> {
fn add_fields<'a, F: LuaUserDataFields<'a, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("ready", |_, this|
Ok(this.0.as_ref().map_or(true, |x| x.is_finished()))
);
}
fn add_methods<'a, M: LuaUserDataMethods<'a, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
// TODO: await MUST NOT be used in callbacks!!
methods.add_method_mut("await", |_, this, ()| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
@ -56,24 +127,24 @@ impl<T: Send + Sync + IntoLuaMulti + 'static> LuaUserData for Promise<T> {
.map_err(LuaError::runtime)?
},
});
methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.spawn(async move {
match x.await {
Err(e) => tracing::error!("could not join promise to run callback: {e}"),
Ok(Err(e)) => tracing::error!("promise returned error: {e}"),
Ok(Ok(res)) => {
if let Err(e) = cb.call::<T,()>(res) {
tracing::error!("error running promise callback: {e}");
}
},
}
});
Ok(())
},
});
// methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() {
// None => Err(LuaError::runtime("Promise already awaited")),
// Some(x) => {
// tokio()
// .spawn(async move {
// match x.await {
// Err(e) => tracing::error!("could not join promise to run callback: {e}"),
// Ok(Err(e)) => tracing::error!("promise returned error: {e}"),
// Ok(Ok(res)) => {
// if let Err(e) = cb.call::<()>(res) {
// tracing::error!("error running promise callback: {e}");
// }
// },
// }
// });
// Ok(())
// },
// });
}
}
@ -88,34 +159,47 @@ macro_rules! a_sync {
fn spawn_runtime_driver(_: &Lua, ():()) -> LuaResult<Driver> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || tokio().block_on(async move {
let handle = std::thread::spawn(move || tokio().block_on(async move {
tracing::info!(" :: driving runtime...");
tokio::select! {
() = std::future::pending::<()>() => {},
_ = rx.recv() => {},
}
}));
Ok(Driver(tx))
Ok(Driver(tx, Some(handle)))
}
#[derive(Debug, Clone)]
struct Driver(tokio::sync::mpsc::UnboundedSender<()>);
#[derive(Debug)]
struct Driver(tokio::sync::mpsc::UnboundedSender<()>, Option<std::thread::JoinHandle<()>>);
impl LuaUserData for Driver {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("stop", |_, this, ()| Ok(this.0.send(()).is_ok()));
methods.add_method_mut("stop", |_, this, ()| {
match this.1.take() {
None => Ok(false),
Some(handle) => {
if this.0.send(()).is_err() {
tracing::warn!("found runtime already stopped while attempting to stop it");
}
match handle.join() {
Err(e) => Err(LuaError::runtime(format!("runtime thread panicked: {e:?}"))),
Ok(()) => Ok(true),
}
},
}
});
}
}
impl LuaUserData for CodempClient {
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("id", |_, this| Ok(this.user().id.to_string()));
fields.add_field_method_get("username", |_, this| Ok(this.user().name.clone()));
fields.add_field_method_get("active_workspaces", |_, this| Ok(this.active_workspaces()));
}
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("refresh", |_, this, ()|
@ -152,17 +236,17 @@ impl LuaUserData for CodempClient {
impl LuaUserData for CodempWorkspace {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("create_buffer", |_, this, (name,):(String,)|
a_sync! { this => Ok(this.create(&name).await?) }
);
methods.add_method("attach", |_, this, (name,):(String,)|
methods.add_method("attach_buffer", |_, this, (name,):(String,)|
a_sync! { this => Ok(this.attach(&name).await?) }
);
methods.add_method("detach", |_, this, (name,):(String,)|
methods.add_method("detach_buffer", |_, this, (name,):(String,)|
Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached))
);
@ -183,24 +267,24 @@ impl LuaUserData for CodempWorkspace {
a_sync! { this => Ok(this.fetch_users().await?) }
);
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
let _this = this.clone();
tokio().spawn(async move {
while let Ok(ev) = _this.event().await {
if let Err(e) = cb.call::<CodempEvent,()>(ev) {
tracing::error!("error running workspace callback: {e}");
}
}
});
Ok(())
});
// methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
// let _this = this.clone();
// tokio().spawn(async move {
// while let Ok(ev) = _this.event().await {
// if let Err(e) = cb.call::<()>(ev) {
// tracing::error!("error running workspace callback: {e}");
// }
// }
// });
// Ok(())
// });
methods.add_method("filetree", |_, this, (filter,):(Option<String>,)|
Ok(this.filetree(filter.as_deref()))
);
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("name", |_, this| Ok(this.id()));
fields.add_field_method_get("cursor", |_, this| Ok(this.cursor()));
fields.add_field_method_get("active_buffers", |_, this| Ok(this.buffer_list()));
@ -209,11 +293,11 @@ impl LuaUserData for CodempWorkspace {
}
impl LuaUserData for CodempEvent {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("type", |_, this| match this {
CodempEvent::FileTreeUpdated(_) => Ok("filetree"),
CodempEvent::UserJoin(_) | CodempEvent::UserLeave(_) => Ok("user"),
@ -226,7 +310,7 @@ impl LuaUserData for CodempEvent {
}
impl LuaUserData for CodempCursorController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)|
@ -242,22 +326,18 @@ impl LuaUserData for CodempCursorController {
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(ControllerCallback::from(move |controller: CodempCursorController| {
if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller.clone(),)) {
tracing::error!("error running cursor callback: {e}");
}
}));
this.callback(move |controller: CodempCursorController| CHANNEL.send(cb.clone(), controller));
Ok(())
});
}
}
impl LuaUserData for Cursor {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("user", |_, this| Ok(this.user.map(|x| x.to_string())));
fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone()));
fields.add_field_method_get("start", |_, this| Ok(RowCol::from(this.start)));
@ -278,18 +358,18 @@ impl From<(i32, i32)> for RowCol {
}
impl LuaUserData for RowCol {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("row", |_, this| Ok(this.row));
fields.add_field_method_get("col", |_, this| Ok(this.col));
}
}
impl LuaUserData for CodempBufferController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (start, end, content, hash): (usize, usize, String, Option<i64>)|
@ -315,26 +395,21 @@ impl LuaUserData for CodempBufferController {
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempBufferController| {
let _c = controller.clone();
if let Err(e) = cb.call::<(CodempBufferController,), ()>((_c,)) {
tracing::error!("error running buffer#{} callback: {e}", controller.name());
}
});
this.callback(move |controller: CodempBufferController| CHANNEL.send(cb.clone(), controller));
Ok(())
});
}
}
impl LuaUserData for CodempTextChange {
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("content", |_, this| Ok(this.content.clone()));
fields.add_field_method_get("first", |_, this| Ok(this.start));
fields.add_field_method_get("last", |_, this| Ok(this.end));
fields.add_field_method_get("hash", |_, this| Ok(this.hash));
}
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("apply", |_, this, (txt,):(String,)| Ok(this.apply(&txt)));
}
@ -420,7 +495,7 @@ fn logger(_: &Lua, (printer, debug): (LuaValue, Option<bool>)) -> LuaResult<bool
if res {
tokio().spawn(async move {
while let Some(msg) = rx.recv().await {
let _ = cb.call::<(String,),()>((msg,));
let _ = cb.call::<()>((msg,));
// if the logger fails logging who logs it?
}
});
@ -450,6 +525,15 @@ fn codemp_native(lua: &Lua) -> LuaResult<LuaTable> {
// runtime
exports.set("spawn_runtime_driver", lua.create_function(spawn_runtime_driver)?)?;
exports.set("poll_callback", lua.create_function(|_, ()| {
// TODO pass args too
if let Some((cb, _arg)) = CHANNEL.recv() {
Ok(Some(cb))
} else {
Ok(None)
}
})?)?;
// logging
exports.set("logger", lua.create_function(logger)?)?;