diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 26f540c..b805017 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -9,7 +9,7 @@ use tonic::async_trait; use crate::errors::IgnorableError; use crate::{api::Controller, Error}; -use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; +use crate::buffer::factory::OperationFactory; use super::TextChange; @@ -34,7 +34,7 @@ pub struct BufferController { content: watch::Receiver, operations: mpsc::UnboundedSender, last_op: Mutex>, - stream: Mutex>, + stream: Mutex>, stop: mpsc::UnboundedSender<()>, } @@ -42,7 +42,7 @@ impl BufferController { pub(crate) fn new( content: watch::Receiver, operations: mpsc::UnboundedSender, - stream: Mutex>, + stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { BufferController { @@ -50,16 +50,6 @@ impl BufferController { content, operations, stream, stop, } } - - fn op_to_change(&self, op: OperationSeq) -> TextChange { - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - TextChange { span, content } - } } impl Drop for BufferController { @@ -85,7 +75,7 @@ impl Controller for BufferController { fn try_recv(&self) -> Result, Error> { match self.stream.blocking_lock().try_recv() { - Ok(op) => Ok(Some(self.op_to_change(op))), + Ok(op) => Ok(Some(op)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), Err(TryRecvError::Lagged(n)) => { @@ -98,7 +88,7 @@ impl Controller for BufferController { /// receive an operation seq and transform it into a TextChange from buffer content async fn recv(&self) -> Result { let op = self.stream.lock().await.recv().await?; - Ok(self.op_to_change(op)) + Ok(op) } /// enqueue an opseq for processing diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 243e6ae..c3a7e21 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -6,7 +6,7 @@ //! this module contains buffer-related operations and helpers to create Operation Sequences //! (the underlying chunks of changes sent over the wire) -use std::ops::Range; +use std::{ops::Range, sync::Arc}; pub(crate) mod worker; @@ -23,10 +23,15 @@ pub use controller::BufferController as Controller; /// an editor-friendly representation of a text change in a buffer /// /// TODO move in proto -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct TextChange { /// range of text change, as byte indexes in buffer pub span: Range, /// content of text change, as string pub content: String, + /// reference to previous content of buffer + pub before: Arc, + /// reference to current content of buffer + pub after: Arc, +} } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index ef3b92a..25279bc 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,24 +1,25 @@ use std::{sync::Arc, collections::VecDeque}; -use operational_transform::OperationSeq; +use operational_transform::{OperationSeq, OTError}; use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::transport::Channel; use tonic::{async_trait, Streaming}; -use crate::errors::IgnorableError; +use crate::errors::{IgnorableError, IgnorableDefaultableError}; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::api::ControllerWorker; use super::TextChange; use super::controller::BufferController; +use super::factory::{leading_noop, tailing_noop}; pub(crate) struct BufferControllerWorker { uid: String, pub(crate) content: watch::Sender, pub(crate) operations: mpsc::UnboundedReceiver, - pub(crate) stream: Arc>, + pub(crate) stream: Arc>, pub(crate) queue: VecDeque, receiver: watch::Receiver, sender: mpsc::UnboundedSender, @@ -48,6 +49,21 @@ impl BufferControllerWorker { stop_control: end_tx, } } + + fn update(&mut self, op: OperationSeq) -> Result { + let before = Arc::new(self.buffer.clone()); + let res = op.apply(&before)?; + self.content.send(res.clone()) + .unwrap_or_warn("error showing updated buffer"); + let after = Arc::new(res.clone()); + self.buffer = res; + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Ok(TextChange { span, content, before, after }) + } } #[async_trait] @@ -67,7 +83,8 @@ impl ControllerWorker for BufferControllerWorker { async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { loop { - let op = tokio::select! { + tokio::select! { + Some(operation) = recv_opseq(&mut rx) => { let mut out = operation; for op in self.queue.iter_mut() { @@ -79,27 +96,28 @@ impl ControllerWorker for BufferControllerWorker { }, } } - self.stream.send(out.clone()).unwrap_or_warn("could not send operation to server"); - out + let change = self.update(out) + .unwrap_or_warn_default("coult not update with (transformed) remote operation"); + self.stream.send(change) + .unwrap_or_warn("could not send operation to server"); }, + Some(op) = self.operations.recv() => { self.queue.push_back(op.clone()); - op + self.update(op) + .unwrap_or_warn("could not apply enqueued operation to current buffer"); + while let Some(op) = self.queue.get(0) { + if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break } + self.queue.pop_front(); + } }, + Some(()) = self.stop.recv() => { break; } - else => break - }; - self.buffer = op.apply(&self.buffer).unwrap_or_else(|e| { - tracing::error!("could not update buffer string: {}", e); - self.buffer - }); - self.content.send(self.buffer.clone()).unwrap_or_warn("error showing updated buffer"); - while let Some(op) = self.queue.get(0) { - if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break } - self.queue.pop_front(); + else => break + } } }