diff --git a/Cargo.toml b/Cargo.toml index 4ea1276..b08e72c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,17 +12,15 @@ crate-type = ["cdylib"] tracing = "0.1" thiserror = "1.0" async-trait = "0.1" -# woot -codemp-woot = { git = "ssh://git@github.com/hexedtech/woot.git", features = ["serde"], tag = "v0.1.2" } -diamond-types="1.0" +# crdt +diamond-types = "1.0" # proto codemp-proto = { git = "ssh://git@github.com/hexedtech/codemp-proto.git", tag = "v0.6.1" } uuid = { version = "1.7", features = ["v4"] } tonic = { version = "0.11", features = ["tls", "tls-roots"] } -postcard = "1.0" # api -similar = { version = "2.2", features = ["inline"] } tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"] } +xxhash-rust = { version = "0.8", features = ["xxh3"] } # client tokio-stream = "0.1" dashmap = "5.5" diff --git a/src/api/change.rs b/src/api/change.rs index 4278d84..8a6c33c 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -3,13 +3,6 @@ //! an editor-friendly representation of a text change in a buffer //! to easily interface with codemp from various editors -/// an atomic and orderable operation -/// -/// this under the hood thinly wraps our CRDT operation -#[derive(Debug, Clone)] -pub struct Op(pub(crate) diamond_types::list::operation::Operation); -// Do we need this in the api? why not just have a TextChange, which already covers as operation. - /// an editor-friendly representation of a text change in a buffer /// /// this represent a range in the previous state of the string and a new content which should be @@ -25,7 +18,6 @@ pub struct Op(pub(crate) diamond_types::list::operation::Operation); /// to delete a the fourth character we should send a /// `TextChange { span: 3..4, content: "".into() }` /// - #[derive(Clone, Debug, Default)] #[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(feature = "python", pyo3::pyclass)] @@ -37,6 +29,8 @@ pub struct TextChange { pub end: u32, /// new content of text inside span pub content: String, + /// optional content hash after applying this change + pub hash: Option, } impl TextChange { @@ -58,93 +52,7 @@ impl TextChange { pub fn is_empty(&self) -> bool { !self.is_delete() && !self.is_insert() } -} - -/* -impl TextChange { - /// create a new TextChange from the difference of given strings - pub fn from_diff(before: &str, after: &str) -> TextChange { - let diff = similar::TextDiff::from_chars(before, after); - let mut start = 0; - let mut end = 0; - let mut from_beginning = true; - for op in diff.ops() { - match op { - similar::DiffOp::Equal { len, .. } => { - if from_beginning { - start += len - } else { - end += len - } - } - _ => { - end = 0; - from_beginning = false; - } - } - } - let end_before = before.len() - end; - let end_after = after.len() - end; - - TextChange { - start: start as u32, - end: end_before as u32, - content: after[start..end_after].to_string(), - } - } - - pub fn span(&self) -> std::ops::Range { - self.start as usize..self.end as usize - } - - /// consume the [TextChange], transforming it into a Vec of [Op] - pub fn transform(&self, woot: &Woot) -> WootResult> { - let mut out = Vec::new(); - if self.is_empty() { - return Ok(out); - } // no-op - let view = woot.view(); - let Some(span) = view.get(self.span()) else { - return Err(crate::woot::WootError::OutOfBounds); - }; - let diff = similar::TextDiff::from_chars(span, &self.content); - for (i, diff) in diff.iter_all_changes().enumerate() { - match diff.tag() { - similar::ChangeTag::Equal => {} - similar::ChangeTag::Delete => match woot.delete_one(self.span().start + i) { - Err(e) => tracing::error!("could not create deletion: {}", e), - Ok(op) => out.push(Op(op)), - }, - similar::ChangeTag::Insert => { - match woot.insert(self.span().start + i, diff.value()) { - Ok(ops) => { - for op in ops { - out.push(Op(op)) - } - } - Err(e) => tracing::error!("could not create insertion: {}", e), - } - } - } - } - Ok(out) - } - - /// returns true if this TextChange deletes existing text - pub fn is_deletion(&self) -> bool { - !self.span().is_empty() - } - - /// returns true if this TextChange adds new text - pub fn is_addition(&self) -> bool { - !self.content.is_empty() - } - - /// returns true if this TextChange is effectively as no-op - pub fn is_empty(&self) -> bool { - !self.is_deletion() && !self.is_addition() - } - + /// applies this text change to given text, returning a new string pub fn apply(&self, txt: &str) -> String { let pre_index = std::cmp::min(self.span().start, txt.len()); @@ -152,55 +60,17 @@ impl TextChange { let post = txt.get(self.span().end..).unwrap_or("").to_string(); format!("{}{}{}", pre, self.content, post) } - - /// convert from byte index to row and column - /// txt must be the whole content of the buffer, in order to count lines - pub fn index_to_rowcol(txt: &str, index: usize) -> codemp_proto::cursor::RowCol { - // FIXME might panic, use .get() - let row = txt[..index].matches('\n').count() as i32; - let col = txt[..index].split('\n').last().unwrap_or("").len() as i32; - codemp_proto::cursor::RowCol { row, col } - } } #[cfg(test)] mod tests { - #[test] - fn textchange_diff_works_for_deletions() { - let change = super::TextChange::from_diff( - "sphinx of black quartz, judge my vow", - "sphinx of quartz, judge my vow", - ); - assert_eq!(change.span(), 10..16); - assert_eq!(change.content, ""); - } - - #[test] - fn textchange_diff_works_for_insertions() { - let change = super::TextChange::from_diff( - "sphinx of quartz, judge my vow", - "sphinx of black quartz, judge my vow", - ); - assert_eq!(change.span(), 10..10); - assert_eq!(change.content, "black "); - } - - #[test] - fn textchange_diff_works_for_changes() { - let change = super::TextChange::from_diff( - "sphinx of black quartz, judge my vow", - "sphinx who watches the desert, judge my vow", - ); - assert_eq!(change.span(), 7..22); - assert_eq!(change.content, "who watches the desert"); - } - #[test] fn textchange_apply_works_for_insertions() { let change = super::TextChange { start: 5, end: 5, content: " cruel".to_string(), + hash: None }; let result = change.apply("hello world!"); assert_eq!(result, "hello cruel world!"); @@ -212,6 +82,7 @@ mod tests { start: 5, end: 11, content: "".to_string(), + hash: None }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello world!"); @@ -223,6 +94,7 @@ mod tests { start: 5, end: 11, content: " not very pleasant".to_string(), + hash: None }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello not very pleasant world!"); @@ -234,6 +106,7 @@ mod tests { start: 100, end: 110, content: "a very long string \n which totally matters".to_string(), + hash: None }; let result = change.apply("a short text"); assert_eq!( @@ -242,31 +115,15 @@ mod tests { ); } - #[test] - fn empty_diff_produces_empty_textchange() { - let change = super::TextChange::from_diff("same \n\n text", "same \n\n text"); - assert!(change.is_empty()); - } - #[test] fn empty_textchange_doesnt_alter_buffer() { let change = super::TextChange { start: 42, end: 42, content: "".to_string(), + hash: None }; let result = change.apply("some important text"); assert_eq!(result, "some important text"); } -}*/ - -// TODO: properly implement this for diamond types directly -impl From for TextChange { - fn from(value: Op) -> Self { - Self { - start: value.0.start() as u32, - end: value.0.end() as u32, - content: value.0.content_as_str().unwrap_or_default().to_string(), - } - } } diff --git a/src/api/controller.rs b/src/api/controller.rs index 10509ff..65fd49c 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -37,12 +37,12 @@ pub trait Controller : Sized + Send + Sync { /// /// `async fn recv(&self) -> codemp::Result;` async fn recv(&self) -> Result { - if let Some(x) = self.try_recv()? { - return Ok(x); // short circuit if already available + loop { + self.poll().await?; + if let Some(x) = self.try_recv().await? { + break Ok(x); + } } - - self.poll().await?; - Ok(self.try_recv()?.expect("no message available after polling")) } /// block until next value is available without consuming it @@ -53,7 +53,7 @@ pub trait Controller : Sized + Send + Sync { async fn poll(&self) -> Result<()>; /// attempt to receive a value without blocking, return None if nothing is available - fn try_recv(&self) -> Result>; + async fn try_recv(&self) -> Result>; /// stop underlying worker /// diff --git a/src/api/mod.rs b/src/api/mod.rs index b696632..a52da0a 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -21,7 +21,6 @@ pub mod user; pub use controller::Controller; pub use change::TextChange; -pub use change::Op; pub use cursor::Cursor; pub use event::Event; pub use user::User; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index da3db6e..3250e7e 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -5,16 +5,13 @@ use std::sync::Arc; use diamond_types::LocalVersion; -use tokio::sync::{oneshot, Mutex}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{oneshot, mpsc, watch}; use tonic::async_trait; use crate::api::Controller; use crate::api::TextChange; -use crate::api::Op; - use crate::ext::InternallyMutable; /// the buffer controller implementation @@ -45,39 +42,14 @@ impl BufferController { #[derive(Debug)] pub(crate) struct BufferControllerInner { - name: String, - latest_version: watch::Receiver, - last_update: InternallyMutable, - ops_in: mpsc::UnboundedSender, - ops_out: Mutex)>>, - poller: mpsc::UnboundedSender>, - stopper: mpsc::UnboundedSender<()>, // just exist - content_request: mpsc::Sender>, -} - -impl BufferControllerInner { - pub(crate) fn new( - name: String, - latest_version: watch::Receiver, - ops_in: mpsc::UnboundedSender, - ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option)>, - poller: mpsc::UnboundedSender>, - stopper: mpsc::UnboundedSender<()>, - content_request: mpsc::Sender>, - // TODO we're getting too much stuff via constructor, maybe make everything pub(crate) - // instead?? or maybe builder, or maybe defaults - ) -> Self { - Self { - name, - latest_version, - last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), - ops_in, - ops_out: Mutex::new(ops_out), - poller, - stopper, - content_request, - } - } + pub(crate) name: String, + pub(crate) latest_version: watch::Receiver, + pub(crate) last_update: InternallyMutable, + pub(crate) ops_in: mpsc::UnboundedSender, + pub(crate) poller: mpsc::UnboundedSender>, + pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist + pub(crate) content_request: mpsc::Sender>, + pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, } #[async_trait] @@ -85,9 +57,6 @@ impl Controller for BufferController { /// block until a text change is available /// this returns immediately if one is already available async fn poll(&self) -> crate::Result<()> { - // TODO there might be some extra logic we can play with using `seen` and `not seen` yet - // mechanics, not just the comparison. nevermind, the `has_changed` etc stuff needs mut self, yuk. - if self.0.last_update.get() != *self.0.latest_version.borrow() { return Ok(()); } @@ -100,7 +69,7 @@ impl Controller for BufferController { } /// if a text change is available, return it immediately - fn try_recv(&self) -> crate::Result> { + async fn try_recv(&self) -> crate::Result> { let last_update = self.0.last_update.get(); let latest_version = self.0.latest_version.borrow().clone(); @@ -108,29 +77,11 @@ impl Controller for BufferController { return Ok(None); } - match self.0.ops_out.try_lock() { - Err(_) => Ok(None), - Ok(mut ops) => match ops.try_recv() { - Ok((lv, Some(op))) => { - self.0.last_update.set(lv); - Ok(Some(TextChange::from(op))) - }, - Ok((_lv, None)) => Ok(None), // TODO what is going on here? - Err(mpsc::error::TryRecvError::Empty) => Ok(None), - Err(mpsc::error::TryRecvError::Disconnected) => - Err(crate::Error::Channel { send: false }), - }, - } - } - - /// block until a new text change is available, and return it - async fn recv(&self) -> crate::Result { - if let Some((lv, Some(op))) = self.0.ops_out.lock().await.recv().await { - self.0.last_update.set(lv); - Ok(TextChange::from(op)) - } else { - Err(crate::Error::Channel { send: false }) - } + let (tx, rx) = oneshot::channel(); + self.0.delta_request.send((last_update, tx)).await?; + let (v, change) = rx.await?; + self.0.last_update.set(v); + Ok(Some(change)) } /// enqueue a text change for processing diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index b87e682..0d67e31 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,10 +6,10 @@ use tonic::{async_trait, Streaming}; use uuid::Uuid; use crate::api::controller::ControllerWorker; -use crate::api::Op; use crate::api::TextChange; use crate::errors::IgnorableError; +use crate::ext::InternallyMutable; use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; @@ -18,10 +18,10 @@ pub(crate) struct BufferWorker { user_id: Uuid, latest_version: watch::Sender, ops_in: mpsc::UnboundedReceiver, - ops_out: mpsc::UnboundedSender<(LocalVersion, Option)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, + delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, stop: mpsc::UnboundedReceiver<()>, controller: BufferController, } @@ -32,34 +32,35 @@ impl BufferWorker { let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); let (opin_tx, opin_rx) = mpsc::unbounded_channel(); - let (opout_tx, opout_rx) = mpsc::unbounded_channel(); let (req_tx, req_rx) = mpsc::channel(1); + let (recv_tx, recv_rx) = mpsc::channel(1); let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); - let controller = BufferControllerInner::new( - path.to_string(), - latest_version_rx, - opin_tx, - opout_rx, - poller_tx, - end_tx, - req_tx, - ); + let controller = BufferControllerInner { + name: path.to_string(), + latest_version: latest_version_rx, + last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), + ops_in: opin_tx, + poller: poller_tx, + stopper: end_tx, + content_request: req_tx, + delta_request: recv_tx, + }; BufferWorker { user_id, latest_version: latest_version_tx, ops_in: opin_rx, - ops_out: opout_tx, poller: poller_rx, pollers: Vec::new(), stop: end_rx, controller: BufferController(Arc::new(controller)), content_checkout: req_rx, + delta_req: recv_rx, } } } @@ -77,6 +78,7 @@ impl ControllerWorker for BufferWorker { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { let mut branch = diamond_types::list::Branch::new(); let mut oplog = diamond_types::list::OpLog::new(); + let mut timer = Timer::new(10); // TODO configurable!! loop { // block until one of these is ready tokio::select! { @@ -112,31 +114,15 @@ impl ControllerWorker for BufferWorker { }, }, - // received a message from server + // received a message from server: add to oplog and update latest version (+unlock pollers) res = rx.message() => match res { Err(_e) => break, Ok(None) => break, Ok(Some(change)) => { - let last_ver = oplog.local_version(); match oplog.decode_and_add(&change.op.data) { Ok(local_version) => { - // give all the changes needed to the controller in a channel. - for (lv, dtop) in oplog.iter_xf_operations_from(&last_ver, &local_version) { - if let Some(dtop) = dtop { - // x.0.start should always be after lastver! - // this step_ver will be the version after we apply the operation - // we give it to the controller so that he knows where it's at. - let step_ver = oplog.version_union(&[lv.start], &last_ver); - let opout = (step_ver, Some(Op(dtop))); - - self.ops_out.send(opout).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } - } - - // finally we send the self.latest_version.send(local_version) .unwrap_or_warn("failed to update latest version!"); - for tx in self.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); } @@ -146,8 +132,35 @@ impl ControllerWorker for BufferWorker { }, }, + // controller is ready to apply change and recv(), calculate it and send it back + res = self.delta_req.recv() => match res { + None => break tracing::error!("no more active controllers: can't send changes"), + Some((last_ver, tx)) => { + if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { + // x.0.start should always be after lastver! + // this step_ver will be the version after we apply the operation + // we give it to the controller so that he knows where it's at. + let step_ver = oplog.version_union(&[lv.start], &last_ver); + + branch.merge(&oplog, oplog.local_version_ref()); + let hash = if timer.step() { + let hash = xxhash_rust::xxh3::xxh3_64(branch.content().to_string().as_bytes()); + Some(i64::from_ne_bytes(hash.to_ne_bytes())) + } else { None }; + let tc = crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash + }; + tx.send((step_ver, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } + }, + }, + + // received a request for full CRDT content res = self.content_checkout.recv() => match res { - None => break tracing::error!("no more active controllers"), + None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { branch.merge(&oplog, oplog.local_version_ref()); let content = branch.content().to_string(); @@ -158,3 +171,17 @@ impl ControllerWorker for BufferWorker { } } } + +struct Timer(u32, u32); +impl Timer { + fn new(period: u32) -> Self { Timer(0, period) } + fn step(&mut self) -> bool { + self.0 += 1; + if self.0 >= self.1 { + self.0 = 0; + true + } else { + false + } + } +} diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 3b1253f..a9cb6b4 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -3,13 +3,7 @@ //! a controller implementation for cursor actions use std::sync::Arc; -use tokio::sync::{ - broadcast::{ - self, - error::{RecvError, TryRecvError}, - }, - mpsc, watch, Mutex, -}; +use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex}; use tonic::async_trait; use crate::api::{Controller, Cursor}; @@ -66,36 +60,15 @@ impl Controller for CursorController { } /// try to receive without blocking, but will still block on stream mutex - fn try_recv(&self) -> crate::Result> { - match self.0.stream.try_lock() { - Err(_) => Ok(None), - Ok(mut stream) => match stream.try_recv() { - Ok(x) => Ok(Some(x.into())), - Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), - Err(TryRecvError::Lagged(n)) => { - tracing::warn!("cursor channel lagged, skipping {} events", n); - Ok(stream.try_recv().map(|x| x.into()).ok()) - } - } - } - } - - // TODO is this cancelable? so it can be used in tokio::select! - // TODO is the result type overkill? should be an option? - /// get next cursor event from current workspace, or block until one is available - async fn recv(&self) -> crate::Result { + async fn try_recv(&self) -> crate::Result> { let mut stream = self.0.stream.lock().await; - match stream.recv().await { - Ok(x) => Ok(x.into()), - Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), - Err(RecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Ok(stream - .recv() - .await - .expect("could not receive after lagging") - .into()) + match stream.try_recv() { + Ok(x) => Ok(Some(x.into())), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), + Err(TryRecvError::Lagged(n)) => { + tracing::warn!("cursor channel lagged, skipping {} events", n); + Ok(stream.try_recv().map(|x| x.into()).ok()) } } } diff --git a/src/ffi/js/mod.rs b/src/ffi/js/mod.rs index ed23845..e64036e 100644 --- a/src/ffi/js/mod.rs +++ b/src/ffi/js/mod.rs @@ -1,5 +1,3 @@ -#![deny(clippy::all)] - pub mod client; pub mod workspace; pub mod cursor; @@ -67,4 +65,4 @@ impl std::io::Write for JsLoggerProducer { } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index 99390df..47107bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -143,9 +143,6 @@ pub mod ffi; /// common utils used in this library and re-exposed pub mod ext; -/// underlying OperationalTransform library used, re-exported -pub use woot; - pub use errors::Error; pub use errors::Result; diff --git a/src/prelude.rs b/src/prelude.rs index a90c04f..2c66a29 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -11,7 +11,6 @@ pub use crate::api::{ Controller as CodempController, TextChange as CodempTextChange, Cursor as CodempCursor, - Op as CodempOp, User as CodempUser, Event as CodempEvent, };