From eafbc41bd1c61f28d5f01fb5b26374e0ecb7f276 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 17 Apr 2023 14:56:00 +0200 Subject: [PATCH] chore: split op factory into processor and factory --- src/lib/lib.rs | 3 +- src/lib/operation/factory.rs | 52 ++++++++++ src/lib/operation/mod.rs | 5 + src/lib/operation/processor.rs | 79 +++++++++++++++ src/lib/opfactory.rs | 171 --------------------------------- 5 files changed, 138 insertions(+), 172 deletions(-) create mode 100644 src/lib/operation/factory.rs create mode 100644 src/lib/operation/mod.rs create mode 100644 src/lib/operation/processor.rs delete mode 100644 src/lib/opfactory.rs diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 82b6ce5..1ae3d88 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,6 +1,7 @@ pub mod proto; -pub mod opfactory; pub mod client; +pub mod operation; +pub mod cursor; pub use tonic; pub use tokio; diff --git a/src/lib/operation/factory.rs b/src/lib/operation/factory.rs new file mode 100644 index 0000000..8035520 --- /dev/null +++ b/src/lib/operation/factory.rs @@ -0,0 +1,52 @@ +use operational_transform::OperationSeq; +use similar::{TextDiff, ChangeTag}; + +pub trait OperationFactory { + fn content(&self) -> String; + + fn replace(&self, txt: &str) -> OperationSeq { + let mut out = OperationSeq::default(); + if self.content() == txt { + return out; // TODO this won't work, should we return a noop instead? + } + + let diff = TextDiff::from_chars(self.content().as_str(), txt); + + for change in diff.iter_all_changes() { + match change.tag() { + ChangeTag::Equal => out.retain(1), + ChangeTag::Delete => out.delete(1), + ChangeTag::Insert => out.insert(change.value()), + } + } + + out + } + + fn insert(&self, txt: &str, pos: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let total = self.content().len() as u64; + out.retain(pos); + out.insert(txt); + out.retain(total - pos); + out + } + + fn delete(&self, pos: u64, count: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let len = self.content().len() as u64; + out.retain(pos - count); + out.delete(count); + out.retain(len - pos); + out + } + + fn cancel(&self, pos: u64, count: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let len = self.content().len() as u64; + out.retain(pos); + out.delete(count); + out.retain(len - (pos+count)); + out + } +} diff --git a/src/lib/operation/mod.rs b/src/lib/operation/mod.rs new file mode 100644 index 0000000..8796b59 --- /dev/null +++ b/src/lib/operation/mod.rs @@ -0,0 +1,5 @@ +pub mod factory; +pub mod processor; + +pub use processor::{OperationController, OperationProcessor}; +pub use factory::OperationFactory; diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs new file mode 100644 index 0000000..a363cf8 --- /dev/null +++ b/src/lib/operation/processor.rs @@ -0,0 +1,79 @@ +use std::{sync::{Mutex, Arc}, collections::VecDeque}; + +use operational_transform::{OperationSeq, OTError}; +use tokio::sync::{watch, oneshot, mpsc}; +use tracing::error; + +use crate::operation::factory::OperationFactory; + + +#[tonic::async_trait] +pub trait OperationProcessor : OperationFactory{ + async fn apply(&self, op: OperationSeq) -> Result; + async fn process(&self, op: OperationSeq) -> Result; + + async fn poll(&self) -> Option; + async fn ack(&self) -> Option; +} + + +pub struct OperationController { + text: Mutex, + queue: Mutex>, + last: Mutex>, + notifier: watch::Sender, +} + +impl OperationController { + pub fn new(content: String) -> Self { + let (tx, rx) = watch::channel(OperationSeq::default()); + OperationController { + text: Mutex::new(content), + queue: Mutex::new(VecDeque::new()), + last: Mutex::new(rx), + notifier: tx, + } + } +} + +impl OperationFactory for OperationController { + fn content(&self) -> String { + self.text.lock().unwrap().clone() + } +} + +#[tonic::async_trait] +impl OperationProcessor for OperationController { + async fn apply(&self, op: OperationSeq) -> Result { + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + self.queue.lock().unwrap().push_back(op.clone()); + self.notifier.send(op).unwrap(); + Ok(res) + } + + async fn process(&self, mut op: OperationSeq) -> Result { + let mut queue = self.queue.lock().unwrap(); + for el in queue.iter_mut() { + (op, *el) = op.transform(el)?; + } + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + Ok(res) + } + + async fn poll(&self) -> Option { + let len = self.queue.lock().unwrap().len(); + if len <= 0 { + let mut recv = self.last.lock().unwrap().clone(); + recv.changed().await.unwrap(); + } + Some(self.queue.lock().unwrap().get(0)?.clone()) + } + + async fn ack(&self) -> Option { + self.queue.lock().unwrap().pop_front() + } +} diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs deleted file mode 100644 index f336df4..0000000 --- a/src/lib/opfactory.rs +++ /dev/null @@ -1,171 +0,0 @@ -use std::sync::Arc; - -use operational_transform::{OperationSeq, OTError}; -use similar::TextDiff; -use tokio::sync::{mpsc, watch, oneshot}; -use tracing::{error, warn}; - -#[tonic::async_trait] -pub trait OperationFactory { - fn content(&self) -> String; - async fn apply(&self, op: OperationSeq) -> Result; - async fn process(&self, op: OperationSeq) -> Result; - async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError>; - - fn replace(&self, txt: &str) -> OperationSeq { - let mut out = OperationSeq::default(); - if self.content() == txt { - return out; // TODO this won't work, should we return a noop instead? - } - - let diff = TextDiff::from_chars(self.content().as_str(), txt); - - for change in diff.iter_all_changes() { - match change.tag() { - similar::ChangeTag::Equal => out.retain(1), - similar::ChangeTag::Delete => out.delete(1), - similar::ChangeTag::Insert => out.insert(change.value()), - } - } - - out - } - - fn insert(&self, txt: &str, pos: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let total = self.content().len() as u64; - out.retain(pos); - out.insert(txt); - out.retain(total - pos); - out - } - - fn delete(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos - count); - out.delete(count); - out.retain(len - pos); - out - } - - fn cancel(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos); - out.delete(count); - out.retain(len - (pos+count)); - out - } -} - -pub struct AsyncFactory { - run: watch::Sender, - ops: mpsc::Sender, - #[allow(unused)] // TODO is this necessary? - content: watch::Receiver, -} - -impl Drop for AsyncFactory { - fn drop(&mut self) { - self.run.send(false).unwrap_or(()); - } -} - -#[tonic::async_trait] -impl OperationFactory for AsyncFactory { - fn content(&self) -> String { - return self.content.borrow().clone(); - } - - async fn apply(&self, op: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Apply(op, tx)).await.map_err(|_| OTError)?; - Ok(rx.await.map_err(|_| OTError)?) - } - - async fn process(&self, op: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Process(op, tx)).await.map_err(|_| OTError)?; - Ok(rx.await.map_err(|_| OTError)?) - } - - async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError> { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Acknowledge(op, tx)).await.map_err(|_| OTError)?; - Ok(rx.await.map_err(|_| OTError)?) - } - -} - -impl AsyncFactory { - pub fn new(init: Option) -> Self { - let (run_tx, run_rx) = watch::channel(true); - let (ops_tx, ops_rx) = mpsc::channel(64); // TODO hardcoded size - let (txt_tx, txt_rx) = watch::channel("".into()); - - let worker = AsyncFactoryWorker { - text: init.unwrap_or("".into()), - ops: ops_rx, - run: run_rx, - content: txt_tx, - }; - - tokio::spawn(async move { worker.work().await }); - - AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx } - } -} - - -#[derive(Debug)] -enum OpMsg { - Apply(OperationSeq, oneshot::Sender), - Process(OperationSeq, oneshot::Sender), - Acknowledge(OperationSeq, oneshot::Sender<()>) -} - -struct AsyncFactoryWorker { - text: String, - ops: mpsc::Receiver, - run: watch::Receiver, - content: watch::Sender -} - -impl AsyncFactoryWorker { - async fn work(mut self) { - while *self.run.borrow() { - tokio::select! { // periodically check run so that we stop cleanly - - recv = self.ops.recv() => { - match recv { - Some(msg) => { - match msg { - OpMsg::Apply(op, tx) => tx.send(self.exec(op)).unwrap_or(()), - OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()), - OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()), - } - if let Err(e) = self.content.send(self.factory.content()) { - error!("error updating content: {}", e); - break; - } - }, - None => break, - } - }, - - _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, - - }; - } - } - - fn exec(&mut self, op: OpWrapper) -> Result { - match op { - OpWrapper::Insert(txt, pos) => Ok(self.factory.insert(&txt, pos)?), - OpWrapper::Delete(pos, count) => Ok(self.factory.delete(pos, count)?), - OpWrapper::Cancel(pos, count) => Ok(self.factory.cancel(pos, count)?), - OpWrapper::Replace(txt) => Ok(self.factory.replace(&txt)?), - } - } -}