chore: split TextChange and Cursor

so that sending/receiving parts are different rather than Option<?>

Co-authored-by: zaaarf <me@zaaarf.foo>
This commit is contained in:
əlemi 2024-10-10 02:10:02 +02:00 committed by alemi.dev
parent 1f2c0708d6
commit 560a634499
7 changed files with 110 additions and 95 deletions

View file

@ -1,6 +1,25 @@
//! # TextChange //! # TextChange
//! A high-level representation of a change within a given buffer. //! A high-level representation of a change within a given buffer.
/// A [`TextChange`] event happening on a buffer.
///
/// Contains the change itself, the new version after this change and an optional `hash` field.
/// This is used for error correction: if provided, it should match the hash of the buffer
/// content **after** applying this change. Note that the `hash` field will not necessarily
/// be provided every time.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(get_all))]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
pub struct BufferUpdate {
/// Optional content hash after applying this change.
pub hash: Option<i64>,
/// CRDT version after this change has been applied.
pub version: Vec<i64>,
/// The change that has occurred.
pub change: TextChange,
}
/// An editor-friendly representation of a text change in a given buffer. /// An editor-friendly representation of a text change in a given buffer.
/// ///
/// It's expressed with a range of characters and a string of content that should replace them, /// It's expressed with a range of characters and a string of content that should replace them,
@ -9,18 +28,18 @@
/// Bulky and large operations will result in a single [`TextChange`] effectively sending the whole /// Bulky and large operations will result in a single [`TextChange`] effectively sending the whole
/// new buffer, but smaller changes are efficient and easy to create or apply. /// new buffer, but smaller changes are efficient and easy to create or apply.
/// ///
/// [`TextChange`] contains an optional `hash` field. This is used for error correction: if
/// provided, it should match the hash of the buffer content **after** applying this change.
/// Note that the `hash` field will not necessarily be provided every time.
///
/// ### Examples /// ### Examples
/// To insert 'a' after 4th character we should send a. /// To insert 'a' after 4th character we should send a.
/// `TextChange { start: 4, end: 4, content: "a".into(), hash: None }` /// ```
/// TextChange { start: 4, end: 4, content: "a".into(), hash: None }
/// ```
/// ///
/// To delete a the fourth character we should send a. /// To delete a the fourth character we should send a.
/// `TextChange { start: 3, end: 4, content: "".into(), hash: None }` /// ```
/// TextChange { start: 3, end: 4, content: "".into(), hash: None }
/// ```
/// ///
/// ```no_run /// ```
/// let change = codemp::api::TextChange { /// let change = codemp::api::TextChange {
/// start: 6, end: 11, /// start: 6, end: 11,
/// content: "mom".to_string(), hash: None /// content: "mom".to_string(), hash: None
@ -41,8 +60,6 @@ pub struct TextChange {
pub end: u32, pub end: u32,
/// New content of text inside span. /// New content of text inside span.
pub content: String, pub content: String,
/// Optional content hash after applying this change.
pub hash: Option<i64>,
} }
impl TextChange { impl TextChange {
@ -90,7 +107,6 @@ mod tests {
start: 5, start: 5,
end: 5, end: 5,
content: " cruel".to_string(), content: " cruel".to_string(),
hash: None,
}; };
let result = change.apply("hello world!"); let result = change.apply("hello world!");
assert_eq!(result, "hello cruel world!"); assert_eq!(result, "hello cruel world!");
@ -102,7 +118,6 @@ mod tests {
start: 5, start: 5,
end: 11, end: 11,
content: "".to_string(), content: "".to_string(),
hash: None,
}; };
let result = change.apply("hello cruel world!"); let result = change.apply("hello cruel world!");
assert_eq!(result, "hello world!"); assert_eq!(result, "hello world!");
@ -114,7 +129,6 @@ mod tests {
start: 5, start: 5,
end: 11, end: 11,
content: " not very pleasant".to_string(), content: " not very pleasant".to_string(),
hash: None,
}; };
let result = change.apply("hello cruel world!"); let result = change.apply("hello cruel world!");
assert_eq!(result, "hello not very pleasant world!"); assert_eq!(result, "hello not very pleasant world!");
@ -126,7 +140,6 @@ mod tests {
start: 100, start: 100,
end: 110, end: 110,
content: "a very long string \n which totally matters".to_string(), content: "a very long string \n which totally matters".to_string(),
hash: None,
}; };
let result = change.apply("a short text"); let result = change.apply("a short text");
assert_eq!( assert_eq!(
@ -141,7 +154,6 @@ mod tests {
start: 42, start: 42,
end: 42, end: 42,
content: "".to_string(), content: "".to_string(),
hash: None,
}; };
let result = change.apply("some important text"); let result = change.apply("some important text");
assert_eq!(result, "some important text"); assert_eq!(result, "some important text");

View file

@ -6,17 +6,32 @@ use pyo3::prelude::*;
/// User cursor position in a buffer /// User cursor position in a buffer
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)] #[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))] // #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))]
pub struct Cursor { pub struct Cursor {
/// Cursor start position in buffer, as 0-indexed row-column tuple. /// User who sent the cursor.
pub start: (i32, i32), pub user: String,
/// Cursor end position in buffer, as 0-indexed row-column tuple. /// Cursor selection
#[cfg_attr(feature = "serialize", serde(alias = "finish"))] // Lua uses `end` as keyword pub sel: Selection,
pub end: (i32, i32), }
/// A cursor selection span, with row-column tuples
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))]
pub struct Selection {
/// Cursor position starting row in buffer.
pub start_row: i32,
/// Cursor position starting column in buffer.
pub start_col: i32,
/// Cursor position final row in buffer.
pub end_row: i32,
/// Cursor position final column in buffer.
pub end_col: i32,
/// Path of buffer this cursor is on. /// Path of buffer this cursor is on.
pub buffer: String, pub buffer: String,
/// User display name, if provided.
pub user: Option<String>,
} }

View file

@ -6,6 +6,7 @@ use std::sync::Arc;
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::api::change::BufferUpdate;
use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback};
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::ControllerResult; use crate::errors::ControllerResult;
@ -13,34 +14,6 @@ use crate::ext::IgnorableError;
use super::worker::DeltaRequest; use super::worker::DeltaRequest;
/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change
/// application
#[derive(Debug)]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass)]
#[cfg_attr(feature = "js", napi_derive::napi)]
pub struct Delta {
/// The change received
pub change: TextChange,
/// The ack handle, must be called after correctly applying this change
pub(crate) ack: BufferAck,
}
#[derive(Clone, Debug)]
pub(crate) struct BufferAck {
pub(crate) tx: mpsc::UnboundedSender<LocalVersion>,
pub(crate) version: LocalVersion,
}
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pymethods)]
#[cfg_attr(feature = "js", napi_derive::napi)]
impl Delta {
pub fn ack(&mut self) {
self.ack.tx
.send(self.ack.version.clone())
.unwrap_or_warn("no worker to receive sent ack");
}
}
/// A [Controller] to asynchronously interact with remote buffers. /// A [Controller] to asynchronously interact with remote buffers.
/// ///
/// Each buffer controller internally tracks the last acknowledged state, remaining always in sync /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync
@ -51,18 +24,26 @@ impl Delta {
pub struct BufferController(pub(crate) Arc<BufferControllerInner>); pub struct BufferController(pub(crate) Arc<BufferControllerInner>);
impl BufferController { impl BufferController {
/// Get the buffer path /// Get the buffer path.
pub fn path(&self) -> &str { pub fn path(&self) -> &str {
&self.0.name &self.0.name
} }
/// Return buffer whole content, updating internal acknowledgement tracker /// Return buffer whole content, updating internal acknowledgement tracker.
pub async fn content(&self) -> ControllerResult<String> { pub async fn content(&self) -> ControllerResult<String> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.content_request.send(tx).await?; self.0.content_request.send(tx).await?;
let content = rx.await?; let content = rx.await?;
Ok(content) Ok(content)
} }
/// Notify CRDT that changes up to the given version have been merged succesfully.
pub fn ack(&mut self, version: Vec<i64>) {
let version = version.into_iter().map(|x| usize::from_ne_bytes(x.to_ne_bytes())).collect();
self.0.ack_tx
.send(version)
.unwrap_or_warn("no worker to receive sent ack");
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -75,10 +56,11 @@ pub(crate) struct BufferControllerInner {
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<DeltaRequest>, pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
pub(crate) ack_tx: mpsc::UnboundedSender<LocalVersion>,
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<TextChange, Delta> for BufferController {} impl Controller<TextChange, BufferUpdate> for BufferController {}
impl AsyncSender<TextChange> for BufferController { impl AsyncSender<TextChange> for BufferController {
fn send(&self, op: TextChange) -> ControllerResult<()> { fn send(&self, op: TextChange) -> ControllerResult<()> {
@ -88,7 +70,7 @@ impl AsyncSender<TextChange> for BufferController {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<Delta> for BufferController { impl AsyncReceiver<BufferUpdate> for BufferController {
async fn poll(&self) -> ControllerResult<()> { async fn poll(&self) -> ControllerResult<()> {
if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { if *self.0.local_version.borrow() != *self.0.latest_version.borrow() {
return Ok(()); return Ok(());
@ -100,7 +82,7 @@ impl AsyncReceiver<Delta> for BufferController {
Ok(()) Ok(())
} }
async fn try_recv(&self) -> ControllerResult<Option<Delta>> { async fn try_recv(&self) -> ControllerResult<Option<BufferUpdate>> {
let last_update = self.0.local_version.borrow().clone(); let last_update = self.0.local_version.borrow().clone();
let latest_version = self.0.latest_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone();

View file

@ -6,15 +6,16 @@ use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
use crate::api::change::BufferUpdate;
use crate::api::controller::ControllerCallback; use crate::api::controller::ControllerCallback;
use crate::api::TextChange; use crate::api::TextChange;
use crate::ext::IgnorableError; use crate::ext::IgnorableError;
use codemp_proto::buffer::{BufferEvent, Operation}; use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::{BufferAck, BufferController, BufferControllerInner, Delta}; use super::controller::{BufferController, BufferControllerInner};
pub(crate) type DeltaOp = Option<Delta>; pub(crate) type DeltaOp = Option<BufferUpdate>;
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>); pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
struct BufferWorker { struct BufferWorker {
@ -23,7 +24,6 @@ struct BufferWorker {
latest_version: watch::Sender<diamond_types::LocalVersion>, latest_version: watch::Sender<diamond_types::LocalVersion>,
local_version: watch::Sender<diamond_types::LocalVersion>, local_version: watch::Sender<diamond_types::LocalVersion>,
ack_rx: mpsc::UnboundedReceiver<LocalVersion>, ack_rx: mpsc::UnboundedReceiver<LocalVersion>,
ack_tx: mpsc::UnboundedSender<LocalVersion>,
ops_in: mpsc::UnboundedReceiver<TextChange>, ops_in: mpsc::UnboundedReceiver<TextChange>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
@ -65,6 +65,7 @@ impl BufferController {
content_request: req_tx, content_request: req_tx,
delta_request: recv_tx, delta_request: recv_tx,
callback: cb_tx, callback: cb_tx,
ack_tx,
}); });
let weak = Arc::downgrade(&controller); let weak = Arc::downgrade(&controller);
@ -74,7 +75,6 @@ impl BufferController {
path: path.to_string(), path: path.to_string(),
latest_version: latest_version_tx, latest_version: latest_version_tx,
local_version: my_version_tx, local_version: my_version_tx,
ack_tx,
ack_rx, ack_rx,
ops_in: opin_rx, ops_in: opin_rx,
poller: poller_rx, poller: poller_rx,
@ -240,32 +240,31 @@ impl BufferWorker {
{ {
tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)");
} }
crate::api::change::TextChange { crate::api::change::BufferUpdate {
hash,
version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful
change: crate::api::change::TextChange {
start: dtop.start() as u32, start: dtop.start() as u32,
end: dtop.start() as u32, end: dtop.start() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(), content: dtop.content_as_str().unwrap_or_default().to_string(),
hash, }
} }
} }
diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { diamond_types::list::operation::OpKind::Del => crate::api::change::BufferUpdate {
hash,
version: step_ver.into_iter().map(|x| i64::from_ne_bytes(x.to_ne_bytes())).collect(), // TODO this is wasteful
change: crate::api::change::TextChange {
start: dtop.start() as u32, start: dtop.start() as u32,
end: dtop.end() as u32, end: dtop.end() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(), content: dtop.content_as_str().unwrap_or_default().to_string(),
hash,
}, },
};
let delta = Delta {
change: tc,
ack: BufferAck {
tx: self.ack_tx.clone(),
version: step_ver,
}, },
}; };
self.local_version self.local_version
.send(new_local_v) .send(new_local_v)
.unwrap_or_warn("could not update local version"); .unwrap_or_warn("could not update local version");
tx.send(Some(delta)) tx.send(Some(tc))
.unwrap_or_warn("could not update ops channel -- is controller dead?"); .unwrap_or_warn("could not update ops channel -- is controller dead?");
} else { } else {
tx.send(None) tx.send(None)

View file

@ -7,8 +7,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use crate::{ use crate::{
api::{ api::{
controller::{AsyncReceiver, AsyncSender, ControllerCallback}, controller::{AsyncReceiver, AsyncSender, ControllerCallback}, cursor::Selection, Controller, Cursor
Controller, Cursor,
}, },
errors::ControllerResult, errors::ControllerResult,
}; };
@ -34,25 +33,29 @@ pub(crate) struct CursorControllerInner {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<Cursor> for CursorController {} impl Controller<Selection, Cursor> for CursorController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<Cursor> for CursorController { impl AsyncSender<Selection> for CursorController {
fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { fn send(&self, mut cursor: Selection) -> ControllerResult<()> {
if cursor.start > cursor.end { if cursor.start_row > cursor.end_row || (
std::mem::swap(&mut cursor.start, &mut cursor.end); cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col
) {
std::mem::swap(&mut cursor.start_row, &mut cursor.end_row);
std::mem::swap(&mut cursor.start_col, &mut cursor.end_col);
} }
Ok(self.0.op.send(CursorPosition { Ok(self.0.op.send(CursorPosition {
buffer: BufferNode { buffer: BufferNode {
path: cursor.buffer, path: cursor.buffer,
}, },
start: RowCol { start: RowCol {
row: cursor.start.0, row: cursor.start_row,
col: cursor.start.1, col: cursor.start_col,
}, },
end: RowCol { end: RowCol {
row: cursor.end.0, row: cursor.end_row,
col: cursor.end.1, col: cursor.end_col,
}, },
})?) })?)
} }

View file

@ -5,7 +5,7 @@ use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
api::{controller::ControllerCallback, Cursor, User}, api::{controller::ControllerCallback, cursor::Selection, Cursor, User},
ext::IgnorableError, ext::IgnorableError,
}; };
use codemp_proto::cursor::{CursorEvent, CursorPosition}; use codemp_proto::cursor::{CursorEvent, CursorPosition};
@ -86,16 +86,18 @@ impl CursorController {
None => break, // clean exit, just weird that we got it here None => break, // clean exit, just weird that we got it here
Some(controller) => { Some(controller) => {
tracing::debug!("received cursor from server"); tracing::debug!("received cursor from server");
let mut cursor = Cursor {
buffer: cur.position.buffer.path,
start: (cur.position.start.row, cur.position.start.col),
end: (cur.position.end.row, cur.position.end.col),
user: None,
};
let user_id = Uuid::from(cur.user); let user_id = Uuid::from(cur.user);
if let Some(user) = worker.map.get(&user_id) { let cursor = Cursor {
cursor.user = Some(user.name.clone()); user: worker.map.get(&user_id).map(|u| u.name.clone()).unwrap_or_default(),
sel: Selection {
buffer: cur.position.buffer.path,
start_row: cur.position.start.row,
start_col: cur.position.start.col,
end_row: cur.position.end.row,
end_col: cur.position.end.col
} }
};
worker.store.push_back(cursor); worker.store.push_back(cursor);
for tx in worker.pollers.drain(..) { for tx in worker.pollers.drain(..) {
tx.send(()).unwrap_or_warn("poller dropped before unblocking"); tx.send(()).unwrap_or_warn("poller dropped before unblocking");

View file

@ -5,6 +5,8 @@ pub use crate::api::{
controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender,
Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor,
Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser,
change::BufferUpdate as CodempBufferUpdate,
cursor::Selection as CodempSelection,
}; };
pub use crate::{ pub use crate::{