diff --git a/Cargo.toml b/Cargo.toml index 9307a29..42c8980 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ thiserror = "1.0" async-trait = "0.1" # woot codemp-woot = { git = "ssh://git@github.com/hexedtech/woot.git", features = ["serde"], tag = "v0.1.2" } +diamond-types="1.0" # proto codemp-proto = { git = "ssh://git@github.com/hexedtech/codemp-proto.git", tag = "v0.6.1" } uuid = { version = "1.7", features = ["v4"] } diff --git a/src/api/change.rs b/src/api/change.rs index 9754f8a..4278d84 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -3,17 +3,12 @@ //! an editor-friendly representation of a text change in a buffer //! to easily interface with codemp from various editors -use crate::woot::{ - crdt::{TextEditor, CRDT}, - woot::Woot, - WootResult, -}; - /// an atomic and orderable operation /// /// this under the hood thinly wraps our CRDT operation #[derive(Debug, Clone)] -pub struct Op(pub(crate) woot::crdt::Op); +pub struct Op(pub(crate) diamond_types::list::operation::Operation); +// Do we need this in the api? why not just have a TextChange, which already covers as operation. /// an editor-friendly representation of a text change in a buffer /// @@ -44,6 +39,28 @@ pub struct TextChange { pub content: String, } +impl TextChange { + pub fn span(&self) -> std::ops::Range { + self.start as usize..self.end as usize + } + + /// returns true if this TextChange deletes existing text + pub fn is_delete(&self) -> bool { + self.start < self.end + } + + /// returns true if this TextChange adds new text + pub fn is_insert(&self) -> bool { + !self.content.is_empty() + } + + /// returns true if this TextChange is effectively as no-op + pub fn is_empty(&self) -> bool { + !self.is_delete() && !self.is_insert() + } +} + +/* impl TextChange { /// create a new TextChange from the difference of given strings pub fn from_diff(before: &str, after: &str) -> TextChange { @@ -241,4 +258,15 @@ mod tests { let result = change.apply("some important text"); assert_eq!(result, "some important text"); } +}*/ + +// TODO: properly implement this for diamond types directly +impl From for TextChange { + fn from(value: Op) -> Self { + Self { + start: value.0.start() as u32, + end: value.0.end() as u32, + content: value.0.content_as_str().unwrap_or_default().to_string(), + } + } } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 057bda6..e751bbb 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use diamond_types::LocalVersion; use tokio::sync::oneshot; use tokio::sync::{mpsc, watch}; use tonic::async_trait; @@ -12,6 +13,8 @@ use crate::api::Controller; use crate::api::TextChange; +use crate::api::Op; + use crate::ext::InternallyMutable; /// the buffer controller implementation @@ -34,17 +37,20 @@ impl BufferController { /// return buffer whole content, updating internal buffer previous state pub fn content(&self) -> String { - self.0.seen.set(self.0.content.borrow().clone()); - self.0.content.borrow().clone() + // this function either needs a ref to the worker + // or needs to basically mirror all the operations that go through it. + // yikes. + todo!() // TODO ouch } } #[derive(Debug)] pub(crate) struct BufferControllerInner { name: String, - content: watch::Receiver, - seen: InternallyMutable, // internal buffer previous state - operations: mpsc::UnboundedSender, + latest_version: watch::Receiver, + last_update: InternallyMutable, + ops_in: mpsc::UnboundedSender, + ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option)>, poller: mpsc::UnboundedSender>, stopper: mpsc::UnboundedSender<()>, // just exist } @@ -52,18 +58,20 @@ pub(crate) struct BufferControllerInner { impl BufferControllerInner { pub(crate) fn new( name: String, - content: watch::Receiver, - operations: mpsc::UnboundedSender, + latest_version: watch::Receiver, + ops_in: mpsc::UnboundedSender, + ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option)>, poller: mpsc::UnboundedSender>, - stop: mpsc::UnboundedSender<()>, + stopper: mpsc::UnboundedSender<()>, ) -> Self { Self { name, - content, - operations, + latest_version, + last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), + ops_in, + ops_out, poller, - seen: InternallyMutable::default(), - stopper: stop, + stopper, } } } @@ -73,9 +81,13 @@ impl Controller for BufferController { /// block until a text change is available /// this returns immediately if one is already available async fn poll(&self) -> crate::Result<()> { - if self.0.seen.get() != *self.0.content.borrow() { - return Ok(()); // short circuit: already available! + // TODO there might be some extra logic we can play with using `seen` and `not seen` yet + // mechanics, not just the comparison. nevermind, the `has_changed` etc stuff needs mut self, yuk. + + if self.0.last_update.get() != *self.0.latest_version.borrow() { + return Ok(()); } + let (tx, rx) = oneshot::channel::<()>(); self.0.poller.send(tx)?; rx.await @@ -85,32 +97,40 @@ impl Controller for BufferController { /// if a text change is available, return it immediately fn try_recv(&self) -> crate::Result> { - let seen = self.0.seen.get(); - let actual = self.0.content.borrow().clone(); - if seen == actual { + let last_update = self.0.last_update.get(); + let latest_version = self.0.latest_version.borrow().clone(); + + if last_update == latest_version { return Ok(None); } - let change = TextChange::from_diff(&seen, &actual); - self.0.seen.set(actual); - Ok(Some(change)) + + if let Ok((lv, Some(op))) = self.0.ops_out.try_recv() { + // if the current change has + self.0.last_update.set(lv); + return Ok(Some(TextChange::from(op))); + } + + return Err(crate::Error::Channel { send: false }); } /// block until a new text change is available, and return it async fn recv(&self) -> crate::Result { - self.poll().await?; - let seen = self.0.seen.get(); - let actual = self.0.content.borrow().clone(); - let change = TextChange::from_diff(&seen, &actual); - self.0.seen.set(actual); - Ok(change) + let last_update = self.0.last_update.get(); + + // no need to poll here? as soon as we have new changes we return them! + if let Some((lv, Some(op))) = self.0.ops_out.recv().await { + self.0.last_update.set(lv); + Ok(TextChange::from(op)) + } else { + Err(crate::Error::Channel { send: false }) + } } /// enqueue a text change for processing /// this also updates internal buffer previous state fn send(&self, op: TextChange) -> crate::Result<()> { - let before = self.0.seen.get(); - self.0.seen.set(op.apply(&before)); - Ok(self.0.operations.send(op)?) + // we let the worker do the updating to the last version and send it back. + Ok(self.0.ops_in.send(op)?) } fn stop(&self) -> bool { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 8fb0b0c..be4f37a 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -2,24 +2,26 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use tokio::sync::{watch, mpsc, oneshot}; +use diamond_types::LocalVersion; +use tokio::sync::{mpsc, oneshot, watch}; use tonic::{async_trait, Streaming}; use uuid::Uuid; -use woot::crdt::CRDT; -use woot::woot::Woot; + +use crate::api::controller::ControllerWorker; +use crate::api::Op; +use crate::api::TextChange; use crate::errors::IgnorableError; -use crate::api::controller::ControllerWorker; -use crate::api::TextChange; use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; pub(crate) struct BufferWorker { - _user_id: Uuid, - buffer: Woot, - content: watch::Sender, - operations: mpsc::UnboundedReceiver, + user_id: Uuid, + buffer: diamond_types::list::ListCRDT, + latest_version: watch::Sender, + ops_in: mpsc::UnboundedReceiver, + ops_out: mpsc::UnboundedSender<(LocalVersion, Option)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, stop: mpsc::UnboundedReceiver<()>, @@ -28,25 +30,37 @@ pub(crate) struct BufferWorker { impl BufferWorker { pub fn new(user_id: Uuid, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel("".to_string()); - let (op_tx, op_rx) = mpsc::unbounded_channel(); - let (end_tx, end_rx) = mpsc::unbounded_channel(); + //let (txt_tx, txt_rx) = watch::channel("".to_string()); + let init = diamond_types::LocalVersion::default(); + let buffer = diamond_types::list::ListCRDT::default(); + + let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); + let (opin_tx, opin_rx) = mpsc::unbounded_channel(); + let (opout_tx, opout_rx) = mpsc::unbounded_channel(); + let (poller_tx, poller_rx) = mpsc::unbounded_channel(); + let mut hasher = DefaultHasher::new(); user_id.hash(&mut hasher); - let site_id = hasher.finish() as usize; + let _site_id = hasher.finish() as usize; + + let (end_tx, end_rx) = mpsc::unbounded_channel(); + let controller = BufferControllerInner::new( path.to_string(), - txt_rx, - op_tx, + latest_version_rx, + opin_tx, + opout_rx, poller_tx, end_tx, ); + BufferWorker { - _user_id: user_id, - buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! - content: txt_tx, - operations: op_rx, + user_id, + buffer, + latest_version: latest_version_tx, + ops_in: opin_rx, + ops_out: opout_tx, poller: poller_rx, pollers: Vec::new(), stop: end_rx, @@ -81,41 +95,56 @@ impl ControllerWorker for BufferWorker { }, // received a text change from editor - res = self.operations.recv() => match res { + res = self.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some(change) => match change.transform(&self.buffer) { - Err(e) => break tracing::error!("could not apply operation from client: {}", e), - Ok(ops) => { - for op in ops { - self.buffer.merge(op.0.clone()); - let operation = Operation { - data: postcard::to_extend(&op.0, Vec::new()).unwrap(), - }; - if let Err(e) = tx.send(operation).await { - tracing::error!("server refused to broadcast {}: {}", op.0, e); - } - } - self.content.send(self.buffer.view()) - .unwrap_or_warn("could not send buffer update"); - }, - } + Some(change) => { + + let agent_id = self.buffer.get_or_create_agent_id(&self.user_id.to_string()); + let lastver = self.buffer.oplog.local_version_ref(); + + if change.is_insert() { + self.buffer.insert(agent_id, change.start as usize, &change.content) // TODO da vedere il cast + } else if change.is_delete() { + self.buffer.delete_without_content(1, change.span()) + } else { continue; }; + + tx.send(Operation { data: self.buffer.oplog.encode_from(Default::default(), lastver) }); + self.latest_version.send(self.buffer.oplog.local_version()); + + }, }, // received a message from server res = rx.message() => match res { Err(_e) => break, Ok(None) => break, - Ok(Some(change)) => match postcard::from_bytes::(&change.op.data) { - Ok(op) => { // TODO here in change we receive info about the author, maybe propagate? - self.buffer.merge(op); - self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); - for tx in self.pollers.drain(..) { - tx.send(()).unwrap_or_warn("could not wake up poller"); - } - }, - Err(e) => tracing::error!("could not deserialize operation from server: {}", e), + Ok(Some(change)) => { + let lastver = self.buffer.oplog.local_version_ref(); + + match self.buffer.merge_data_and_ff(&change.op.data) { + Ok(local_version) => { + + // give all the changes needed to the controller in a channel. + for (lv, Some(dtop)) in self.buffer.oplog.iter_xf_operations_from(lastver, &local_version) { + // x.0.start should always be after lastver! + // this step_ver will be the version after we apply the operation + // we give it to the controller so that he knows where it's at. + let step_ver = self.buffer.oplog.version_union(&[lv.start], lastver); + let opout = (step_ver, Some(Op(dtop))); + + self.ops_out.send(opout).unwrap(); //TODO ERRORS + } + + // finally we send the + self.latest_version.send(local_version); + for tx in self.pollers.drain(..) { + tx.send(()).unwrap_or_warn("could not wake up poller"); + } + }, + Err(e) => tracing::error!("could not deserialize operation from server: {}", e), + } }, - }, + } } } } diff --git a/src/ext.rs b/src/ext.rs index 2bfc362..bee1cb4 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -82,10 +82,11 @@ impl InternallyMutable { } } +/* pub(crate) struct CallbackHandleWatch(pub(crate) tokio::sync::watch::Sender>); impl crate::api::controller::CallbackHandle for CallbackHandleWatch { fn unregister(self) { self.0.send_replace(None); } -} +}*/