Merge branch 'dev' into pyo3_bump

This commit is contained in:
cschen 2024-08-21 15:03:10 +02:00
commit e7272753e6
4 changed files with 139 additions and 55 deletions

View file

@ -26,10 +26,10 @@ tokio-stream = "0.1"
dashmap = "5.5"
# glue (multiple)
lazy_static = { version = "1.4", optional = true }
tracing-subscriber = { version = "0.3", optional = true }
# glue (java)
lazy_static = { version = "1.4", optional = true }
jni = { version = "0.21", features = ["invocation"], optional = true }
# glue (lua)
@ -50,7 +50,7 @@ pyo3-build-config = { version = "0.19", optional = true }
[features]
default = []
lua = ["mlua", "lazy_static", "tracing-subscriber"]
lua = ["mlua", "tracing-subscriber"]
java = ["lazy_static", "jni", "tracing-subscriber"]
js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"]
python = ["pyo3", "tracing-subscriber", "pyo3-build-config"]

View file

@ -15,6 +15,8 @@ use crate::api::TextChange;
use crate::ext::InternallyMutable;
use super::worker::DeltaRequest;
/// the buffer controller implementation
///
/// for each controller a worker exists, managing outgoing and inbound
@ -54,7 +56,7 @@ pub(crate) struct BufferControllerInner {
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
}
@ -87,7 +89,7 @@ impl Controller<TextChange> for BufferController {
self.0.delta_request.send((last_update, tx)).await?;
let (v, change) = rx.await?;
self.0.last_update.set(v);
Ok(Some(change))
Ok(change)
}
/// enqueue a text change for processing

View file

@ -14,6 +14,9 @@ use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::{BufferController, BufferControllerInner};
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
pub(crate) struct BufferWorker {
user_id: Uuid,
latest_version: watch::Sender<diamond_types::LocalVersion>,
@ -21,7 +24,7 @@ pub(crate) struct BufferWorker {
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
delta_req: mpsc::Receiver<DeltaRequest>,
stop: mpsc::UnboundedReceiver<()>,
controller: BufferController,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
@ -181,7 +184,9 @@ impl ControllerWorker<TextChange> for BufferWorker {
}
}
};
tx.send((new_local_v, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?");
tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?");
}
},
},

View file

@ -17,13 +17,70 @@ impl From::<CodempError> for LuaError {
}
}
lazy_static::lazy_static!{
static ref RT : tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("could not create tokio runtime");
fn tokio() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(||
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("could not create tokio runtime")
)
}
struct Promise<T: Send + Sync + IntoLuaMulti>(Option<tokio::task::JoinHandle<LuaResult<T>>>);
impl<T: Send + Sync + IntoLuaMulti + 'static> LuaUserData for Promise<T> {
fn add_fields<'a, F: LuaUserDataFields<'a, Self>>(fields: &mut F) {
fields.add_field_method_get("ready", |_, this|
Ok(this.0.as_ref().map_or(true, |x| x.is_finished()))
);
}
fn add_methods<'a, M: LuaUserDataMethods<'a, Self>>(methods: &mut M) {
// TODO: await MUST NOT be used in callbacks!!
methods.add_method_mut("await", |_, this, ()| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.block_on(x)
.map_err(LuaError::runtime)?
},
});
methods.add_method_mut("and_then", |_, this, (cb,):(LuaFunction,)| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
tokio()
.spawn(async move {
match x.await {
Err(e) => tracing::error!("could not join promise to run callback: {e}"),
Ok(Err(e)) => tracing::error!("promise returned error: {e}"),
Ok(Ok(res)) => {
if let Err(e) = cb.call::<T,()>(res) {
tracing::error!("error running promise callback: {e}");
}
},
}
});
Ok(())
},
});
}
}
macro_rules! a_sync {
($($clone:ident)* => $x:expr) => {
{
$(let $clone = $clone.clone();)*
Ok(Promise(Some(tokio().spawn(async move { $x }))))
}
};
}
fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult<Driver> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || RT.block_on(async move {
std::thread::spawn(move || tokio().block_on(async move {
tracing::info!(" :: driving runtime...");
tokio::select! {
() = std::future::pending::<()>() => {},
_ = rx.recv() => {},
@ -52,12 +109,12 @@ impl LuaUserData for CodempClient {
// join a remote workspace and start processing cursor events
methods.add_method("join_workspace", |_, this, (session,):(String,)|
Ok(RT.block_on(async { this.join_workspace(&session).await })?)
a_sync! { this => Ok(this.join_workspace(&session).await?) }
);
methods.add_method("leave_workspace", |_, this, (session,):(String,)| {
methods.add_method("leave_workspace", |_, this, (session,):(String,)|
Ok(this.leave_workspace(&session))
});
);
methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session)));
methods.add_method("active_workspaces", |_, this, ()| Ok(this.active_workspaces()));
@ -69,28 +126,46 @@ impl LuaUserData for CodempClient {
impl LuaUserData for CodempWorkspace {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("create_buffer", |_, this, (name,):(String,)| {
Ok(RT.block_on(async { this.create(&name).await })?)
});
methods.add_method("create_buffer", |_, this, (name,):(String,)|
a_sync! { this => Ok(this.create(&name).await?) }
);
methods.add_method("attach", |_, this, (name,):(String,)| {
Ok(RT.block_on(async { this.attach(&name).await })?)
});
methods.add_method("attach", |_, this, (name,):(String,)|
a_sync! { this => Ok(this.attach(&name).await?) }
);
methods.add_method("detach", |_, this, (name,):(String,)| {
methods.add_method("detach", |_, this, (name,):(String,)|
Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached))
});
);
methods.add_method("delete_buffer", |_, this, (name,):(String,)| {
Ok(RT.block_on(this.delete(&name))?)
});
methods.add_method("delete_buffer", |_, this, (name,):(String,)|
a_sync! { this => Ok(this.delete(&name).await?) }
);
methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name)));
methods.add_method("event", |_, this, ()| Ok(RT.block_on(this.event())?));
methods.add_method("event", |_, this, ()|
a_sync! { this => Ok(this.event().await?) }
);
methods.add_method("fetch_buffers", |_, this, ()| Ok(RT.block_on(this.fetch_buffers())?));
methods.add_method("fetch_users", |_, this, ()| Ok(RT.block_on(this.fetch_users())?));
methods.add_method("fetch_buffers", |_, this, ()|
a_sync! { this => Ok(this.fetch_buffers().await?) }
);
methods.add_method("fetch_users", |_, this, ()|
a_sync! { this => Ok(this.fetch_users().await?) }
);
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
let _this = this.clone();
tokio().spawn(async move {
while let Ok(ev) = _this.event().await {
if let Err(e) = cb.call::<CodempEvent,()>(ev) {
tracing::error!("error running workspace callback: {e}");
}
}
});
Ok(())
});
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
@ -109,12 +184,12 @@ impl LuaUserData for CodempEvent {
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("type", |_, this| match this {
CodempEvent::FileTreeUpdated => Ok("filetree"),
CodempEvent::FileTreeUpdated(_) => Ok("filetree"),
CodempEvent::UserJoin(_) | CodempEvent::UserLeave(_) => Ok("user"),
});
fields.add_field_method_get("value", |_, this| match this {
CodempEvent::FileTreeUpdated => Ok(None),
CodempEvent::UserJoin(x) | CodempEvent::UserLeave(x) => Ok(Some(x.clone())),
CodempEvent::FileTreeUpdated(x) => Ok(x.clone()),
CodempEvent::UserJoin(x) | CodempEvent::UserLeave(x) => Ok(x.clone()),
});
}
}
@ -123,19 +198,21 @@ impl LuaUserData for CodempCursorController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| {
Ok(RT.block_on(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }))?)
});
methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?));
methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?));
methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?));
methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)|
a_sync! { this => Ok(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }).await?) }
);
methods.add_method("try_recv", |_, this, ()|
a_sync! { this => Ok(this.try_recv().await?) }
);
methods.add_method("recv", |_, this, ()| a_sync! { this => Ok(this.recv().await?) });
methods.add_method("poll", |_, this, ()| a_sync! { this => Ok(this.poll().await?) });
methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback()));
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(ControllerCallback::from(move |controller| {
if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller,)) {
this.callback(ControllerCallback::from(move |controller: CodempCursorController| {
if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller.clone(),)) {
tracing::error!("error running cursor callback: {e}");
}
}));
@ -184,33 +261,33 @@ impl LuaUserData for CodempBufferController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (start, end, text): (usize, usize, String)| {
Ok(
RT.block_on(this.send(
methods.add_method("send", |_, this, (start, end, content, hash): (usize, usize, String, Option<i64>)|
a_sync! { this => Ok(
this.send(
CodempTextChange {
start: start as u32,
end: end as u32,
content: text,
hash: None,
content,
hash,
}
))?
)
});
).await?
)}
);
methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?));
methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?));
methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?));
methods.add_method("try_recv", |_, this, ()| a_sync! { this => Ok(this.try_recv().await?) });
methods.add_method("recv", |_, this, ()| a_sync! { this => Ok(this.recv().await?) });
methods.add_method("poll", |_, this, ()| a_sync! { this => Ok(this.poll().await?) });
methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("content", |_, this, ()| Ok(RT.block_on(this.content())?));
methods.add_method("content", |_, this, ()| a_sync! { this => Ok(this.content().await?) });
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback()));
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempBufferController| {
let _c = controller.clone();
if let Err(e) = cb.call::<(CodempBufferController,), ()>((controller,)) {
tracing::error!("error running buffer#{} callback: {e}", _c.name());
if let Err(e) = cb.call::<(CodempBufferController,), ()>((_c,)) {
tracing::error!("error running buffer#{} callback: {e}", controller.name());
}
});
Ok(())
@ -240,7 +317,7 @@ fn codemp_lua(lua: &Lua) -> LuaResult<LuaTable> {
// entrypoint
exports.set("connect", lua.create_function(|_, (host, username, password):(String,String,String)|
Ok(RT.block_on(CodempClient::new(host, username, password))?)
a_sync! { => Ok(CodempClient::new(host, username, password).await?) }
)?)?;
// utils
@ -318,7 +395,7 @@ fn logger(_: &Lua, (printer, debug): (LuaValue, Option<bool>)) -> LuaResult<bool
.try_init()
.is_ok();
if res {
RT.spawn(async move {
tokio().spawn(async move {
while let Some(msg) = rx.recv().await {
let _ = cb.call::<(String,),()>((msg,));
// if the logger fails logging who logs it?