diff --git a/Cargo.toml b/Cargo.toml index fc279d7..aaec6d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,8 @@ name = "codemp" [dependencies] # core tracing = "0.1" -# ot -operational-transform = { version = "0.6", features = ["serde"], optional = true } +# woot +codemp-woot = { path = "../woot", optional = true } # proto tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true } prost = { version = "0.11.8", optional = true } @@ -31,8 +31,8 @@ tonic-build = "0.9" [features] default = ["client"] -api = ["ot", "dep:similar", "dep:tokio", "dep:async-trait"] -ot = ["dep:operational-transform"] +api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"] +woot = ["dep:codemp-woot"] proto = ["dep:prost", "dep:tonic"] client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json"] global = ["client", "dep:lazy_static"] diff --git a/src/api/factory.rs b/src/api/factory.rs deleted file mode 100644 index 693c179..0000000 --- a/src/api/factory.rs +++ /dev/null @@ -1,165 +0,0 @@ -//! ### factory -//! -//! a helper trait that any string container can implement, which generates opseqs -//! -//! an OperationFactory trait implementation is provided for `String` and `Arc`, but plugin developers -//! should implement their own operation factory interfacing directly with the editor -//! buffer when possible. - -use std::ops::Range; - -use operational_transform::{OperationSeq, Operation}; -use similar::{TextDiff, ChangeTag}; - -/// calculate leading no-ops in given opseq -pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } - -/// calculate tailing no-ops in given opseq -pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } - -const fn count_noop(op: Option<&Operation>) -> u64 { - match op { - None => 0, - Some(Operation::Retain(n)) => *n, - Some(_) => 0, - } -} - -/// return the range on which the operation seq is actually applying its changes -pub fn op_effective_range(op: &OperationSeq) -> Range { - let first = leading_noop(op.ops()); - let last = op.base_len() as u64 - tailing_noop(op.ops()); - first..last -} - -/// a helper trait that any string container can implement, which generates opseqs -/// -/// all operations are to be considered mutating current state, obtainable with -/// [OperationFactory::content]. generating an operation has no effect on internal state -/// -/// ### examples -/// -/// ```rust -/// use codemp::api::OperationFactory; -/// -/// let mut factory = String::new(); -/// let operation = factory.ins("asd", 0); -/// factory = operation.apply(&factory)?; -/// assert_eq!(factory, "asd"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::ins] to add new characters at a specific index -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("asd"); -/// factory = factory.ins(" dsa", 3).apply(&factory)?; -/// assert_eq!(factory, "asd dsa"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::diff] to arbitrarily change text at any position -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("asd dsa"); -/// factory = factory -/// .diff(2, " xxx ", 5) -/// .expect("replaced region is equal to origin") -/// .apply(&factory)?; -/// assert_eq!(factory, "as xxx sa"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::del] to remove characters from given index -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("as xxx sa"); -/// factory = factory.del(2, 5).apply(&factory)?; -/// assert_eq!(factory, "assa"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::replace] to completely replace buffer content -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("assa"); -/// factory = factory.replace("from scratch") -/// .expect("replace is equal to origin") -/// .apply(&factory)?; -/// assert_eq!(factory, "from scratch"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -pub trait OperationFactory { - /// the current content of the buffer - fn content(&self) -> String; - - /// completely replace the buffer with given text - fn replace(&self, txt: &str) -> Option { - self.diff(0, txt, self.content().len()) - } - - /// transform buffer in range [start..end] with given text - fn diff(&self, start: usize, txt: &str, end: usize) -> Option { - let mut out = OperationSeq::default(); - let content = self.content(); - let tail_skip = content.len() - end; // TODO len is number of bytes, not chars - let content_slice = &content[start..end]; - - if content_slice == txt { - // if slice equals given text, no operation should be taken - return None; - } - - out.retain(start as u64); - - let diff = TextDiff::from_chars(content_slice, txt); - - for change in diff.iter_all_changes() { - match change.tag() { - ChangeTag::Equal => out.retain(1), - ChangeTag::Delete => out.delete(1), - ChangeTag::Insert => out.insert(change.value()), - } - } - - out.retain(tail_skip as u64); - - Some(out) - } - - /// insert given chars at target position - fn ins(&self, txt: &str, pos: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let total = self.content().len() as u64; - out.retain(pos); - out.insert(txt); - out.retain(total - pos); - out - } - - /// delete n characters forward at given position - fn del(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos); - out.delete(count); - out.retain(len - (pos+count)); - out - } -} - -impl OperationFactory for String { - fn content(&self) -> String { - self.clone() - } -} - -impl OperationFactory for std::sync::Arc { - fn content(&self) -> String { - self.to_string() - } -} diff --git a/src/api/mod.rs b/src/api/mod.rs index 4a69729..12a6070 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -7,8 +7,4 @@ /// a generic async provider for bidirectional communication pub mod controller; -/// a helper trait to generate operation sequences -pub mod factory; - pub use controller::Controller; -pub use factory::OperationFactory; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 4dcad19..1f02299 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -3,8 +3,9 @@ //! a controller implementation for buffer actions -use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, Mutex, oneshot}; +use std::sync::Arc; + +use tokio::sync::{watch, mpsc}; use tonic::async_trait; use crate::errors::IgnorableError; @@ -28,67 +29,50 @@ use super::TextChange; /// Operation Sequences easily /// /// upon dropping this handle will stop the associated worker -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BufferController { content: watch::Receiver, - operations: mpsc::UnboundedSender, - last_op: Mutex>, - stream: mpsc::UnboundedSender>>, - stop: mpsc::UnboundedSender<()>, + operations: mpsc::UnboundedSender, + _stop: Arc, // just exist } impl BufferController { pub(crate) fn new( content: watch::Receiver, - operations: mpsc::UnboundedSender, - stream: mpsc::UnboundedSender>>, + operations: mpsc::UnboundedSender, stop: mpsc::UnboundedSender<()>, - last_op: Mutex>, ) -> Self { - BufferController { - last_op, content, operations, stream, stop, - } - } - - pub fn content(&self) -> String { - self.content.borrow().clone() + BufferController { content, operations, _stop: Arc::new(StopOnDrop(stop)) } } } -impl Drop for BufferController { +#[derive(Debug)] +struct StopOnDrop(mpsc::UnboundedSender<()>); + +impl Drop for StopOnDrop { fn drop(&mut self) { - self.stop.send(()).unwrap_or_warn("could not send stop message to worker"); + self.0.send(()).unwrap_or_warn("could not send stop message to worker"); } } #[async_trait] -impl Controller for BufferController { - type Input = OperationSeq; +impl Controller for BufferController { + type Input = TextChange; async fn poll(&self) -> Result<(), Error> { - Ok(self.last_op.lock().await.changed().await?) + Ok(self.content.clone().changed().await?) } - fn try_recv(&self) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - self.stream.send(tx)?; - rx.blocking_recv() - .map_err(|_| Error::Channel { send: false }) + fn try_recv(&self) -> Result, Error> { + Ok(Some(self.content.borrow().clone())) } - async fn recv(&self) -> Result { - self.poll().await?; - let (tx, rx) = oneshot::channel(); - self.stream.send(tx)?; - Ok( - rx.await - .map_err(|_| Error::Channel { send: false })? - .expect("empty channel after polling") - ) + async fn recv(&self) -> Result { + Ok(self.content.borrow().clone()) } /// enqueue an opseq for processing - fn send(&self, op: OperationSeq) -> Result<(), Error> { + fn send(&self, op: TextChange) -> Result<(), Error> { Ok(self.operations.send(op)?) } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 72accfa..3ac4317 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,14 +1,13 @@ -use std::collections::VecDeque; - -use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, oneshot, Mutex}; +use similar::{TextDiff, ChangeTag}; +use tokio::sync::{watch, mpsc}; use tonic::transport::Channel; use tonic::{async_trait, Streaming}; +use woot::crdt::{Op, CRDT, TextEditor}; +use woot::woot::Woot; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::api::controller::ControllerWorker; -use crate::api::factory::{leading_noop, tailing_noop}; use super::TextChange; use super::controller::BufferController; @@ -17,48 +16,38 @@ use super::controller::BufferController; pub(crate) struct BufferControllerWorker { uid: String, content: watch::Sender, - operations: mpsc::UnboundedReceiver, - stream: mpsc::UnboundedReceiver>>, - stream_requestor: mpsc::UnboundedSender>>, + operations: mpsc::UnboundedReceiver, receiver: watch::Receiver, - sender: mpsc::UnboundedSender, - buffer: String, + sender: mpsc::UnboundedSender, + buffer: Woot, path: String, stop: mpsc::UnboundedReceiver<()>, stop_control: mpsc::UnboundedSender<()>, - new_op_tx: watch::Sender<()>, - new_op_rx: watch::Receiver<()>, } impl BufferControllerWorker { pub fn new(uid: String, buffer: &str, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); - let (s_tx, s_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); - let (notx, norx) = watch::channel(()); BufferControllerWorker { uid, content: txt_tx, operations: op_rx, - stream: s_rx, - stream_requestor: s_tx, receiver: txt_rx, sender: op_tx, - buffer: buffer.to_string(), + buffer: Woot::new(42069), // TODO initialize with buffer! path: path.to_string(), stop: end_rx, stop_control: end_tx, - new_op_tx: notx, - new_op_rx: norx, } } - async fn send_op(&self, tx: &mut BufferClient, outbound: &OperationSeq) -> crate::Result<()> { + async fn send_op(&self, tx: &mut BufferClient, outbound: &Op) -> crate::Result<()> { let opseq = serde_json::to_string(outbound).expect("could not serialize opseq"); let req = OperationRequest { path: self.path.clone(), - hash: format!("{:x}", md5::compute(&self.buffer)), + hash: format!("{:x}", md5::compute(self.buffer.view())), op: Some(RawOp { opseq, user: self.uid.clone(), }), @@ -69,7 +58,7 @@ impl BufferControllerWorker { } #[async_trait] -impl ControllerWorker for BufferControllerWorker { +impl ControllerWorker for BufferControllerWorker { type Controller = BufferController; type Tx = BufferClient; type Rx = Streaming; @@ -78,123 +67,62 @@ impl ControllerWorker for BufferControllerWorker { BufferController::new( self.receiver.clone(), self.sender.clone(), - self.stream_requestor.clone(), self.stop_control.clone(), - Mutex::new(self.new_op_rx.clone()), ) } async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { - let mut clientside : VecDeque = VecDeque::new(); - let mut serverside : VecDeque = VecDeque::new(); - loop { - // block until one of these is ready tokio::select! { biased; - // received a stop request (or channel got closed) - res = self.stop.recv() => { - tracing::info!("received stop signal"); - match res { - None => return tracing::warn!("stop channel closed, stopping worker"), - Some(()) => return tracing::debug!("buffer worker stopping cleanly"), - } - } + // received stop signal + _ = self.stop.recv() => break, - // received a new message from server (or an error) - res = rx.message() => { - tracing::info!("received msg from server"); - let inbound : OperationSeq = match res { - Err(e) => return tracing::error!("error receiving op from server: {}", e), - Ok(None) => return tracing::warn!("server closed operation stream"), - Ok(Some(msg)) => serde_json::from_str(&msg.opseq) - .expect("could not deserialize server opseq"), - }; - self.buffer = inbound.apply(&self.buffer).expect("could not apply remote opseq???"); - serverside.push_back(inbound); - while let Some(mut outbound) = clientside.get(0).cloned() { - let mut serverside_tmp = serverside.clone(); - for server_op in serverside_tmp.iter_mut() { - tracing::info!("transforming {:?} <-> {:?}", outbound, server_op); - (outbound, *server_op) = outbound.transform(server_op) - .expect("could not transform enqueued out with just received"); - } - match self.send_op(&mut tx, &outbound).await { - Err(e) => { tracing::warn!("could not send op even after transforming: {}", e); break; }, - Ok(()) => { - tracing::info!("back in sync"); - serverside = serverside_tmp; - self.buffer = outbound.apply(&self.buffer).expect("could not apply op after synching back"); - clientside.pop_front(); - }, - } - } - self.content.send(self.buffer.clone()).expect("could not broadcast buffer update"); - self.new_op_tx.send(()).expect("could not activate client after new server event"); - }, + // received a text change from editor + res = self.operations.recv() => match res { + None => break, + Some(change) => { + let span = &self.buffer.view()[change.span.clone()]; + let diff = TextDiff::from_chars(span, &change.content); - // received a new operation from client (or channel got closed) - res = self.operations.recv() => { - tracing::info!("received op from client"); - match res { - None => return tracing::warn!("client closed operation stream"), - Some(op) => { - if clientside.is_empty() { - match self.send_op(&mut tx, &op).await { - Ok(()) => { - self.buffer = op.apply(&self.buffer).expect("could not apply op"); - self.content.send(self.buffer.clone()).expect("could not update buffer view"); - }, - Err(e) => { - tracing::warn!("server rejected op: {}", e); - clientside.push_back(op); - }, - } - } else { // I GET STUCK IN THIS BRANCH AND NOTHING HAPPENS AAAAAAAAAA - clientside.push_back(op); + let mut i = 0; + let mut ops = Vec::new(); + for diff in diff.iter_all_changes() { + match diff.tag() { + ChangeTag::Equal => i += 1, + ChangeTag::Delete => ops.push(self.buffer.delete(change.span.start + i).unwrap()), + ChangeTag::Insert => { + for c in diff.value().chars() { + ops.push(self.buffer.insert(change.span.start + i, c).unwrap()); + i += 1; + } + }, + } + } + + for op in ops { + match self.send_op(&mut tx, &op).await { + Ok(()) => self.buffer.enqueue(op), + Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), } } } }, - - // client requested a server operation, transform it and send it - res = self.stream.recv() => { - tracing::info!("received op REQUEST from client"); - match res { - None => return tracing::error!("client closed requestor stream"), - Some(tx) => tx.send(match serverside.pop_front() { - None => { - tracing::warn!("requested change but none is available"); - None - }, - Some(mut operation) => { - let mut after = self.buffer.clone(); - for op in clientside.iter_mut() { - (*op, operation) = match op.transform(&operation) { - Err(e) => return tracing::warn!("could not transform enqueued operation: {}", e), - Ok((x, y)) => (x, y), - }; - after = match op.apply(&after) { - Err(_) => return tracing::error!("could not apply outgoing enqueued opseq to current buffer?"), - Ok(x) => x, - }; - } - let skip = leading_noop(operation.ops()) as usize; - let tail = tailing_noop(operation.ops()) as usize; - let span = skip..(operation.base_len() - tail); - let content = if after.len() - tail < skip { "".into() } else { after[skip..after.len()-tail].to_string() }; - let change = TextChange { span, content, after }; - - Some(change) - }, - }).expect("client did not wait????"), - } + // received a stop request (or channel got closed) + res = rx.message() => match res { + Err(_e) => break, + Ok(None) => break, + Ok(Some(change)) => { + let op : Op = serde_json::from_str(&change.opseq).unwrap(); + self.buffer.enqueue(op); + self.content.send(self.buffer.view()).unwrap(); + }, }, - } + } } } diff --git a/src/lib.rs b/src/lib.rs index 23e87a9..a3b47c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,8 +172,8 @@ pub mod instance; pub mod prelude; /// underlying OperationalTransform library used, re-exported -#[cfg(feature = "ot")] -pub use operational_transform as ot; +#[cfg(feature = "woot")] +pub use woot; /// protocol types and services auto-generated by grpc #[cfg(feature = "proto")] diff --git a/src/prelude.rs b/src/prelude.rs index 2c3e5fa..e49e505 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -11,10 +11,7 @@ pub use crate::{ pub use crate::ot::OperationSeq as CodempOperationSeq; #[cfg(feature = "api")] -pub use crate::{ - api::Controller as CodempController, - api::OperationFactory as CodempOperationFactory, -}; +pub use crate::api::Controller as CodempController; #[cfg(feature = "client")] pub use crate::{