diff --git a/Cargo.lock b/Cargo.lock index 8d50945..bb7e87a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,7 +231,7 @@ dependencies = [ [[package]] name = "codemp" -version = "0.7.2" +version = "0.7.3" dependencies = [ "async-trait", "codemp-proto", diff --git a/Cargo.toml b/Cargo.toml index 4bbbc02..d9f73c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ tracing-subscriber = { version = "0.3", optional = true } # glue (java) lazy_static = { version = "1.5", optional = true } jni = { version = "0.21", features = ["invocation"], optional = true } -jni-toolbox = { version = "0.2.0", optional = true, features = ["uuid"] } +jni-toolbox = { version = "0.2", optional = true, features = ["uuid"] } # glue (lua) mlua-codemp-patch = { version = "0.10.0-beta.2", features = ["module", "send", "serialize"], optional = true } diff --git a/dist/java/src/mp/code/BufferController.java b/dist/java/src/mp/code/BufferController.java index 66f4495..99570fa 100644 --- a/dist/java/src/mp/code/BufferController.java +++ b/dist/java/src/mp/code/BufferController.java @@ -1,5 +1,6 @@ package mp.code; +import mp.code.data.BufferUpdate; import mp.code.data.TextChange; import mp.code.exceptions.ControllerException; @@ -42,26 +43,26 @@ public final class BufferController { return get_content(this.ptr); } - private static native TextChange try_recv(long self) throws ControllerException; + private static native BufferUpdate try_recv(long self) throws ControllerException; /** - * Tries to get a {@link TextChange} from the queue if any were present, and returns + * Tries to get a {@link BufferUpdate} from the queue if any were present, and returns * an empty optional otherwise. * @return the first text change in queue, if any are present * @throws ControllerException if the controller was stopped */ - public Optional tryRecv() throws ControllerException { + public Optional tryRecv() throws ControllerException { return Optional.ofNullable(try_recv(this.ptr)); } - private static native TextChange recv(long self) throws ControllerException; + private static native BufferUpdate recv(long self) throws ControllerException; /** - * Blocks until a {@link TextChange} is available and returns it. + * Blocks until a {@link BufferUpdate} is available and returns it. * @return the text change update that occurred * @throws ControllerException if the controller was stopped */ - public TextChange recv() throws ControllerException { + public BufferUpdate recv() throws ControllerException { return recv(this.ptr); } @@ -78,7 +79,7 @@ public final class BufferController { private static native void callback(long self, Consumer cb); /** - * Registers a callback to be invoked whenever a {@link TextChange} occurs. + * Registers a callback to be invoked whenever a {@link BufferUpdate} occurs. * This will not work unless a Java thread has been dedicated to the event loop. * @see Extensions#drive(boolean) */ @@ -106,6 +107,17 @@ public final class BufferController { poll(this.ptr); } + private static native void ack(long self, long[] version); + + /** + * Acknowledges that a certain CRDT version has been correctly applied. + * @param version the version to acknowledge + * @see BufferUpdate#version + */ + public void ack(long[] version) { + ack(this.ptr, version); + } + private static native void free(long self); static { diff --git a/dist/java/src/mp/code/CursorController.java b/dist/java/src/mp/code/CursorController.java index 159e548..6c2cae1 100644 --- a/dist/java/src/mp/code/CursorController.java +++ b/dist/java/src/mp/code/CursorController.java @@ -1,6 +1,7 @@ package mp.code; import mp.code.data.Cursor; +import mp.code.data.Selection; import mp.code.exceptions.ControllerException; import java.util.Optional; @@ -42,13 +43,13 @@ public final class CursorController { return recv(this.ptr); } - private static native void send(long self, Cursor cursor) throws ControllerException; + private static native void send(long self, Selection cursor) throws ControllerException; /** - * Tries to send a {@link Cursor} update. + * Tries to send a {@link Selection} update. * @throws ControllerException if the controller was stopped */ - public void send(Cursor cursor) throws ControllerException { + public void send(Selection cursor) throws ControllerException { send(this.ptr, cursor); } diff --git a/dist/java/src/mp/code/data/BufferUpdate.java b/dist/java/src/mp/code/data/BufferUpdate.java new file mode 100644 index 0000000..9979411 --- /dev/null +++ b/dist/java/src/mp/code/data/BufferUpdate.java @@ -0,0 +1,35 @@ +package mp.code.data; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import mp.code.Extensions; + +import java.util.OptionalLong; + +/** + * A data class holding information about a buffer update. + */ +@ToString +@EqualsAndHashCode +@RequiredArgsConstructor +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") +public class BufferUpdate { + /** + * The hash of the content after applying it (calculated with {@link Extensions#hash(String)}). + * It is generally meaningless to send, but when received it is an invitation to check the hash + * and forcefully re-sync if necessary. + */ + public final OptionalLong hash; // xxh3 hash + + /** + * The CRDT version after the associated change has been applied. + * You MUST acknowledge that it was applied with {@link mp.code.BufferController#ack(long[])}. + */ + public final long[] version; + + /** + * The {@link TextChange} contained in this buffer update. + */ + public final TextChange change; +} diff --git a/dist/java/src/mp/code/data/Cursor.java b/dist/java/src/mp/code/data/Cursor.java index e132fc9..d52ad58 100644 --- a/dist/java/src/mp/code/data/Cursor.java +++ b/dist/java/src/mp/code/data/Cursor.java @@ -11,37 +11,13 @@ import lombok.ToString; @EqualsAndHashCode @RequiredArgsConstructor public class Cursor { - /** - * The starting row of the cursor position. - * If negative, it is clamped to 0. - */ - public final int startRow; - - /** - * The starting column of the cursor position. - * If negative, it is clamped to 0. - */ - public final int startCol; - - /** - * The ending row of the cursor position. - * If negative, it is clamped to 0. - */ - public final int endRow; - - /** - * The ending column of the cursor position. - * If negative, it is clamped to 0. - */ - public final int endCol; - - /** - * The buffer the cursor is located on. - */ - public final String buffer; - /** * The user who controls the cursor. */ public final String user; + + /** + * The associated selection update. + */ + public final Selection selection; } diff --git a/dist/java/src/mp/code/data/Selection.java b/dist/java/src/mp/code/data/Selection.java new file mode 100644 index 0000000..cc31cd4 --- /dev/null +++ b/dist/java/src/mp/code/data/Selection.java @@ -0,0 +1,42 @@ +package mp.code.data; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * A data class holding information about a cursor selection. + */ +@ToString +@EqualsAndHashCode +@RequiredArgsConstructor +public class Selection { + /** + * The starting row of the cursor position. + * If negative, it is clamped to 0. + */ + public final int startRow; + + /** + * The starting column of the cursor position. + * If negative, it is clamped to 0. + */ + public final int startCol; + + /** + * The ending row of the cursor position. + * If negative, it is clamped to 0. + */ + public final int endRow; + + /** + * The ending column of the cursor position. + * If negative, it is clamped to 0. + */ + public final int endCol; + + /** + * The buffer the cursor is located on. + */ + public final String buffer; +} diff --git a/dist/java/src/mp/code/data/TextChange.java b/dist/java/src/mp/code/data/TextChange.java index 33e7561..3c0a0f9 100644 --- a/dist/java/src/mp/code/data/TextChange.java +++ b/dist/java/src/mp/code/data/TextChange.java @@ -22,7 +22,7 @@ public class TextChange { public final long start; /** - * The endomg position of the change. + * The ending position of the change. * If negative, it is clamped to 0. */ public final long end; @@ -33,13 +33,6 @@ public class TextChange { */ public final String content; - /** - * The hash of the content after applying it (calculated with {@link Extensions#hash(String)}). - * It is generally meaningless to send, but when received it is an invitation to check the hash - * and forcefully re-sync if necessary. - */ - public final OptionalLong hash; // xxh3 hash - /** * Checks if the change represents a deletion. * It does if the starting index is lower than the ending index. diff --git a/dist/lua/annotations.lua b/dist/lua/annotations.lua index 0e573fa..71b5ea3 100644 --- a/dist/lua/annotations.lua +++ b/dist/lua/annotations.lua @@ -135,28 +135,28 @@ function MaybeCursorPromise:cancel() end function MaybeCursorPromise:and_then(cb) end ----@class (exact) TextChangePromise : Promise -local TextChangePromise = {} +---@class (exact) BufferUpdatePromise : Promise +local BufferUpdatePromise = {} --- block until promise is ready and return value ---- @return TextChange -function TextChangePromise:await() end +--- @return BufferUpdate +function BufferUpdatePromise:await() end --- cancel promise execution -function TextChangePromise:cancel() end ----@param cb fun(x: TextChange) callback to invoke +function BufferUpdatePromise:cancel() end +---@param cb fun(x: BufferUpdate) callback to invoke ---invoke callback asynchronously as soon as promise is ready -function TextChangePromise:and_then(cb) end +function BufferUpdatePromise:and_then(cb) end ----@class (exact) MaybeTextChangePromise : Promise -local MaybeTextChangePromise = {} +---@class (exact) MaybeBufferUpdatePromise : Promise +local MaybeBufferUpdatePromise = {} --- block until promise is ready and return value ---- @return TextChange | nil -function MaybeTextChangePromise:await() end +--- @return BufferUpdate | nil +function MaybeBufferUpdatePromise:await() end --- cancel promise execution -function MaybeTextChangePromise:cancel() end ----@param cb fun(x: TextChange | nil) callback to invoke +function MaybeBufferUpdatePromise:cancel() end +---@param cb fun(x: BufferUpdate | nil) callback to invoke ---invoke callback asynchronously as soon as promise is ready -function MaybeTextChangePromise:and_then(cb) end +function MaybeBufferUpdatePromise:and_then(cb) end -- [[ END ASYNC STUFF ]] @@ -322,9 +322,14 @@ local BufferController = {} ---@field content string text content of change ---@field start integer start index of change ---@field finish integer end index of change ----@field hash integer? optional hash of text buffer after this change, for sync checks local TextChange = {} +---@class (exact) BufferUpdate +---@field change TextChange text change for this delta +---@field version table CRDT version after this change +---@field hash integer? optional hash of text buffer after this change, for sync checks +local BufferUpdate = {} + ---@param other string text to apply change to ---apply this text change to a string, returning the result function TextChange:apply(other) end @@ -336,13 +341,13 @@ function TextChange:apply(other) end ---update buffer with a text change; note that to delete content should be empty but not span, while to insert span should be empty but not content (can insert and delete at the same time) function BufferController:send(change) end ----@return MaybeTextChangePromise +---@return MaybeBufferUpdatePromise ---@async ---@nodiscard ---try to receive text changes, returning nil if none is available function BufferController:try_recv() end ----@return TextChangePromise +---@return BufferUpdatePromise ---@async ---@nodiscard ---block until next text change and return it @@ -367,6 +372,10 @@ function BufferController:callback(cb) end ---get current content of buffer controller, marking all pending changes as seen function BufferController:content() end +---@param version [integer] version to ack +---notify controller that this version's change has been correctly applied +function BufferController:ack(version) end + @@ -374,14 +383,19 @@ function BufferController:content() end ---handle to a workspace's cursor channel, allowing send/recv operations local CursorController = {} ----@class Cursor ----@field user string? id of user owning this cursor +---@class Selection ---@field buffer string relative path ("name") of buffer on which this cursor is ----@field start [integer, integer] cursor start position ----@field finish [integer, integer] cursor end position ----a cursor position +---@field start_row integer +---@field start_col integer +---@field end_row integer +---@field end_col integer +---a cursor selected region, as row-col indices ----@param cursor Cursor cursor event to broadcast +---@class Cursor +---@field user string id of user owning this cursor +---@field sel Selection selected region for this user + +---@param cursor Selection cursor position to broadcast ---@return NilPromise ---@async ---@nodiscard diff --git a/dist/py/src/codemp/codemp.pyi b/dist/py/src/codemp/codemp.pyi index 6c088c5..25b1ba8 100644 --- a/dist/py/src/codemp/codemp.pyi +++ b/dist/py/src/codemp/codemp.pyi @@ -92,6 +92,14 @@ class TextChange: def is_empty(self) -> bool: ... def apply(self, txt: str) -> str: ... +class BufferUpdate: + """ + A single editor delta event, wrapping a TextChange and the new version + """ + change: TextChange + hash: Optional[int] + version: list[int] + class BufferController: """ @@ -100,6 +108,7 @@ class BufferController: """ def path(self) -> str: ... def content(self) -> Promise[str]: ... + def ack(self, v: list[int]) -> None: ... def send(self, start: int, end: int, @@ -113,14 +122,20 @@ class BufferController: -class Cursor: +class Selection: """ An Editor agnostic cursor position representation """ start: Tuple[int, int] end: Tuple[int, int] buffer: str - user: Optional[str] # can be an empty string + +class Cursor: + """ + A remote cursor event + """ + user: str + sel: Selection class CursorController: diff --git a/src/api/change.rs b/src/api/change.rs index 5a2ebfe..6270e37 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -1,6 +1,25 @@ //! # TextChange //! A high-level representation of a change within a given buffer. +/// A [`TextChange`] event happening on a buffer. +/// +/// Contains the change itself, the new version after this change and an optional `hash` field. +/// This is used for error correction: if provided, it should match the hash of the buffer +/// content **after** applying this change. Note that the `hash` field will not necessarily +/// be provided every time. +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(get_all))] +#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] +pub struct BufferUpdate { + /// Optional content hash after applying this change. + pub hash: Option, + /// CRDT version after this change has been applied. + pub version: Vec, + /// The change that has occurred. + pub change: TextChange, +} + /// An editor-friendly representation of a text change in a given buffer. /// /// It's expressed with a range of characters and a string of content that should replace them, @@ -9,21 +28,22 @@ /// Bulky and large operations will result in a single [`TextChange`] effectively sending the whole /// new buffer, but smaller changes are efficient and easy to create or apply. /// -/// [`TextChange`] contains an optional `hash` field. This is used for error correction: if -/// provided, it should match the hash of the buffer content **after** applying this change. -/// Note that the `hash` field will not necessarily be provided every time. -/// /// ### Examples -/// To insert 'a' after 4th character we should send a. -/// `TextChange { start: 4, end: 4, content: "a".into(), hash: None }` +/// To insert 'a' after 4th character we should send: +/// ``` +/// codemp::api::TextChange { start: 4, end: 4, content: "a".into() }; +/// ``` /// -/// To delete a the fourth character we should send a. -/// `TextChange { start: 3, end: 4, content: "".into(), hash: None }` +/// To delete the fourth character we should send: +/// ``` +/// codemp::api::TextChange { start: 3, end: 4, content: "".into() }; +/// ``` /// -/// ```no_run +/// ``` /// let change = codemp::api::TextChange { -/// start: 6, end: 11, -/// content: "mom".to_string(), hash: None +/// start: 6, +/// end: 11, +/// content: "mom".to_string() /// }; /// let before = "hello world!"; /// let after = change.apply(before); @@ -41,8 +61,6 @@ pub struct TextChange { pub end: u32, /// New content of text inside span. pub content: String, - /// Optional content hash after applying this change. - pub hash: Option, } impl TextChange { @@ -90,7 +108,6 @@ mod tests { start: 5, end: 5, content: " cruel".to_string(), - hash: None, }; let result = change.apply("hello world!"); assert_eq!(result, "hello cruel world!"); @@ -102,7 +119,6 @@ mod tests { start: 5, end: 11, content: "".to_string(), - hash: None, }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello world!"); @@ -114,7 +130,6 @@ mod tests { start: 5, end: 11, content: " not very pleasant".to_string(), - hash: None, }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello not very pleasant world!"); @@ -126,7 +141,6 @@ mod tests { start: 100, end: 110, content: "a very long string \n which totally matters".to_string(), - hash: None, }; let result = change.apply("a short text"); assert_eq!( @@ -141,7 +155,6 @@ mod tests { start: 42, end: 42, content: "".to_string(), - hash: None, }; let result = change.apply("some important text"); assert_eq!(result, "some important text"); diff --git a/src/api/config.rs b/src/api/config.rs index 23dc7ef..b23d962 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -1,11 +1,11 @@ //! # Config //! Data structure defining clients configuration -/// Configuration struct for `codemp` client +/// Configuration struct for the `codemp` client. /// -/// username and password are required fields, while everything else is optional +/// `username` and `password` are required fields, everything else is optional. /// -/// host, port and tls affect all connections to all grpc services +/// `host`, `port` and `tls` affect all connections to all gRPC services; the /// resulting endpoint is composed like this: /// http{tls?'s':''}://{host}:{port} #[derive(Clone, Debug)] @@ -16,20 +16,20 @@ )] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] pub struct Config { - /// user identifier used to register, possibly your email + /// User identifier used to register, possibly your email. pub username: String, - /// user password chosen upon registration + /// User password chosen upon registration. pub password: String, - /// address of server to connect to, default api.code.mp + /// Address of server to connect to, default api.code.mp. pub host: Option, - /// port to connect to, default 50053 + /// Port to connect to, default 50053. pub port: Option, - /// enable or disable tls, default true + /// Enable or disable tls, default true. pub tls: Option, } impl Config { - /// construct a new Config object, with given username and password + /// Construct a new Config object, with given username and password. pub fn new(username: impl ToString, password: impl ToString) -> Self { Self { username: username.to_string(), diff --git a/src/api/controller.rs b/src/api/controller.rs index dda3b3f..853f1e5 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -37,11 +37,9 @@ where /// See [`Controller`]'s documentation for details. /// /// Details about the receiving end are left to the implementor. -#[allow(async_fn_in_trait)] -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] pub trait AsyncSender: Sized + Send + Sync { - /// Enqueue a new value to be sent to all other users. - async fn send(&self, x: T) -> ControllerResult<()>; + /// Enqueue a new value to be sent to all other users without blocking + fn send(&self, x: T) -> ControllerResult<()>; } /// Asynchronous and thread-safe handle to receive data from a stream. diff --git a/src/api/cursor.rs b/src/api/cursor.rs index 7c1e4b5..89ecd5e 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -4,19 +4,34 @@ #[cfg(any(feature = "py", feature = "py-noabi"))] use pyo3::prelude::*; -/// User cursor position in a buffer +/// An event that occurred about a user's cursor. #[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] // #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] pub struct Cursor { - /// Cursor start position in buffer, as 0-indexed row-column tuple. - pub start: (i32, i32), - /// Cursor end position in buffer, as 0-indexed row-column tuple. - #[cfg_attr(feature = "serialize", serde(alias = "finish"))] // Lua uses `end` as keyword - pub end: (i32, i32), + /// User who sent the cursor. + pub user: String, + /// The updated cursor selection. + pub sel: Selection, +} + +/// A cursor selection span. +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] +#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] +// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] +pub struct Selection { + /// Cursor position starting row in buffer. + pub start_row: i32, + /// Cursor position starting column in buffer. + pub start_col: i32, + /// Cursor position final row in buffer. + pub end_row: i32, + /// Cursor position final column in buffer. + pub end_col: i32, /// Path of buffer this cursor is on. pub buffer: String, - /// User display name, if provided. - pub user: Option, } diff --git a/src/api/mod.rs b/src/api/mod.rs index 54be1da..158a4d0 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -19,9 +19,11 @@ pub mod event; /// data structure for remote users pub mod user; +pub use change::BufferUpdate; pub use change::TextChange; pub use config::Config; pub use controller::Controller; pub use cursor::Cursor; +pub use cursor::Selection; pub use event::Event; pub use user::User; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index bcf115e..0065e38 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -7,11 +7,10 @@ use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; +use crate::api::BufferUpdate; use crate::api::TextChange; use crate::errors::ControllerResult; -use crate::ext::InternallyMutable; - -use super::worker::DeltaRequest; +use crate::ext::IgnorableError; /// A [Controller] to asynchronously interact with remote buffers. /// @@ -23,53 +22,59 @@ use super::worker::DeltaRequest; pub struct BufferController(pub(crate) Arc); impl BufferController { - /// Get the buffer path + /// Get the buffer path. pub fn path(&self) -> &str { &self.0.name } - /// Return buffer whole content, updating internal acknowledgement tracker + /// Return buffer whole content, updating internal acknowledgement tracker. pub async fn content(&self) -> ControllerResult { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; let content = rx.await?; - self.0 - .last_update - .set(self.0.latest_version.borrow().clone()); Ok(content) } + + /// Notify CRDT that changes up to the given version have been merged succesfully. + pub fn ack(&self, version: Vec) { + let version = version + .into_iter() + .map(|x| usize::from_ne_bytes(x.to_ne_bytes())) + .collect(); + self.0 + .ack_tx + .send(version) + .unwrap_or_warn("no worker to receive sent ack"); + } } #[derive(Debug)] pub(crate) struct BufferControllerInner { pub(crate) name: String, pub(crate) latest_version: watch::Receiver, - pub(crate) last_update: InternallyMutable, - pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, + pub(crate) local_version: watch::Receiver, + pub(crate) ops_in: mpsc::UnboundedSender, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, - pub(crate) delta_request: mpsc::Sender, + pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender>)>, pub(crate) callback: watch::Sender>>, + pub(crate) ack_tx: mpsc::UnboundedSender, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController {} +impl Controller for BufferController {} -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for BufferController { - async fn send(&self, op: TextChange) -> ControllerResult<()> { - // we let the worker do the updating to the last version and send it back. - let (tx, rx) = oneshot::channel(); - self.0.ops_in.send((op, tx))?; - self.0.last_update.set(rx.await?); + fn send(&self, op: TextChange) -> ControllerResult<()> { + self.0.ops_in.send(op)?; Ok(()) } } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncReceiver for BufferController { +impl AsyncReceiver for BufferController { async fn poll(&self) -> ControllerResult<()> { - if self.0.last_update.get() != *self.0.latest_version.borrow() { + if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { return Ok(()); } @@ -79,8 +84,8 @@ impl AsyncReceiver for BufferController { Ok(()) } - async fn try_recv(&self) -> ControllerResult> { - let last_update = self.0.last_update.get(); + async fn try_recv(&self) -> ControllerResult> { + let last_update = self.0.local_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone(); if last_update == latest_version { @@ -89,16 +94,11 @@ impl AsyncReceiver for BufferController { let (tx, rx) = oneshot::channel(); self.0.delta_request.send((last_update, tx)).await?; - let (v, change) = rx.await?; - self.0.last_update.set(v); - Ok(change) + Ok(rx.await?) } fn callback(&self, cb: impl Into>) { - if self.0.callback.send(Some(cb.into())).is_err() { - // TODO should we panic? we failed what we were supposed to do - tracing::error!("no active buffer worker to run registered callback!"); - } + self.0.callback.send_replace(Some(cb.into())); } fn clear_callback(&self) { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index a5a730f..a77fa98 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -7,25 +7,25 @@ use tonic::Streaming; use uuid::Uuid; use crate::api::controller::ControllerCallback; +use crate::api::BufferUpdate; use crate::api::TextChange; -use crate::ext::{IgnorableError, InternallyMutable}; +use crate::ext::IgnorableError; use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; -pub(crate) type DeltaOp = (LocalVersion, Option); -pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); - struct BufferWorker { user_id: Uuid, path: String, latest_version: watch::Sender, - ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, + local_version: watch::Sender, + ack_rx: mpsc::UnboundedReceiver, + ops_in: mpsc::UnboundedReceiver, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, - delta_req: mpsc::Receiver, + delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender>)>, controller: std::sync::Weak, callback: watch::Receiver>>, oplog: OpLog, @@ -43,7 +43,9 @@ impl BufferController { let init = diamond_types::LocalVersion::default(); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); + let (my_version_tx, my_version_rx) = watch::channel(init.clone()); let (opin_tx, opin_rx) = mpsc::unbounded_channel(); + let (ack_tx, ack_rx) = mpsc::unbounded_channel(); let (req_tx, req_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); @@ -54,12 +56,13 @@ impl BufferController { let controller = Arc::new(BufferControllerInner { name: path.to_string(), latest_version: latest_version_rx, - last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), + local_version: my_version_rx, ops_in: opin_tx, poller: poller_tx, content_request: req_tx, delta_request: recv_tx, callback: cb_tx, + ack_tx, }); let weak = Arc::downgrade(&controller); @@ -68,6 +71,8 @@ impl BufferController { user_id, path: path.to_string(), latest_version: latest_version_tx, + local_version: my_version_tx, + ack_rx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), @@ -106,10 +111,18 @@ impl BufferController { Some(tx) => worker.pollers.push(tx), }, + // received new change ack, merge editor branch up to that version + res = worker.ack_rx.recv() => match res { + None => break tracing::error!("ack channel closed"), + Some(v) => { + worker.branch.merge(&worker.oplog, &v) + }, + }, + // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, + Some(change) => worker.handle_editor_change(change, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) @@ -142,12 +155,7 @@ impl BufferController { } impl BufferWorker { - async fn handle_editor_change( - &mut self, - change: TextChange, - ack: oneshot::Sender, - tx: &mpsc::Sender, - ) { + async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender) { let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); let last_ver = self.oplog.local_version(); // clip to buffer extents @@ -174,9 +182,10 @@ impl BufferWorker { self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); + self.local_version + .send(self.branch.local_version()) + .unwrap_or_warn("failed to update local version!"); } - ack.send(self.branch.local_version()) - .unwrap_or_warn("controller didn't wait for ack"); } async fn handle_server_change(&mut self, change: BufferEvent) -> bool { @@ -203,7 +212,11 @@ impl BufferWorker { } } - async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { + async fn handle_delta_request( + &mut self, + last_ver: LocalVersion, + tx: oneshot::Sender>, + ) { if let Some((lv, Some(dtop))) = self .oplog .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) @@ -228,25 +241,40 @@ impl BufferWorker { { tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); } - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.start() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), + crate::api::BufferUpdate { hash, + version: step_ver + .into_iter() + .map(|x| i64::from_ne_bytes(x.to_ne_bytes())) + .collect(), // TODO this is wasteful + change: crate::api::TextChange { + start: dtop.start() as u32, + end: dtop.start() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + }, } } - diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), + diamond_types::list::operation::OpKind::Del => crate::api::BufferUpdate { hash, + version: step_ver + .into_iter() + .map(|x| i64::from_ne_bytes(x.to_ne_bytes())) + .collect(), // TODO this is wasteful + change: crate::api::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + }, }, }; - tx.send((new_local_v, Some(tc))) + self.local_version + .send(new_local_v) + .unwrap_or_warn("could not update local version"); + tx.send(Some(tc)) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send((last_ver, None)) + tx.send(None) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index a0bced9..fd22375 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -8,7 +8,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use crate::{ api::{ controller::{AsyncReceiver, AsyncSender, ControllerCallback}, - Controller, Cursor, + Controller, Cursor, Selection, }, errors::ControllerResult, }; @@ -27,38 +27,38 @@ pub struct CursorController(pub(crate) Arc); #[derive(Debug)] pub(crate) struct CursorControllerInner { - pub(crate) op: mpsc::Sender, + pub(crate) op: mpsc::UnboundedSender, pub(crate) stream: mpsc::Sender>>, pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for CursorController {} +impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncSender for CursorController { - async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { - if cursor.start > cursor.end { - std::mem::swap(&mut cursor.start, &mut cursor.end); +impl AsyncSender for CursorController { + fn send(&self, mut cursor: Selection) -> ControllerResult<()> { + if cursor.start_row > cursor.end_row + || (cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col) + { + std::mem::swap(&mut cursor.start_row, &mut cursor.end_row); + std::mem::swap(&mut cursor.start_col, &mut cursor.end_col); } - Ok(self - .0 - .op - .send(CursorPosition { - buffer: BufferNode { - path: cursor.buffer, - }, - start: RowCol { - row: cursor.start.0, - col: cursor.start.1, - }, - end: RowCol { - row: cursor.end.0, - col: cursor.end.1, - }, - }) - .await?) + + Ok(self.0.op.send(CursorPosition { + buffer: BufferNode { + path: cursor.buffer, + }, + start: RowCol { + row: cursor.start_row, + col: cursor.start_col, + }, + end: RowCol { + row: cursor.end_row, + col: cursor.end_col, + }, + })?) } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index aab8c34..a690a9a 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -5,7 +5,7 @@ use tonic::Streaming; use uuid::Uuid; use crate::{ - api::{controller::ControllerCallback, Cursor, User}, + api::{controller::ControllerCallback, Cursor, Selection, User}, ext::IgnorableError, }; use codemp_proto::cursor::{CursorEvent, CursorPosition}; @@ -13,7 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; use super::controller::{CursorController, CursorControllerInner}; struct CursorWorker { - op: mpsc::Receiver, + op: mpsc::UnboundedReceiver, map: Arc>, stream: mpsc::Receiver>>, poll: mpsc::UnboundedReceiver>, @@ -30,7 +30,7 @@ impl CursorController { rx: Streaming, ) -> Self { // TODO we should tweak the channel buffer size to better propagate backpressure - let (op_tx, op_rx) = mpsc::channel(64); + let (op_tx, op_rx) = mpsc::unbounded_channel(); let (stream_tx, stream_rx) = mpsc::channel(1); let (cb_tx, cb_rx) = watch::channel(None); let (poll_tx, poll_rx) = mpsc::unbounded_channel(); @@ -86,16 +86,18 @@ impl CursorController { None => break, // clean exit, just weird that we got it here Some(controller) => { tracing::debug!("received cursor from server"); - let mut cursor = Cursor { - buffer: cur.position.buffer.path, - start: (cur.position.start.row, cur.position.start.col), - end: (cur.position.end.row, cur.position.end.col), - user: None, - }; let user_id = Uuid::from(cur.user); - if let Some(user) = worker.map.get(&user_id) { - cursor.user = Some(user.name.clone()); - } + let cursor = Cursor { + user: worker.map.get(&user_id).map(|u| u.name.clone()).unwrap_or_default(), + sel: Selection { + buffer: cur.position.buffer.path, + start_row: cur.position.start.row, + start_col: cur.position.start.col, + end_row: cur.position.end.row, + end_col: cur.position.end.col + } + }; + worker.store.push_back(cursor); for tx in worker.pollers.drain(..) { tx.send(()).unwrap_or_warn("poller dropped before unblocking"); diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 639c187..5f06f8e 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -4,7 +4,7 @@ use jni_toolbox::jni; use crate::{ api::{ controller::{AsyncReceiver, AsyncSender}, - TextChange, + BufferUpdate, TextChange, }, errors::ControllerError, }; @@ -27,13 +27,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +) -> Result, ControllerError> { super::tokio().block_on(controller.try_recv()) } /// Block until it receives a [TextChange]. #[jni(package = "mp.code", class = "BufferController")] -fn recv(controller: &mut crate::buffer::Controller) -> Result { +fn recv(controller: &mut crate::buffer::Controller) -> Result { super::tokio().block_on(controller.recv()) } @@ -43,7 +43,7 @@ fn send( controller: &mut crate::buffer::Controller, change: TextChange, ) -> Result<(), ControllerError> { - super::tokio().block_on(controller.send(change)) + controller.send(change) } /// Register a callback for buffer changes. @@ -99,6 +99,12 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro super::tokio().block_on(controller.poll()) } +/// Acknowledge that a change has been correctly applied. +#[jni(package = "mp.code", class = "BufferController")] +fn ack(controller: &mut crate::buffer::Controller, version: Vec) { + controller.ack(version) +} + /// Called by the Java GC to drop a [crate::buffer::Controller]. #[jni(package = "mp.code", class = "BufferController")] fn free(input: jni::sys::jlong) { diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 65cb0c4..e08a788 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -1,7 +1,7 @@ use crate::{ api::{ controller::{AsyncReceiver, AsyncSender}, - Cursor, + Cursor, Selection, }, errors::ControllerError, }; @@ -24,8 +24,8 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result Result<(), ControllerError> { - super::tokio().block_on(controller.send(cursor)) +fn send(controller: &mut crate::cursor::Controller, sel: Selection) -> Result<(), ControllerError> { + controller.send(sel) } /// Register a callback for cursor changes. diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index cad7768..51bd3a5 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -79,6 +79,7 @@ macro_rules! null_check { } }; } + pub(crate) use null_check; impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError { @@ -193,13 +194,13 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event { } } -impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { - const CLASS: &'static str = "mp/code/data/TextChange"; +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::BufferUpdate { + const CLASS: &'static str = "mp/code/data/BufferUpdate"; fn into_java_object( self, env: &mut jni::JNIEnv<'j>, ) -> Result, jni::errors::Error> { - let content = env.new_string(self.content)?; + let class = env.find_class(Self::CLASS)?; let hash_class = env.find_class("java/util/OptionalLong")?; let hash = if let Some(h) = self.hash { @@ -214,15 +215,36 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { }? .l()?; + let version = self.version.into_java_object(env)?; + let change = self.change.into_java_object(env)?; + + env.new_object( + class, + "(Ljava/util/OptionalLong;[JLmp/code/data/TextChange;)V", + &[ + jni::objects::JValueGen::Object(&hash), + jni::objects::JValueGen::Object(&version), + jni::objects::JValueGen::Object(&change), + ], + ) + } +} + +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { + const CLASS: &'static str = "mp/code/data/TextChange"; + fn into_java_object( + self, + env: &mut jni::JNIEnv<'j>, + ) -> Result, jni::errors::Error> { + let content = env.new_string(self.content)?; let class = env.find_class(Self::CLASS)?; env.new_object( class, - "(JJLjava/lang/String;Ljava/util/OptionalLong;)V", + "(JJLjava/lang/String;)V", &[ jni::objects::JValueGen::Long(self.start.into()), jni::objects::JValueGen::Long(self.end.into()), jni::objects::JValueGen::Object(&content), - jni::objects::JValueGen::Object(&hash), ], ) } @@ -234,24 +256,39 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor { self, env: &mut jni::JNIEnv<'j>, ) -> Result, jni::errors::Error> { - let class = env.find_class("mp/code/data/Cursor")?; - let buffer = env.new_string(&self.buffer)?; - let user = if let Some(user) = self.user { - env.new_string(user)?.into() - } else { - jni::objects::JObject::null() - }; + let class = env.find_class(Self::CLASS)?; + let user = env.new_string(&self.user)?; + let sel = self.sel.into_java_object(env)?; env.new_object( class, - "(IIIILjava/lang/String;Ljava/lang/String;)V", + "(Ljava/lang/String;Lmp/code/data/Selection;)V", &[ - jni::objects::JValueGen::Int(self.start.0), - jni::objects::JValueGen::Int(self.start.1), - jni::objects::JValueGen::Int(self.end.0), - jni::objects::JValueGen::Int(self.end.1), - jni::objects::JValueGen::Object(&buffer), jni::objects::JValueGen::Object(&user), + jni::objects::JValueGen::Object(&sel), + ], + ) + } +} + +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Selection { + const CLASS: &'static str = "mp/code/data/Selection"; + fn into_java_object( + self, + env: &mut jni::JNIEnv<'j>, + ) -> Result, jni::errors::Error> { + let class = env.find_class(Self::CLASS)?; + let buffer = env.new_string(&self.buffer)?; + + env.new_object( + class, + "(IIIILjava/lang/String;)V", + &[ + jni::objects::JValueGen::Int(self.start_row), + jni::objects::JValueGen::Int(self.start_col), + jni::objects::JValueGen::Int(self.end_row), + jni::objects::JValueGen::Int(self.end_col), + jni::objects::JValueGen::Object(&buffer), ], ) } @@ -363,7 +400,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { } } -impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { +impl<'j> jni_toolbox::FromJava<'j> for crate::api::Selection { type From = jni::objects::JObject<'j>; fn from_java( env: &mut jni::JNIEnv<'j>, @@ -384,20 +421,12 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - let user = { - let jfield = env.get_field(&cursor, "user", "Ljava/lang/String;")?.l()?; - if jfield.is_null() { - None - } else { - Some(unsafe { env.get_string_unchecked(&jfield.into()) }?.into()) - } - }; - Ok(Self { - start: (start_row, start_col), - end: (end_row, end_col), + start_row, + start_col, + end_row, + end_col, buffer, - user, }) } } @@ -427,21 +456,10 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - let hash = { - let jfield = env - .get_field(&change, "hash", "Ljava/util/OptionalLong;")? - .l()?; - if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? { - Some(env.call_method(&jfield, "getAsLong", "()J", &[])?.j()?) - } else { - None - } - }; Ok(Self { start, end, content, - hash, }) } } diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 4bd3da8..acdc36f 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -91,7 +91,7 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr super::tokio().block_on(workspace.delete(&path)) } -/// Block and receive a workspace event +/// Block and receive a workspace event. #[jni(package = "mp.code", class = "Workspace")] fn recv(workspace: &mut Workspace) -> Result { super::tokio().block_on(workspace.recv()) @@ -103,13 +103,13 @@ fn try_recv(workspace: &mut Workspace) -> Result, Cont super::tokio().block_on(workspace.try_recv()) } -/// Block until a workspace event is available +/// Block until a workspace event is available. #[jni(package = "mp.code", class = "Workspace")] fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { super::tokio().block_on(workspace.poll()) } -/// Clear previously registered callback +/// Clear previously registered callback. #[jni(package = "mp.code", class = "Workspace")] fn clear_callback(workspace: &mut Workspace) { workspace.clear_callback(); diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index df89f06..fc9b961 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,5 +1,5 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::TextChange; +use crate::api::{BufferUpdate, TextChange}; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, @@ -51,20 +51,20 @@ impl BufferController { /// Return next buffer event if present #[napi(js_name = "try_recv")] - pub async fn js_try_recv(&self) -> napi::Result> { + pub async fn js_try_recv(&self) -> napi::Result> { Ok(self.try_recv().await?) } /// Wait for next buffer event and return it #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result { + pub async fn js_recv(&self) -> napi::Result { Ok(self.recv().await?) } /// Send a buffer update to workspace #[napi(js_name = "send")] - pub async fn js_send(&self, op: TextChange) -> napi::Result<()> { - Ok(self.send(op).await?) + pub fn js_send(&self, op: TextChange) -> napi::Result<()> { + Ok(self.send(op)?) } /// Return buffer whole content diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index 2bb09e5..9131abc 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -6,41 +6,6 @@ use napi::threadsafe_function::{ }; use napi_derive::napi; -#[napi(object, js_name = "Cursor")] -pub struct JsCursor { - /// range of text change, as char indexes in buffer previous state - pub start_row: i32, - pub start_col: i32, - pub end_row: i32, - pub end_col: i32, - pub buffer: String, - pub user: Option, -} - -impl From for crate::api::Cursor { - fn from(value: JsCursor) -> Self { - crate::api::Cursor { - start: (value.start_row, value.start_col), - end: (value.end_row, value.end_col), - buffer: value.buffer, - user: value.user, - } - } -} - -impl From for JsCursor { - fn from(value: crate::api::Cursor) -> Self { - JsCursor { - start_row: value.start.0, - start_col: value.start.1, - end_row: value.end.0, - end_col: value.end.1, - buffer: value.buffer, - user: value.user.map(|x| x.to_string()), - } - } -} - #[napi] impl CursorController { /// Register a callback to be called on receive. @@ -74,19 +39,19 @@ impl CursorController { /// Send a new cursor event to remote #[napi(js_name = "send")] - pub async fn js_send(&self, pos: JsCursor) -> napi::Result<()> { - Ok(self.send(crate::api::Cursor::from(pos)).await?) + pub fn js_send(&self, sel: crate::api::Selection) -> napi::Result<()> { + Ok(self.send(sel)?) } /// Get next cursor event if available without blocking #[napi(js_name = "try_recv")] - pub async fn js_try_recv(&self) -> napi::Result> { - Ok(self.try_recv().await?.map(JsCursor::from)) + pub async fn js_try_recv(&self) -> napi::Result> { + Ok(self.try_recv().await?.map(crate::api::Cursor::from)) } /// Block until next #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result { + pub async fn js_recv(&self) -> napi::Result { Ok(self.recv().await?.into()) } } diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index e44fe81..bacde0e 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -8,8 +8,6 @@ use napi::threadsafe_function::{ }; use napi_derive::napi; -use super::client::JsUser; - #[napi(object, js_name = "Event")] pub struct JsEvent { pub r#type: String, @@ -148,12 +146,15 @@ impl Workspace { /// List users attached to a specific buffer #[napi(js_name = "list_buffer_users")] - pub async fn js_list_buffer_users(&self, path: String) -> napi::Result> { + pub async fn js_list_buffer_users( + &self, + path: String, + ) -> napi::Result> { Ok(self .list_buffer_users(&path) .await? .into_iter() - .map(JsUser::from) + .map(super::client::JsUser::from) .collect()) } } diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 76c89cd..5bc0813 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -11,10 +11,9 @@ impl LuaUserData for CodempBufferController { Ok(format!("{:?}", this)) }); - methods.add_method( - "send", - |_, this, (change,): (CodempTextChange,)| a_sync! { this => this.send(change).await? }, - ); + methods.add_method("send", |_, this, (change,): (CodempTextChange,)| { + Ok(this.send(change)?) + }); methods.add_method( "try_recv", @@ -22,21 +21,20 @@ impl LuaUserData for CodempBufferController { ); methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); + methods.add_method_mut("ack", |_, this, (version,): (Vec,)| { + Ok(this.ack(version)) + }); methods.add_method( "content", |_, this, ()| a_sync! { this => this.content().await? }, ); - methods.add_method("clear_callback", |_, this, ()| { - this.clear_callback(); - Ok(()) - }); + methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { - this.callback(move |controller: CodempBufferController| { + Ok(this.callback(move |controller: CodempBufferController| { super::ext::callback().invoke(cb.clone(), controller) - }); - Ok(()) + })) }); } } @@ -47,7 +45,6 @@ impl LuaUserData for CodempTextChange { fields.add_field_method_get("content", |_, this| Ok(this.content.clone())); fields.add_field_method_get("start", |_, this| Ok(this.start)); fields.add_field_method_get("end", |_, this| Ok(this.end)); - fields.add_field_method_get("hash", |_, this| Ok(this.hash)); // add a 'finish' accessor too because in Lua 'end' is reserved fields.add_field_method_get("finish", |_, this| Ok(this.end)); } @@ -59,3 +56,18 @@ impl LuaUserData for CodempTextChange { methods.add_method("apply", |_, this, (txt,): (String,)| Ok(this.apply(&txt))); } } + +from_lua_serde! { CodempBufferUpdate } +impl LuaUserData for CodempBufferUpdate { + fn add_fields>(fields: &mut F) { + fields.add_field_method_get("hash", |_, this| Ok(this.hash)); + fields.add_field_method_get("version", |_, this| Ok(this.version.clone())); + fields.add_field_method_get("change", |_, this| Ok(this.change.clone())); + } + + fn add_methods>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| { + Ok(format!("{:?}", this)) + }); + } +} diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 3deea7c..86a3a33 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -4,7 +4,6 @@ use mlua_codemp_patch as mlua; use super::ext::a_sync::a_sync; use super::ext::from_lua_serde; -use super::ext::lua_tuple; impl LuaUserData for CodempCursorController { fn add_methods>(methods: &mut M) { @@ -12,10 +11,9 @@ impl LuaUserData for CodempCursorController { Ok(format!("{:?}", this)) }); - methods.add_method( - "send", - |_, this, (cursor,): (CodempCursor,)| a_sync! { this => this.send(cursor).await? }, - ); + methods.add_method("send", |_, this, (cursor,): (CodempSelection,)| { + Ok(this.send(cursor)?) + }); methods.add_method( "try_recv", |_, this, ()| a_sync! { this => this.try_recv().await? }, @@ -23,33 +21,42 @@ impl LuaUserData for CodempCursorController { methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("clear_callback", |_, this, ()| { - this.clear_callback(); - Ok(()) - }); + methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback())); methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { - this.callback(move |controller: CodempCursorController| { + Ok(this.callback(move |controller: CodempCursorController| { super::ext::callback().invoke(cb.clone(), controller) - }); - Ok(()) + })) }); } } from_lua_serde! { CodempCursor } impl LuaUserData for CodempCursor { + fn add_fields>(fields: &mut F) { + fields.add_field_method_get("user", |_, this| Ok(this.user.clone())); + fields.add_field_method_get("sel", |_, this| Ok(this.sel.clone())); + } + fn add_methods>(methods: &mut M) { methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| { Ok(format!("{:?}", this)) }); } +} +from_lua_serde! { CodempSelection } +impl LuaUserData for CodempSelection { fn add_fields>(fields: &mut F) { - fields.add_field_method_get("user", |_, this| Ok(this.user.clone())); fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone())); - fields.add_field_method_get("start", |lua, this| lua_tuple(lua, this.start)); - fields.add_field_method_get("end", |lua, this| lua_tuple(lua, this.end)); - // add a 'finish' accessor too because in Lua 'end' is reserved - fields.add_field_method_get("finish", |lua, this| lua_tuple(lua, this.end)); + fields.add_field_method_get("start_row", |_, this| Ok(this.start_row)); + fields.add_field_method_get("start_col", |_, this| Ok(this.start_col)); + fields.add_field_method_get("end_row", |_, this| Ok(this.end_row)); + fields.add_field_method_get("end_col", |_, this| Ok(this.end_col)); + } + + fn add_methods>(methods: &mut M) { + methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| { + Ok(format!("{:?}", this)) + }); } } diff --git a/src/ffi/lua/ext/a_sync.rs b/src/ffi/lua/ext/a_sync.rs index 02c672b..2e10776 100644 --- a/src/ffi/lua/ext/a_sync.rs +++ b/src/ffi/lua/ext/a_sync.rs @@ -51,10 +51,7 @@ impl LuaUserData for Promise { }); methods.add_method_mut("cancel", |_, this, ()| match this.0.take() { None => Err(LuaError::runtime("Promise already awaited")), - Some(x) => { - x.abort(); - Ok(()) - } + Some(x) => Ok(x.abort()), }); methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| { match this.0.take() { diff --git a/src/ffi/lua/ext/callback.rs b/src/ffi/lua/ext/callback.rs index dd84098..7c5f360 100644 --- a/src/ffi/lua/ext/callback.rs +++ b/src/ffi/lua/ext/callback.rs @@ -62,55 +62,57 @@ pub(crate) enum LuaCallback { Invoke(LuaFunction, CallbackArg), } -pub(crate) enum CallbackArg { - Nil, - Str(String), - VecStr(Vec), - Client(CodempClient), - CursorController(CodempCursorController), - BufferController(CodempBufferController), - Workspace(CodempWorkspace), - Event(CodempEvent), - MaybeEvent(Option), - Cursor(CodempCursor), - MaybeCursor(Option), - TextChange(CodempTextChange), - MaybeTextChange(Option), -} - -impl IntoLua for CallbackArg { - // TODO this basically calls .into_lua() on all enum variants - // i wish i could do this with a Box or an impl IntoLua - // but IntoLua requires Sized so it can't be made into an object - fn into_lua(self, lua: &Lua) -> LuaResult { - match self { - CallbackArg::Nil => Ok(LuaValue::Nil), - CallbackArg::Str(x) => x.into_lua(lua), - CallbackArg::Client(x) => x.into_lua(lua), - CallbackArg::CursorController(x) => x.into_lua(lua), - CallbackArg::BufferController(x) => x.into_lua(lua), - CallbackArg::Workspace(x) => x.into_lua(lua), - CallbackArg::VecStr(x) => x.into_lua(lua), - CallbackArg::Event(x) => x.into_lua(lua), - CallbackArg::MaybeEvent(x) => x.into_lua(lua), - CallbackArg::Cursor(x) => x.into_lua(lua), - CallbackArg::MaybeCursor(x) => x.into_lua(lua), - CallbackArg::TextChange(x) => x.into_lua(lua), - CallbackArg::MaybeTextChange(x) => x.into_lua(lua), +macro_rules! callback_args { + ($($name:ident : $t:ty ,)*) => { + pub(crate) enum CallbackArg { + Nil, + $( + $name($t), + )* } - } + + impl IntoLua for CallbackArg { + fn into_lua(self, lua: &Lua) -> LuaResult { + match self { + Self::Nil => Ok(LuaValue::Nil), + $( + Self::$name(x) => x.into_lua(lua), + )* + } + } + } + + impl From<()> for CallbackArg { + fn from(_value: ()) -> Self { + Self::Nil + } + } + + $( + impl From<$t> for CallbackArg { + fn from(value: $t) -> Self { + Self::$name(value) + } + } + )* + }; } -impl From<()> for CallbackArg { fn from(_: ()) -> Self { CallbackArg::Nil } } -impl From for CallbackArg { fn from(value: String) -> Self { CallbackArg::Str(value) } } -impl From for CallbackArg { fn from(value: CodempClient) -> Self { CallbackArg::Client(value) } } -impl From for CallbackArg { fn from(value: CodempCursorController) -> Self { CallbackArg::CursorController(value) } } -impl From for CallbackArg { fn from(value: CodempBufferController) -> Self { CallbackArg::BufferController(value) } } -impl From for CallbackArg { fn from(value: CodempWorkspace) -> Self { CallbackArg::Workspace(value) } } -impl From> for CallbackArg { fn from(value: Vec) -> Self { CallbackArg::VecStr(value) } } -impl From for CallbackArg { fn from(value: CodempEvent) -> Self { CallbackArg::Event(value) } } -impl From for CallbackArg { fn from(value: CodempCursor) -> Self { CallbackArg::Cursor(value) } } -impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeCursor(value) } } -impl From for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } } -impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeTextChange(value) } } -impl From> for CallbackArg { fn from(value: Option) -> Self { CallbackArg::MaybeEvent(value) } } +callback_args! { + Str: String, + VecStr: Vec, + Client: CodempClient, + CursorController: CodempCursorController, + BufferController: CodempBufferController, + Workspace: CodempWorkspace, + Event: CodempEvent, + MaybeEvent: Option, + Cursor: CodempCursor, + MaybeCursor: Option, + Selection: CodempSelection, + MaybeSelection: Option, + TextChange: CodempTextChange, + MaybeTextChange: Option, + BufferUpdate: CodempBufferUpdate, + MaybeBufferUpdate: Option, +} diff --git a/src/ffi/lua/ext/mod.rs b/src/ffi/lua/ext/mod.rs index c30a742..1cb07c6 100644 --- a/src/ffi/lua/ext/mod.rs +++ b/src/ffi/lua/ext/mod.rs @@ -2,19 +2,9 @@ pub mod a_sync; pub mod callback; pub mod log; -use mlua::prelude::*; -use mlua_codemp_patch as mlua; - pub(crate) use a_sync::tokio; pub(crate) use callback::callback; -pub(crate) fn lua_tuple(lua: &Lua, (a, b): (T, T)) -> LuaResult { - let table = lua.create_table()?; - table.set(1, a)?; - table.set(2, b)?; - Ok(table) -} - macro_rules! from_lua_serde { ($($t:ty)*) => { $( diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index 15167c9..13d7e84 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -61,16 +61,12 @@ impl LuaUserData for CodempWorkspace { methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { - this.callback(move |controller: CodempWorkspace| { + Ok(this.callback(move |controller: CodempWorkspace| { super::ext::callback().invoke(cb.clone(), controller) - }); - Ok(()) + })) }); - methods.add_method("clear_callbacl", |_, this, ()| { - this.clear_callback(); - Ok(()) - }); + methods.add_method("clear_callbacl", |_, this, ()| Ok(this.clear_callback())); } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index fa03449..bc92147 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -43,6 +43,8 @@ //! `JNIException`s are however unchecked: there is nothing you can do to recover from them, as they usually represent a severe error in the glue code. If they arise, it's probably a bug. //! +#![allow(clippy::unit_arg)] + /// java bindings, built with [jni] #[cfg(feature = "java")] pub mod java; diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 6ee1e27..9d9d5f3 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,6 +1,6 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; -use crate::api::Cursor; use crate::api::TextChange; +use crate::api::{Cursor, Selection}; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use pyo3::exceptions::PyValueError; @@ -15,19 +15,21 @@ impl CursorController { #[pyo3(name = "send")] fn pysend( &self, - py: Python, + _py: Python, path: String, start: (i32, i32), end: (i32, i32), - ) -> PyResult { - let pos = Cursor { - start, - end, + ) -> PyResult<()> { + let pos = Selection { + start_row: start.0, + start_col: start.1, + end_row: end.0, + end_col: end.1, buffer: path, - user: None, }; let this = self.clone(); - a_sync_allow_threads!(py, this.send(pos).await) + this.send(pos)?; + Ok(()) } #[pyo3(name = "try_recv")] @@ -84,15 +86,15 @@ impl BufferController { } #[pyo3(name = "send")] - fn pysend(&self, py: Python, start: u32, end: u32, txt: String) -> PyResult { + fn pysend(&self, _py: Python, start: u32, end: u32, txt: String) -> PyResult<()> { let op = TextChange { start, end, content: txt, - hash: None, }; let this = self.clone(); - a_sync_allow_threads!(py, this.send(op).await) + this.send(op)?; + Ok(()) } #[pyo3(name = "try_recv")] @@ -141,21 +143,21 @@ impl BufferController { impl Cursor { #[getter(start)] fn pystart(&self) -> (i32, i32) { - self.start + (self.sel.start_row, self.sel.start_col) } #[getter(end)] fn pyend(&self) -> (i32, i32) { - self.end + (self.sel.end_row, self.sel.end_col) } #[getter(buffer)] fn pybuffer(&self) -> String { - self.buffer.clone() + self.sel.buffer.clone() } #[getter(user)] fn pyuser(&self) -> Option { - self.user.clone() + Some(self.user.clone()) } } diff --git a/src/lib.rs b/src/lib.rs index 40ad39f..40c6905 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,7 +53,7 @@ //! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods //! let cursor = workspace.cursor(); //! let event = cursor.recv().await.expect("disconnected while waiting for event!"); -//! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); +//! println!("user {} moved on buffer {}", event.user, event.sel.buffer); //! # }; //! ``` //! @@ -69,8 +69,12 @@ //! # use codemp::api::controller::{AsyncSender, AsyncReceiver}; //! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach"); //! buffer.content(); // force-sync -//! if let Some(change) = buffer.try_recv().await.unwrap() { -//! println!("content: {}, span: {}-{}", change.content, change.start, change.end); +//! if let Some(mut update) = buffer.try_recv().await.unwrap() { +//! println!( +//! "content: {}, span: {}-{}", +//! update.change.content, update.change.start, update.change.end +//! ); +//! buffer.ack(update.version); //! } // if None, no changes are currently available //! # }; //! ``` diff --git a/src/prelude.rs b/src/prelude.rs index 0fec44d..71bb813 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,8 +3,9 @@ pub use crate::api::{ controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, - Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, - Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, + BufferUpdate as CodempBufferUpdate, Config as CodempConfig, Controller as CodempController, + Cursor as CodempCursor, Event as CodempEvent, Selection as CodempSelection, + TextChange as CodempTextChange, User as CodempUser, }; pub use crate::{