feat(lua): also pass errors in callbacks

This commit is contained in:
əlemi 2024-09-17 23:27:27 +02:00
parent 27b56cbd03
commit 3047d21870
Signed by: alemi
GPG key ID: A4895B84D311642C
8 changed files with 42 additions and 26 deletions

View file

@ -4,6 +4,7 @@ on:
push:
branches:
- stable
- dev
permissions:
contents: read

View file

@ -4,7 +4,6 @@ use crate::prelude::*;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
use super::ext::callback::CHANNEL;
impl LuaUserData for CodempBufferController {
@ -25,7 +24,7 @@ impl LuaUserData for CodempBufferController {
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempBufferController| CHANNEL.send(cb.clone(), controller));
this.callback(move |controller: CodempBufferController| super::ext::callback().invoke(cb.clone(), controller));
Ok(())
});
}

View file

@ -4,7 +4,6 @@ use crate::prelude::*;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
use super::ext::callback::CHANNEL;
use super::ext::lua_tuple;
impl LuaUserData for CodempCursorController {
@ -24,7 +23,7 @@ impl LuaUserData for CodempCursorController {
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempCursorController| CHANNEL.send(cb.clone(), controller));
this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller));
Ok(())
});
}

View file

@ -1,8 +1,6 @@
use mlua_codemp_patch as mlua;
use mlua::prelude::*;
use super::callback::CHANNEL;
pub(crate) fn tokio() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
@ -60,8 +58,10 @@ impl LuaUserData for Promise {
.spawn(async move {
match x.await {
Err(e) => tracing::error!("could not join promise to run callback: {e}"),
Ok(Err(e)) => tracing::error!("promise returned error: {e}"),
Ok(Ok(res)) => CHANNEL.send(cb, res),
Ok(res) => match res {
Err(e) => super::callback().failure(e),
Ok(val) => super::callback().invoke(cb, val),
},
}
});
Ok(())

View file

@ -3,16 +3,17 @@ use mlua::prelude::*;
use crate::prelude::*;
use crate::ext::IgnorableError;
lazy_static::lazy_static! {
pub(crate) static ref CHANNEL: CallbackChannel = CallbackChannel::default();
pub(crate) fn callback() -> &'static CallbackChannel<LuaCallback> {
static CHANNEL: std::sync::OnceLock<CallbackChannel<LuaCallback>> = std::sync::OnceLock::new();
CHANNEL.get_or_init(CallbackChannel::default)
}
pub(crate) struct CallbackChannel {
tx: std::sync::Arc<tokio::sync::mpsc::UnboundedSender<(LuaFunction, CallbackArg)>>,
rx: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<(LuaFunction, CallbackArg)>>
pub(crate) struct CallbackChannel<T> {
tx: std::sync::Arc<tokio::sync::mpsc::UnboundedSender<T>>,
rx: std::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<T>>
}
impl Default for CallbackChannel {
impl Default for CallbackChannel<LuaCallback> {
fn default() -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rx = std::sync::Mutex::new(rx);
@ -23,30 +24,40 @@ impl Default for CallbackChannel {
}
}
impl CallbackChannel {
pub(crate) fn send(&self, cb: LuaFunction, arg: impl Into<CallbackArg>) {
self.tx.send((cb, arg.into()))
impl CallbackChannel<LuaCallback> {
pub(crate) fn invoke(&self, cb: LuaFunction, arg: impl Into<CallbackArg>) {
self.tx.send(LuaCallback::Invoke(cb, arg.into()))
.unwrap_or_warn("error scheduling callback")
}
pub(crate) fn recv(&self) -> Option<(LuaFunction, CallbackArg)> {
pub(crate) fn failure(&self, err: impl std::error::Error) {
self.tx.send(LuaCallback::Fail(format!("promise failed with error: {err:?}")))
.unwrap_or_warn("error scheduling callback failure")
}
pub(crate) fn recv(&self) -> Option<LuaCallback> {
match self.rx.try_lock() {
Err(e) => {
tracing::warn!("could not acquire callback channel mutex: {e}");
tracing::debug!("backing off from callback mutex: {e}");
None
},
Ok(mut lock) => match lock.try_recv() {
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => None,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
tracing::error!("callback channel closed");
None
},
Ok((cb, arg)) => Some((cb, arg)),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => None,
Ok(cb) => Some(cb),
},
}
}
}
pub(crate) enum LuaCallback {
Fail(String),
Invoke(LuaFunction, CallbackArg),
}
pub(crate) enum CallbackArg {
Nil,
Str(String),

View file

@ -66,7 +66,7 @@ pub(crate) fn logger(_: &Lua, (printer, debug): (LuaValue, Option<bool>)) -> Lua
if res {
super::a_sync::tokio().spawn(async move {
while let Some(msg) = rx.recv().await {
super::callback::CHANNEL.send(cb.clone(), msg);
super::callback().invoke(cb.clone(), msg);
}
});
}

View file

@ -6,6 +6,7 @@ use mlua_codemp_patch as mlua;
use mlua::prelude::*;
pub(crate) use a_sync::tokio;
pub(crate) use callback::callback;
pub(crate) fn lua_tuple<T: IntoLua>(lua: &Lua, (a, b): (T, T)) -> LuaResult<LuaTable> {
let table = lua.create_table()?;

View file

@ -30,11 +30,16 @@ fn entrypoint(lua: &Lua) -> LuaResult<LuaTable> {
// runtime
exports.set("spawn_runtime_driver", lua.create_function(ext::a_sync::spawn_runtime_driver)?)?;
exports.set("poll_callback", lua.create_function(|lua, ()| {
// TODO pass args too
let mut val = LuaMultiValue::new();
if let Some((cb, arg)) = ext::callback::CHANNEL.recv() {
val.push_back(LuaValue::Function(cb));
val.push_back(arg.into_lua(lua)?);
match ext::callback().recv() {
None => {},
Some(ext::callback::LuaCallback::Invoke(cb, arg)) => {
val.push_back(LuaValue::Function(cb));
val.push_back(arg.into_lua(lua)?);
}
Some(ext::callback::LuaCallback::Fail(msg)) => {
return Err(LuaError::runtime(msg));
},
}
Ok(val)
})?)?;