Merge branch 'dev' into pyo3_bump

This commit is contained in:
cschen 2024-08-16 16:50:55 +02:00
commit ef2285d0f3
15 changed files with 300 additions and 239 deletions

View file

@ -33,7 +33,7 @@ tracing-subscriber = { version = "0.3", optional = true }
jni = { version = "0.21", features = ["invocation"], optional = true } jni = { version = "0.21", features = ["invocation"], optional = true }
# glue (lua) # glue (lua)
mlua = { version = "0.9", features = ["module", "luajit", "send"], optional = true } mlua = { version = "0.10.0-beta.1", features = ["module", "luajit", "send"], optional = true }
# glue (js) # glue (js)
napi = { version = "2.16", features = ["full"], optional = true } napi = { version = "2.16", features = ["full"], optional = true }

10
dist/java/src/mp/code/Utils.java vendored Normal file
View file

@ -0,0 +1,10 @@
package mp.code;
public class Utils {
/**
* Hashes the given {@link String} using CodeMP's hashing algorithm (xxh3).
* @param input the string to hash
* @return the hash
*/
public static native long hash(String input);
}

View file

@ -1,21 +1,17 @@
package mp.code.data; package mp.code.data;
import java.util.OptionalLong;
public class TextChange { public class TextChange {
public final long start; public final long start;
public final long end; public final long end;
public final String content; public final String content;
private final long hash; // xxh3 hash public final OptionalLong hash; // xxh3 hash
public TextChange(long start, long end, String content, long hash) { public TextChange(long start, long end, String content, OptionalLong hash) {
this.start = start; this.start = start;
this.end = end; this.end = end;
this.content = content; this.content = content;
this.hash = hash; this.hash = hash;
} }
private static native long hash(String content);
public boolean hashMatches(String content) {
// 0 is Rust default value and a very unlikely hash
return hash == 0L || this.hash == hash(content);
}
} }

View file

@ -45,6 +45,14 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
} }
} }
/// registers a callback to be called on receive.
///
/// there can only be one callback at any given time.
fn callback(&self, cb: impl Into<ControllerCallback<Self>>);
/// clears the currently registered callback.
fn clear_callback(&self);
/// block until next value is available without consuming it /// block until next value is available without consuming it
/// ///
/// this is just an async trait function wrapped by `async_trait`: /// this is just an async trait function wrapped by `async_trait`:
@ -64,3 +72,25 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// (likely if worker is already stopped) /// (likely if worker is already stopped)
fn stop(&self) -> bool; fn stop(&self) -> bool;
} }
/// type wrapper for Boxed dyn callback
pub struct ControllerCallback<T>(Box<dyn Sync + Send + Fn(T)>);
impl<T> ControllerCallback<T> {
pub fn call(&self, x: T) {
self.0(x) // lmao at this syntax
}
}
impl<T> std::fmt::Debug for ControllerCallback<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ControllerCallback {{ {:p} }}", self.0)
}
}
impl<T, X: Sync + Send + Fn(T) + 'static> From<X> for ControllerCallback<T> {
fn from(value: X) -> Self {
Self(Box::new(value))
}
}

View file

@ -1,5 +1,6 @@
use codemp_proto::workspace::workspace_event::Event as WorkspaceEventInner; use codemp_proto::workspace::workspace_event::Event as WorkspaceEventInner;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass)]
pub enum Event { pub enum Event {
FileTreeUpdated(String), FileTreeUpdated(String),

View file

@ -8,6 +8,7 @@ use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::api::controller::ControllerCallback;
use crate::api::Controller; use crate::api::Controller;
use crate::api::TextChange; use crate::api::TextChange;
@ -53,8 +54,8 @@ pub(crate) struct BufferControllerInner {
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
} }
#[async_trait] #[async_trait]
@ -99,6 +100,19 @@ impl Controller<TextChange> for BufferController {
Ok(()) Ok(())
} }
fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) {
if self.0.callback.send(Some(cb.into())).is_err() {
// TODO should we panic? we failed what we were supposed to do
tracing::error!("no active buffer worker to run registered callback!");
}
}
fn clear_callback(&self) {
if self.0.callback.send(None).is_err() {
tracing::warn!("no active buffer worker to clear callback");
}
}
fn stop(&self) -> bool { fn stop(&self) -> bool {
self.0.stopper.send(()).is_ok() self.0.stopper.send(()).is_ok()
} }

View file

@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use tonic::{async_trait, Streaming}; use tonic::{async_trait, Streaming};
use uuid::Uuid; use uuid::Uuid;
use crate::api::controller::ControllerWorker; use crate::api::controller::{ControllerCallback, ControllerWorker};
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
@ -24,6 +24,7 @@ pub(crate) struct BufferWorker {
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
controller: BufferController, controller: BufferController,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
} }
impl BufferWorker { impl BufferWorker {
@ -35,6 +36,7 @@ impl BufferWorker {
let (req_tx, req_rx) = mpsc::channel(1); let (req_tx, req_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1);
let (cb_tx, cb_rx) = watch::channel(None);
let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let (poller_tx, poller_rx) = mpsc::unbounded_channel();
@ -49,6 +51,7 @@ impl BufferWorker {
stopper: end_tx, stopper: end_tx,
content_request: req_tx, content_request: req_tx,
delta_request: recv_tx, delta_request: recv_tx,
callback: cb_tx,
}; };
BufferWorker { BufferWorker {
@ -61,6 +64,7 @@ impl BufferWorker {
controller: BufferController(Arc::new(controller)), controller: BufferController(Arc::new(controller)),
content_checkout: req_rx, content_checkout: req_rx,
delta_req: recv_rx, delta_req: recv_rx,
callback: cb_rx,
} }
} }
} }
@ -130,6 +134,9 @@ impl ControllerWorker<TextChange> for BufferWorker {
for tx in self.pollers.drain(..) { for tx in self.pollers.drain(..) {
tx.send(()).unwrap_or_warn("could not wake up poller"); tx.send(()).unwrap_or_warn("could not wake up poller");
} }
if let Some(cb) = self.callback.borrow().as_ref() {
cb.call(self.controller.clone()); // TODO should we run this on another task/thread?
}
}, },
Err(e) => tracing::error!("could not deserialize operation from server: {}", e), Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
} }

View file

@ -3,10 +3,13 @@
//! a controller implementation for cursor actions //! a controller implementation for cursor actions
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex}; use tokio::sync::{
broadcast::{self, error::TryRecvError},
mpsc, watch, Mutex,
};
use tonic::async_trait; use tonic::async_trait;
use crate::api::{Controller, Cursor}; use crate::api::{controller::ControllerCallback, Controller, Cursor};
use codemp_proto::cursor::{CursorEvent, CursorPosition}; use codemp_proto::cursor::{CursorEvent, CursorPosition};
/// the cursor controller implementation /// the cursor controller implementation
/// ///
@ -26,26 +29,11 @@ pub struct CursorController(pub(crate) Arc<CursorControllerInner>);
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct CursorControllerInner { pub(crate) struct CursorControllerInner {
op: mpsc::Sender<CursorPosition>, pub(crate) op: mpsc::Sender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>, pub(crate) last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, pub(crate) stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
} pub(crate) stop: mpsc::UnboundedSender<()>,
impl CursorControllerInner {
pub(crate) fn new(
op: mpsc::Sender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
Self {
op,
last_op,
stream,
stop,
}
}
} }
#[async_trait] #[async_trait]
@ -78,6 +66,19 @@ impl Controller<Cursor> for CursorController {
Ok(self.0.last_op.lock().await.changed().await?) Ok(self.0.last_op.lock().await.changed().await?)
} }
fn callback(&self, cb: impl Into<ControllerCallback<CursorController>>) {
if self.0.callback.send(Some(cb.into())).is_err() {
// TODO should we panic? we failed what we were supposed to do
tracing::error!("no active cursor worker to run registered callback!");
}
}
fn clear_callback(&self) {
if self.0.callback.send(None).is_err() {
tracing::warn!("no active cursor worker to clear callback");
}
}
fn stop(&self) -> bool { fn stop(&self) -> bool {
self.0.stop.send(()).is_ok() self.0.stop.send(()).is_ok()
} }

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
use tonic::{Streaming, async_trait}; use tonic::{Streaming, async_trait};
use crate::{api::{controller::ControllerWorker, Cursor}, errors::IgnorableError}; use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, errors::IgnorableError};
use codemp_proto::cursor::{CursorPosition, CursorEvent}; use codemp_proto::cursor::{CursorPosition, CursorEvent};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
@ -14,6 +14,7 @@ pub(crate) struct CursorWorker {
channel: broadcast::Sender<CursorEvent>, channel: broadcast::Sender<CursorEvent>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
controller: CursorController, controller: CursorController,
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
} }
impl Default for CursorWorker { impl Default for CursorWorker {
@ -22,18 +23,21 @@ impl Default for CursorWorker {
let (cur_tx, _cur_rx) = broadcast::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (change_tx, change_rx) = watch::channel(CursorEvent::default()); let (change_tx, change_rx) = watch::channel(CursorEvent::default());
let controller = CursorControllerInner::new( let (cb_tx, cb_rx) = watch::channel(None);
op_tx, let controller = CursorControllerInner {
Mutex::new(change_rx), op: op_tx,
Mutex::new(cur_tx.subscribe()), last_op: Mutex::new(change_rx),
end_tx stream: Mutex::new(cur_tx.subscribe()),
); stop: end_tx,
callback: cb_tx,
};
Self { Self {
op: op_rx, op: op_rx,
changed: change_tx, changed: change_tx,
channel: cur_tx, channel: cur_tx,
stop: end_rx, stop: end_rx,
controller: CursorController(Arc::new(controller)), controller: CursorController(Arc::new(controller)),
callback: cb_rx,
} }
} }
} }
@ -57,6 +61,9 @@ impl ControllerWorker<Cursor> for CursorWorker {
Ok(Some(cur)) = rx.message() => { Ok(Some(cur)) = rx.message() => {
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event");
self.changed.send(cur).unwrap_or_warn("could not update last event"); self.changed.send(cur).unwrap_or_warn("could not update last event");
if let Some(cb) = self.callback.borrow().as_ref() {
cb.call(self.controller.clone()); // TODO should this run in its own task/thread?
}
}, },
else => break, else => break,
} }

View file

@ -63,71 +63,27 @@ fn recv_jni(env: &mut JNIEnv, change: Option<crate::api::TextChange>) -> jobject
None => JObject::default(), None => JObject::default(),
Some(event) => { Some(event) => {
let content = env.new_string(event.content).jexcept(env); let content = env.new_string(event.content).jexcept(env);
let hash = env.find_class("java/util/OptionalLong").and_then(|class| {
if let Some(h) = event.hash {
env.call_static_method(class, "of", "(J)Ljava/util/OptionalLong;", &[JValueGen::Long(h)])
} else {
env.call_static_method(class, "empty", "()Ljava/util/OptionalLong;", &[])
}
}).and_then(|o| o.l()).jexcept(env);
env.find_class("mp/code/data/TextChange") env.find_class("mp/code/data/TextChange")
.and_then(|class| { .and_then(|class| {
env.new_object( env.new_object(
class, class,
"(JJLjava/lang/String;J)V", "(JJLjava/lang/String;Ljava/util/OptionalLong;)V",
&[ &[
JValueGen::Long(jlong::from(event.start)), JValueGen::Long(jlong::from(event.start)),
JValueGen::Long(jlong::from(event.end)), JValueGen::Long(jlong::from(event.end)),
JValueGen::Object(&content), JValueGen::Object(&content),
JValueGen::Long(event.hash.unwrap_or_default()) JValueGen::Object(&hash)
] ]
) )
}).jexcept(env) }).jexcept(env)
} }
}.as_raw() }.as_raw()
} }
/// Receives from Java, converts and sends a [crate::api::TextChange].
#[no_mangle]
pub extern "system" fn Java_mp_code_BufferController_send<'local>(
mut env: JNIEnv,
_class: JClass<'local>,
self_ptr: jlong,
input: JObject<'local>
) {
let start = env.get_field(&input, "start", "J").and_then(|s| s.j()).jexcept(&mut env);
let end = env.get_field(&input, "end", "J").and_then(|e| e.j()).jexcept(&mut env);
let content = env.get_field(&input, "content", "Ljava/lang/String;")
.and_then(|c| c.l())
.map(|c| c.into())
.jexcept(&mut env);
let content = unsafe { env.get_string_unchecked(&content) }
.map(|c| c.to_string_lossy().to_string())
.jexcept(&mut env);
let controller = unsafe { Box::leak(Box::from_raw(self_ptr as *mut crate::buffer::Controller)) };
RT.block_on(controller.send(crate::api::TextChange {
start: start as u32,
end: end as u32,
content,
hash: None
})).jexcept(&mut env);
}
/// Called by the Java GC to drop a [crate::buffer::Controller].
#[no_mangle]
pub extern "system" fn Java_mp_code_BufferController_free(
_env: JNIEnv,
_class: JClass,
self_ptr: jlong,
) {
let _ = unsafe { Box::from_raw(self_ptr as *mut crate::cursor::Controller) };
}
/// Calculates the XXH3 hash for a given String.
#[no_mangle]
pub extern "system" fn Java_mp_code_data_TextChange_hash<'local>(
mut env: JNIEnv,
_class: JClass<'local>,
content: JString<'local>,
) -> jlong {
let content: String = env.get_string(&content)
.map(|s| s.into())
.jexcept(&mut env);
let hash = crate::ext::hash(content.as_bytes());
i64::from_ne_bytes(hash.to_ne_bytes())
}

View file

@ -2,6 +2,7 @@ pub mod client;
pub mod workspace; pub mod workspace;
pub mod cursor; pub mod cursor;
pub mod buffer; pub mod buffer;
pub mod utils;
lazy_static::lazy_static! { lazy_static::lazy_static! {
pub(crate) static ref RT: tokio::runtime::Runtime = tokio::runtime::Runtime::new().expect("could not create tokio runtime"); pub(crate) static ref RT: tokio::runtime::Runtime = tokio::runtime::Runtime::new().expect("could not create tokio runtime");

17
src/ffi/java/utils.rs Normal file
View file

@ -0,0 +1,17 @@
use jni::{objects::{JClass, JString}, sys::jlong, JNIEnv};
use super::JExceptable;
/// Calculates the XXH3 hash for a given String.
#[no_mangle]
pub extern "system" fn Java_mp_code_Utils_hash<'local>(
mut env: JNIEnv,
_class: JClass<'local>,
content: JString<'local>,
) -> jlong {
let content: String = env.get_string(&content)
.map(|s| s.into())
.jexcept(&mut env);
let hash = crate::ext::hash(content.as_bytes());
i64::from_ne_bytes(hash.to_ne_bytes())
}

View file

@ -1,11 +1,12 @@
use std::io::Write; use std::io::Write;
use std::sync::Mutex; use std::sync::Mutex;
use crate::api::controller::ControllerCallback;
use crate::api::Cursor; use crate::api::Cursor;
use crate::prelude::*; use crate::prelude::*;
use crate::workspace::worker::DetachResult; use crate::workspace::worker::DetachResult;
use mlua::prelude::*; use mlua::prelude::*;
use tokio::sync::broadcast; use tokio::sync::mpsc;
impl From::<CodempError> for LuaError { impl From::<CodempError> for LuaError {
fn from(value: CodempError) -> Self { fn from(value: CodempError) -> Self {
@ -18,16 +19,6 @@ impl From::<CodempError> for LuaError {
lazy_static::lazy_static!{ lazy_static::lazy_static!{
static ref RT : tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("could not create tokio runtime"); static ref RT : tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("could not create tokio runtime");
static ref LOG : broadcast::Sender<String> = broadcast::channel(32).0;
static ref STORE : dashmap::DashMap<String, CodempClient> = dashmap::DashMap::default();
}
#[derive(Debug, Clone)]
struct Driver(tokio::sync::mpsc::UnboundedSender<()>);
impl LuaUserData for Driver {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_method("stop", |_, this, ()| Ok(this.0.send(()).is_ok()));
}
} }
fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult<Driver> { fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult<Driver> {
@ -41,48 +32,35 @@ fn runtime_drive_forever(_: &Lua, ():()) -> LuaResult<Driver> {
Ok(Driver(tx)) Ok(Driver(tx))
} }
fn connect(_: &Lua, (host, username, password): (String, String, String)) -> LuaResult<CodempClient> { #[derive(Debug, Clone)]
let client = RT.block_on(CodempClient::new(host, username, password))?; struct Driver(tokio::sync::mpsc::UnboundedSender<()>);
STORE.insert(client.user_id().to_string(), client.clone()); impl LuaUserData for Driver {
Ok(client) fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
} methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("stop", |_, this, ()| Ok(this.0.send(()).is_ok()));
fn get_client(_: &Lua, (id,): (String,)) -> LuaResult<Option<CodempClient>> {
Ok(STORE.get(&id).map(|x| x.value().clone()))
}
fn close_client(_: &Lua, (id,): (String,)) -> LuaResult<bool> {
if let Some((_id, client)) = STORE.remove(&id) {
for ws in client.active_workspaces() {
if !client.leave_workspace(&ws) {
tracing::warn!("could not leave workspace {ws}");
}
}
Ok(true)
} else {
Ok(false)
} }
} }
impl LuaUserData for CodempClient { impl LuaUserData for CodempClient {
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("id", |_, this| Ok(this.user_id().to_string())); fields.add_field_method_get("id", |_, this| Ok(this.user_id().to_string()));
} }
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
// join a remote workspace and start processing cursor events // join a remote workspace and start processing cursor events
methods.add_method("join_workspace", |_, this, (session,):(String,)| { methods.add_method("join_workspace", |_, this, (session,):(String,)|
tracing::info!("joining workspace {}", session); Ok(RT.block_on(async { this.join_workspace(&session).await })?)
let ws = RT.block_on(async { this.join_workspace(&session).await })?; );
let cursor = ws.cursor();
Ok(cursor)
});
methods.add_method("leave_workspace", |_, this, (session,):(String,)| { methods.add_method("leave_workspace", |_, this, (session,):(String,)| {
Ok(this.leave_workspace(&session)) Ok(this.leave_workspace(&session))
}); });
methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session))); methods.add_method("get_workspace", |_, this, (session,):(String,)| Ok(this.get_workspace(&session)));
methods.add_method("active_workspaces", |_, this, ()| Ok(this.active_workspaces()));
} }
} }
@ -90,6 +68,7 @@ impl LuaUserData for CodempClient {
impl LuaUserData for CodempWorkspace { impl LuaUserData for CodempWorkspace {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("create_buffer", |_, this, (name,):(String,)| { methods.add_method("create_buffer", |_, this, (name,):(String,)| {
Ok(RT.block_on(async { this.create(&name).await })?) Ok(RT.block_on(async { this.create(&name).await })?)
}); });
@ -109,16 +88,25 @@ impl LuaUserData for CodempWorkspace {
methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name))); methods.add_method("get_buffer", |_, this, (name,):(String,)| Ok(this.buffer_by_name(&name)));
methods.add_method("event", |_, this, ()| Ok(RT.block_on(this.event())?)); methods.add_method("event", |_, this, ()| Ok(RT.block_on(this.event())?));
methods.add_method("fetch_buffers", |_, this, ()| Ok(RT.block_on(this.fetch_buffers())?));
methods.add_method("fetch_users", |_, this, ()| Ok(RT.block_on(this.fetch_users())?));
} }
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("id", |_, this| Ok(this.id()));
fields.add_field_method_get("cursor", |_, this| Ok(this.cursor())); fields.add_field_method_get("cursor", |_, this| Ok(this.cursor()));
fields.add_field_method_get("filetree", |_, this| Ok(this.filetree())); fields.add_field_method_get("filetree", |_, this| Ok(this.filetree()));
fields.add_field_method_get("active_buffers", |_, this| Ok(this.buffer_list()));
// fields.add_field_method_get("users", |_, this| Ok(this.0.users())); // TODO // fields.add_field_method_get("users", |_, this| Ok(this.0.users())); // TODO
} }
} }
impl LuaUserData for CodempEvent { impl LuaUserData for CodempEvent {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("type", |_, this| match this { fields.add_field_method_get("type", |_, this| match this {
CodempEvent::FileTreeUpdated => Ok("filetree"), CodempEvent::FileTreeUpdated => Ok("filetree"),
@ -134,23 +122,33 @@ impl LuaUserData for CodempEvent {
impl LuaUserData for CodempCursorController { impl LuaUserData for CodempCursorController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| { methods.add_method("send", |_, this, (buffer, start_row, start_col, end_row, end_col):(String, i32, i32, i32, i32)| {
Ok(RT.block_on(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }))?) Ok(RT.block_on(this.send(CodempCursor { buffer, start: (start_row, start_col), end: (end_row, end_col), user: None }))?)
}); });
methods.add_method("try_recv", |_, this, ()| { methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?));
match RT.block_on(this.try_recv())? { methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?));
Some(x) => Ok(Some(x)), methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?));
None => Ok(None),
} methods.add_method("stop", |_, this, ()| Ok(this.stop()));
});
methods.add_method("poll", |_, this, ()| { methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback()));
RT.block_on(this.poll())?; methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(ControllerCallback::from(move |controller| {
if let Err(e) = cb.call::<(CodempCursorController,), ()>((controller,)) {
tracing::error!("error running cursor callback: {e}");
}
}));
Ok(()) Ok(())
}); });
} }
} }
impl LuaUserData for Cursor { impl LuaUserData for Cursor {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("user", |_, this| Ok(this.user.map(|x| x.to_string()))); fields.add_field_method_get("user", |_, this| Ok(this.user.map(|x| x.to_string())));
fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone())); fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone()));
@ -172,6 +170,10 @@ impl From<(i32, i32)> for RowCol {
} }
impl LuaUserData for RowCol { impl LuaUserData for RowCol {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
}
fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) { fn add_fields<'lua, F: LuaUserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("row", |_, this| Ok(this.row)); fields.add_field_method_get("row", |_, this| Ok(this.row));
fields.add_field_method_get("col", |_, this| Ok(this.col)); fields.add_field_method_get("col", |_, this| Ok(this.col));
@ -181,6 +183,7 @@ impl LuaUserData for RowCol {
impl LuaUserData for CodempBufferController { impl LuaUserData for CodempBufferController {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("send", |_, this, (start, end, text): (usize, usize, String)| { methods.add_method("send", |_, this, (start, end, text): (usize, usize, String)| {
Ok( Ok(
RT.block_on(this.send( RT.block_on(this.send(
@ -193,19 +196,25 @@ impl LuaUserData for CodempBufferController {
))? ))?
) )
}); });
methods.add_method("try_recv", |_, this, ()| {
match RT.block_on(this.try_recv())? { methods.add_method("try_recv", |_, this, ()| Ok(RT.block_on(this.try_recv())?));
Some(x) => Ok(Some(x)), methods.add_method("recv", |_, this, ()| Ok(RT.block_on(this.recv())?));
None => Ok(None), methods.add_method("poll", |_, this, ()| Ok(RT.block_on(this.poll())?));
}
}); methods.add_method("stop", |_, this, ()| Ok(this.stop()));
methods.add_method("poll", |_, this, ()| {
RT.block_on(this.poll())?; methods.add_method("content", |_, this, ()| Ok(RT.block_on(this.content())?));
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback()));
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
this.callback(move |controller: CodempBufferController| {
let _c = controller.clone();
if let Err(e) = cb.call::<(CodempBufferController,), ()>((controller,)) {
tracing::error!("error running buffer#{} callback: {e}", _c.name());
}
});
Ok(()) Ok(())
}); });
methods.add_method("content", |_, this, ()|
Ok(RT.block_on(this.content())?)
);
} }
} }
@ -218,100 +227,107 @@ impl LuaUserData for CodempTextChange {
} }
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_meta_function(LuaMetaMethod::Call, |_, (start, end, txt, hash): (usize, usize, String, Option<i64>)| {
Ok(CodempTextChange {
start: start as u32,
end: end as u32,
content: txt,
hash,
})
});
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this))); methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| Ok(format!("{:?}", this)));
methods.add_method("apply", |_, this, (txt,):(String,)| Ok(this.apply(&txt))); methods.add_method("apply", |_, this, (txt,):(String,)| Ok(this.apply(&txt)));
} }
} }
// setup library logging to file
#[derive(Debug)]
struct LuaLogger(broadcast::Receiver<String>);
impl LuaUserData for LuaLogger {
fn add_methods<'lua, M: LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_method_mut("recv", |_, this, ()| {
Ok(this.0.blocking_recv().expect("logger channel closed"))
});
}
}
#[derive(Debug, Clone)]
struct LuaLoggerProducer;
impl Write for LuaLoggerProducer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let _ = LOG.send(String::from_utf8_lossy(buf).to_string());
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> { Ok(()) }
}
fn setup_logger(_: &Lua, (debug, path): (Option<bool>, Option<String>)) -> LuaResult<bool> {
let format = tracing_subscriber::fmt::format()
.with_level(true)
.with_target(true)
.with_thread_ids(false)
.with_thread_names(false)
.with_ansi(false)
.with_file(false)
.with_line_number(false)
.with_source_location(false)
.compact();
let level = if debug.unwrap_or_default() { tracing::Level::DEBUG } else {tracing::Level::INFO };
let builder = tracing_subscriber::fmt()
.event_format(format)
.with_max_level(level);
let result = if let Some(path) = path {
let logfile = std::fs::File::create(path).expect("failed creating logfile");
builder.with_writer(Mutex::new(logfile)).try_init().is_ok()
} else {
builder.with_writer(Mutex::new(LuaLoggerProducer)).try_init().is_ok()
};
Ok(result)
}
fn get_logger(_: &Lua, (): ()) -> LuaResult<LuaLogger> {
let sub = LOG.subscribe();
Ok(LuaLogger(sub))
}
fn hash(_: &Lua, (txt,): (String,)) -> LuaResult<i64> {
Ok(crate::hash(txt))
}
// define module and exports // define module and exports
#[mlua::lua_module] #[mlua::lua_module]
fn codemp_lua(lua: &Lua) -> LuaResult<LuaTable> { fn codemp_lua(lua: &Lua) -> LuaResult<LuaTable> {
let exports = lua.create_table()?; let exports = lua.create_table()?;
// entrypoint // entrypoint
exports.set("connect", lua.create_function(connect)?)?; exports.set("connect", lua.create_function(|_, (host, username, password):(String,String,String)|
exports.set("get_client", lua.create_function(get_client)?)?; Ok(RT.block_on(CodempClient::new(host, username, password))?)
exports.set("close_client", lua.create_function(close_client)?)?; )?)?;
// utils // utils
exports.set("hash", lua.create_function(hash)?)?; exports.set("hash", lua.create_function(|_, (txt,):(String,)|
Ok(crate::hash(txt))
)?)?;
// runtime // runtime
exports.set("runtime_drive_forever", lua.create_function(runtime_drive_forever)?)?; exports.set("runtime_drive_forever", lua.create_function(runtime_drive_forever)?)?;
// logging // logging
exports.set("setup_logger", lua.create_function(setup_logger)?)?; exports.set("logger", lua.create_function(logger)?)?;
exports.set("get_logger", lua.create_function(get_logger)?)?;
Ok(exports) Ok(exports)
} }
#[derive(Debug, Clone)]
struct LuaLoggerProducer(mpsc::UnboundedSender<String>);
impl Write for LuaLoggerProducer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let _ = self.0.send(String::from_utf8_lossy(buf).to_string());
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> { Ok(()) }
}
// TODO can we make this less verbose?
fn logger(_: &Lua, (printer, debug): (LuaValue, Option<bool>)) -> LuaResult<bool> {
let level = if debug.unwrap_or_default() { tracing::Level::DEBUG } else {tracing::Level::INFO };
let success = match printer {
LuaNil
| LuaValue::Boolean(_)
| LuaValue::LightUserData(_)
| LuaValue::Integer(_)
| LuaValue::Number(_)
| LuaValue::Table(_)
| LuaValue::Thread(_)
| LuaValue::UserData(_)
| LuaValue::Error(_) => return Err(LuaError::BindError), // TODO full BadArgument type??
LuaValue::String(path) => {
let logfile = std::fs::File::create(path.to_string_lossy()).map_err(|e| LuaError::RuntimeError(e.to_string()))?;
let format = tracing_subscriber::fmt::format()
.with_level(true)
.with_target(true)
.with_thread_ids(true)
.with_thread_names(true)
.with_ansi(false)
.with_file(false)
.with_line_number(false)
.with_source_location(false);
tracing_subscriber::fmt()
.event_format(format)
.with_max_level(level)
.with_writer(Mutex::new(logfile))
.try_init()
.is_ok()
},
LuaValue::Function(cb) => {
let (tx, mut rx) = mpsc::unbounded_channel();
let format = tracing_subscriber::fmt::format()
.with_level(true)
.with_target(true)
.with_thread_ids(false)
.with_thread_names(false)
.with_ansi(false)
.with_file(false)
.with_line_number(false)
.with_source_location(false);
let res = tracing_subscriber::fmt()
.event_format(format)
.with_max_level(level)
.with_writer(Mutex::new(LuaLoggerProducer(tx)))
.try_init()
.is_ok();
if res {
RT.spawn(async move {
while let Some(msg) = rx.recv().await {
let _ = cb.call::<(String,),()>((msg,));
// if the logger fails logging who logs it?
}
});
}
res
},
};
Ok(success)
}

View file

@ -1,18 +1,26 @@
use codemp_proto::{auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, workspace::workspace_client::WorkspaceClient}; use codemp_proto::{
use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}}; auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient,
workspace::workspace_client::WorkspaceClient,
};
use tonic::{
service::{interceptor::InterceptedService, Interceptor},
transport::{Channel, Endpoint},
};
#[derive(Clone)] #[derive(Clone)]
pub struct WorkspaceInterceptor { pub struct WorkspaceInterceptor {
token: tokio::sync::watch::Receiver<Token> token: tokio::sync::watch::Receiver<Token>,
} }
impl Interceptor for WorkspaceInterceptor { impl Interceptor for WorkspaceInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> { fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() { if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token); request.metadata_mut().insert("auth", token);
} }
Ok(request) Ok(request)
} }
} }
@ -29,9 +37,7 @@ pub struct Services {
impl Services { impl Services {
pub async fn try_new(dest: &str, token: Token) -> crate::Result<Self> { pub async fn try_new(dest: &str, token: Token) -> crate::Result<Self> {
let channel = Endpoint::from_shared(dest.to_string())? let channel = Endpoint::from_shared(dest.to_string())?.connect().await?;
.connect()
.await?;
let (token_tx, token_rx) = tokio::sync::watch::channel(token); let (token_tx, token_rx) = tokio::sync::watch::channel(token);
let inter = WorkspaceInterceptor { token: token_rx }; let inter = WorkspaceInterceptor { token: token_rx };
Ok(Self { Ok(Self {
@ -61,5 +67,4 @@ impl Services {
pub fn cur(&self) -> CursorClient<AuthedService> { pub fn cur(&self) -> CursorClient<AuthedService> {
self.cursor.clone() self.cursor.clone()
} }
} }

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
api::{controller::ControllerWorker, Controller, User}, api::{controller::ControllerWorker, Controller, Event, User},
buffer::{self, worker::BufferWorker}, buffer::{self, worker::BufferWorker},
cursor::{self, worker::CursorWorker}, cursor::{self, worker::CursorWorker},
workspace::service::Services, workspace::service::Services,
@ -220,7 +220,7 @@ impl Workspace {
} }
/// await next workspace [crate::api::Event] and return it /// await next workspace [crate::api::Event] and return it
pub async fn event(&self) -> crate::Result<crate::api::Event> { pub async fn event(&self) -> crate::Result<Event> {
self.0 self.0
.events .events
.lock() .lock()
@ -271,7 +271,7 @@ impl Workspace {
/// get a list of the users attached to a specific buffer /// get a list of the users attached to a specific buffer
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<String>> { pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<User>> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
let buffer_users = workspace_client let buffer_users = workspace_client
.list_buffer_users(tonic::Request::new(BufferNode { .list_buffer_users(tonic::Request::new(BufferNode {
@ -281,7 +281,7 @@ impl Workspace {
.into_inner() .into_inner()
.users .users
.into_iter() .into_iter()
.map(|u| u.id) .map(|id| id.into())
.collect(); .collect();
Ok(buffer_users) Ok(buffer_users)