From 907a0329d30c77405a8509d097b635ed1f0b6ab6 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 02:30:46 +0200 Subject: [PATCH 01/18] feat!: ackable changes for buffer cntrls, sync send --- src/api/change.rs | 16 ++++++++++++ src/api/controller.rs | 8 +++--- src/buffer/controller.rs | 50 +++++++++++++++++++------------------ src/buffer/worker.rs | 53 ++++++++++++++++++++++++++-------------- src/cursor/controller.rs | 7 +++--- src/cursor/worker.rs | 4 +-- 6 files changed, 85 insertions(+), 53 deletions(-) diff --git a/src/api/change.rs b/src/api/change.rs index 5a2ebfe..f41e342 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -45,6 +45,22 @@ pub struct TextChange { pub hash: Option, } +/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change +/// application +#[derive(Debug)] +pub struct Delta { + /// The change received + pub change: TextChange, + /// The ack handle, must be called after correctly applying this change + pub ack: T, +} + +/// A token which can be used to acknowledge changes +pub trait Acknowledgeable { + /// Send Acknowledgement. This action is idempotent + fn send(&mut self); +} + impl TextChange { /// Returns the [`std::ops::Range`] representing this change's span. pub fn span(&self) -> std::ops::Range { diff --git a/src/api/controller.rs b/src/api/controller.rs index dda3b3f..0f19d70 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -37,11 +37,9 @@ where /// See [`Controller`]'s documentation for details. /// /// Details about the receiving end are left to the implementor. -#[allow(async_fn_in_trait)] -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait AsyncSender: Sized + Send + Sync { - /// Enqueue a new value to be sent to all other users. - async fn send(&self, x: T) -> ControllerResult<()>; +pub trait AsyncSender : Sized + Send + Sync { + /// Enqueue a new value to be sent to all other users without blocking + fn send(&self, x: T) -> ControllerResult<()>; } /// Asynchronous and thread-safe handle to receive data from a stream. diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index bcf115e..d52b472 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,13 +6,27 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; +use crate::api::change::{Acknowledgeable, Delta}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; -use crate::ext::InternallyMutable; +use crate::ext::IgnorableError; use super::worker::DeltaRequest; +#[derive(Debug)] +pub(crate) struct BufferAck { + pub(crate) tx: mpsc::UnboundedSender, + pub(crate) version: LocalVersion, +} + +impl Acknowledgeable for BufferAck { + fn send(&mut self) { + self.tx.send(self.version.clone()) + .unwrap_or_warn("no worker to receive sent ack"); + } +} + /// A [Controller] to asynchronously interact with remote buffers. /// /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync @@ -33,9 +47,6 @@ impl BufferController { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; let content = rx.await?; - self.0 - .last_update - .set(self.0.latest_version.borrow().clone()); Ok(content) } } @@ -44,8 +55,8 @@ impl BufferController { pub(crate) struct BufferControllerInner { pub(crate) name: String, pub(crate) latest_version: watch::Receiver, - pub(crate) last_update: InternallyMutable, - pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, + pub(crate) local_version: watch::Receiver, + pub(crate) ops_in: mpsc::UnboundedSender, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender, @@ -53,23 +64,19 @@ pub(crate) struct BufferControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController {} +impl Controller> for BufferController {} -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for BufferController { - async fn send(&self, op: TextChange) -> ControllerResult<()> { - // we let the worker do the updating to the last version and send it back. - let (tx, rx) = oneshot::channel(); - self.0.ops_in.send((op, tx))?; - self.0.last_update.set(rx.await?); + fn send(&self, op: TextChange) -> ControllerResult<()> { + self.0.ops_in.send(op)?; Ok(()) } } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncReceiver for BufferController { +impl AsyncReceiver> for BufferController { async fn poll(&self) -> ControllerResult<()> { - if self.0.last_update.get() != *self.0.latest_version.borrow() { + if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { return Ok(()); } @@ -79,8 +86,8 @@ impl AsyncReceiver for BufferController { Ok(()) } - async fn try_recv(&self) -> ControllerResult> { - let last_update = self.0.last_update.get(); + async fn try_recv(&self) -> ControllerResult>> { + let last_update = self.0.local_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone(); if last_update == latest_version { @@ -89,16 +96,11 @@ impl AsyncReceiver for BufferController { let (tx, rx) = oneshot::channel(); self.0.delta_request.send((last_update, tx)).await?; - let (v, change) = rx.await?; - self.0.last_update.set(v); - Ok(change) + Ok(rx.await?) } fn callback(&self, cb: impl Into>) { - if self.0.callback.send(Some(cb.into())).is_err() { - // TODO should we panic? we failed what we were supposed to do - tracing::error!("no active buffer worker to run registered callback!"); - } + self.0.callback.send_replace(Some(cb.into())); } fn clear_callback(&self) { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index a5a730f..ba1a318 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,22 +6,26 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; +use crate::api::change::Delta; use crate::api::controller::ControllerCallback; use crate::api::TextChange; -use crate::ext::{IgnorableError, InternallyMutable}; +use crate::ext::IgnorableError; use codemp_proto::buffer::{BufferEvent, Operation}; -use super::controller::{BufferController, BufferControllerInner}; +use super::controller::{BufferAck, BufferController, BufferControllerInner}; -pub(crate) type DeltaOp = (LocalVersion, Option); +pub(crate) type DeltaOp = Option>; pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); struct BufferWorker { user_id: Uuid, path: String, latest_version: watch::Sender, - ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, + local_version: watch::Sender, + ack_rx: mpsc::UnboundedReceiver, + ack_tx: mpsc::UnboundedSender, + ops_in: mpsc::UnboundedReceiver, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, @@ -43,7 +47,9 @@ impl BufferController { let init = diamond_types::LocalVersion::default(); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); + let (my_version_tx, my_version_rx) = watch::channel(init.clone()); let (opin_tx, opin_rx) = mpsc::unbounded_channel(); + let (ack_tx, ack_rx) = mpsc::unbounded_channel(); let (req_tx, req_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); @@ -54,7 +60,7 @@ impl BufferController { let controller = Arc::new(BufferControllerInner { name: path.to_string(), latest_version: latest_version_rx, - last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), + local_version: my_version_rx, ops_in: opin_tx, poller: poller_tx, content_request: req_tx, @@ -68,6 +74,8 @@ impl BufferController { user_id, path: path.to_string(), latest_version: latest_version_tx, + local_version: my_version_tx, + ack_tx, ack_rx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), @@ -106,10 +114,18 @@ impl BufferController { Some(tx) => worker.pollers.push(tx), }, + // received new change ack, merge editor branch up to that version + res = worker.ack_rx.recv() => match res { + None => break tracing::error!("ack channel closed"), + Some(v) => { + worker.branch.merge(&worker.oplog, &v) + }, + }, + // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, + Some(change) => worker.handle_editor_change(change, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) @@ -142,12 +158,7 @@ impl BufferController { } impl BufferWorker { - async fn handle_editor_change( - &mut self, - change: TextChange, - ack: oneshot::Sender, - tx: &mpsc::Sender, - ) { + async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender) { let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); let last_ver = self.oplog.local_version(); // clip to buffer extents @@ -174,9 +185,9 @@ impl BufferWorker { self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); + self.local_version.send(self.branch.local_version()) + .unwrap_or_warn("failed to update local version!"); } - ack.send(self.branch.local_version()) - .unwrap_or_warn("controller didn't wait for ack"); } async fn handle_server_change(&mut self, change: BufferEvent) -> bool { @@ -243,11 +254,17 @@ impl BufferWorker { hash, }, }; - tx.send((new_local_v, Some(tc))) - .unwrap_or_warn("could not update ops channel -- is controller dead?"); + let delta = Delta { + change: tc, + ack: BufferAck { + tx: self.ack_tx.clone(), + version: step_ver, + }, + }; + self.local_version.send(new_local_v).unwrap_or_warn("could not update local version"); + tx.send(Some(delta)).unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send((last_ver, None)) - .unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send(None).unwrap_or_warn("could not update ops channel -- is controller dead?"); } } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index a0bced9..bac20cf 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -27,7 +27,7 @@ pub struct CursorController(pub(crate) Arc); #[derive(Debug)] pub(crate) struct CursorControllerInner { - pub(crate) op: mpsc::Sender, + pub(crate) op: mpsc::UnboundedSender, pub(crate) stream: mpsc::Sender>>, pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, @@ -38,7 +38,7 @@ impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for CursorController { - async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { + fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } @@ -57,8 +57,7 @@ impl AsyncSender for CursorController { row: cursor.end.0, col: cursor.end.1, }, - }) - .await?) + })?) } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index aab8c34..bd6b35b 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -13,7 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; use super::controller::{CursorController, CursorControllerInner}; struct CursorWorker { - op: mpsc::Receiver, + op: mpsc::UnboundedReceiver, map: Arc>, stream: mpsc::Receiver>>, poll: mpsc::UnboundedReceiver>, @@ -30,7 +30,7 @@ impl CursorController { rx: Streaming, ) -> Self { // TODO we should tweak the channel buffer size to better propagate backpressure - let (op_tx, op_rx) = mpsc::channel(64); + let (op_tx, op_rx) = mpsc::unbounded_channel(); let (stream_tx, stream_rx) = mpsc::channel(1); let (cb_tx, cb_rx) = watch::channel(None); let (poll_tx, poll_rx) = mpsc::unbounded_channel(); From d5518a7b48307ac80ffafdb08d93bff871a57155 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:17:30 +0200 Subject: [PATCH 02/18] chore: updated send methods (+format) --- Cargo.lock | 2 +- src/api/controller.rs | 2 +- src/buffer/controller.rs | 3 ++- src/buffer/worker.rs | 16 +++++++++++----- src/cursor/controller.rs | 29 +++++++++++++---------------- src/ffi/java/buffer.rs | 8 +++++--- src/ffi/java/cursor.rs | 6 +++++- src/ffi/js/buffer.rs | 13 +++++++++---- src/ffi/js/cursor.rs | 9 +++++++-- src/ffi/js/workspace.rs | 2 -- src/ffi/lua/buffer.rs | 8 ++++---- src/ffi/lua/cursor.rs | 8 ++++---- src/ffi/python/controllers.rs | 12 +++++++----- 13 files changed, 69 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d50945..bb7e87a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,7 +231,7 @@ dependencies = [ [[package]] name = "codemp" -version = "0.7.2" +version = "0.7.3" dependencies = [ "async-trait", "codemp-proto", diff --git a/src/api/controller.rs b/src/api/controller.rs index 0f19d70..853f1e5 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -37,7 +37,7 @@ where /// See [`Controller`]'s documentation for details. /// /// Details about the receiving end are left to the implementor. -pub trait AsyncSender : Sized + Send + Sync { +pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users without blocking fn send(&self, x: T) -> ControllerResult<()>; } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index d52b472..efb5a7d 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -22,7 +22,8 @@ pub(crate) struct BufferAck { impl Acknowledgeable for BufferAck { fn send(&mut self) { - self.tx.send(self.version.clone()) + self.tx + .send(self.version.clone()) .unwrap_or_warn("no worker to receive sent ack"); } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index ba1a318..01826b8 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -75,7 +75,8 @@ impl BufferController { path: path.to_string(), latest_version: latest_version_tx, local_version: my_version_tx, - ack_tx, ack_rx, + ack_tx, + ack_rx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), @@ -185,7 +186,8 @@ impl BufferWorker { self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); - self.local_version.send(self.branch.local_version()) + self.local_version + .send(self.branch.local_version()) .unwrap_or_warn("failed to update local version!"); } } @@ -261,10 +263,14 @@ impl BufferWorker { version: step_ver, }, }; - self.local_version.send(new_local_v).unwrap_or_warn("could not update local version"); - tx.send(Some(delta)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + self.local_version + .send(new_local_v) + .unwrap_or_warn("could not update local version"); + tx.send(Some(delta)) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send(None).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send(None) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index bac20cf..63a5483 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -42,22 +42,19 @@ impl AsyncSender for CursorController { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self - .0 - .op - .send(CursorPosition { - buffer: BufferNode { - path: cursor.buffer, - }, - start: RowCol { - row: cursor.start.0, - col: cursor.start.1, - }, - end: RowCol { - row: cursor.end.0, - col: cursor.end.1, - }, - })?) + Ok(self.0.op.send(CursorPosition { + buffer: BufferNode { + path: cursor.buffer, + }, + start: RowCol { + row: cursor.start.0, + col: cursor.start.1, + }, + end: RowCol { + row: cursor.end.0, + col: cursor.end.1, + }, + })?) } } diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 639c187..1a5e2d8 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -3,9 +3,11 @@ use jni_toolbox::jni; use crate::{ api::{ + change::Delta, controller::{AsyncReceiver, AsyncSender}, TextChange, }, + buffer::controller::BufferAck, errors::ControllerError, }; @@ -27,13 +29,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +) -> Result>, ControllerError> { super::tokio().block_on(controller.try_recv()) } /// Block until it receives a [TextChange]. #[jni(package = "mp.code", class = "BufferController")] -fn recv(controller: &mut crate::buffer::Controller) -> Result { +fn recv(controller: &mut crate::buffer::Controller) -> Result, ControllerError> { super::tokio().block_on(controller.recv()) } @@ -43,7 +45,7 @@ fn send( controller: &mut crate::buffer::Controller, change: TextChange, ) -> Result<(), ControllerError> { - super::tokio().block_on(controller.send(change)) + controller.send(change) } /// Register a callback for buffer changes. diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 65cb0c4..60f2810 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -5,6 +5,10 @@ use crate::{ }, errors::ControllerError, }; +use crate::{ + api::{Controller, Cursor}, + errors::ControllerError, +}; use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; @@ -25,7 +29,7 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result Result<(), ControllerError> { - super::tokio().block_on(controller.send(cursor)) + controller.send(cursor) } /// Register a callback for cursor changes. diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index df89f06..945b25c 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,9 +1,14 @@ +use crate::api::change::Delta; use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::TextChange; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; +use napi::threadsafe_function::{ + ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, +}; +use napi_derive::napi; use napi_derive::napi; #[napi] @@ -51,20 +56,20 @@ impl BufferController { /// Return next buffer event if present #[napi(js_name = "try_recv")] - pub async fn js_try_recv(&self) -> napi::Result> { + pub async fn js_try_recv(&self) -> napi::Result>> { Ok(self.try_recv().await?) } /// Wait for next buffer event and return it #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result { + pub async fn js_recv(&self) -> napi::Result> { Ok(self.recv().await?) } /// Send a buffer update to workspace #[napi(js_name = "send")] - pub async fn js_send(&self, op: TextChange) -> napi::Result<()> { - Ok(self.send(op).await?) + pub fn js_send(&self, op: TextChange) -> napi::Result<()> { + Ok(self.send(op)?) } /// Return buffer whole content diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index 2bb09e5..f7fbf4f 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,9 +1,14 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::{ ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; +use napi::threadsafe_function::{ + ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, +}; +use napi_derive::napi; use napi_derive::napi; #[napi(object, js_name = "Cursor")] @@ -74,8 +79,8 @@ impl CursorController { /// Send a new cursor event to remote #[napi(js_name = "send")] - pub async fn js_send(&self, pos: JsCursor) -> napi::Result<()> { - Ok(self.send(crate::api::Cursor::from(pos)).await?) + pub fn js_send(&self, pos: JsCursor) -> napi::Result<()> { + Ok(self.send(crate::api::Cursor::from(pos))?) } /// Get next cursor event if available without blocking diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index e44fe81..294dd98 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -8,8 +8,6 @@ use napi::threadsafe_function::{ }; use napi_derive::napi; -use super::client::JsUser; - #[napi(object, js_name = "Event")] pub struct JsEvent { pub r#type: String, diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 76c89cd..2a46dc6 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -11,10 +11,10 @@ impl LuaUserData for CodempBufferController { Ok(format!("{:?}", this)) }); - methods.add_method( - "send", - |_, this, (change,): (CodempTextChange,)| a_sync! { this => this.send(change).await? }, - ); + methods.add_method("send", |_, this, (change,): (CodempTextChange,)| { + this.send(change)?; + Ok(()) + }); methods.add_method( "try_recv", diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 3deea7c..2e93fc8 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -12,10 +12,10 @@ impl LuaUserData for CodempCursorController { Ok(format!("{:?}", this)) }); - methods.add_method( - "send", - |_, this, (cursor,): (CodempCursor,)| a_sync! { this => this.send(cursor).await? }, - ); + methods.add_method("send", |_, this, (cursor,): (CodempCursor,)| { + this.send(cursor)?; + Ok(()) + }); methods.add_method( "try_recv", |_, this, ()| a_sync! { this => this.try_recv().await? }, diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 6ee1e27..5b0381c 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -15,11 +15,11 @@ impl CursorController { #[pyo3(name = "send")] fn pysend( &self, - py: Python, + _py: Python, path: String, start: (i32, i32), end: (i32, i32), - ) -> PyResult { + ) -> PyResult<()> { let pos = Cursor { start, end, @@ -27,7 +27,8 @@ impl CursorController { user: None, }; let this = self.clone(); - a_sync_allow_threads!(py, this.send(pos).await) + this.send(pos)?; + Ok(()) } #[pyo3(name = "try_recv")] @@ -84,7 +85,7 @@ impl BufferController { } #[pyo3(name = "send")] - fn pysend(&self, py: Python, start: u32, end: u32, txt: String) -> PyResult { + fn pysend(&self, _py: Python, start: u32, end: u32, txt: String) -> PyResult<()> { let op = TextChange { start, end, @@ -92,7 +93,8 @@ impl BufferController { hash: None, }; let this = self.clone(); - a_sync_allow_threads!(py, this.send(op).await) + this.send(op)?; + Ok(()) } #[pyo3(name = "try_recv")] From 75a0df5e7c3a7a43865f2c0741ba173caa5ce3fb Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:40:55 +0200 Subject: [PATCH 03/18] chore(lua): auto callback args macro --- src/ffi/lua/ext/callback.rs | 96 ++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index dd84098..ce96c31 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -62,55 +62,53 @@ pub(crate) enum LuaCallback { Invoke(LuaFunction, CallbackArg), } -pub(crate) enum CallbackArg { - Nil, - Str(String), - VecStr(Vec), - Client(CodempClient), - CursorController(CodempCursorController), - BufferController(CodempBufferController), - Workspace(CodempWorkspace), - Event(CodempEvent), - MaybeEvent(Option), - Cursor(CodempCursor), - MaybeCursor(Option), - TextChange(CodempTextChange), - MaybeTextChange(Option), -} - -impl IntoLua for CallbackArg { - // TODO this basically calls .into_lua() on all enum variants - // i wish i could do this with a Box or an impl IntoLua - // but IntoLua requires Sized so it can't be made into an object - fn into_lua(self, lua: &Lua) -> LuaResult { - match self { - CallbackArg::Nil => Ok(LuaValue::Nil), - CallbackArg::Str(x) => x.into_lua(lua), - CallbackArg::Client(x) => x.into_lua(lua), - CallbackArg::CursorController(x) => x.into_lua(lua), - CallbackArg::BufferController(x) => x.into_lua(lua), - CallbackArg::Workspace(x) => x.into_lua(lua), - CallbackArg::VecStr(x) => x.into_lua(lua), - CallbackArg::Event(x) => x.into_lua(lua), - CallbackArg::MaybeEvent(x) => x.into_lua(lua), - CallbackArg::Cursor(x) => x.into_lua(lua), - CallbackArg::MaybeCursor(x) => x.into_lua(lua), - CallbackArg::TextChange(x) => x.into_lua(lua), - CallbackArg::MaybeTextChange(x) => x.into_lua(lua), +macro_rules! callback_args { + ($($name:ident : $t:ty ,)*) => { + pub(crate) enum CallbackArg { + Nil, + $( + $name($t), + )* } - } + + impl IntoLua for CallbackArg { + fn into_lua(self, lua: &Lua) -> LuaResult { + match self { + Self::Nil => Ok(LuaValue::Nil), + $( + Self::$name(x) => x.into_lua(lua), + )* + } + } + } + + impl From<()> for CallbackArg { + fn from(_value: ()) -> Self { + Self::Nil + } + } + + $( + impl From<$t> for CallbackArg { + fn from(value: $t) -> Self { + Self::$name(value) + } + } + )* + }; } -impl From<()> for CallbackArg { fn from(_: ()) -> Self { CallbackArg::Nil } } -impl From for CallbackArg { fn from(value: String) -> Self { CallbackArg::Str(value) } } -impl From for CallbackArg { fn from(value: CodempClient) -> Self { CallbackArg::Client(value) } } -impl From for CallbackArg { fn from(value: CodempCursorController) -> Self { CallbackArg::CursorController(value) } } -impl From for CallbackArg { fn from(value: CodempBufferController) -> Self { CallbackArg::BufferController(value) } } -impl From for CallbackArg { fn from(value: CodempWorkspace) -> Self { CallbackArg::Workspace(value) } } -impl From> for CallbackArg { fn from(value: Vec) -> Self { CallbackArg::VecStr(value) } } -impl From for CallbackArg { fn from(value: CodempEvent) -> Self { CallbackArg::Event(value) } } -impl From for CallbackArg { fn from(value: CodempCursor) -> Self { CallbackArg::Cursor(value) } } -impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeCursor(value) } } -impl From for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } } -impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeTextChange(value) } } -impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeEvent(value) } } +callback_args! { + Str: String, + VecStr: Vec, + Client: CodempClient, + CursorController: CodempCursorController, + BufferController: CodempBufferController, + Workspace: CodempWorkspace, + Event: CodempEvent, + MaybeEvent: Option, + Cursor: CodempCursor, + MaybeCursor: Option, + TextChange: CodempTextChange, + MaybeTextChange: Option, +} From 29fde1ad24df5f92329738f1e5ded5a62b1cac64 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:41:20 +0200 Subject: [PATCH 04/18] fix: BufferAck is clonable --- src/buffer/controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index efb5a7d..5ff432c 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -14,7 +14,7 @@ use crate::ext::IgnorableError; use super::worker::DeltaRequest; -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) struct BufferAck { pub(crate) tx: mpsc::UnboundedSender, pub(crate) version: LocalVersion, From b821cdc15295f5a37ac058e27f045331d9377779 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 03:41:28 +0200 Subject: [PATCH 05/18] fix(lua): add Delta object --- src/ffi/lua/buffer.rs | 15 +++++++++++++++ src/ffi/lua/ext/callback.rs | 4 ++++ 2 files changed, 19 insertions(+) diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 2a46dc6..17d3627 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -1,4 +1,6 @@ use crate::prelude::*; +use crate::api::change::{Acknowledgeable, Delta}; +use crate::buffer::controller::BufferAck; use mlua::prelude::*; use mlua_codemp_patch as mlua; @@ -59,3 +61,16 @@ impl LuaUserData for CodempTextChange { methods.add_method("apply", |_, this, (txt,): (String,)| Ok(this.apply(&txt))); } } + +impl LuaUserData for Delta { + fn add_fields>(fields: &mut F) { + fields.add_field_method_get("change", |_, this| Ok(this.change.clone())); + fields.add_field_method_get("ack", |_, this| Ok(this.ack.clone())); + } +} + +impl LuaUserData for BufferAck { + fn add_methods>(methods: &mut M) { + methods.add_method_mut("send", |_, this, ()| Ok(this.send())); + } +} diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index ce96c31..7deb8a0 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -1,5 +1,7 @@ use crate::ext::IgnorableError; use crate::prelude::*; +use crate::api::change::Delta; +use crate::buffer::controller::BufferAck; use mlua::prelude::*; use mlua_codemp_patch as mlua; @@ -111,4 +113,6 @@ callback_args! { MaybeCursor: Option, TextChange: CodempTextChange, MaybeTextChange: Option, + Delta: Delta, + MaybeDelta: Option>, } From 039a54b8e218a02f869b050ea0c2274c5e29bee8 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 04:10:52 +0200 Subject: [PATCH 06/18] chore: last fmt touches --- src/ffi/lua/buffer.rs | 2 +- src/ffi/lua/ext/callback.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 17d3627..945fa83 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -1,6 +1,6 @@ -use crate::prelude::*; use crate::api::change::{Acknowledgeable, Delta}; use crate::buffer::controller::BufferAck; +use crate::prelude::*; use mlua::prelude::*; use mlua_codemp_patch as mlua; diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index 7deb8a0..d4baa91 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -1,7 +1,7 @@ -use crate::ext::IgnorableError; -use crate::prelude::*; use crate::api::change::Delta; use crate::buffer::controller::BufferAck; +use crate::ext::IgnorableError; +use crate::prelude::*; use mlua::prelude::*; use mlua_codemp_patch as mlua; From a318e3bc2839e12f37e900702f81ba8ac7ad9744 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 04:14:23 +0200 Subject: [PATCH 07/18] fix: imports and types --- src/ffi/java/cursor.rs | 4 ---- src/ffi/js/buffer.rs | 10 ++++------ src/ffi/js/cursor.rs | 5 ----- src/ffi/js/workspace.rs | 7 +++++-- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 60f2810..57fa1d5 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -5,10 +5,6 @@ use crate::{ }, errors::ControllerError, }; -use crate::{ - api::{Controller, Cursor}, - errors::ControllerError, -}; use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index 945b25c..e3a9f48 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -5,10 +5,6 @@ use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; -use napi::threadsafe_function::{ - ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, -}; -use napi_derive::napi; use napi_derive::napi; #[napi] @@ -56,13 +52,15 @@ impl BufferController { /// Return next buffer event if present #[napi(js_name = "try_recv")] - pub async fn js_try_recv(&self) -> napi::Result>> { + pub async fn js_try_recv( + &self, + ) -> napi::Result>> { Ok(self.try_recv().await?) } /// Wait for next buffer event and return it #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result> { + pub async fn js_recv(&self) -> napi::Result> { Ok(self.recv().await?) } diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index f7fbf4f..79af392 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,14 +1,9 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; use napi::threadsafe_function::ErrorStrategy::Fatal; -use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::{ ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; -use napi::threadsafe_function::{ - ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, -}; -use napi_derive::napi; use napi_derive::napi; #[napi(object, js_name = "Cursor")] diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index 294dd98..bacde0e 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -146,12 +146,15 @@ impl Workspace { /// List users attached to a specific buffer #[napi(js_name = "list_buffer_users")] - pub async fn js_list_buffer_users(&self, path: String) -> napi::Result> { + pub async fn js_list_buffer_users( + &self, + path: String, + ) -> napi::Result> { Ok(self .list_buffer_users(&path) .await? .into_iter() - .map(JsUser::from) + .map(super::client::JsUser::from) .collect()) } } From 45864e19f6336b69d69813f4840fb373078d457c Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 6 Oct 2024 10:18:58 +0200 Subject: [PATCH 08/18] feat: added Delta object to ffis --- dist/java/src/mp/code/data/Delta.java | 37 +++++++++++++++++++++++++ dist/lua/annotations.lua | 39 ++++++++++++++++----------- dist/py/src/codemp/codemp.pyi | 8 ++++++ src/api/change.rs | 16 ----------- src/buffer/controller.rs | 29 ++++++++++++++------ src/buffer/worker.rs | 5 ++-- src/ffi/java/buffer.rs | 7 +++-- src/ffi/java/mod.rs | 2 ++ src/ffi/js/buffer.rs | 7 +++-- src/ffi/lua/buffer.rs | 10 +++---- src/ffi/lua/ext/callback.rs | 7 +++-- src/ffi/mod.rs | 2 ++ 12 files changed, 107 insertions(+), 62 deletions(-) create mode 100644 dist/java/src/mp/code/data/Delta.java diff --git a/dist/java/src/mp/code/data/Delta.java b/dist/java/src/mp/code/data/Delta.java new file mode 100644 index 0000000..e04af1c --- /dev/null +++ b/dist/java/src/mp/code/data/Delta.java @@ -0,0 +1,37 @@ +package mp.code.data; + +import lombok.Getter; +import mp.code.data.Config; +import mp.code.data.User; +import mp.code.exceptions.ConnectionException; +import mp.code.exceptions.ConnectionRemoteException; + +import java.util.Optional; + +@Getter +public final class Delta { + private final long ptr; + + Delta(long ptr) { + this.ptr = ptr; + Extensions.CLEANER.register(this, () -> free(ptr)); + } + + private static native TextChange get_text_change(long self); + + public mp.code.data.TextChange getTextChange() { + return get_text_change(this.ptr); + } + + private static native void ack_native(long self, boolean success) throws ConnectionException; + + public void ack(boolean success) throws ConnectionException { + return ack_native(this.ptr, success); + } + + private static native void free(long self); + + static { + NativeUtils.loadLibraryIfNeeded(); + } +} diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 0e573fa..895e324 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -135,28 +135,28 @@ function MaybeCursorPromise:cancel() end function MaybeCursorPromise:and_then(cb) end ----@class (exact) TextChangePromise : Promise -local TextChangePromise = {} +---@class (exact) DeltaPromise : Promise +local DeltaPromise = {} --- block until promise is ready and return value ---- @return TextChange -function TextChangePromise:await() end +--- @return Delta +function DeltaPromise:await() end --- cancel promise execution -function TextChangePromise:cancel() end ----@param cb fun(x: TextChange) callback to invoke +function DeltaPromise:cancel() end +---@param cb fun(x: Delta) callback to invoke ---invoke callback asynchronously as soon as promise is ready -function TextChangePromise:and_then(cb) end +function DeltaPromise:and_then(cb) end ----@class (exact) MaybeTextChangePromise : Promise -local MaybeTextChangePromise = {} +---@class (exact) MaybeDeltaPromise : Promise +local MaybeDeltaPromise = {} --- block until promise is ready and return value ---- @return TextChange | nil -function MaybeTextChangePromise:await() end +--- @return Delta | nil +function MaybeDeltaPromise:await() end --- cancel promise execution -function MaybeTextChangePromise:cancel() end ----@param cb fun(x: TextChange | nil) callback to invoke +function MaybeDeltaPromise:cancel() end +---@param cb fun(x: Delta | nil) callback to invoke ---invoke callback asynchronously as soon as promise is ready -function MaybeTextChangePromise:and_then(cb) end +function MaybeDeltaPromise:and_then(cb) end -- [[ END ASYNC STUFF ]] @@ -325,6 +325,13 @@ local BufferController = {} ---@field hash integer? optional hash of text buffer after this change, for sync checks local TextChange = {} +---@class (exact) Delta +---@field change TextChange text change for this delta +local Delta = {} + +---notify controller that this change has been correctly applied +function Delta:ack() end + ---@param other string text to apply change to ---apply this text change to a string, returning the result function TextChange:apply(other) end @@ -336,13 +343,13 @@ function TextChange:apply(other) end ---update buffer with a text change; note that to delete content should be empty but not span, while to insert span should be empty but not content (can insert and delete at the same time) function BufferController:send(change) end ----@return MaybeTextChangePromise +---@return MaybeDeltaPromise ---@async ---@nodiscard ---try to receive text changes, returning nil if none is available function BufferController:try_recv() end ----@return TextChangePromise +---@return DeltaPromise ---@async ---@nodiscard ---block until next text change and return it diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index 6c088c5..ead8f3c 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -92,6 +92,14 @@ class TextChange: def is_empty(self) -> bool: ... def apply(self, txt: str) -> str: ... +class Delta: + """ + A single editor delta event, wrapping a TextChange and the corresponding ACK channel + """ + change: TextChange + + def ack(self,) -> str: ... + class BufferController: """ diff --git a/src/api/change.rs b/src/api/change.rs index f41e342..5a2ebfe 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -45,22 +45,6 @@ pub struct TextChange { pub hash: Option, } -/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change -/// application -#[derive(Debug)] -pub struct Delta { - /// The change received - pub change: TextChange, - /// The ack handle, must be called after correctly applying this change - pub ack: T, -} - -/// A token which can be used to acknowledge changes -pub trait Acknowledgeable { - /// Send Acknowledgement. This action is idempotent - fn send(&mut self); -} - impl TextChange { /// Returns the [`std::ops::Range`] representing this change's span. pub fn span(&self) -> std::ops::Range { diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 5ff432c..4c55297 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; -use crate::api::change::{Acknowledgeable, Delta}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; @@ -14,16 +13,30 @@ use crate::ext::IgnorableError; use super::worker::DeltaRequest; +/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change +/// application +#[derive(Debug)] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass)] +#[cfg_attr(feature = "js", napi_derive::napi)] +pub struct Delta { + /// The change received + pub change: TextChange, + /// The ack handle, must be called after correctly applying this change + pub(crate) ack: BufferAck, +} + #[derive(Clone, Debug)] pub(crate) struct BufferAck { pub(crate) tx: mpsc::UnboundedSender, pub(crate) version: LocalVersion, } -impl Acknowledgeable for BufferAck { - fn send(&mut self) { - self.tx - .send(self.version.clone()) +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pymethods)] +#[cfg_attr(feature = "js", napi_derive::napi)] +impl Delta { + pub fn ack(&mut self) { + self.ack.tx + .send(self.ack.version.clone()) .unwrap_or_warn("no worker to receive sent ack"); } } @@ -65,7 +78,7 @@ pub(crate) struct BufferControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller> for BufferController {} +impl Controller for BufferController {} impl AsyncSender for BufferController { fn send(&self, op: TextChange) -> ControllerResult<()> { @@ -75,7 +88,7 @@ impl AsyncSender for BufferController { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncReceiver> for BufferController { +impl AsyncReceiver for BufferController { async fn poll(&self) -> ControllerResult<()> { if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { return Ok(()); @@ -87,7 +100,7 @@ impl AsyncReceiver> for BufferController { Ok(()) } - async fn try_recv(&self) -> ControllerResult>> { + async fn try_recv(&self) -> ControllerResult> { let last_update = self.0.local_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone(); diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 01826b8..610abf1 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,16 +6,15 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::api::change::Delta; use crate::api::controller::ControllerCallback; use crate::api::TextChange; use crate::ext::IgnorableError; use codemp_proto::buffer::{BufferEvent, Operation}; -use super::controller::{BufferAck, BufferController, BufferControllerInner}; +use super::controller::{BufferAck, BufferController, BufferControllerInner, Delta}; -pub(crate) type DeltaOp = Option>; +pub(crate) type DeltaOp = Option; pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); struct BufferWorker { diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 1a5e2d8..c700fde 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -3,11 +3,10 @@ use jni_toolbox::jni; use crate::{ api::{ - change::Delta, controller::{AsyncReceiver, AsyncSender}, TextChange, }, - buffer::controller::BufferAck, + buffer::controller::Delta, errors::ControllerError, }; @@ -29,13 +28,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result>, ControllerError> { +) -> Result, ControllerError> { super::tokio().block_on(controller.try_recv()) } /// Block until it receives a [TextChange]. #[jni(package = "mp.code", class = "BufferController")] -fn recv(controller: &mut crate::buffer::Controller) -> Result, ControllerError> { +fn recv(controller: &mut crate::buffer::Controller) -> Result { super::tokio().block_on(controller.recv()) } diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index cad7768..0fb0980 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -141,6 +141,7 @@ into_java_ptr_class!(crate::Client, "mp/code/Client"); into_java_ptr_class!(crate::Workspace, "mp/code/Workspace"); into_java_ptr_class!(crate::cursor::Controller, "mp/code/CursorController"); into_java_ptr_class!(crate::buffer::Controller, "mp/code/BufferController"); +into_java_ptr_class!(crate::buffer::controller::Delta, "mp/code/data/Delta"); impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::User { const CLASS: &'static str = "mp/code/data/User"; @@ -275,6 +276,7 @@ from_java_ptr!(crate::Client); from_java_ptr!(crate::Workspace); from_java_ptr!(crate::cursor::Controller); from_java_ptr!(crate::buffer::Controller); +from_java_ptr!(crate::buffer::controller::Delta); impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { type From = jni::objects::JObject<'j>; diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index e3a9f48..ea28eb0 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,7 +1,6 @@ -use crate::api::change::Delta; use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::TextChange; -use crate::buffer::controller::BufferController; +use crate::buffer::controller::{Delta, BufferController}; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; @@ -54,13 +53,13 @@ impl BufferController { #[napi(js_name = "try_recv")] pub async fn js_try_recv( &self, - ) -> napi::Result>> { + ) -> napi::Result> { Ok(self.try_recv().await?) } /// Wait for next buffer event and return it #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result> { + pub async fn js_recv(&self) -> napi::Result { Ok(self.recv().await?) } diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 945fa83..c7d4ec7 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -1,5 +1,4 @@ -use crate::api::change::{Acknowledgeable, Delta}; -use crate::buffer::controller::BufferAck; +use crate::buffer::controller::Delta; use crate::prelude::*; use mlua::prelude::*; use mlua_codemp_patch as mlua; @@ -62,15 +61,12 @@ impl LuaUserData for CodempTextChange { } } -impl LuaUserData for Delta { +impl LuaUserData for Delta { fn add_fields>(fields: &mut F) { fields.add_field_method_get("change", |_, this| Ok(this.change.clone())); - fields.add_field_method_get("ack", |_, this| Ok(this.ack.clone())); } -} -impl LuaUserData for BufferAck { fn add_methods>(methods: &mut M) { - methods.add_method_mut("send", |_, this, ()| Ok(this.send())); + methods.add_method_mut("ack", |_, this, ()| Ok(this.ack())); } } diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index d4baa91..b67757d 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -1,5 +1,4 @@ -use crate::api::change::Delta; -use crate::buffer::controller::BufferAck; +use crate::buffer::controller::Delta; use crate::ext::IgnorableError; use crate::prelude::*; use mlua::prelude::*; @@ -113,6 +112,6 @@ callback_args! { MaybeCursor: Option, TextChange: CodempTextChange, MaybeTextChange: Option, - Delta: Delta, - MaybeDelta: Option>, + Delta: Delta, + MaybeDelta: Option, } diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index fa03449..bc92147 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -43,6 +43,8 @@ //! `JNIException`s are however unchecked: there is nothing you can do to recover from them, as they usually represent a severe error in the glue code. If they arise, it's probably a bug. //! +#![allow(clippy::unit_arg)] + /// java bindings, built with [jni] #[cfg(feature = "java")] pub mod java; From d66b25deb4ef901418935b3e6f546d2427a2a21e Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 6 Oct 2024 10:19:22 +0200 Subject: [PATCH 09/18] chore(lua): style --- src/ffi/lua/buffer.rs | 21 ++++++++------------- src/ffi/lua/cursor.rs | 17 ++++++----------- src/ffi/lua/ext/a_sync.rs | 5 +---- src/ffi/lua/workspace.rs | 16 +++++++--------- 4 files changed, 22 insertions(+), 37 deletions(-) diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index c7d4ec7..635b10a 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -12,10 +12,9 @@ impl LuaUserData for CodempBufferController { Ok(format!("{:?}", this)) }); - methods.add_method("send", |_, this, (change,): (CodempTextChange,)| { - this.send(change)?; - Ok(()) - }); + methods.add_method("send", |_, this, (change,): (CodempTextChange,)| + Ok(this.send(change)?) + ); methods.add_method( "try_recv", @@ -29,16 +28,12 @@ impl LuaUserData for CodempBufferController { |_, this, ()| a_sync! { this => this.content().await? }, ); - methods.add_method("clear_callback", |_, this, ()| { - this.clear_callback(); - Ok(()) - }); - methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { - this.callback(move |controller: CodempBufferController| { + methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| + Ok(this.callback(move |controller: CodempBufferController| super::ext::callback().invoke(cb.clone(), controller) - }); - Ok(()) - }); + )) + ); } } diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 2e93fc8..40df670 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -13,8 +13,7 @@ impl LuaUserData for CodempCursorController { }); methods.add_method("send", |_, this, (cursor,): (CodempCursor,)| { - this.send(cursor)?; - Ok(()) + Ok(this.send(cursor)?) }); methods.add_method( "try_recv", @@ -23,16 +22,12 @@ impl LuaUserData for CodempCursorController { methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("clear_callback", |_, this, ()| { - this.clear_callback(); - Ok(()) - }); - methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { - this.callback(move |controller: CodempCursorController| { + methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| + Ok(this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller) - }); - Ok(()) - }); + )) + ); } } diff --git a/src/ffi/lua/ext/a_sync.rs b/src/ffi/lua/ext/a_sync.rs index 02c672b..2e10776 100644 --- a/src/ffi/lua/ext/a_sync.rs +++ b/src/ffi/lua/ext/a_sync.rs @@ -51,10 +51,7 @@ impl LuaUserData for Promise { }); methods.add_method_mut("cancel", |_, this, ()| match this.0.take() { None => Err(LuaError::runtime("Promise already awaited")), - Some(x) => { - x.abort(); - Ok(()) - } + Some(x) => Ok(x.abort()), }); methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| { match this.0.take() { diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index 15167c9..5aa4b82 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -60,17 +60,15 @@ impl LuaUserData for CodempWorkspace { methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { - this.callback(move |controller: CodempWorkspace| { + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| + Ok(this.callback(move |controller: CodempWorkspace| super::ext::callback().invoke(cb.clone(), controller) - }); - Ok(()) - }); + )) + ); - methods.add_method("clear_callbacl", |_, this, ()| { - this.clear_callback(); - Ok(()) - }); + methods.add_method("clear_callbacl", |_, this, ()| + Ok(this.clear_callback()) + ); } fn add_fields>(fields: &mut F) { From 1f2c0708d67b1a0892420285f7a9ed75bbc29bdf Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 6 Oct 2024 10:24:23 +0200 Subject: [PATCH 10/18] test: fix doctest --- src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 40ad39f..edfda76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,8 +69,12 @@ //! # use codemp::api::controller::{AsyncSender, AsyncReceiver}; //! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach"); //! buffer.content(); // force-sync -//! if let Some(change) = buffer.try_recv().await.unwrap() { -//! println!("content: {}, span: {}-{}", change.content, change.start, change.end); +//! if let Some(mut delta) = buffer.try_recv().await.unwrap() { +//! println!( +//! "content: {}, span: {}-{}", +//! delta.change.content, delta.change.start, delta.change.end +//! ); +//! delta.ack(); //! } // if None, no changes are currently available //! # }; //! ``` From 560a634499b724e03ad00c25e6b9cb82c098cdf5 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 10 Oct 2024 02:10:02 +0200 Subject: [PATCH 11/18] chore: split TextChange and Cursor so that sending/receiving parts are different rather than Option Co-authored-by: zaaarf --- src/api/change.rs | 40 +++++++++++++++++++++------------ src/api/cursor.rs | 29 ++++++++++++++++++------ src/buffer/controller.rs | 48 +++++++++++++--------------------------- src/buffer/worker.rs | 39 ++++++++++++++++---------------- src/cursor/controller.rs | 25 ++++++++++++--------- src/cursor/worker.rs | 22 +++++++++--------- src/prelude.rs | 2 ++ 7 files changed, 110 insertions(+), 95 deletions(-) diff --git a/src/api/change.rs b/src/api/change.rs index 5a2ebfe..d4b342e 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -1,6 +1,25 @@ //! # TextChange //! A high-level representation of a change within a given buffer. +/// A [`TextChange`] event happening on a buffer. +/// +/// Contains the change itself, the new version after this change and an optional `hash` field. +/// This is used for error correction: if provided, it should match the hash of the buffer +/// content **after** applying this change. Note that the `hash` field will not necessarily +/// be provided every time. +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(get_all))] +#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] +pub struct BufferUpdate { + /// Optional content hash after applying this change. + pub hash: Option, + /// CRDT version after this change has been applied. + pub version: Vec, + /// The change that has occurred. + pub change: TextChange, +} + /// An editor-friendly representation of a text change in a given buffer. /// /// It's expressed with a range of characters and a string of content that should replace them, @@ -9,18 +28,18 @@ /// Bulky and large operations will result in a single [`TextChange`] effectively sending the whole /// new buffer, but smaller changes are efficient and easy to create or apply. /// -/// [`TextChange`] contains an optional `hash` field. This is used for error correction: if -/// provided, it should match the hash of the buffer content **after** applying this change. -/// Note that the `hash` field will not necessarily be provided every time. -/// /// ### Examples /// To insert 'a' after 4th character we should send a. -/// `TextChange { start: 4, end: 4, content: "a".into(), hash: None }` +/// ``` +/// TextChange { start: 4, end: 4, content: "a".into(), hash: None } +/// ``` /// /// To delete a the fourth character we should send a. -/// `TextChange { start: 3, end: 4, content: "".into(), hash: None }` +/// ``` +/// TextChange { start: 3, end: 4, content: "".into(), hash: None } +/// ``` /// -/// ```no_run +/// ``` /// let change = codemp::api::TextChange { /// start: 6, end: 11, /// content: "mom".to_string(), hash: None @@ -41,8 +60,6 @@ pub struct TextChange { pub end: u32, /// New content of text inside span. pub content: String, - /// Optional content hash after applying this change. - pub hash: Option, } impl TextChange { @@ -90,7 +107,6 @@ mod tests { start: 5, end: 5, content: " cruel".to_string(), - hash: None, }; let result = change.apply("hello world!"); assert_eq!(result, "hello cruel world!"); @@ -102,7 +118,6 @@ mod tests { start: 5, end: 11, content: "".to_string(), - hash: None, }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello world!"); @@ -114,7 +129,6 @@ mod tests { start: 5, end: 11, content: " not very pleasant".to_string(), - hash: None, }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello not very pleasant world!"); @@ -126,7 +140,6 @@ mod tests { start: 100, end: 110, content: "a very long string \n which totally matters".to_string(), - hash: None, }; let result = change.apply("a short text"); assert_eq!( @@ -141,7 +154,6 @@ mod tests { start: 42, end: 42, content: "".to_string(), - hash: None, }; let result = change.apply("some important text"); assert_eq!(result, "some important text"); diff --git a/src/api/cursor.rs b/src/api/cursor.rs index 7c1e4b5..93226f5 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -6,17 +6,32 @@ use pyo3::prelude::*; /// User cursor position in a buffer #[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] // #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] pub struct Cursor { - /// Cursor start position in buffer, as 0-indexed row-column tuple. - pub start: (i32, i32), - /// Cursor end position in buffer, as 0-indexed row-column tuple. - #[cfg_attr(feature = "serialize", serde(alias = "finish"))] // Lua uses `end` as keyword - pub end: (i32, i32), + /// User who sent the cursor. + pub user: String, + /// Cursor selection + pub sel: Selection, +} + +/// A cursor selection span, with row-column tuples +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] +#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] +// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] +pub struct Selection { + /// Cursor position starting row in buffer. + pub start_row: i32, + /// Cursor position starting column in buffer. + pub start_col: i32, + /// Cursor position final row in buffer. + pub end_row: i32, + /// Cursor position final column in buffer. + pub end_col: i32, /// Path of buffer this cursor is on. pub buffer: String, - /// User display name, if provided. - pub user: Option, } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 4c55297..e452b2a 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; +use crate::api::change::BufferUpdate; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; @@ -13,34 +14,6 @@ use crate::ext::IgnorableError; use super::worker::DeltaRequest; -/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change -/// application -#[derive(Debug)] -#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass)] -#[cfg_attr(feature = "js", napi_derive::napi)] -pub struct Delta { - /// The change received - pub change: TextChange, - /// The ack handle, must be called after correctly applying this change - pub(crate) ack: BufferAck, -} - -#[derive(Clone, Debug)] -pub(crate) struct BufferAck { - pub(crate) tx: mpsc::UnboundedSender, - pub(crate) version: LocalVersion, -} - -#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pymethods)] -#[cfg_attr(feature = "js", napi_derive::napi)] -impl Delta { - pub fn ack(&mut self) { - self.ack.tx - .send(self.ack.version.clone()) - .unwrap_or_warn("no worker to receive sent ack"); - } -} - /// A [Controller] to asynchronously interact with remote buffers. /// /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync @@ -51,18 +24,26 @@ impl Delta { pub struct BufferController(pub(crate) Arc); impl BufferController { - /// Get the buffer path + /// Get the buffer path. pub fn path(&self) -> &str { &self.0.name } - /// Return buffer whole content, updating internal acknowledgement tracker + /// Return buffer whole content, updating internal acknowledgement tracker. pub async fn content(&self) -> ControllerResult { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; let content = rx.await?; Ok(content) } + + /// Notify CRDT that changes up to the given version have been merged succesfully. + pub fn ack(&mut self, version: Vec) { + let version = version.into_iter().map(|x| usize::from_ne_bytes(x.to_ne_bytes())).collect(); + self.0.ack_tx + .send(version) + .unwrap_or_warn("no worker to receive sent ack"); + } } #[derive(Debug)] @@ -75,10 +56,11 @@ pub(crate) struct BufferControllerInner { pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender, pub(crate) callback: watch::Sender>>, + pub(crate) ack_tx: mpsc::UnboundedSender, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController {} +impl Controller for BufferController {} impl AsyncSender for BufferController { fn send(&self, op: TextChange) -> ControllerResult<()> { @@ -88,7 +70,7 @@ impl AsyncSender for BufferController { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncReceiver for BufferController { +impl AsyncReceiver for BufferController { async fn poll(&self) -> ControllerResult<()> { if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { return Ok(()); @@ -100,7 +82,7 @@ impl AsyncReceiver for BufferController { Ok(()) } - async fn try_recv(&self) -> ControllerResult> { + async fn try_recv(&self) -> ControllerResult> { let last_update = self.0.local_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone(); diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 610abf1..a021174 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,15 +6,16 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; +use crate::api::change::BufferUpdate; use crate::api::controller::ControllerCallback; use crate::api::TextChange; use crate::ext::IgnorableError; use codemp_proto::buffer::{BufferEvent, Operation}; -use super::controller::{BufferAck, BufferController, BufferControllerInner, Delta}; +use super::controller::{BufferController, BufferControllerInner}; -pub(crate) type DeltaOp = Option; +pub(crate) type DeltaOp = Option; pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); struct BufferWorker { @@ -23,7 +24,6 @@ struct BufferWorker { latest_version: watch::Sender, local_version: watch::Sender, ack_rx: mpsc::UnboundedReceiver, - ack_tx: mpsc::UnboundedSender, ops_in: mpsc::UnboundedReceiver, poller: mpsc::UnboundedReceiver>, pollers: Vec>, @@ -65,6 +65,7 @@ impl BufferController { content_request: req_tx, delta_request: recv_tx, callback: cb_tx, + ack_tx, }); let weak = Arc::downgrade(&controller); @@ -74,7 +75,6 @@ impl BufferController { path: path.to_string(), latest_version: latest_version_tx, local_version: my_version_tx, - ack_tx, ack_rx, ops_in: opin_rx, poller: poller_rx, @@ -240,32 +240,31 @@ impl BufferWorker { { tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); } - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.start() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), + crate::api::change::BufferUpdate { hash, + version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful + change: crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.start() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + } } } - diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), + diamond_types::list::operation::OpKind::Del => crate::api::change::BufferUpdate { hash, - }, - }; - let delta = Delta { - change: tc, - ack: BufferAck { - tx: self.ack_tx.clone(), - version: step_ver, + version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful + change: crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + }, }, }; self.local_version .send(new_local_v) .unwrap_or_warn("could not update local version"); - tx.send(Some(delta)) + tx.send(Some(tc)) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { tx.send(None) diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 63a5483..aa57fe9 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -7,8 +7,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use crate::{ api::{ - controller::{AsyncReceiver, AsyncSender, ControllerCallback}, - Controller, Cursor, + controller::{AsyncReceiver, AsyncSender, ControllerCallback}, cursor::Selection, Controller, Cursor }, errors::ControllerResult, }; @@ -34,25 +33,29 @@ pub(crate) struct CursorControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for CursorController {} +impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncSender for CursorController { - fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { - if cursor.start > cursor.end { - std::mem::swap(&mut cursor.start, &mut cursor.end); +impl AsyncSender for CursorController { + fn send(&self, mut cursor: Selection) -> ControllerResult<()> { + if cursor.start_row > cursor.end_row || ( + cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col + ) { + std::mem::swap(&mut cursor.start_row, &mut cursor.end_row); + std::mem::swap(&mut cursor.start_col, &mut cursor.end_col); } + Ok(self.0.op.send(CursorPosition { buffer: BufferNode { path: cursor.buffer, }, start: RowCol { - row: cursor.start.0, - col: cursor.start.1, + row: cursor.start_row, + col: cursor.start_col, }, end: RowCol { - row: cursor.end.0, - col: cursor.end.1, + row: cursor.end_row, + col: cursor.end_col, }, })?) } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index bd6b35b..91f07d5 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -5,7 +5,7 @@ use tonic::Streaming; use uuid::Uuid; use crate::{ - api::{controller::ControllerCallback, Cursor, User}, + api::{controller::ControllerCallback, cursor::Selection, Cursor, User}, ext::IgnorableError, }; use codemp_proto::cursor::{CursorEvent, CursorPosition}; @@ -86,16 +86,18 @@ impl CursorController { None => break, // clean exit, just weird that we got it here Some(controller) => { tracing::debug!("received cursor from server"); - let mut cursor = Cursor { - buffer: cur.position.buffer.path, - start: (cur.position.start.row, cur.position.start.col), - end: (cur.position.end.row, cur.position.end.col), - user: None, - }; let user_id = Uuid::from(cur.user); - if let Some(user) = worker.map.get(&user_id) { - cursor.user = Some(user.name.clone()); - } + let cursor = Cursor { + user: worker.map.get(&user_id).map(|u| u.name.clone()).unwrap_or_default(), + sel: Selection { + buffer: cur.position.buffer.path, + start_row: cur.position.start.row, + start_col: cur.position.start.col, + end_row: cur.position.end.row, + end_col: cur.position.end.col + } + }; + worker.store.push_back(cursor); for tx in worker.pollers.drain(..) { tx.send(()).unwrap_or_warn("poller dropped before unblocking"); diff --git a/src/prelude.rs b/src/prelude.rs index 0fec44d..f7d6fc1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -5,6 +5,8 @@ pub use crate::api::{ controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, + change::BufferUpdate as CodempBufferUpdate, + cursor::Selection as CodempSelection, }; pub use crate::{ From 6035c448fa325e73299b4f42ff454d64c01bb3e7 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 10 Oct 2024 02:12:24 +0200 Subject: [PATCH 12/18] chore: update glue code Co-authored-by: zaaarf --- dist/lua/annotations.lua | 46 ++++++++++++++++++----------------- dist/py/src/codemp/codemp.pyi | 19 ++++++++++----- src/ffi/java/buffer.rs | 8 +++--- src/ffi/java/mod.rs | 15 ------------ src/ffi/js/buffer.rs | 8 +++--- src/ffi/js/cursor.rs | 45 ++++------------------------------ src/ffi/lua/buffer.rs | 12 ++++++--- src/ffi/lua/cursor.rs | 26 ++++++++++++++------ src/ffi/lua/ext/callback.rs | 7 +++--- src/ffi/lua/ext/mod.rs | 10 -------- src/ffi/python/controllers.rs | 20 +++++++-------- 11 files changed, 90 insertions(+), 126 deletions(-) diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 895e324..4b2d317 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -135,28 +135,28 @@ function MaybeCursorPromise:cancel() end function MaybeCursorPromise:and_then(cb) end ----@class (exact) DeltaPromise : Promise -local DeltaPromise = {} +---@class (exact) BufferUpdatePromise : Promise +local BufferUpdatePromise = {} --- block until promise is ready and return value ---- @return Delta -function DeltaPromise:await() end +--- @return BufferUpdate +function BufferUpdatePromise:await() end --- cancel promise execution -function DeltaPromise:cancel() end ----@param cb fun(x: Delta) callback to invoke +function BufferUpdatePromise:cancel() end +---@param cb fun(x: BufferUpdate) callback to invoke ---invoke callback asynchronously as soon as promise is ready -function DeltaPromise:and_then(cb) end +function BufferUpdatePromise:and_then(cb) end ----@class (exact) MaybeDeltaPromise : Promise -local MaybeDeltaPromise = {} +---@class (exact) MaybeBufferUpdatePromise : Promise +local MaybeBufferUpdatePromise = {} --- block until promise is ready and return value ---- @return Delta | nil -function MaybeDeltaPromise:await() end +--- @return BufferUpdate | nil +function MaybeBufferUpdatePromise:await() end --- cancel promise execution -function MaybeDeltaPromise:cancel() end ----@param cb fun(x: Delta | nil) callback to invoke +function MaybeBufferUpdatePromise:cancel() end +---@param cb fun(x: BufferUpdate | nil) callback to invoke ---invoke callback asynchronously as soon as promise is ready -function MaybeDeltaPromise:and_then(cb) end +function MaybeBufferUpdatePromise:and_then(cb) end -- [[ END ASYNC STUFF ]] @@ -322,15 +322,13 @@ local BufferController = {} ---@field content string text content of change ---@field start integer start index of change ---@field finish integer end index of change ----@field hash integer? optional hash of text buffer after this change, for sync checks local TextChange = {} ----@class (exact) Delta +---@class (exact) BufferUpdate ---@field change TextChange text change for this delta -local Delta = {} - ----notify controller that this change has been correctly applied -function Delta:ack() end +---@field version [integer] CRDT version after this change +---@field hash integer? optional hash of text buffer after this change, for sync checks +local BufferUpdate = {} ---@param other string text to apply change to ---apply this text change to a string, returning the result @@ -343,13 +341,13 @@ function TextChange:apply(other) end ---update buffer with a text change; note that to delete content should be empty but not span, while to insert span should be empty but not content (can insert and delete at the same time) function BufferController:send(change) end ----@return MaybeDeltaPromise +---@return MaybeBufferUpdatePromise ---@async ---@nodiscard ---try to receive text changes, returning nil if none is available function BufferController:try_recv() end ----@return DeltaPromise +---@return BufferUpdatePromise ---@async ---@nodiscard ---block until next text change and return it @@ -374,6 +372,10 @@ function BufferController:callback(cb) end ---get current content of buffer controller, marking all pending changes as seen function BufferController:content() end +---@param version [integer] version to ack +---notify controller that this version's change has been correctly applied +function BufferController:ack(version) end + diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index ead8f3c..25b1ba8 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -92,13 +92,13 @@ class TextChange: def is_empty(self) -> bool: ... def apply(self, txt: str) -> str: ... -class Delta: +class BufferUpdate: """ - A single editor delta event, wrapping a TextChange and the corresponding ACK channel + A single editor delta event, wrapping a TextChange and the new version """ change: TextChange - - def ack(self,) -> str: ... + hash: Optional[int] + version: list[int] class BufferController: @@ -108,6 +108,7 @@ class BufferController: """ def path(self) -> str: ... def content(self) -> Promise[str]: ... + def ack(self, v: list[int]) -> None: ... def send(self, start: int, end: int, @@ -121,14 +122,20 @@ class BufferController: -class Cursor: +class Selection: """ An Editor agnostic cursor position representation """ start: Tuple[int, int] end: Tuple[int, int] buffer: str - user: Optional[str] # can be an empty string + +class Cursor: + """ + A remote cursor event + """ + user: str + sel: Selection class CursorController: diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index c700fde..e65f2b0 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -3,10 +3,8 @@ use jni_toolbox::jni; use crate::{ api::{ - controller::{AsyncReceiver, AsyncSender}, - TextChange, + change::BufferUpdate, controller::{AsyncReceiver, AsyncSender}, TextChange }, - buffer::controller::Delta, errors::ControllerError, }; @@ -28,13 +26,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +) -> Result, ControllerError> { super::tokio().block_on(controller.try_recv()) } /// Block until it receives a [TextChange]. #[jni(package = "mp.code", class = "BufferController")] -fn recv(controller: &mut crate::buffer::Controller) -> Result { +fn recv(controller: &mut crate::buffer::Controller) -> Result { super::tokio().block_on(controller.recv()) } diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 0fb0980..7d4021e 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -141,7 +141,6 @@ into_java_ptr_class!(crate::Client, "mp/code/Client"); into_java_ptr_class!(crate::Workspace, "mp/code/Workspace"); into_java_ptr_class!(crate::cursor::Controller, "mp/code/CursorController"); into_java_ptr_class!(crate::buffer::Controller, "mp/code/BufferController"); -into_java_ptr_class!(crate::buffer::controller::Delta, "mp/code/data/Delta"); impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::User { const CLASS: &'static str = "mp/code/data/User"; @@ -202,19 +201,6 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { ) -> Result, jni::errors::Error> { let content = env.new_string(self.content)?; - let hash_class = env.find_class("java/util/OptionalLong")?; - let hash = if let Some(h) = self.hash { - env.call_static_method( - hash_class, - "of", - "(J)Ljava/util/OptionalLong;", - &[jni::objects::JValueGen::Long(h)], - ) - } else { - env.call_static_method(hash_class, "empty", "()Ljava/util/OptionalLong;", &[]) - }? - .l()?; - let class = env.find_class(Self::CLASS)?; env.new_object( class, @@ -223,7 +209,6 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { jni::objects::JValueGen::Long(self.start.into()), jni::objects::JValueGen::Long(self.end.into()), jni::objects::JValueGen::Object(&content), - jni::objects::JValueGen::Object(&hash), ], ) } diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index ea28eb0..5bb8f21 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,6 +1,6 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::TextChange; -use crate::buffer::controller::{Delta, BufferController}; +use crate::api::change::{TextChange, BufferUpdate}; +use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; @@ -53,13 +53,13 @@ impl BufferController { #[napi(js_name = "try_recv")] pub async fn js_try_recv( &self, - ) -> napi::Result> { + ) -> napi::Result> { Ok(self.try_recv().await?) } /// Wait for next buffer event and return it #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result { + pub async fn js_recv(&self) -> napi::Result { Ok(self.recv().await?) } diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index 79af392..d1ae476 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -6,41 +6,6 @@ use napi::threadsafe_function::{ }; use napi_derive::napi; -#[napi(object, js_name = "Cursor")] -pub struct JsCursor { - /// range of text change, as char indexes in buffer previous state - pub start_row: i32, - pub start_col: i32, - pub end_row: i32, - pub end_col: i32, - pub buffer: String, - pub user: Option, -} - -impl From for crate::api::Cursor { - fn from(value: JsCursor) -> Self { - crate::api::Cursor { - start: (value.start_row, value.start_col), - end: (value.end_row, value.end_col), - buffer: value.buffer, - user: value.user, - } - } -} - -impl From for JsCursor { - fn from(value: crate::api::Cursor) -> Self { - JsCursor { - start_row: value.start.0, - start_col: value.start.1, - end_row: value.end.0, - end_col: value.end.1, - buffer: value.buffer, - user: value.user.map(|x| x.to_string()), - } - } -} - #[napi] impl CursorController { /// Register a callback to be called on receive. @@ -74,19 +39,19 @@ impl CursorController { /// Send a new cursor event to remote #[napi(js_name = "send")] - pub fn js_send(&self, pos: JsCursor) -> napi::Result<()> { - Ok(self.send(crate::api::Cursor::from(pos))?) + pub fn js_send(&self, sel: crate::api::cursor::Selection) -> napi::Result<()> { + Ok(self.send(sel)?) } /// Get next cursor event if available without blocking #[napi(js_name = "try_recv")] - pub async fn js_try_recv(&self) -> napi::Result> { - Ok(self.try_recv().await?.map(JsCursor::from)) + pub async fn js_try_recv(&self) -> napi::Result> { + Ok(self.try_recv().await?.map(crate::api::Cursor::from)) } /// Block until next #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result { + pub async fn js_recv(&self) -> napi::Result { Ok(self.recv().await?.into()) } } diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 635b10a..85f4971 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -1,4 +1,3 @@ -use crate::buffer::controller::Delta; use crate::prelude::*; use mlua::prelude::*; use mlua_codemp_patch as mlua; @@ -22,6 +21,7 @@ impl LuaUserData for CodempBufferController { ); methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); + methods.add_method_mut("ack", |_, this, (version,):(Vec,)| Ok(this.ack(version))); methods.add_method( "content", @@ -43,7 +43,6 @@ impl LuaUserData for CodempTextChange { fields.add_field_method_get("content", |_, this| Ok(this.content.clone())); fields.add_field_method_get("start", |_, this| Ok(this.start)); fields.add_field_method_get("end", |_, this| Ok(this.end)); - fields.add_field_method_get("hash", |_, this| Ok(this.hash)); // add a 'finish' accessor too because in Lua 'end' is reserved fields.add_field_method_get("finish", |_, this| Ok(this.end)); } @@ -56,12 +55,17 @@ impl LuaUserData for CodempTextChange { } } -impl LuaUserData for Delta { +from_lua_serde! { CodempBufferUpdate } +impl LuaUserData for CodempBufferUpdate { fn add_fields>(fields: &mut F) { + fields.add_field_method_get("hash", |_, this| Ok(this.hash)); + fields.add_field_method_get("version", |_, this| Ok(this.version.clone())); fields.add_field_method_get("change", |_, this| Ok(this.change.clone())); } fn add_methods>(methods: &mut M) { - methods.add_method_mut("ack", |_, this, ()| Ok(this.ack())); + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| { + Ok(format!("{:?}", this)) + }); } } diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 40df670..3215beb 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -4,7 +4,6 @@ use mlua_codemp_patch as mlua; use super::ext::a_sync::a_sync; use super::ext::from_lua_serde; -use super::ext::lua_tuple; impl LuaUserData for CodempCursorController { fn add_methods>(methods: &mut M) { @@ -12,7 +11,7 @@ impl LuaUserData for CodempCursorController { Ok(format!("{:?}", this)) }); - methods.add_method("send", |_, this, (cursor,): (CodempCursor,)| { + methods.add_method("send", |_, this, (cursor,): (CodempSelection,)| { Ok(this.send(cursor)?) }); methods.add_method( @@ -33,18 +32,31 @@ impl LuaUserData for CodempCursorController { from_lua_serde! { CodempCursor } impl LuaUserData for CodempCursor { + fn add_fields>(fields: &mut F) { + fields.add_field_method_get("user", |_, this| Ok(this.user.clone())); + fields.add_field_method_get("sel", |_, this| Ok(this.sel.clone())); + } + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| { Ok(format!("{:?}", this)) }); } +} +from_lua_serde! { CodempSelection } +impl LuaUserData for CodempSelection { fn add_fields>(fields: &mut F) { - fields.add_field_method_get("user", |_, this| Ok(this.user.clone())); fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone())); - fields.add_field_method_get("start", |lua, this| lua_tuple(lua, this.start)); - fields.add_field_method_get("end", |lua, this| lua_tuple(lua, this.end)); - // add a 'finish' accessor too because in Lua 'end' is reserved - fields.add_field_method_get("finish", |lua, this| lua_tuple(lua, this.end)); + fields.add_field_method_get("start_row", |_, this| Ok(this.start_row)); + fields.add_field_method_get("start_col", |_, this| Ok(this.start_col)); + fields.add_field_method_get("end_row", |_, this| Ok(this.end_row)); + fields.add_field_method_get("end_col", |_, this| Ok(this.end_col)); + } + + fn add_methods>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| { + Ok(format!("{:?}", this)) + }); } } diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index b67757d..7c5f360 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -1,4 +1,3 @@ -use crate::buffer::controller::Delta; use crate::ext::IgnorableError; use crate::prelude::*; use mlua::prelude::*; @@ -110,8 +109,10 @@ callback_args! { MaybeEvent: Option, Cursor: CodempCursor, MaybeCursor: Option, + Selection: CodempSelection, + MaybeSelection: Option, TextChange: CodempTextChange, MaybeTextChange: Option, - Delta: Delta, - MaybeDelta: Option, + BufferUpdate: CodempBufferUpdate, + MaybeBufferUpdate: Option, } diff --git a/src/ffi/lua/ext/mod.rs b/src/ffi/lua/ext/mod.rs index c30a742..1cb07c6 100644 --- a/src/ffi/lua/ext/mod.rs +++ b/src/ffi/lua/ext/mod.rs @@ -2,19 +2,9 @@ pub mod a_sync; pub mod callback; pub mod log; -use mlua::prelude::*; -use mlua_codemp_patch as mlua; - pub(crate) use a_sync::tokio; pub(crate) use callback::callback; -pub(crate) fn lua_tuple(lua: &Lua, (a, b): (T, T)) -> LuaResult { - let table = lua.create_table()?; - table.set(1, a)?; - table.set(2, b)?; - Ok(table) -} - macro_rules! from_lua_serde { ($($t:ty)*) => { $( diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 5b0381c..0d2ccf3 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,5 +1,5 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::Cursor; +use crate::api::cursor::{Cursor, Selection}; use crate::api::TextChange; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; @@ -20,11 +20,12 @@ impl CursorController { start: (i32, i32), end: (i32, i32), ) -> PyResult<()> { - let pos = Cursor { - start, - end, + let pos = Selection { + start_row: start.0, + start_col: start.1, + end_row: end.0, + end_col: end.1, buffer: path, - user: None, }; let this = self.clone(); this.send(pos)?; @@ -90,7 +91,6 @@ impl BufferController { start, end, content: txt, - hash: None, }; let this = self.clone(); this.send(op)?; @@ -143,21 +143,21 @@ impl BufferController { impl Cursor { #[getter(start)] fn pystart(&self) -> (i32, i32) { - self.start + (self.sel.start_row, self.sel.start_col) } #[getter(end)] fn pyend(&self) -> (i32, i32) { - self.end + (self.sel.end_row, self.sel.end_col) } #[getter(buffer)] fn pybuffer(&self) -> String { - self.buffer.clone() + self.sel.buffer.clone() } #[getter(user)] fn pyuser(&self) -> Option { - self.user.clone() + Some(self.user.clone()) } } From 51cff040ed41245a4eccf44f2734ab9d8195c4c3 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 10 Oct 2024 02:12:51 +0200 Subject: [PATCH 13/18] chore: unpin jni toolbox --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4bbbc02..d9f73c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ tracing-subscriber = { version = "0.3", optional = true } # glue (java) lazy_static = { version = "1.5", optional = true } jni = { version = "0.21", features = ["invocation"], optional = true } -jni-toolbox = { version = "0.2.0", optional = true, features = ["uuid"] } +jni-toolbox = { version = "0.2", optional = true, features = ["uuid"] } # glue (lua) mlua-codemp-patch = { version = "0.10.0-beta.2", features = ["module", "send", "serialize"], optional = true } From ae66f282d453c4bff1aab7a232e303e62f4cfc7a Mon Sep 17 00:00:00 2001 From: zaaarf Date: Thu, 10 Oct 2024 12:04:20 +0200 Subject: [PATCH 14/18] feat(java): implemented acking and send/recv separation --- dist/java/src/mp/code/BufferController.java | 26 +++-- dist/java/src/mp/code/CursorController.java | 7 +- dist/java/src/mp/code/data/BufferUpdate.java | 35 ++++++ dist/java/src/mp/code/data/Cursor.java | 34 +----- dist/java/src/mp/code/data/Delta.java | 37 ------ dist/java/src/mp/code/data/Selection.java | 42 +++++++ dist/java/src/mp/code/data/TextChange.java | 9 +- src/api/mod.rs | 2 + src/ffi/java/buffer.rs | 14 ++- src/ffi/java/cursor.rs | 5 +- src/ffi/java/mod.rs | 113 +++++++++++-------- src/ffi/java/workspace.rs | 6 +- 12 files changed, 193 insertions(+), 137 deletions(-) create mode 100644 dist/java/src/mp/code/data/BufferUpdate.java delete mode 100644 dist/java/src/mp/code/data/Delta.java create mode 100644 dist/java/src/mp/code/data/Selection.java diff --git a/dist/java/src/mp/code/BufferController.java b/dist/java/src/mp/code/BufferController.java index 66f4495..99570fa 100644 --- a/dist/java/src/mp/code/BufferController.java +++ b/dist/java/src/mp/code/BufferController.java @@ -1,5 +1,6 @@ package mp.code; +import mp.code.data.BufferUpdate; import mp.code.data.TextChange; import mp.code.exceptions.ControllerException; @@ -42,26 +43,26 @@ public final class BufferController { return get_content(this.ptr); } - private static native TextChange try_recv(long self) throws ControllerException; + private static native BufferUpdate try_recv(long self) throws ControllerException; /** - * Tries to get a {@link TextChange} from the queue if any were present, and returns + * Tries to get a {@link BufferUpdate} from the queue if any were present, and returns * an empty optional otherwise. * @return the first text change in queue, if any are present * @throws ControllerException if the controller was stopped */ - public Optional tryRecv() throws ControllerException { + public Optional tryRecv() throws ControllerException { return Optional.ofNullable(try_recv(this.ptr)); } - private static native TextChange recv(long self) throws ControllerException; + private static native BufferUpdate recv(long self) throws ControllerException; /** - * Blocks until a {@link TextChange} is available and returns it. + * Blocks until a {@link BufferUpdate} is available and returns it. * @return the text change update that occurred * @throws ControllerException if the controller was stopped */ - public TextChange recv() throws ControllerException { + public BufferUpdate recv() throws ControllerException { return recv(this.ptr); } @@ -78,7 +79,7 @@ public final class BufferController { private static native void callback(long self, Consumer cb); /** - * Registers a callback to be invoked whenever a {@link TextChange} occurs. + * Registers a callback to be invoked whenever a {@link BufferUpdate} occurs. * This will not work unless a Java thread has been dedicated to the event loop. * @see Extensions#drive(boolean) */ @@ -106,6 +107,17 @@ public final class BufferController { poll(this.ptr); } + private static native void ack(long self, long[] version); + + /** + * Acknowledges that a certain CRDT version has been correctly applied. + * @param version the version to acknowledge + * @see BufferUpdate#version + */ + public void ack(long[] version) { + ack(this.ptr, version); + } + private static native void free(long self); static { diff --git a/dist/java/src/mp/code/CursorController.java b/dist/java/src/mp/code/CursorController.java index 159e548..6c2cae1 100644 --- a/dist/java/src/mp/code/CursorController.java +++ b/dist/java/src/mp/code/CursorController.java @@ -1,6 +1,7 @@ package mp.code; import mp.code.data.Cursor; +import mp.code.data.Selection; import mp.code.exceptions.ControllerException; import java.util.Optional; @@ -42,13 +43,13 @@ public final class CursorController { return recv(this.ptr); } - private static native void send(long self, Cursor cursor) throws ControllerException; + private static native void send(long self, Selection cursor) throws ControllerException; /** - * Tries to send a {@link Cursor} update. + * Tries to send a {@link Selection} update. * @throws ControllerException if the controller was stopped */ - public void send(Cursor cursor) throws ControllerException { + public void send(Selection cursor) throws ControllerException { send(this.ptr, cursor); } diff --git a/dist/java/src/mp/code/data/BufferUpdate.java b/dist/java/src/mp/code/data/BufferUpdate.java new file mode 100644 index 0000000..9979411 --- /dev/null +++ b/dist/java/src/mp/code/data/BufferUpdate.java @@ -0,0 +1,35 @@ +package mp.code.data; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import mp.code.Extensions; + +import java.util.OptionalLong; + +/** + * A data class holding information about a buffer update. + */ +@ToString +@EqualsAndHashCode +@RequiredArgsConstructor +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") +public class BufferUpdate { + /** + * The hash of the content after applying it (calculated with {@link Extensions#hash(String)}). + * It is generally meaningless to send, but when received it is an invitation to check the hash + * and forcefully re-sync if necessary. + */ + public final OptionalLong hash; // xxh3 hash + + /** + * The CRDT version after the associated change has been applied. + * You MUST acknowledge that it was applied with {@link mp.code.BufferController#ack(long[])}. + */ + public final long[] version; + + /** + * The {@link TextChange} contained in this buffer update. + */ + public final TextChange change; +} diff --git a/dist/java/src/mp/code/data/Cursor.java b/dist/java/src/mp/code/data/Cursor.java index e132fc9..d52ad58 100644 --- a/dist/java/src/mp/code/data/Cursor.java +++ b/dist/java/src/mp/code/data/Cursor.java @@ -11,37 +11,13 @@ import lombok.ToString; @EqualsAndHashCode @RequiredArgsConstructor public class Cursor { - /** - * The starting row of the cursor position. - * If negative, it is clamped to 0. - */ - public final int startRow; - - /** - * The starting column of the cursor position. - * If negative, it is clamped to 0. - */ - public final int startCol; - - /** - * The ending row of the cursor position. - * If negative, it is clamped to 0. - */ - public final int endRow; - - /** - * The ending column of the cursor position. - * If negative, it is clamped to 0. - */ - public final int endCol; - - /** - * The buffer the cursor is located on. - */ - public final String buffer; - /** * The user who controls the cursor. */ public final String user; + + /** + * The associated selection update. + */ + public final Selection selection; } diff --git a/dist/java/src/mp/code/data/Delta.java b/dist/java/src/mp/code/data/Delta.java deleted file mode 100644 index e04af1c..0000000 --- a/dist/java/src/mp/code/data/Delta.java +++ /dev/null @@ -1,37 +0,0 @@ -package mp.code.data; - -import lombok.Getter; -import mp.code.data.Config; -import mp.code.data.User; -import mp.code.exceptions.ConnectionException; -import mp.code.exceptions.ConnectionRemoteException; - -import java.util.Optional; - -@Getter -public final class Delta { - private final long ptr; - - Delta(long ptr) { - this.ptr = ptr; - Extensions.CLEANER.register(this, () -> free(ptr)); - } - - private static native TextChange get_text_change(long self); - - public mp.code.data.TextChange getTextChange() { - return get_text_change(this.ptr); - } - - private static native void ack_native(long self, boolean success) throws ConnectionException; - - public void ack(boolean success) throws ConnectionException { - return ack_native(this.ptr, success); - } - - private static native void free(long self); - - static { - NativeUtils.loadLibraryIfNeeded(); - } -} diff --git a/dist/java/src/mp/code/data/Selection.java b/dist/java/src/mp/code/data/Selection.java new file mode 100644 index 0000000..cc31cd4 --- /dev/null +++ b/dist/java/src/mp/code/data/Selection.java @@ -0,0 +1,42 @@ +package mp.code.data; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * A data class holding information about a cursor selection. + */ +@ToString +@EqualsAndHashCode +@RequiredArgsConstructor +public class Selection { + /** + * The starting row of the cursor position. + * If negative, it is clamped to 0. + */ + public final int startRow; + + /** + * The starting column of the cursor position. + * If negative, it is clamped to 0. + */ + public final int startCol; + + /** + * The ending row of the cursor position. + * If negative, it is clamped to 0. + */ + public final int endRow; + + /** + * The ending column of the cursor position. + * If negative, it is clamped to 0. + */ + public final int endCol; + + /** + * The buffer the cursor is located on. + */ + public final String buffer; +} diff --git a/dist/java/src/mp/code/data/TextChange.java b/dist/java/src/mp/code/data/TextChange.java index 33e7561..3c0a0f9 100644 --- a/dist/java/src/mp/code/data/TextChange.java +++ b/dist/java/src/mp/code/data/TextChange.java @@ -22,7 +22,7 @@ public class TextChange { public final long start; /** - * The endomg position of the change. + * The ending position of the change. * If negative, it is clamped to 0. */ public final long end; @@ -33,13 +33,6 @@ public class TextChange { */ public final String content; - /** - * The hash of the content after applying it (calculated with {@link Extensions#hash(String)}). - * It is generally meaningless to send, but when received it is an invitation to check the hash - * and forcefully re-sync if necessary. - */ - public final OptionalLong hash; // xxh3 hash - /** * Checks if the change represents a deletion. * It does if the starting index is lower than the ending index. diff --git a/src/api/mod.rs b/src/api/mod.rs index 54be1da..158a4d0 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -19,9 +19,11 @@ pub mod event; /// data structure for remote users pub mod user; +pub use change::BufferUpdate; pub use change::TextChange; pub use config::Config; pub use controller::Controller; pub use cursor::Cursor; +pub use cursor::Selection; pub use event::Event; pub use user::User; diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index e65f2b0..ef70838 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -3,7 +3,9 @@ use jni_toolbox::jni; use crate::{ api::{ - change::BufferUpdate, controller::{AsyncReceiver, AsyncSender}, TextChange + controller::{AsyncReceiver, AsyncSender}, + BufferUpdate, + TextChange }, errors::ControllerError, }; @@ -24,9 +26,7 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +fn try_recv(controller: &mut crate::buffer::Controller) -> Result, ControllerError> { super::tokio().block_on(controller.try_recv()) } @@ -98,6 +98,12 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro super::tokio().block_on(controller.poll()) } +/// Acknowledge that a change has been correctly applied. +#[jni(package = "mp.code", class = "BufferController")] +fn ack(controller: &mut crate::buffer::Controller, version: Vec) { + controller.ack(version) +} + /// Called by the Java GC to drop a [crate::buffer::Controller]. #[jni(package = "mp.code", class = "BufferController")] fn free(input: jni::sys::jlong) { diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 57fa1d5..2878391 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -2,6 +2,7 @@ use crate::{ api::{ controller::{AsyncReceiver, AsyncSender}, Cursor, + Selection }, errors::ControllerError, }; @@ -24,8 +25,8 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result Result<(), ControllerError> { - controller.send(cursor) +fn send(controller: &mut crate::cursor::Controller, sel: Selection) -> Result<(), ControllerError> { + controller.send(sel) } /// Register a callback for cursor changes. diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 7d4021e..2472cea 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -79,6 +79,7 @@ macro_rules! null_check { } }; } + pub(crate) use null_check; impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError { @@ -193,6 +194,42 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event { } } +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::BufferUpdate { + const CLASS: &'static str = "mp/code/data/BufferUpdate"; + fn into_java_object( + self, + env: &mut jni::JNIEnv<'j>, + ) -> Result, jni::errors::Error> { + let class = env.find_class(Self::CLASS)?; + + let hash_class = env.find_class("java/util/OptionalLong")?; + let hash = if let Some(h) = self.hash { + env.call_static_method( + hash_class, + "of", + "(J)Ljava/util/OptionalLong;", + &[jni::objects::JValueGen::Long(h)], + ) + } else { + env.call_static_method(hash_class, "empty", "()Ljava/util/OptionalLong;", &[]) + }? + .l()?; + + let version = self.version.into_java_object(env)?; + let change = self.change.into_java_object(env)?; + + env.new_object( + class, + "(Ljava/util/OptionalLong;[JLmp/code/data/TextChange;)V", + &[ + jni::objects::JValueGen::Object(&hash), + jni::objects::JValueGen::Object(&version), + jni::objects::JValueGen::Object(&change), + ], + ) + } +} + impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { const CLASS: &'static str = "mp/code/data/TextChange"; fn into_java_object( @@ -200,11 +237,10 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { env: &mut jni::JNIEnv<'j>, ) -> Result, jni::errors::Error> { let content = env.new_string(self.content)?; - let class = env.find_class(Self::CLASS)?; env.new_object( class, - "(JJLjava/lang/String;Ljava/util/OptionalLong;)V", + "(JJLjava/lang/String;)V", &[ jni::objects::JValueGen::Long(self.start.into()), jni::objects::JValueGen::Long(self.end.into()), @@ -220,24 +256,39 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor { self, env: &mut jni::JNIEnv<'j>, ) -> Result, jni::errors::Error> { - let class = env.find_class("mp/code/data/Cursor")?; - let buffer = env.new_string(&self.buffer)?; - let user = if let Some(user) = self.user { - env.new_string(user)?.into() - } else { - jni::objects::JObject::null() - }; + let class = env.find_class(Self::CLASS)?; + let user = env.new_string(&self.user)?; + let sel = self.sel.into_java_object(env)?; env.new_object( class, - "(IIIILjava/lang/String;Ljava/lang/String;)V", + "(Ljava/lang/String;Lmp/code/data/Selection;)V", &[ - jni::objects::JValueGen::Int(self.start.0), - jni::objects::JValueGen::Int(self.start.1), - jni::objects::JValueGen::Int(self.end.0), - jni::objects::JValueGen::Int(self.end.1), - jni::objects::JValueGen::Object(&buffer), jni::objects::JValueGen::Object(&user), + jni::objects::JValueGen::Object(&sel), + ], + ) + } +} + +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Selection { + const CLASS: &'static str = "mp/code/data/Selection"; + fn into_java_object( + self, + env: &mut jni::JNIEnv<'j>, + ) -> Result, jni::errors::Error> { + let class = env.find_class(Self::CLASS)?; + let buffer = env.new_string(&self.buffer)?; + + env.new_object( + class, + "(IIIILjava/lang/String;)V", + &[ + jni::objects::JValueGen::Int(self.start_row), + jni::objects::JValueGen::Int(self.start_col), + jni::objects::JValueGen::Int(self.end_row), + jni::objects::JValueGen::Int(self.end_col), + jni::objects::JValueGen::Object(&buffer), ], ) } @@ -261,7 +312,6 @@ from_java_ptr!(crate::Client); from_java_ptr!(crate::Workspace); from_java_ptr!(crate::cursor::Controller); from_java_ptr!(crate::buffer::Controller); -from_java_ptr!(crate::buffer::controller::Delta); impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { type From = jni::objects::JObject<'j>; @@ -350,7 +400,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { } } -impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { +impl<'j> jni_toolbox::FromJava<'j> for crate::api::Selection { type From = jni::objects::JObject<'j>; fn from_java( env: &mut jni::JNIEnv<'j>, @@ -371,21 +421,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - let user = { - let jfield = env.get_field(&cursor, "user", "Ljava/lang/String;")?.l()?; - if jfield.is_null() { - None - } else { - Some(unsafe { env.get_string_unchecked(&jfield.into()) }?.into()) - } - }; - - Ok(Self { - start: (start_row, start_col), - end: (end_row, end_col), - buffer, - user, - }) + Ok(Self { start_row, start_col, end_row, end_col, buffer }) } } @@ -414,21 +450,10 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - let hash = { - let jfield = env - .get_field(&change, "hash", "Ljava/util/OptionalLong;")? - .l()?; - if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? { - Some(env.call_method(&jfield, "getAsLong", "()J", &[])?.j()?) - } else { - None - } - }; Ok(Self { start, end, - content, - hash, + content }) } } diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 4bd3da8..acdc36f 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -91,7 +91,7 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr super::tokio().block_on(workspace.delete(&path)) } -/// Block and receive a workspace event +/// Block and receive a workspace event. #[jni(package = "mp.code", class = "Workspace")] fn recv(workspace: &mut Workspace) -> Result { super::tokio().block_on(workspace.recv()) @@ -103,13 +103,13 @@ fn try_recv(workspace: &mut Workspace) -> Result, Cont super::tokio().block_on(workspace.try_recv()) } -/// Block until a workspace event is available +/// Block until a workspace event is available. #[jni(package = "mp.code", class = "Workspace")] fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { super::tokio().block_on(workspace.poll()) } -/// Clear previously registered callback +/// Clear previously registered callback. #[jni(package = "mp.code", class = "Workspace")] fn clear_callback(workspace: &mut Workspace) { workspace.clear_callback(); From ce8dcc8b8cfb63e06eb8b5ea6a1588f9d3a055d3 Mon Sep 17 00:00:00 2001 From: zaaarf Date: Thu, 10 Oct 2024 12:14:44 +0200 Subject: [PATCH 15/18] chore: cleanup --- src/api/config.rs | 18 +++++++++--------- src/api/cursor.rs | 6 +++--- src/buffer/controller.rs | 6 ++---- src/buffer/worker.rs | 17 +++++++---------- src/cursor/controller.rs | 5 ++++- src/cursor/worker.rs | 2 +- src/ffi/js/buffer.rs | 2 +- src/ffi/js/cursor.rs | 2 +- src/ffi/python/controllers.rs | 2 +- src/prelude.rs | 4 ++-- 10 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/api/config.rs b/src/api/config.rs index 23dc7ef..b23d962 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -1,11 +1,11 @@ //! # Config //! Data structure defining clients configuration -/// Configuration struct for `codemp` client +/// Configuration struct for the `codemp` client. /// -/// username and password are required fields, while everything else is optional +/// `username` and `password` are required fields, everything else is optional. /// -/// host, port and tls affect all connections to all grpc services +/// `host`, `port` and `tls` affect all connections to all gRPC services; the /// resulting endpoint is composed like this: /// http{tls?'s':''}://{host}:{port} #[derive(Clone, Debug)] @@ -16,20 +16,20 @@ )] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] pub struct Config { - /// user identifier used to register, possibly your email + /// User identifier used to register, possibly your email. pub username: String, - /// user password chosen upon registration + /// User password chosen upon registration. pub password: String, - /// address of server to connect to, default api.code.mp + /// Address of server to connect to, default api.code.mp. pub host: Option, - /// port to connect to, default 50053 + /// Port to connect to, default 50053. pub port: Option, - /// enable or disable tls, default true + /// Enable or disable tls, default true. pub tls: Option, } impl Config { - /// construct a new Config object, with given username and password + /// Construct a new Config object, with given username and password. pub fn new(username: impl ToString, password: impl ToString) -> Self { Self { username: username.to_string(), diff --git a/src/api/cursor.rs b/src/api/cursor.rs index 93226f5..89ecd5e 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -4,7 +4,7 @@ #[cfg(any(feature = "py", feature = "py-noabi"))] use pyo3::prelude::*; -/// User cursor position in a buffer +/// An event that occurred about a user's cursor. #[derive(Clone, Debug, Default)] #[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] @@ -13,11 +13,11 @@ use pyo3::prelude::*; pub struct Cursor { /// User who sent the cursor. pub user: String, - /// Cursor selection + /// The updated cursor selection. pub sel: Selection, } -/// A cursor selection span, with row-column tuples +/// A cursor selection span. #[derive(Clone, Debug, Default)] #[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index e452b2a..bd860d7 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,14 +6,12 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; -use crate::api::change::BufferUpdate; +use crate::api::BufferUpdate; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; use crate::ext::IgnorableError; -use super::worker::DeltaRequest; - /// A [Controller] to asynchronously interact with remote buffers. /// /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync @@ -54,7 +52,7 @@ pub(crate) struct BufferControllerInner { pub(crate) ops_in: mpsc::UnboundedSender, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, - pub(crate) delta_request: mpsc::Sender, + pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender>)>, pub(crate) callback: watch::Sender>>, pub(crate) ack_tx: mpsc::UnboundedSender, } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index a021174..bfa372c 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,7 +6,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::api::change::BufferUpdate; +use crate::api::BufferUpdate; use crate::api::controller::ControllerCallback; use crate::api::TextChange; use crate::ext::IgnorableError; @@ -15,9 +15,6 @@ use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; -pub(crate) type DeltaOp = Option; -pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); - struct BufferWorker { user_id: Uuid, path: String, @@ -28,7 +25,7 @@ struct BufferWorker { poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, - delta_req: mpsc::Receiver, + delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender>)>, controller: std::sync::Weak, callback: watch::Receiver>>, oplog: OpLog, @@ -215,7 +212,7 @@ impl BufferWorker { } } - async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { + async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender>) { if let Some((lv, Some(dtop))) = self .oplog .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) @@ -240,10 +237,10 @@ impl BufferWorker { { tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); } - crate::api::change::BufferUpdate { + crate::api::BufferUpdate { hash, version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful - change: crate::api::change::TextChange { + change: crate::api::TextChange { start: dtop.start() as u32, end: dtop.start() as u32, content: dtop.content_as_str().unwrap_or_default().to_string(), @@ -251,10 +248,10 @@ impl BufferWorker { } } - diamond_types::list::operation::OpKind::Del => crate::api::change::BufferUpdate { + diamond_types::list::operation::OpKind::Del => crate::api::BufferUpdate { hash, version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful - change: crate::api::change::TextChange { + change: crate::api::TextChange { start: dtop.start() as u32, end: dtop.end() as u32, content: dtop.content_as_str().unwrap_or_default().to_string(), diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index aa57fe9..211fbc2 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -7,7 +7,10 @@ use tokio::sync::{mpsc, oneshot, watch}; use crate::{ api::{ - controller::{AsyncReceiver, AsyncSender, ControllerCallback}, cursor::Selection, Controller, Cursor + controller::{AsyncReceiver, AsyncSender, ControllerCallback}, + Controller, + Cursor, + Selection }, errors::ControllerResult, }; diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 91f07d5..a690a9a 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -5,7 +5,7 @@ use tonic::Streaming; use uuid::Uuid; use crate::{ - api::{controller::ControllerCallback, cursor::Selection, Cursor, User}, + api::{controller::ControllerCallback, Cursor, Selection, User}, ext::IgnorableError, }; use codemp_proto::cursor::{CursorEvent, CursorPosition}; diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index 5bb8f21..8499a45 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,5 +1,5 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::change::{TextChange, BufferUpdate}; +use crate::api::{TextChange, BufferUpdate}; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index d1ae476..9131abc 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -39,7 +39,7 @@ impl CursorController { /// Send a new cursor event to remote #[napi(js_name = "send")] - pub fn js_send(&self, sel: crate::api::cursor::Selection) -> napi::Result<()> { + pub fn js_send(&self, sel: crate::api::Selection) -> napi::Result<()> { Ok(self.send(sel)?) } diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 0d2ccf3..6c1f188 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,5 +1,5 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::cursor::{Cursor, Selection}; +use crate::api::{Cursor, Selection}; use crate::api::TextChange; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; diff --git a/src/prelude.rs b/src/prelude.rs index f7d6fc1..35003a8 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -5,8 +5,8 @@ pub use crate::api::{ controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, - change::BufferUpdate as CodempBufferUpdate, - cursor::Selection as CodempSelection, + BufferUpdate as CodempBufferUpdate, + Selection as CodempSelection, }; pub use crate::{ From 721d71dd18dd3bdf9c544c09e2a0bd1fe9738faa Mon Sep 17 00:00:00 2001 From: zaaarf Date: Thu, 10 Oct 2024 12:25:06 +0200 Subject: [PATCH 16/18] fix: doctests --- src/api/change.rs | 13 +++++++------ src/buffer/controller.rs | 2 +- src/lib.rs | 8 ++++---- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/api/change.rs b/src/api/change.rs index d4b342e..6270e37 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -29,20 +29,21 @@ pub struct BufferUpdate { /// new buffer, but smaller changes are efficient and easy to create or apply. /// /// ### Examples -/// To insert 'a' after 4th character we should send a. +/// To insert 'a' after 4th character we should send: /// ``` -/// TextChange { start: 4, end: 4, content: "a".into(), hash: None } +/// codemp::api::TextChange { start: 4, end: 4, content: "a".into() }; /// ``` /// -/// To delete a the fourth character we should send a. +/// To delete the fourth character we should send: /// ``` -/// TextChange { start: 3, end: 4, content: "".into(), hash: None } +/// codemp::api::TextChange { start: 3, end: 4, content: "".into() }; /// ``` /// /// ``` /// let change = codemp::api::TextChange { -/// start: 6, end: 11, -/// content: "mom".to_string(), hash: None +/// start: 6, +/// end: 11, +/// content: "mom".to_string() /// }; /// let before = "hello world!"; /// let after = change.apply(before); diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index bd860d7..a3cd3cf 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -36,7 +36,7 @@ impl BufferController { } /// Notify CRDT that changes up to the given version have been merged succesfully. - pub fn ack(&mut self, version: Vec) { + pub fn ack(&self, version: Vec) { let version = version.into_iter().map(|x| usize::from_ne_bytes(x.to_ne_bytes())).collect(); self.0.ack_tx .send(version) diff --git a/src/lib.rs b/src/lib.rs index edfda76..40c6905 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,7 +53,7 @@ //! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods //! let cursor = workspace.cursor(); //! let event = cursor.recv().await.expect("disconnected while waiting for event!"); -//! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); +//! println!("user {} moved on buffer {}", event.user, event.sel.buffer); //! # }; //! ``` //! @@ -69,12 +69,12 @@ //! # use codemp::api::controller::{AsyncSender, AsyncReceiver}; //! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach"); //! buffer.content(); // force-sync -//! if let Some(mut delta) = buffer.try_recv().await.unwrap() { +//! if let Some(mut update) = buffer.try_recv().await.unwrap() { //! println!( //! "content: {}, span: {}-{}", -//! delta.change.content, delta.change.start, delta.change.end +//! update.change.content, update.change.start, update.change.end //! ); -//! delta.ack(); +//! buffer.ack(update.version); //! } // if None, no changes are currently available //! # }; //! ``` From 9e977adcdd51ee08ea1bdd34431c2d68da30856f Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 10 Oct 2024 12:46:56 +0200 Subject: [PATCH 17/18] chore: cargo fmt --- src/buffer/controller.rs | 12 ++++++++---- src/buffer/worker.rs | 20 +++++++++++++++----- src/cursor/controller.rs | 10 ++++------ src/ffi/java/buffer.rs | 7 ++++--- src/ffi/java/cursor.rs | 3 +-- src/ffi/java/mod.rs | 10 ++++++++-- src/ffi/js/buffer.rs | 6 ++---- src/ffi/lua/buffer.rs | 16 +++++++++------- src/ffi/lua/cursor.rs | 8 ++++---- src/ffi/lua/workspace.rs | 12 +++++------- src/ffi/python/controllers.rs | 2 +- src/prelude.rs | 7 +++---- 12 files changed, 64 insertions(+), 49 deletions(-) diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index a3cd3cf..0065e38 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,8 +6,8 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; -use crate::api::BufferUpdate; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; +use crate::api::BufferUpdate; use crate::api::TextChange; use crate::errors::ControllerResult; use crate::ext::IgnorableError; @@ -35,10 +35,14 @@ impl BufferController { Ok(content) } - /// Notify CRDT that changes up to the given version have been merged succesfully. + /// Notify CRDT that changes up to the given version have been merged succesfully. pub fn ack(&self, version: Vec) { - let version = version.into_iter().map(|x| usize::from_ne_bytes(x.to_ne_bytes())).collect(); - self.0.ack_tx + let version = version + .into_iter() + .map(|x| usize::from_ne_bytes(x.to_ne_bytes())) + .collect(); + self.0 + .ack_tx .send(version) .unwrap_or_warn("no worker to receive sent ack"); } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index bfa372c..a77fa98 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,8 +6,8 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; -use crate::api::BufferUpdate; use crate::api::controller::ControllerCallback; +use crate::api::BufferUpdate; use crate::api::TextChange; use crate::ext::IgnorableError; @@ -212,7 +212,11 @@ impl BufferWorker { } } - async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender>) { + async fn handle_delta_request( + &mut self, + last_ver: LocalVersion, + tx: oneshot::Sender>, + ) { if let Some((lv, Some(dtop))) = self .oplog .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) @@ -239,18 +243,24 @@ impl BufferWorker { } crate::api::BufferUpdate { hash, - version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful + version: step_ver + .into_iter() + .map(|x| i64::from_ne_bytes(x.to_ne_bytes())) + .collect(), // TODO this is wasteful change: crate::api::TextChange { start: dtop.start() as u32, end: dtop.start() as u32, content: dtop.content_as_str().unwrap_or_default().to_string(), - } + }, } } diamond_types::list::operation::OpKind::Del => crate::api::BufferUpdate { hash, - version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful + version: step_ver + .into_iter() + .map(|x| i64::from_ne_bytes(x.to_ne_bytes())) + .collect(), // TODO this is wasteful change: crate::api::TextChange { start: dtop.start() as u32, end: dtop.end() as u32, diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 211fbc2..fd22375 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -8,9 +8,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use crate::{ api::{ controller::{AsyncReceiver, AsyncSender, ControllerCallback}, - Controller, - Cursor, - Selection + Controller, Cursor, Selection, }, errors::ControllerResult, }; @@ -41,9 +39,9 @@ impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for CursorController { fn send(&self, mut cursor: Selection) -> ControllerResult<()> { - if cursor.start_row > cursor.end_row || ( - cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col - ) { + if cursor.start_row > cursor.end_row + || (cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col) + { std::mem::swap(&mut cursor.start_row, &mut cursor.end_row); std::mem::swap(&mut cursor.start_col, &mut cursor.end_col); } diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index ef70838..5f06f8e 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -4,8 +4,7 @@ use jni_toolbox::jni; use crate::{ api::{ controller::{AsyncReceiver, AsyncSender}, - BufferUpdate, - TextChange + BufferUpdate, TextChange, }, errors::ControllerError, }; @@ -26,7 +25,9 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +fn try_recv( + controller: &mut crate::buffer::Controller, +) -> Result, ControllerError> { super::tokio().block_on(controller.try_recv()) } diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 2878391..e08a788 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -1,8 +1,7 @@ use crate::{ api::{ controller::{AsyncReceiver, AsyncSender}, - Cursor, - Selection + Cursor, Selection, }, errors::ControllerError, }; diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 2472cea..51bd3a5 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -421,7 +421,13 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Selection { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - Ok(Self { start_row, start_col, end_row, end_col, buffer }) + Ok(Self { + start_row, + start_col, + end_row, + end_col, + buffer, + }) } } @@ -453,7 +459,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange { Ok(Self { start, end, - content + content, }) } } diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index 8499a45..fc9b961 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,5 +1,5 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::{TextChange, BufferUpdate}; +use crate::api::{BufferUpdate, TextChange}; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, @@ -51,9 +51,7 @@ impl BufferController { /// Return next buffer event if present #[napi(js_name = "try_recv")] - pub async fn js_try_recv( - &self, - ) -> napi::Result> { + pub async fn js_try_recv(&self) -> napi::Result> { Ok(self.try_recv().await?) } diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 85f4971..5bc0813 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -11,9 +11,9 @@ impl LuaUserData for CodempBufferController { Ok(format!("{:?}", this)) }); - methods.add_method("send", |_, this, (change,): (CodempTextChange,)| + methods.add_method("send", |_, this, (change,): (CodempTextChange,)| { Ok(this.send(change)?) - ); + }); methods.add_method( "try_recv", @@ -21,7 +21,9 @@ impl LuaUserData for CodempBufferController { ); methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method_mut("ack", |_, this, (version,):(Vec,)| Ok(this.ack(version))); + methods.add_method_mut("ack", |_, this, (version,): (Vec,)| { + Ok(this.ack(version)) + }); methods.add_method( "content", @@ -29,11 +31,11 @@ impl LuaUserData for CodempBufferController { ); methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); - methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| - Ok(this.callback(move |controller: CodempBufferController| + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { + Ok(this.callback(move |controller: CodempBufferController| { super::ext::callback().invoke(cb.clone(), controller) - )) - ); + })) + }); } } diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 3215beb..86a3a33 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -22,11 +22,11 @@ impl LuaUserData for CodempCursorController { methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); - methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| - Ok(this.callback(move |controller: CodempCursorController| + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { + Ok(this.callback(move |controller: CodempCursorController| { super::ext::callback().invoke(cb.clone(), controller) - )) - ); + })) + }); } } diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index 5aa4b82..13d7e84 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -60,15 +60,13 @@ impl LuaUserData for CodempWorkspace { methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| - Ok(this.callback(move |controller: CodempWorkspace| + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { + Ok(this.callback(move |controller: CodempWorkspace| { super::ext::callback().invoke(cb.clone(), controller) - )) - ); + })) + }); - methods.add_method("clear_callbacl", |_, this, ()| - Ok(this.clear_callback()) - ); + methods.add_method("clear_callbacl", |_, this, ()| Ok(this.clear_callback())); } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 6c1f188..9d9d5f3 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,6 +1,6 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::{Cursor, Selection}; use crate::api::TextChange; +use crate::api::{Cursor, Selection}; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use pyo3::exceptions::PyValueError; diff --git a/src/prelude.rs b/src/prelude.rs index 35003a8..71bb813 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,10 +3,9 @@ pub use crate::api::{ controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, - Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, - Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, - BufferUpdate as CodempBufferUpdate, - Selection as CodempSelection, + BufferUpdate as CodempBufferUpdate, Config as CodempConfig, Controller as CodempController, + Cursor as CodempCursor, Event as CodempEvent, Selection as CodempSelection, + TextChange as CodempTextChange, User as CodempUser, }; pub use crate::{ From 0005a797972cd5932313b87ffb96e33ec3eb5fb1 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 10 Oct 2024 13:01:12 +0200 Subject: [PATCH 18/18] chore: fix lua annotations --- dist/lua/annotations.lua | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 4b2d317..71b5ea3 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -326,7 +326,7 @@ local TextChange = {} ---@class (exact) BufferUpdate ---@field change TextChange text change for this delta ----@field version [integer] CRDT version after this change +---@field version table CRDT version after this change ---@field hash integer? optional hash of text buffer after this change, for sync checks local BufferUpdate = {} @@ -383,14 +383,19 @@ function BufferController:ack(version) end ---handle to a workspace's cursor channel, allowing send/recv operations local CursorController = {} ----@class Cursor ----@field user string? id of user owning this cursor +---@class Selection ---@field buffer string relative path ("name") of buffer on which this cursor is ----@field start [integer, integer] cursor start position ----@field finish [integer, integer] cursor end position ----a cursor position +---@field start_row integer +---@field start_col integer +---@field end_row integer +---@field end_col integer +---a cursor selected region, as row-col indices ----@param cursor Cursor cursor event to broadcast +---@class Cursor +---@field user string id of user owning this cursor +---@field sel Selection selected region for this user + +---@param cursor Selection cursor position to broadcast ---@return NilPromise ---@async ---@nodiscard