From eeb72545c6714399c68e60ce8176a2a8b34bc6bb Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 13 Apr 2023 02:19:31 +0200 Subject: [PATCH] feat: async opseq queuing and transforming --- src/lib/client.rs | 9 ++++++++ src/lib/opfactory.rs | 53 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/lib/client.rs b/src/lib/client.rs index ade4aca..6b9d179 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -66,6 +66,9 @@ impl CodempClient { .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, }; let res = self.client.edit(req).await?.into_inner(); + if let Err(e) = factory.ack(op.clone()).await { + error!("could not ack op '{:?}' : {}", op, e); + } Ok(res.accepted) }, } @@ -84,6 +87,9 @@ impl CodempClient { .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, }; let res = self.client.edit(req).await?.into_inner(); + if let Err(e) = factory.ack(op.clone()).await { + error!("could not ack op '{:?}' : {}", op, e); + } Ok(res.accepted) }, } @@ -102,6 +108,9 @@ impl CodempClient { .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, }; let res = self.client.edit(req).await?.into_inner(); + if let Err(e) = factory.ack(op.clone()).await { + error!("could not ack op '{:?}' : {}", op, e); + } Ok(res.accepted) }, } diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs index 659511d..1e31bbe 100644 --- a/src/lib/opfactory.rs +++ b/src/lib/opfactory.rs @@ -1,16 +1,28 @@ +use std::collections::VecDeque; + use operational_transform::{OperationSeq, OTError}; use similar::TextDiff; use tokio::sync::{mpsc, watch, oneshot}; -use tracing::error; +use tracing::{error, warn}; #[derive(Clone)] pub struct OperationFactory { content: String, + queue: VecDeque, } impl OperationFactory { pub fn new(init: Option) -> Self { - OperationFactory { content: init.unwrap_or(String::new()) } + OperationFactory { + content: init.unwrap_or(String::new()), + queue: VecDeque::new(), + } + } + + fn apply(&mut self, op: OperationSeq) -> Result { + self.content = op.apply(&self.content)?; + self.queue.push_back(op.clone()); + Ok(op) } // TODO remove the need for this @@ -25,8 +37,7 @@ impl OperationFactory { pub fn replace(&mut self, txt: &str) -> Result { let mut out = OperationSeq::default(); if self.content == txt { // TODO throw and error rather than wasting everyone's resources - out.retain(txt.len() as u64); - return Ok(out); // nothing to do + return Err(OTError); // nothing to do } let diff = TextDiff::from_chars(self.content.as_str(), txt); @@ -49,8 +60,7 @@ impl OperationFactory { out.retain(pos); out.insert(txt); out.retain(total - pos); - self.content = out.apply(&self.content)?; - Ok(out) + Ok(self.apply(out)?) } pub fn delete(&mut self, pos: u64, count: u64) -> Result { @@ -59,8 +69,7 @@ impl OperationFactory { out.retain(pos - count); out.delete(count); out.retain(len - pos); - self.content = out.apply(&self.content)?; - Ok(out) + Ok(self.apply(out)?) } pub fn cancel(&mut self, pos: u64, count: u64) -> Result { @@ -69,11 +78,25 @@ impl OperationFactory { out.retain(pos); out.delete(count); out.retain(len - (pos+count)); - self.content = out.apply(&self.content)?; - Ok(out) + Ok(self.apply(out)?) } - pub fn process(&mut self, op: OperationSeq) -> Result { + pub fn ack(&mut self, op: OperationSeq) -> Result<(), OTError> { // TODO use a different error? + // TODO is manually iterating from behind worth the manual search boilerplate? + for (i, o) in self.queue.iter().enumerate().rev() { + if o == &op { + self.queue.remove(i); + return Ok(()); + } + } + warn!("could not ack op {:?} from {:?}", op, self.queue); + Err(OTError) + } + + pub fn process(&mut self, mut op: OperationSeq) -> Result { + for o in self.queue.iter_mut() { + (op, *o) = op.transform(o)?; + } self.content = op.apply(&self.content)?; Ok(self.content.clone()) } @@ -140,6 +163,12 @@ impl AsyncFactory { self.ops.send(OpMsg::Process(opseq, tx)).await.map_err(|_| OTError)?; rx.await.map_err(|_| OTError)? } + + pub async fn ack(&self, opseq: OperationSeq) -> Result<(), OTError> { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Ack(opseq, tx)).await.map_err(|_| OTError)?; + rx.await.map_err(|_| OTError)? + } } @@ -147,6 +176,7 @@ impl AsyncFactory { enum OpMsg { Exec(OpWrapper, oneshot::Sender>), Process(OperationSeq, oneshot::Sender>), + Ack(OperationSeq, oneshot::Sender>), } #[derive(Debug)] @@ -175,6 +205,7 @@ impl AsyncFactoryWorker { match msg { OpMsg::Exec(op, tx) => tx.send(self.exec(op)).unwrap_or(()), OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()), + OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()), } if let Err(e) = self.content.send(self.factory.content()) { error!("error updating content: {}", e);