diff --git a/Cargo.toml b/Cargo.toml index 1ec49fd..324d739 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ tracing-subscriber = { version = "0.3", optional = true } jni = { version = "0.21", features = ["invocation"], optional = true } # glue (lua) -mlua = { version = "0.9", features = ["module", "luajit", "send"], optional = true } +mlua = { version = "0.10.0-beta.1", features = ["module", "luajit", "send"], optional = true } # glue (js) napi = { version = "2.16", features = ["full"], optional = true } diff --git a/dist/java/src/mp/code/Utils.java b/dist/java/src/mp/code/Utils.java new file mode 100644 index 0000000..7174ba7 --- /dev/null +++ b/dist/java/src/mp/code/Utils.java @@ -0,0 +1,10 @@ +package mp.code; + +public class Utils { + /** + * Hashes the given {@link String} using CodeMP's hashing algorithm (xxh3). + * @param input the string to hash + * @return the hash + */ + public static native long hash(String input); +} diff --git a/dist/java/src/mp/code/data/TextChange.java b/dist/java/src/mp/code/data/TextChange.java index e50b6f0..f4e614d 100644 --- a/dist/java/src/mp/code/data/TextChange.java +++ b/dist/java/src/mp/code/data/TextChange.java @@ -1,21 +1,17 @@ package mp.code.data; +import java.util.OptionalLong; + public class TextChange { public final long start; public final long end; public final String content; - private final long hash; // xxh3 hash + public final OptionalLong hash; // xxh3 hash - public TextChange(long start, long end, String content, long hash) { + public TextChange(long start, long end, String content, OptionalLong hash) { this.start = start; this.end = end; this.content = content; this.hash = hash; } - - private static native long hash(String content); - public boolean hashMatches(String content) { - // 0 is Rust default value and a very unlikely hash - return hash == 0L || this.hash == hash(content); - } } diff --git a/src/api/controller.rs b/src/api/controller.rs index 5f41bed..3170256 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -45,6 +45,14 @@ pub trait Controller : Sized + Send + Sync { } } + /// registers a callback to be called on receive. + /// + /// there can only be one callback at any given time. + fn callback(&self, cb: impl Into>); + + /// clears the currently registered callback. + fn clear_callback(&self); + /// block until next value is available without consuming it /// /// this is just an async trait function wrapped by `async_trait`: @@ -64,3 +72,25 @@ pub trait Controller : Sized + Send + Sync { /// (likely if worker is already stopped) fn stop(&self) -> bool; } + + +/// type wrapper for Boxed dyn callback +pub struct ControllerCallback(Box); + +impl ControllerCallback { + pub fn call(&self, x: T) { + self.0(x) // lmao at this syntax + } +} + +impl std::fmt::Debug for ControllerCallback { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ControllerCallback {{ {:p} }}", self.0) + } +} + +impl From for ControllerCallback { + fn from(value: X) -> Self { + Self(Box::new(value)) + } +} diff --git a/src/api/event.rs b/src/api/event.rs index f943af3..be26ccf 100644 --- a/src/api/event.rs +++ b/src/api/event.rs @@ -1,5 +1,6 @@ use codemp_proto::workspace::workspace_event::Event as WorkspaceEventInner; +#[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] pub enum Event { FileTreeUpdated(String), diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9eee7f5..f6048a4 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -8,6 +8,7 @@ use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; use tonic::async_trait; +use crate::api::controller::ControllerCallback; use crate::api::Controller; use crate::api::TextChange; @@ -53,8 +54,8 @@ pub(crate) struct BufferControllerInner { pub(crate) poller: mpsc::UnboundedSender>, pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, - pub(crate) delta_request: - mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + pub(crate) callback: watch::Sender>>, } #[async_trait] @@ -99,6 +100,19 @@ impl Controller for BufferController { Ok(()) } + fn callback(&self, cb: impl Into>) { + if self.0.callback.send(Some(cb.into())).is_err() { + // TODO should we panic? we failed what we were supposed to do + tracing::error!("no active buffer worker to run registered callback!"); + } + } + + fn clear_callback(&self) { + if self.0.callback.send(None).is_err() { + tracing::warn!("no active buffer worker to clear callback"); + } + } + fn stop(&self) -> bool { self.0.stopper.send(()).is_ok() } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 505dc38..960a7bf 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::{async_trait, Streaming}; use uuid::Uuid; -use crate::api::controller::ControllerWorker; +use crate::api::controller::{ControllerCallback, ControllerWorker}; use crate::api::TextChange; use crate::errors::IgnorableError; @@ -24,6 +24,7 @@ pub(crate) struct BufferWorker { delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, stop: mpsc::UnboundedReceiver<()>, controller: BufferController, + callback: watch::Receiver>>, } impl BufferWorker { @@ -35,6 +36,7 @@ impl BufferWorker { let (req_tx, req_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); + let (cb_tx, cb_rx) = watch::channel(None); let (poller_tx, poller_rx) = mpsc::unbounded_channel(); @@ -49,6 +51,7 @@ impl BufferWorker { stopper: end_tx, content_request: req_tx, delta_request: recv_tx, + callback: cb_tx, }; BufferWorker { @@ -61,6 +64,7 @@ impl BufferWorker { controller: BufferController(Arc::new(controller)), content_checkout: req_rx, delta_req: recv_rx, + callback: cb_rx, } } } @@ -130,6 +134,9 @@ impl ControllerWorker for BufferWorker { for tx in self.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); } + if let Some(cb) = self.callback.borrow().as_ref() { + cb.call(self.controller.clone()); // TODO should we run this on another task/thread? + } }, Err(e) => tracing::error!("could not deserialize operation from server: {}", e), } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 50237b3..c6edc38 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -3,10 +3,13 @@ //! a controller implementation for cursor actions use std::sync::Arc; -use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex}; +use tokio::sync::{ + broadcast::{self, error::TryRecvError}, + mpsc, watch, Mutex, +}; use tonic::async_trait; -use crate::api::{Controller, Cursor}; +use crate::api::{controller::ControllerCallback, Controller, Cursor}; use codemp_proto::cursor::{CursorEvent, CursorPosition}; /// the cursor controller implementation /// @@ -26,26 +29,11 @@ pub struct CursorController(pub(crate) Arc); #[derive(Debug)] pub(crate) struct CursorControllerInner { - op: mpsc::Sender, - last_op: Mutex>, - stream: Mutex>, - stop: mpsc::UnboundedSender<()>, -} - -impl CursorControllerInner { - pub(crate) fn new( - op: mpsc::Sender, - last_op: Mutex>, - stream: Mutex>, - stop: mpsc::UnboundedSender<()>, - ) -> Self { - Self { - op, - last_op, - stream, - stop, - } - } + pub(crate) op: mpsc::Sender, + pub(crate) last_op: Mutex>, + pub(crate) stream: Mutex>, + pub(crate) callback: watch::Sender>>, + pub(crate) stop: mpsc::UnboundedSender<()>, } #[async_trait] @@ -78,6 +66,19 @@ impl Controller for CursorController { Ok(self.0.last_op.lock().await.changed().await?) } + fn callback(&self, cb: impl Into>) { + if self.0.callback.send(Some(cb.into())).is_err() { + // TODO should we panic? we failed what we were supposed to do + tracing::error!("no active cursor worker to run registered callback!"); + } + } + + fn clear_callback(&self) { + if self.0.callback.send(None).is_err() { + tracing::warn!("no active cursor worker to clear callback"); + } + } + fn stop(&self) -> bool { self.0.stop.send(()).is_ok() } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index c35a9fc..10766bd 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, async_trait}; -use crate::{api::{controller::ControllerWorker, Cursor}, errors::IgnorableError}; +use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, errors::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; @@ -14,6 +14,7 @@ pub(crate) struct CursorWorker { channel: broadcast::Sender, stop: mpsc::UnboundedReceiver<()>, controller: CursorController, + callback: watch::Receiver>>, } impl Default for CursorWorker { @@ -22,18 +23,21 @@ impl Default for CursorWorker { let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (change_tx, change_rx) = watch::channel(CursorEvent::default()); - let controller = CursorControllerInner::new( - op_tx, - Mutex::new(change_rx), - Mutex::new(cur_tx.subscribe()), - end_tx - ); + let (cb_tx, cb_rx) = watch::channel(None); + let controller = CursorControllerInner { + op: op_tx, + last_op: Mutex::new(change_rx), + stream: Mutex::new(cur_tx.subscribe()), + stop: end_tx, + callback: cb_tx, + }; Self { op: op_rx, changed: change_tx, channel: cur_tx, stop: end_rx, controller: CursorController(Arc::new(controller)), + callback: cb_rx, } } } @@ -57,6 +61,9 @@ impl ControllerWorker for CursorWorker { Ok(Some(cur)) = rx.message() => { self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.changed.send(cur).unwrap_or_warn("could not update last event"); + if let Some(cb) = self.callback.borrow().as_ref() { + cb.call(self.controller.clone()); // TODO should this run in its own task/thread? + } }, else => break, } diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 02e9807..01dec82 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -63,71 +63,27 @@ fn recv_jni(env: &mut JNIEnv, change: Option) -> jobject None => JObject::default(), Some(event) => { let content = env.new_string(event.content).jexcept(env); + + let hash = env.find_class("java/util/OptionalLong").and_then(|class| { + if let Some(h) = event.hash { + env.call_static_method(class, "of", "(J)Ljava/util/OptionalLong;", &[JValueGen::Long(h)]) + } else { + env.call_static_method(class, "empty", "()Ljava/util/OptionalLong;", &[]) + } + }).and_then(|o| o.l()).jexcept(env); env.find_class("mp/code/data/TextChange") .and_then(|class| { env.new_object( class, - "(JJLjava/lang/String;J)V", + "(JJLjava/lang/String;Ljava/util/OptionalLong;)V", &[ JValueGen::Long(jlong::from(event.start)), JValueGen::Long(jlong::from(event.end)), JValueGen::Object(&content), - JValueGen::Long(event.hash.unwrap_or_default()) + JValueGen::Object(&hash) ] ) }).jexcept(env) } }.as_raw() } - -/// Receives from Java, converts and sends a [crate::api::TextChange]. -#[no_mangle] -pub extern "system" fn Java_mp_code_BufferController_send<'local>( - mut env: JNIEnv, - _class: JClass<'local>, - self_ptr: jlong, - input: JObject<'local> -) { - let start = env.get_field(&input, "start", "J").and_then(|s| s.j()).jexcept(&mut env); - let end = env.get_field(&input, "end", "J").and_then(|e| e.j()).jexcept(&mut env); - let content = env.get_field(&input, "content", "Ljava/lang/String;") - .and_then(|c| c.l()) - .map(|c| c.into()) - .jexcept(&mut env); - let content = unsafe { env.get_string_unchecked(&content) } - .map(|c| c.to_string_lossy().to_string()) - .jexcept(&mut env); - - let controller = unsafe { Box::leak(Box::from_raw(self_ptr as *mut crate::buffer::Controller)) }; - RT.block_on(controller.send(crate::api::TextChange { - start: start as u32, - end: end as u32, - content, - hash: None - })).jexcept(&mut env); -} - -/// Called by the Java GC to drop a [crate::buffer::Controller]. -#[no_mangle] -pub extern "system" fn Java_mp_code_BufferController_free( - _env: JNIEnv, - _class: JClass, - self_ptr: jlong, -) { - let _ = unsafe { Box::from_raw(self_ptr as *mut crate::cursor::Controller) }; -} - - -/// Calculates the XXH3 hash for a given String. -#[no_mangle] -pub extern "system" fn Java_mp_code_data_TextChange_hash<'local>( - mut env: JNIEnv, - _class: JClass<'local>, - content: JString<'local>, -) -> jlong { - let content: String = env.get_string(&content) - .map(|s| s.into()) - .jexcept(&mut env); - let hash = crate::ext::hash(content.as_bytes()); - i64::from_ne_bytes(hash.to_ne_bytes()) -} diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 2d6d9b3..982f116 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -2,6 +2,7 @@ pub mod client; pub mod workspace; pub mod cursor; pub mod buffer; +pub mod utils; lazy_static::lazy_static! { pub(crate) static ref RT: tokio::runtime::Runtime = tokio::runtime::Runtime::new().expect("could not create tokio runtime"); diff --git a/src/ffi/java/utils.rs b/src/ffi/java/utils.rs new file mode 100644 index 0000000..ef421c4 --- /dev/null +++ b/src/ffi/java/utils.rs @@ -0,0 +1,17 @@ +use jni::{objects::{JClass, JString}, sys::jlong, JNIEnv}; + +use super::JExceptable; + +/// Calculates the XXH3 hash for a given String. +#[no_mangle] +pub extern "system" fn Java_mp_code_Utils_hash<'local>( + mut env: JNIEnv, + _class: JClass<'local>, + content: JString<'local>, +) -> jlong { + let content: String = env.get_string(&content) + .map(|s| s.into()) + .jexcept(&mut env); + let hash = crate::ext::hash(content.as_bytes()); + i64::from_ne_bytes(hash.to_ne_bytes()) +} diff --git a/src/ffi/lua.rs b/src/ffi/lua.rs index 09083c1..723c340 100644 --- a/src/ffi/lua.rs +++ b/src/ffi/lua.rs @@ -1,11 +1,12 @@ use std::io::Write; use std::sync::Mutex; +use crate::api::controller::ControllerCallback; use crate::api::Cursor; use crate::prelude::*; use crate::workspace::worker::DetachResult; use mlua::prelude::*; -use tokio::sync::broadcast; +use tokio::sync::mpsc; impl From:: for LuaError { fn from(value: CodempError) -> Self { @@ -18,16 +19,6 @@ impl From:: for LuaError { lazy_static::lazy_static!{ static ref RT : tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("could not create tokio runtime"); - static ref LOG : broadcast::Sender = broadcast::channel(32).0; - static ref STORE : dashmap::DashMap = dashmap::DashMap::default(); -} - -#[derive(Debug, Clone)] -struct Driver(tokio::sync::mpsc::UnboundedSender<()>); -impl LuaUserData for Driver { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { - methods.add_method("stop", |_, this, ()| Ok(this.0.send(()).is_ok())); - } } fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult { @@ -41,48 +32,35 @@ fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult { Ok(Driver(tx)) } -fn connect(_: &Lua, (host, username, password): (String, String, String)) -> LuaResult { - let client = RT.block_on(CodempClient::new(host, username, password))?; - STORE.insert(client.user_id().to_string(), client.clone()); - Ok(client) -} - -fn get_client(_: &Lua, (id,): (String,)) -> LuaResult> { - Ok(STORE.get(&id).map(|x| x.value().clone())) -} - -fn close_client(_: &Lua, (id,): (String,)) -> LuaResult { - if let Some((_id, client)) = STORE.remove(&id) { - for ws in client.active_workspaces() { - if !client.leave_workspace(&ws) { - tracing::warn!("could not leave workspace {ws}"); - } - } - Ok(true) - } else { - Ok(false) +#[derive(Debug, Clone)] +struct Driver(tokio::sync::mpsc::UnboundedSender<()>); +impl LuaUserData for Driver { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + methods.add_method("stop", |_, this, ()| Ok(this.0.send(()).is_ok())); } } + impl LuaUserData for CodempClient { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fields.add_field_method_get("id", |_, this| Ok(this.user_id().to_string())); } fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + // join a remote workspace and start processing cursor events - methods.add_method("join_workspace", |_, this, (session,):(String,)| { - tracing::info!("joining workspace {}", session); - let ws = RT.block_on(async { this.join_workspace(&session).await })?; - let cursor = ws.cursor(); - Ok(cursor) - }); + methods.add_method("join_workspace", |_, this, (session,):(String,)| + Ok(RT.block_on(async { this.join_workspace(&session).await })?) + ); methods.add_method("leave_workspace", |_, this, (session,):(String,)| { Ok(this.leave_workspace(&session)) }); methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session))); + methods.add_method("active_workspaces", |_, this, ()| Ok(this.active_workspaces())); } } @@ -90,6 +68,7 @@ impl LuaUserData for CodempClient { impl LuaUserData for CodempWorkspace { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("create_buffer", |_, this, (name,):(String,)| { Ok(RT.block_on(async { this.create(&name).await })?) }); @@ -109,16 +88,25 @@ impl LuaUserData for CodempWorkspace { methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); methods.add_method("event", |_, this, ()| Ok(RT.block_on(this.event())?)); + + methods.add_method("fetch_buffers", |_, this, ()| Ok(RT.block_on(this.fetch_buffers())?)); + methods.add_method("fetch_users", |_, this, ()| Ok(RT.block_on(this.fetch_users())?)); } fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { + fields.add_field_method_get("id", |_, this| Ok(this.id())); fields.add_field_method_get("cursor", |_, this| Ok(this.cursor())); fields.add_field_method_get("filetree", |_, this| Ok(this.filetree())); + fields.add_field_method_get("active_buffers", |_, this| Ok(this.buffer_list())); // fields.add_field_method_get("users", |_, this| Ok(this.0.users())); // TODO } } impl LuaUserData for CodempEvent { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + } + fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fields.add_field_method_get("type", |_, this| match this { CodempEvent::FileTreeUpdated => Ok("filetree"), @@ -134,23 +122,33 @@ impl LuaUserData for CodempEvent { impl LuaUserData for CodempCursorController { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| { Ok(RT.block_on(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }))?) }); - methods.add_method("try_recv", |_, this, ()| { - match RT.block_on(this.try_recv())? { - Some(x) => Ok(Some(x)), - None => Ok(None), - } - }); - methods.add_method("poll", |_, this, ()| { - RT.block_on(this.poll())?; + methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?)); + methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?)); + methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?)); + + methods.add_method("stop", |_, this, ()| Ok(this.stop())); + + methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); + methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { + this.callback(ControllerCallback::from(move |controller| { + if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller,)) { + tracing::error!("error running cursor callback: {e}"); + } + })); Ok(()) }); } } impl LuaUserData for Cursor { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + } + fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fields.add_field_method_get("user", |_, this| Ok(this.user.map(|x| x.to_string()))); fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone())); @@ -172,6 +170,10 @@ impl From<(i32, i32)> for RowCol { } impl LuaUserData for RowCol { + fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + } + fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fields.add_field_method_get("row", |_, this| Ok(this.row)); fields.add_field_method_get("col", |_, this| Ok(this.col)); @@ -181,6 +183,7 @@ impl LuaUserData for RowCol { impl LuaUserData for CodempBufferController { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); + methods.add_method("send", |_, this, (start, end, text): (usize, usize, String)| { Ok( RT.block_on(this.send( @@ -193,19 +196,25 @@ impl LuaUserData for CodempBufferController { ))? ) }); - methods.add_method("try_recv", |_, this, ()| { - match RT.block_on(this.try_recv())? { - Some(x) => Ok(Some(x)), - None => Ok(None), - } - }); - methods.add_method("poll", |_, this, ()| { - RT.block_on(this.poll())?; + + methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?)); + methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?)); + methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?)); + + methods.add_method("stop", |_, this, ()| Ok(this.stop())); + + methods.add_method("content", |_, this, ()| Ok(RT.block_on(this.content())?)); + + methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); + methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { + this.callback(move |controller: CodempBufferController| { + let _c = controller.clone(); + if let Err(e) = cb.call::<(CodempBufferController,), ()>((controller,)) { + tracing::error!("error running buffer#{} callback: {e}", _c.name()); + } + }); Ok(()) }); - methods.add_method("content", |_, this, ()| - Ok(RT.block_on(this.content())?) - ); } } @@ -218,100 +227,107 @@ impl LuaUserData for CodempTextChange { } fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { - methods.add_meta_function(LuaMetaMethod::Call, |_, (start, end, txt, hash): (usize, usize, String, Option)| { - Ok(CodempTextChange { - start: start as u32, - end: end as u32, - content: txt, - hash, - }) - }); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_method("apply", |_, this, (txt,):(String,)| Ok(this.apply(&txt))); } } - -// setup library logging to file -#[derive(Debug)] -struct LuaLogger(broadcast::Receiver); -impl LuaUserData for LuaLogger { - fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { - methods.add_method_mut("recv", |_, this, ()| { - Ok(this.0.blocking_recv().expect("logger channel closed")) - }); - } -} - -#[derive(Debug, Clone)] -struct LuaLoggerProducer; -impl Write for LuaLoggerProducer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let _ = LOG.send(String::from_utf8_lossy(buf).to_string()); - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { Ok(()) } -} - -fn setup_logger(_: &Lua, (debug, path): (Option, Option)) -> LuaResult { - let format = tracing_subscriber::fmt::format() - .with_level(true) - .with_target(true) - .with_thread_ids(false) - .with_thread_names(false) - .with_ansi(false) - .with_file(false) - .with_line_number(false) - .with_source_location(false) - .compact(); - - let level = if debug.unwrap_or_default() { tracing::Level::DEBUG } else {tracing::Level::INFO }; - - let builder = tracing_subscriber::fmt() - .event_format(format) - .with_max_level(level); - - let result = if let Some(path) = path { - let logfile = std::fs::File::create(path).expect("failed creating logfile"); - builder.with_writer(Mutex::new(logfile)).try_init().is_ok() - } else { - builder.with_writer(Mutex::new(LuaLoggerProducer)).try_init().is_ok() - }; - - Ok(result) -} - -fn get_logger(_: &Lua, (): ()) -> LuaResult { - let sub = LOG.subscribe(); - Ok(LuaLogger(sub)) -} - -fn hash(_: &Lua, (txt,): (String,)) -> LuaResult { - Ok(crate::hash(txt)) -} - // define module and exports #[mlua::lua_module] fn codemp_lua(lua: &Lua) -> LuaResult { let exports = lua.create_table()?; // entrypoint - exports.set("connect", lua.create_function(connect)?)?; - exports.set("get_client", lua.create_function(get_client)?)?; - exports.set("close_client", lua.create_function(close_client)?)?; + exports.set("connect", lua.create_function(|_, (host, username, password):(String,String,String)| + Ok(RT.block_on(CodempClient::new(host, username, password))?) + )?)?; // utils - exports.set("hash", lua.create_function(hash)?)?; + exports.set("hash", lua.create_function(|_, (txt,):(String,)| + Ok(crate::hash(txt)) + )?)?; // runtime exports.set("runtime_drive_forever", lua.create_function(runtime_drive_forever)?)?; // logging - exports.set("setup_logger", lua.create_function(setup_logger)?)?; - exports.set("get_logger", lua.create_function(get_logger)?)?; + exports.set("logger", lua.create_function(logger)?)?; Ok(exports) } + +#[derive(Debug, Clone)] +struct LuaLoggerProducer(mpsc::UnboundedSender); +impl Write for LuaLoggerProducer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let _ = self.0.send(String::from_utf8_lossy(buf).to_string()); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { Ok(()) } +} + +// TODO can we make this less verbose? +fn logger(_: &Lua, (printer, debug): (LuaValue, Option)) -> LuaResult { + let level = if debug.unwrap_or_default() { tracing::Level::DEBUG } else {tracing::Level::INFO }; + let success = match printer { + LuaNil + | LuaValue::Boolean(_) + | LuaValue::LightUserData(_) + | LuaValue::Integer(_) + | LuaValue::Number(_) + | LuaValue::Table(_) + | LuaValue::Thread(_) + | LuaValue::UserData(_) + | LuaValue::Error(_) => return Err(LuaError::BindError), // TODO full BadArgument type?? + LuaValue::String(path) => { + let logfile = std::fs::File::create(path.to_string_lossy()).map_err(|e| LuaError::RuntimeError(e.to_string()))?; + let format = tracing_subscriber::fmt::format() + .with_level(true) + .with_target(true) + .with_thread_ids(true) + .with_thread_names(true) + .with_ansi(false) + .with_file(false) + .with_line_number(false) + .with_source_location(false); + tracing_subscriber::fmt() + .event_format(format) + .with_max_level(level) + .with_writer(Mutex::new(logfile)) + .try_init() + .is_ok() + }, + LuaValue::Function(cb) => { + let (tx, mut rx) = mpsc::unbounded_channel(); + let format = tracing_subscriber::fmt::format() + .with_level(true) + .with_target(true) + .with_thread_ids(false) + .with_thread_names(false) + .with_ansi(false) + .with_file(false) + .with_line_number(false) + .with_source_location(false); + let res = tracing_subscriber::fmt() + .event_format(format) + .with_max_level(level) + .with_writer(Mutex::new(LuaLoggerProducer(tx))) + .try_init() + .is_ok(); + if res { + RT.spawn(async move { + while let Some(msg) = rx.recv().await { + let _ = cb.call::<(String,),()>((msg,)); + // if the logger fails logging who logs it? + } + }); + } + res + }, + }; + + Ok(success) +} diff --git a/src/workspace/service.rs b/src/workspace/service.rs index 04f7ffa..7a69b01 100644 --- a/src/workspace/service.rs +++ b/src/workspace/service.rs @@ -1,18 +1,26 @@ -use codemp_proto::{auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, workspace::workspace_client::WorkspaceClient}; -use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}}; - +use codemp_proto::{ + auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, + workspace::workspace_client::WorkspaceClient, +}; +use tonic::{ + service::{interceptor::InterceptedService, Interceptor}, + transport::{Channel, Endpoint}, +}; #[derive(Clone)] pub struct WorkspaceInterceptor { - token: tokio::sync::watch::Receiver + token: tokio::sync::watch::Receiver, } impl Interceptor for WorkspaceInterceptor { - fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> Result, tonic::Status> { if let Ok(token) = self.token.borrow().token.parse() { request.metadata_mut().insert("auth", token); } - + Ok(request) } } @@ -29,9 +37,7 @@ pub struct Services { impl Services { pub async fn try_new(dest: &str, token: Token) -> crate::Result { - let channel = Endpoint::from_shared(dest.to_string())? - .connect() - .await?; + let channel = Endpoint::from_shared(dest.to_string())?.connect().await?; let (token_tx, token_rx) = tokio::sync::watch::channel(token); let inter = WorkspaceInterceptor { token: token_rx }; Ok(Self { @@ -61,5 +67,4 @@ impl Services { pub fn cur(&self) -> CursorClient { self.cursor.clone() } - } diff --git a/src/workspace/worker.rs b/src/workspace/worker.rs index 94cb174..c1a66d9 100644 --- a/src/workspace/worker.rs +++ b/src/workspace/worker.rs @@ -1,5 +1,5 @@ use crate::{ - api::{controller::ControllerWorker, Controller, User}, + api::{controller::ControllerWorker, Controller, Event, User}, buffer::{self, worker::BufferWorker}, cursor::{self, worker::CursorWorker}, workspace::service::Services, @@ -220,7 +220,7 @@ impl Workspace { } /// await next workspace [crate::api::Event] and return it - pub async fn event(&self) -> crate::Result { + pub async fn event(&self) -> crate::Result { self.0 .events .lock() @@ -271,7 +271,7 @@ impl Workspace { /// get a list of the users attached to a specific buffer /// /// TODO: discuss implementation details - pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { + pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { let mut workspace_client = self.0.services.ws(); let buffer_users = workspace_client .list_buffer_users(tonic::Request::new(BufferNode { @@ -281,7 +281,7 @@ impl Workspace { .into_inner() .users .into_iter() - .map(|u| u.id) + .map(|id| id.into()) .collect(); Ok(buffer_users)