From 560a634499b724e03ad00c25e6b9cb82c098cdf5 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 10 Oct 2024 02:10:02 +0200 Subject: [PATCH] chore: split TextChange and Cursor so that sending/receiving parts are different rather than Option Co-authored-by: zaaarf --- src/api/change.rs | 40 +++++++++++++++++++++------------ src/api/cursor.rs | 29 ++++++++++++++++++------ src/buffer/controller.rs | 48 +++++++++++++--------------------------- src/buffer/worker.rs | 39 ++++++++++++++++---------------- src/cursor/controller.rs | 25 ++++++++++++--------- src/cursor/worker.rs | 22 +++++++++--------- src/prelude.rs | 2 ++ 7 files changed, 110 insertions(+), 95 deletions(-) diff --git a/src/api/change.rs b/src/api/change.rs index 5a2ebfe..d4b342e 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -1,6 +1,25 @@ //! # TextChange //! A high-level representation of a change within a given buffer. +/// A [`TextChange`] event happening on a buffer. +/// +/// Contains the change itself, the new version after this change and an optional `hash` field. +/// This is used for error correction: if provided, it should match the hash of the buffer +/// content **after** applying this change. Note that the `hash` field will not necessarily +/// be provided every time. +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(get_all))] +#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] +pub struct BufferUpdate { + /// Optional content hash after applying this change. + pub hash: Option, + /// CRDT version after this change has been applied. + pub version: Vec, + /// The change that has occurred. + pub change: TextChange, +} + /// An editor-friendly representation of a text change in a given buffer. /// /// It's expressed with a range of characters and a string of content that should replace them, @@ -9,18 +28,18 @@ /// Bulky and large operations will result in a single [`TextChange`] effectively sending the whole /// new buffer, but smaller changes are efficient and easy to create or apply. /// -/// [`TextChange`] contains an optional `hash` field. This is used for error correction: if -/// provided, it should match the hash of the buffer content **after** applying this change. -/// Note that the `hash` field will not necessarily be provided every time. -/// /// ### Examples /// To insert 'a' after 4th character we should send a. -/// `TextChange { start: 4, end: 4, content: "a".into(), hash: None }` +/// ``` +/// TextChange { start: 4, end: 4, content: "a".into(), hash: None } +/// ``` /// /// To delete a the fourth character we should send a. -/// `TextChange { start: 3, end: 4, content: "".into(), hash: None }` +/// ``` +/// TextChange { start: 3, end: 4, content: "".into(), hash: None } +/// ``` /// -/// ```no_run +/// ``` /// let change = codemp::api::TextChange { /// start: 6, end: 11, /// content: "mom".to_string(), hash: None @@ -41,8 +60,6 @@ pub struct TextChange { pub end: u32, /// New content of text inside span. pub content: String, - /// Optional content hash after applying this change. - pub hash: Option, } impl TextChange { @@ -90,7 +107,6 @@ mod tests { start: 5, end: 5, content: " cruel".to_string(), - hash: None, }; let result = change.apply("hello world!"); assert_eq!(result, "hello cruel world!"); @@ -102,7 +118,6 @@ mod tests { start: 5, end: 11, content: "".to_string(), - hash: None, }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello world!"); @@ -114,7 +129,6 @@ mod tests { start: 5, end: 11, content: " not very pleasant".to_string(), - hash: None, }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello not very pleasant world!"); @@ -126,7 +140,6 @@ mod tests { start: 100, end: 110, content: "a very long string \n which totally matters".to_string(), - hash: None, }; let result = change.apply("a short text"); assert_eq!( @@ -141,7 +154,6 @@ mod tests { start: 42, end: 42, content: "".to_string(), - hash: None, }; let result = change.apply("some important text"); assert_eq!(result, "some important text"); diff --git a/src/api/cursor.rs b/src/api/cursor.rs index 7c1e4b5..93226f5 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -6,17 +6,32 @@ use pyo3::prelude::*; /// User cursor position in a buffer #[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] // #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] pub struct Cursor { - /// Cursor start position in buffer, as 0-indexed row-column tuple. - pub start: (i32, i32), - /// Cursor end position in buffer, as 0-indexed row-column tuple. - #[cfg_attr(feature = "serialize", serde(alias = "finish"))] // Lua uses `end` as keyword - pub end: (i32, i32), + /// User who sent the cursor. + pub user: String, + /// Cursor selection + pub sel: Selection, +} + +/// A cursor selection span, with row-column tuples +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "js", napi_derive::napi(object))] +#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] +#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] +// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] +pub struct Selection { + /// Cursor position starting row in buffer. + pub start_row: i32, + /// Cursor position starting column in buffer. + pub start_col: i32, + /// Cursor position final row in buffer. + pub end_row: i32, + /// Cursor position final column in buffer. + pub end_col: i32, /// Path of buffer this cursor is on. pub buffer: String, - /// User display name, if provided. - pub user: Option, } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 4c55297..e452b2a 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; +use crate::api::change::BufferUpdate; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; @@ -13,34 +14,6 @@ use crate::ext::IgnorableError; use super::worker::DeltaRequest; -/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change -/// application -#[derive(Debug)] -#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass)] -#[cfg_attr(feature = "js", napi_derive::napi)] -pub struct Delta { - /// The change received - pub change: TextChange, - /// The ack handle, must be called after correctly applying this change - pub(crate) ack: BufferAck, -} - -#[derive(Clone, Debug)] -pub(crate) struct BufferAck { - pub(crate) tx: mpsc::UnboundedSender, - pub(crate) version: LocalVersion, -} - -#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pymethods)] -#[cfg_attr(feature = "js", napi_derive::napi)] -impl Delta { - pub fn ack(&mut self) { - self.ack.tx - .send(self.ack.version.clone()) - .unwrap_or_warn("no worker to receive sent ack"); - } -} - /// A [Controller] to asynchronously interact with remote buffers. /// /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync @@ -51,18 +24,26 @@ impl Delta { pub struct BufferController(pub(crate) Arc); impl BufferController { - /// Get the buffer path + /// Get the buffer path. pub fn path(&self) -> &str { &self.0.name } - /// Return buffer whole content, updating internal acknowledgement tracker + /// Return buffer whole content, updating internal acknowledgement tracker. pub async fn content(&self) -> ControllerResult { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; let content = rx.await?; Ok(content) } + + /// Notify CRDT that changes up to the given version have been merged succesfully. + pub fn ack(&mut self, version: Vec) { + let version = version.into_iter().map(|x| usize::from_ne_bytes(x.to_ne_bytes())).collect(); + self.0.ack_tx + .send(version) + .unwrap_or_warn("no worker to receive sent ack"); + } } #[derive(Debug)] @@ -75,10 +56,11 @@ pub(crate) struct BufferControllerInner { pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender, pub(crate) callback: watch::Sender>>, + pub(crate) ack_tx: mpsc::UnboundedSender, } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController {} +impl Controller for BufferController {} impl AsyncSender for BufferController { fn send(&self, op: TextChange) -> ControllerResult<()> { @@ -88,7 +70,7 @@ impl AsyncSender for BufferController { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncReceiver for BufferController { +impl AsyncReceiver for BufferController { async fn poll(&self) -> ControllerResult<()> { if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { return Ok(()); @@ -100,7 +82,7 @@ impl AsyncReceiver for BufferController { Ok(()) } - async fn try_recv(&self) -> ControllerResult> { + async fn try_recv(&self) -> ControllerResult> { let last_update = self.0.local_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone(); diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 610abf1..a021174 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,15 +6,16 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; +use crate::api::change::BufferUpdate; use crate::api::controller::ControllerCallback; use crate::api::TextChange; use crate::ext::IgnorableError; use codemp_proto::buffer::{BufferEvent, Operation}; -use super::controller::{BufferAck, BufferController, BufferControllerInner, Delta}; +use super::controller::{BufferController, BufferControllerInner}; -pub(crate) type DeltaOp = Option; +pub(crate) type DeltaOp = Option; pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); struct BufferWorker { @@ -23,7 +24,6 @@ struct BufferWorker { latest_version: watch::Sender, local_version: watch::Sender, ack_rx: mpsc::UnboundedReceiver, - ack_tx: mpsc::UnboundedSender, ops_in: mpsc::UnboundedReceiver, poller: mpsc::UnboundedReceiver>, pollers: Vec>, @@ -65,6 +65,7 @@ impl BufferController { content_request: req_tx, delta_request: recv_tx, callback: cb_tx, + ack_tx, }); let weak = Arc::downgrade(&controller); @@ -74,7 +75,6 @@ impl BufferController { path: path.to_string(), latest_version: latest_version_tx, local_version: my_version_tx, - ack_tx, ack_rx, ops_in: opin_rx, poller: poller_rx, @@ -240,32 +240,31 @@ impl BufferWorker { { tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); } - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.start() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), + crate::api::change::BufferUpdate { hash, + version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful + change: crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.start() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + } } } - diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), + diamond_types::list::operation::OpKind::Del => crate::api::change::BufferUpdate { hash, - }, - }; - let delta = Delta { - change: tc, - ack: BufferAck { - tx: self.ack_tx.clone(), - version: step_ver, + version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful + change: crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + }, }, }; self.local_version .send(new_local_v) .unwrap_or_warn("could not update local version"); - tx.send(Some(delta)) + tx.send(Some(tc)) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { tx.send(None) diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 63a5483..aa57fe9 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -7,8 +7,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use crate::{ api::{ - controller::{AsyncReceiver, AsyncSender, ControllerCallback}, - Controller, Cursor, + controller::{AsyncReceiver, AsyncSender, ControllerCallback}, cursor::Selection, Controller, Cursor }, errors::ControllerResult, }; @@ -34,25 +33,29 @@ pub(crate) struct CursorControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for CursorController {} +impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncSender for CursorController { - fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { - if cursor.start > cursor.end { - std::mem::swap(&mut cursor.start, &mut cursor.end); +impl AsyncSender for CursorController { + fn send(&self, mut cursor: Selection) -> ControllerResult<()> { + if cursor.start_row > cursor.end_row || ( + cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col + ) { + std::mem::swap(&mut cursor.start_row, &mut cursor.end_row); + std::mem::swap(&mut cursor.start_col, &mut cursor.end_col); } + Ok(self.0.op.send(CursorPosition { buffer: BufferNode { path: cursor.buffer, }, start: RowCol { - row: cursor.start.0, - col: cursor.start.1, + row: cursor.start_row, + col: cursor.start_col, }, end: RowCol { - row: cursor.end.0, - col: cursor.end.1, + row: cursor.end_row, + col: cursor.end_col, }, })?) } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index bd6b35b..91f07d5 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -5,7 +5,7 @@ use tonic::Streaming; use uuid::Uuid; use crate::{ - api::{controller::ControllerCallback, Cursor, User}, + api::{controller::ControllerCallback, cursor::Selection, Cursor, User}, ext::IgnorableError, }; use codemp_proto::cursor::{CursorEvent, CursorPosition}; @@ -86,16 +86,18 @@ impl CursorController { None => break, // clean exit, just weird that we got it here Some(controller) => { tracing::debug!("received cursor from server"); - let mut cursor = Cursor { - buffer: cur.position.buffer.path, - start: (cur.position.start.row, cur.position.start.col), - end: (cur.position.end.row, cur.position.end.col), - user: None, - }; let user_id = Uuid::from(cur.user); - if let Some(user) = worker.map.get(&user_id) { - cursor.user = Some(user.name.clone()); - } + let cursor = Cursor { + user: worker.map.get(&user_id).map(|u| u.name.clone()).unwrap_or_default(), + sel: Selection { + buffer: cur.position.buffer.path, + start_row: cur.position.start.row, + start_col: cur.position.start.col, + end_row: cur.position.end.row, + end_col: cur.position.end.col + } + }; + worker.store.push_back(cursor); for tx in worker.pollers.drain(..) { tx.send(()).unwrap_or_warn("poller dropped before unblocking"); diff --git a/src/prelude.rs b/src/prelude.rs index 0fec44d..f7d6fc1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -5,6 +5,8 @@ pub use crate::api::{ controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, + change::BufferUpdate as CodempBufferUpdate, + cursor::Selection as CodempSelection, }; pub use crate::{