From 75e397854b829136434526c07e6e3e270b73e9bd Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 16 Apr 2023 03:24:18 +0200 Subject: [PATCH 1/6] chore!: initial work on refactoring client+factory --- src/lib/client.rs | 218 +++++++++++-------------------------------- src/lib/opfactory.rs | 169 +++++++++++---------------------- 2 files changed, 105 insertions(+), 282 deletions(-) diff --git a/src/lib/client.rs b/src/lib/client.rs index 6b9d179..d92e6b5 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -1,194 +1,80 @@ -/// TODO better name for this file +use std::{future::Future, sync::Arc, pin::Pin}; -use std::{sync::{Arc, RwLock}, collections::BTreeMap}; -use tracing::{error, warn, info}; -use uuid::Uuid; +use operational_transform::OperationSeq; +use tokio::sync::mpsc; +use tonic::{transport::{Channel, Error}, Status}; +use tracing::error; -use crate::{ - opfactory::AsyncFactory, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, - tonic::{transport::Channel, Status, Streaming}, -}; +use crate::{proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, opfactory::AsyncFactory}; -pub type FactoryStore = Arc>>>; - -impl From::> for CodempClient { - fn from(x: BufferClient) -> CodempClient { - CodempClient { - id: Uuid::new_v4(), - client:x, - factories: Arc::new(RwLock::new(BTreeMap::new())), - } - } +pub trait EditorDriver : Clone { + fn id(&self) -> String; } #[derive(Clone)] -pub struct CodempClient { - id: Uuid, +pub struct CodempClient { client: BufferClient, - factories: FactoryStore, + driver: T, } -impl CodempClient { - fn get_factory(&self, path: &String) -> Result, Status> { - match self.factories.read().unwrap().get(path) { - Some(f) => Ok(f.clone()), - None => Err(Status::not_found("no active buffer for given path")), - } +impl CodempClient { // TODO wrap tonic 'connect' to allow multiple types + pub async fn new(addr: &str, driver: T) -> Result { + let client = BufferClient::connect(addr.to_string()).await?; + + Ok(CodempClient { client, driver }) } - pub fn add_factory(&self, path: String, factory:Arc) { - self.factories.write().unwrap().insert(path, factory); - } - - pub async fn create(&mut self, path: String, content: Option) -> Result { + pub async fn create_buffer(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { - path: path.clone(), - content: content.clone(), - user: self.id.to_string(), + path, content, + user: self.driver.id(), }; - let res = self.client.create(req).await?.into_inner(); + let res = self.client.create(req).await?; - Ok(res.accepted) + Ok(res.into_inner().accepted) } - pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result { - let factory = self.get_factory(&path)?; - match factory.insert(txt, pos).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn delete(&mut self, path: String, pos: u64, count: u64) -> Result { - let factory = self.get_factory(&path)?; - match factory.delete(pos, count).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn replace(&mut self, path: String, txt: String) -> Result { - let factory = self.get_factory(&path)?; - match factory.replace(txt).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> { - let req = CursorMov { - path, user: self.id.to_string(), - row, col, - }; - let _res = self.client.cursor(req).await?.into_inner(); - Ok(()) - } - - pub async fn listen(&mut self, path: String, callback: F) -> Result<(), Status> - where F : Fn(CursorMov) -> () + Send + 'static { + pub async fn attach_buffer(&mut self, path: String) -> Result, Status> { let req = BufferPayload { - path, - content: None, - user: self.id.to_string(), + path, content: None, + user: self.driver.id(), }; - let mut stream = self.client.listen(req).await?.into_inner(); + + let content = self.client.sync(req.clone()) + .await? + .into_inner() + .content; + + let mut stream = self.client.attach(req).await?.into_inner(); + + let factory = Arc::new(AsyncFactory::new(content)); + + let (tx, mut rx) = mpsc::channel(64); + tokio::spawn(async move { - // TODO catch some errors - while let Ok(Some(x)) = stream.message().await { - callback(x) + loop { + match stream.message().await { + Err(e) => break error!("error receiving update: {}", e), + Ok(None) => break, + Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { + Err(e) => break error!("error deserializing opseq: {}", e), + Ok(v) => match factory.process(v).await { + Err(e) => break error!("could not apply operation from server: {}", e), + Ok(txt) => { // send back txt + } + } + }, + } } }); - Ok(()) - } - pub async fn attach(&mut self, path: String, callback: F) -> Result - where F : Fn(String) -> () + Send + 'static { - let content = self.sync(path.clone()).await?; - let factory = Arc::new(AsyncFactory::new(Some(content.clone()))); - self.add_factory(path.clone(), factory.clone()); - let req = BufferPayload { - path, - content: None, - user: self.id.to_string(), - }; - let stream = self.client.attach(req).await?.into_inner(); - tokio::spawn(async move { Self::worker(stream, factory, callback).await } ); - Ok(content) - } + tokio::spawn(async move { + while let Some(op) = rx.recv().await { - pub fn detach(&mut self, path: String) { - self.factories.write().unwrap().remove(&path); - info!("|| detached from buffer"); - } - - async fn sync(&mut self, path: String) -> Result { - let res = self.client.sync( - BufferPayload { - path, content: None, user: self.id.to_string(), } - ).await?; - Ok(res.into_inner().content.unwrap_or("".into())) - } + }); - async fn worker(mut stream: Streaming, factory: Arc, callback: F) - where F : Fn(String) -> () { - info!("|> buffer worker started"); - loop { - match stream.message().await { - Err(e) => break error!("error receiving change: {}", e), - Ok(v) => match v { - None => break warn!("stream closed"), - Some(operation) => match serde_json::from_str(&operation.opseq) { - Err(e) => break error!("could not deserialize opseq: {}", e), - Ok(op) => match factory.process(op).await { - Err(e) => break error!("desynched: {}", e), - Ok(x) => callback(x), - }, - } - }, - } - } - info!("[] buffer worker stopped"); + Ok(tx) } } diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs index 320bd84..f336df4 100644 --- a/src/lib/opfactory.rs +++ b/src/lib/opfactory.rs @@ -1,47 +1,24 @@ -use std::collections::VecDeque; +use std::sync::Arc; use operational_transform::{OperationSeq, OTError}; use similar::TextDiff; use tokio::sync::{mpsc, watch, oneshot}; use tracing::{error, warn}; -#[derive(Clone)] -pub struct OperationFactory { - content: String, - queue: VecDeque, -} +#[tonic::async_trait] +pub trait OperationFactory { + fn content(&self) -> String; + async fn apply(&self, op: OperationSeq) -> Result; + async fn process(&self, op: OperationSeq) -> Result; + async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError>; -impl OperationFactory { - pub fn new(init: Option) -> Self { - OperationFactory { - content: init.unwrap_or(String::new()), - queue: VecDeque::new(), - } - } - - fn apply(&mut self, op: OperationSeq) -> Result { - if op.is_noop() { return Err(OTError) } - self.content = op.apply(&self.content)?; - self.queue.push_back(op.clone()); - Ok(op) - } - - // TODO remove the need for this - pub fn content(&self) -> String { - self.content.clone() - } - - pub fn check(&self, txt: &str) -> bool { - self.content == txt - } - - pub fn replace(&mut self, txt: &str) -> Result { + fn replace(&self, txt: &str) -> OperationSeq { let mut out = OperationSeq::default(); - if self.content == txt { // TODO throw and error rather than wasting everyone's resources - return Err(OTError); // nothing to do + if self.content() == txt { + return out; // TODO this won't work, should we return a noop instead? } - let diff = TextDiff::from_chars(self.content.as_str(), txt); + let diff = TextDiff::from_chars(self.content().as_str(), txt); for change in diff.iter_all_changes() { match change.tag() { @@ -51,59 +28,37 @@ impl OperationFactory { } } - self.content = out.apply(&self.content)?; - Ok(out) + out } - pub fn insert(&mut self, txt: &str, pos: u64) -> Result { + fn insert(&self, txt: &str, pos: u64) -> OperationSeq { let mut out = OperationSeq::default(); - let total = self.content.len() as u64; + let total = self.content().len() as u64; out.retain(pos); out.insert(txt); out.retain(total - pos); - Ok(self.apply(out)?) + out } - pub fn delete(&mut self, pos: u64, count: u64) -> Result { + fn delete(&self, pos: u64, count: u64) -> OperationSeq { let mut out = OperationSeq::default(); - let len = self.content.len() as u64; + let len = self.content().len() as u64; out.retain(pos - count); out.delete(count); out.retain(len - pos); - Ok(self.apply(out)?) + out } - pub fn cancel(&mut self, pos: u64, count: u64) -> Result { + fn cancel(&self, pos: u64, count: u64) -> OperationSeq { let mut out = OperationSeq::default(); - let len = self.content.len() as u64; + let len = self.content().len() as u64; out.retain(pos); out.delete(count); out.retain(len - (pos+count)); - Ok(self.apply(out)?) - } - - pub fn ack(&mut self, op: OperationSeq) -> Result<(), OTError> { // TODO use a different error? - // TODO is manually iterating from behind worth the manual search boilerplate? - for (i, o) in self.queue.iter().enumerate().rev() { - if o == &op { - self.queue.remove(i); - return Ok(()); - } - } - warn!("could not ack op {:?} from {:?}", op, self.queue); - Err(OTError) - } - - pub fn process(&mut self, mut op: OperationSeq) -> Result { - for o in self.queue.iter_mut() { - (op, *o) = op.transform(o)?; - } - self.content = op.apply(&self.content)?; - Ok(self.content.clone()) + out } } - pub struct AsyncFactory { run: watch::Sender, ops: mpsc::Sender, @@ -117,6 +72,32 @@ impl Drop for AsyncFactory { } } +#[tonic::async_trait] +impl OperationFactory for AsyncFactory { + fn content(&self) -> String { + return self.content.borrow().clone(); + } + + async fn apply(&self, op: OperationSeq) -> Result { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Apply(op, tx)).await.map_err(|_| OTError)?; + Ok(rx.await.map_err(|_| OTError)?) + } + + async fn process(&self, op: OperationSeq) -> Result { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Process(op, tx)).await.map_err(|_| OTError)?; + Ok(rx.await.map_err(|_| OTError)?) + } + + async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError> { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Acknowledge(op, tx)).await.map_err(|_| OTError)?; + Ok(rx.await.map_err(|_| OTError)?) + } + +} + impl AsyncFactory { pub fn new(init: Option) -> Self { let (run_tx, run_rx) = watch::channel(true); @@ -124,7 +105,7 @@ impl AsyncFactory { let (txt_tx, txt_rx) = watch::channel("".into()); let worker = AsyncFactoryWorker { - factory: OperationFactory::new(init), + text: init.unwrap_or("".into()), ops: ops_rx, run: run_rx, content: txt_tx, @@ -134,62 +115,18 @@ impl AsyncFactory { AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx } } - - pub async fn insert(&self, txt: String, pos: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Insert(txt, pos), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn delete(&self, pos: u64, count: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Delete(pos, count), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn cancel(&self, pos: u64, count: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Cancel(pos, count), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn replace(&self, txt: String) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Replace(txt), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn process(&self, opseq: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Process(opseq, tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn ack(&self, opseq: OperationSeq) -> Result<(), OTError> { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Ack(opseq, tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } } #[derive(Debug)] enum OpMsg { - Exec(OpWrapper, oneshot::Sender>), - Process(OperationSeq, oneshot::Sender>), - Ack(OperationSeq, oneshot::Sender>), -} - -#[derive(Debug)] -enum OpWrapper { - Insert(String, u64), - Delete(u64, u64), - Cancel(u64, u64), - Replace(String), + Apply(OperationSeq, oneshot::Sender), + Process(OperationSeq, oneshot::Sender), + Acknowledge(OperationSeq, oneshot::Sender<()>) } struct AsyncFactoryWorker { - factory: OperationFactory, + text: String, ops: mpsc::Receiver, run: watch::Receiver, content: watch::Sender @@ -204,7 +141,7 @@ impl AsyncFactoryWorker { match recv { Some(msg) => { match msg { - OpMsg::Exec(op, tx) => tx.send(self.exec(op)).unwrap_or(()), + OpMsg::Apply(op, tx) => tx.send(self.exec(op)).unwrap_or(()), OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()), OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()), } From eafbc41bd1c61f28d5f01fb5b26374e0ecb7f276 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 17 Apr 2023 14:56:00 +0200 Subject: [PATCH 2/6] chore: split op factory into processor and factory --- src/lib/lib.rs | 3 +- src/lib/operation/factory.rs | 52 ++++++++++ src/lib/operation/mod.rs | 5 + src/lib/operation/processor.rs | 79 +++++++++++++++ src/lib/opfactory.rs | 171 --------------------------------- 5 files changed, 138 insertions(+), 172 deletions(-) create mode 100644 src/lib/operation/factory.rs create mode 100644 src/lib/operation/mod.rs create mode 100644 src/lib/operation/processor.rs delete mode 100644 src/lib/opfactory.rs diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 82b6ce5..1ae3d88 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,6 +1,7 @@ pub mod proto; -pub mod opfactory; pub mod client; +pub mod operation; +pub mod cursor; pub use tonic; pub use tokio; diff --git a/src/lib/operation/factory.rs b/src/lib/operation/factory.rs new file mode 100644 index 0000000..8035520 --- /dev/null +++ b/src/lib/operation/factory.rs @@ -0,0 +1,52 @@ +use operational_transform::OperationSeq; +use similar::{TextDiff, ChangeTag}; + +pub trait OperationFactory { + fn content(&self) -> String; + + fn replace(&self, txt: &str) -> OperationSeq { + let mut out = OperationSeq::default(); + if self.content() == txt { + return out; // TODO this won't work, should we return a noop instead? + } + + let diff = TextDiff::from_chars(self.content().as_str(), txt); + + for change in diff.iter_all_changes() { + match change.tag() { + ChangeTag::Equal => out.retain(1), + ChangeTag::Delete => out.delete(1), + ChangeTag::Insert => out.insert(change.value()), + } + } + + out + } + + fn insert(&self, txt: &str, pos: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let total = self.content().len() as u64; + out.retain(pos); + out.insert(txt); + out.retain(total - pos); + out + } + + fn delete(&self, pos: u64, count: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let len = self.content().len() as u64; + out.retain(pos - count); + out.delete(count); + out.retain(len - pos); + out + } + + fn cancel(&self, pos: u64, count: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let len = self.content().len() as u64; + out.retain(pos); + out.delete(count); + out.retain(len - (pos+count)); + out + } +} diff --git a/src/lib/operation/mod.rs b/src/lib/operation/mod.rs new file mode 100644 index 0000000..8796b59 --- /dev/null +++ b/src/lib/operation/mod.rs @@ -0,0 +1,5 @@ +pub mod factory; +pub mod processor; + +pub use processor::{OperationController, OperationProcessor}; +pub use factory::OperationFactory; diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs new file mode 100644 index 0000000..a363cf8 --- /dev/null +++ b/src/lib/operation/processor.rs @@ -0,0 +1,79 @@ +use std::{sync::{Mutex, Arc}, collections::VecDeque}; + +use operational_transform::{OperationSeq, OTError}; +use tokio::sync::{watch, oneshot, mpsc}; +use tracing::error; + +use crate::operation::factory::OperationFactory; + + +#[tonic::async_trait] +pub trait OperationProcessor : OperationFactory{ + async fn apply(&self, op: OperationSeq) -> Result; + async fn process(&self, op: OperationSeq) -> Result; + + async fn poll(&self) -> Option; + async fn ack(&self) -> Option; +} + + +pub struct OperationController { + text: Mutex, + queue: Mutex>, + last: Mutex>, + notifier: watch::Sender, +} + +impl OperationController { + pub fn new(content: String) -> Self { + let (tx, rx) = watch::channel(OperationSeq::default()); + OperationController { + text: Mutex::new(content), + queue: Mutex::new(VecDeque::new()), + last: Mutex::new(rx), + notifier: tx, + } + } +} + +impl OperationFactory for OperationController { + fn content(&self) -> String { + self.text.lock().unwrap().clone() + } +} + +#[tonic::async_trait] +impl OperationProcessor for OperationController { + async fn apply(&self, op: OperationSeq) -> Result { + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + self.queue.lock().unwrap().push_back(op.clone()); + self.notifier.send(op).unwrap(); + Ok(res) + } + + async fn process(&self, mut op: OperationSeq) -> Result { + let mut queue = self.queue.lock().unwrap(); + for el in queue.iter_mut() { + (op, *el) = op.transform(el)?; + } + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + Ok(res) + } + + async fn poll(&self) -> Option { + let len = self.queue.lock().unwrap().len(); + if len <= 0 { + let mut recv = self.last.lock().unwrap().clone(); + recv.changed().await.unwrap(); + } + Some(self.queue.lock().unwrap().get(0)?.clone()) + } + + async fn ack(&self) -> Option { + self.queue.lock().unwrap().pop_front() + } +} diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs deleted file mode 100644 index f336df4..0000000 --- a/src/lib/opfactory.rs +++ /dev/null @@ -1,171 +0,0 @@ -use std::sync::Arc; - -use operational_transform::{OperationSeq, OTError}; -use similar::TextDiff; -use tokio::sync::{mpsc, watch, oneshot}; -use tracing::{error, warn}; - -#[tonic::async_trait] -pub trait OperationFactory { - fn content(&self) -> String; - async fn apply(&self, op: OperationSeq) -> Result; - async fn process(&self, op: OperationSeq) -> Result; - async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError>; - - fn replace(&self, txt: &str) -> OperationSeq { - let mut out = OperationSeq::default(); - if self.content() == txt { - return out; // TODO this won't work, should we return a noop instead? - } - - let diff = TextDiff::from_chars(self.content().as_str(), txt); - - for change in diff.iter_all_changes() { - match change.tag() { - similar::ChangeTag::Equal => out.retain(1), - similar::ChangeTag::Delete => out.delete(1), - similar::ChangeTag::Insert => out.insert(change.value()), - } - } - - out - } - - fn insert(&self, txt: &str, pos: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let total = self.content().len() as u64; - out.retain(pos); - out.insert(txt); - out.retain(total - pos); - out - } - - fn delete(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos - count); - out.delete(count); - out.retain(len - pos); - out - } - - fn cancel(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos); - out.delete(count); - out.retain(len - (pos+count)); - out - } -} - -pub struct AsyncFactory { - run: watch::Sender, - ops: mpsc::Sender, - #[allow(unused)] // TODO is this necessary? - content: watch::Receiver, -} - -impl Drop for AsyncFactory { - fn drop(&mut self) { - self.run.send(false).unwrap_or(()); - } -} - -#[tonic::async_trait] -impl OperationFactory for AsyncFactory { - fn content(&self) -> String { - return self.content.borrow().clone(); - } - - async fn apply(&self, op: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Apply(op, tx)).await.map_err(|_| OTError)?; - Ok(rx.await.map_err(|_| OTError)?) - } - - async fn process(&self, op: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Process(op, tx)).await.map_err(|_| OTError)?; - Ok(rx.await.map_err(|_| OTError)?) - } - - async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError> { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Acknowledge(op, tx)).await.map_err(|_| OTError)?; - Ok(rx.await.map_err(|_| OTError)?) - } - -} - -impl AsyncFactory { - pub fn new(init: Option) -> Self { - let (run_tx, run_rx) = watch::channel(true); - let (ops_tx, ops_rx) = mpsc::channel(64); // TODO hardcoded size - let (txt_tx, txt_rx) = watch::channel("".into()); - - let worker = AsyncFactoryWorker { - text: init.unwrap_or("".into()), - ops: ops_rx, - run: run_rx, - content: txt_tx, - }; - - tokio::spawn(async move { worker.work().await }); - - AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx } - } -} - - -#[derive(Debug)] -enum OpMsg { - Apply(OperationSeq, oneshot::Sender), - Process(OperationSeq, oneshot::Sender), - Acknowledge(OperationSeq, oneshot::Sender<()>) -} - -struct AsyncFactoryWorker { - text: String, - ops: mpsc::Receiver, - run: watch::Receiver, - content: watch::Sender -} - -impl AsyncFactoryWorker { - async fn work(mut self) { - while *self.run.borrow() { - tokio::select! { // periodically check run so that we stop cleanly - - recv = self.ops.recv() => { - match recv { - Some(msg) => { - match msg { - OpMsg::Apply(op, tx) => tx.send(self.exec(op)).unwrap_or(()), - OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()), - OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()), - } - if let Err(e) = self.content.send(self.factory.content()) { - error!("error updating content: {}", e); - break; - } - }, - None => break, - } - }, - - _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, - - }; - } - } - - fn exec(&mut self, op: OpWrapper) -> Result { - match op { - OpWrapper::Insert(txt, pos) => Ok(self.factory.insert(&txt, pos)?), - OpWrapper::Delete(pos, count) => Ok(self.factory.delete(pos, count)?), - OpWrapper::Cancel(pos, count) => Ok(self.factory.cancel(pos, count)?), - OpWrapper::Replace(txt) => Ok(self.factory.replace(&txt)?), - } - } -} From 35935d88a45d39ba39208107f9ba44af24b78951 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 17 Apr 2023 14:56:25 +0200 Subject: [PATCH 3/6] chore: rewrote the codemp client using new traits --- src/client/nvim/main.rs | 11 ++-- src/lib/client.rs | 110 +++++++++++++++++++++++++++++----------- 2 files changed, 86 insertions(+), 35 deletions(-) diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 0b6662e..f5651a6 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,6 +1,8 @@ -use std::{net::TcpStream, sync::Mutex}; +use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; -use codemp::client::CodempClient; +use codemp::cursor::CursorController; +use codemp::operation::OperationController; +use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::proto::buffer_client::BufferClient; use rmpv::Value; @@ -9,12 +11,13 @@ use tokio::io::Stdout; use clap::Parser; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; -use tonic::async_trait; use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { client: CodempClient, + factories: BTreeMap, + cursor: Option, } fn nullable_optional_str(args: &Vec, index: usize) -> Option { @@ -33,7 +36,7 @@ fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } -#[async_trait] +#[tonic::async_trait] impl Handler for NeovimHandler { type Writer = Compat; diff --git a/src/lib/client.rs b/src/lib/client.rs index d92e6b5..0ab4938 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -1,33 +1,38 @@ -use std::{future::Future, sync::Arc, pin::Pin}; +use std::sync::Arc; use operational_transform::OperationSeq; -use tokio::sync::mpsc; -use tonic::{transport::{Channel, Error}, Status}; -use tracing::error; +use tonic::{transport::Channel, Status}; +use tracing::{error, warn}; +use uuid::Uuid; -use crate::{proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, opfactory::AsyncFactory}; - -pub trait EditorDriver : Clone { - fn id(&self) -> String; -} +use crate::{ + cursor::{CursorController, CursorStorage}, + operation::{OperationController, OperationProcessor}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, +}; #[derive(Clone)] -pub struct CodempClient { +pub struct CodempClient { + id: String, client: BufferClient, - driver: T, } -impl CodempClient { // TODO wrap tonic 'connect' to allow multiple types - pub async fn new(addr: &str, driver: T) -> Result { - let client = BufferClient::connect(addr.to_string()).await?; +impl From::> for CodempClient { + fn from(value: BufferClient) -> Self { + CodempClient { id: Uuid::new_v4().to_string(), client: value } + } +} + +impl CodempClient { + pub fn new(id: String, client: BufferClient) -> Self { + CodempClient { id, client } - Ok(CodempClient { client, driver }) } - pub async fn create_buffer(&mut self, path: String, content: Option) -> Result { + pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { path, content, - user: self.driver.id(), + user: self.id.clone(), }; let res = self.client.create(req).await?; @@ -35,10 +40,36 @@ impl CodempClient { // TODO wrap tonic 'connect' to allow m Ok(res.into_inner().accepted) } - pub async fn attach_buffer(&mut self, path: String) -> Result, Status> { + pub async fn listen(&mut self) -> Result, Status> { let req = BufferPayload { - path, content: None, - user: self.driver.id(), + path: "".into(), + content: None, + user: self.id.clone(), + }; + + let mut stream = self.client.listen(req).await?.into_inner(); + + let controller = Arc::new(CursorController::new()); + + let _controller = controller.clone(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break error!("error receiving cursor: {}", e), + Ok(None) => break, + Ok(Some(x)) => { _controller.update(x); }, + } + } + }); + + Ok(controller) + } + + pub async fn attach(&mut self, path: String) -> Result, Status> { + let req = BufferPayload { + path: path.clone(), + content: None, + user: self.id.clone(), }; let content = self.client.sync(req.clone()) @@ -48,33 +79,50 @@ impl CodempClient { // TODO wrap tonic 'connect' to allow m let mut stream = self.client.attach(req).await?.into_inner(); - let factory = Arc::new(AsyncFactory::new(content)); - - let (tx, mut rx) = mpsc::channel(64); + let factory = Arc::new(OperationController::new(content.unwrap_or("".into()))); + let _factory = factory.clone(); tokio::spawn(async move { loop { match stream.message().await { - Err(e) => break error!("error receiving update: {}", e), - Ok(None) => break, + Err(e) => break error!("error receiving update: {}", e), + Ok(None) => break, // clean exit Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { Err(e) => break error!("error deserializing opseq: {}", e), - Ok(v) => match factory.process(v).await { + Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(txt) => { // send back txt - } + Ok(_txt) => { } } }, } } }); + let mut _client = self.client.clone(); + let _uid = self.id.clone(); + let _factory = factory.clone(); + let _path = path.clone(); tokio::spawn(async move { - while let Some(op) = rx.recv().await { - + while let Some(op) = _factory.poll().await { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path: _path.clone(), + user: _uid.clone(), + }; + match _client.edit(req).await { + Ok(res) => match res.into_inner().accepted { + true => { _factory.ack().await; }, + false => { + warn!("server rejected operation, retrying in 1s"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }, + Err(e) => error!("could not send edit: {}", e), + } } }); - Ok(tx) + Ok(factory) } } From ead5ffc49c849540bae8abda4da41797f89a7ccb Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 17 Apr 2023 14:57:06 +0200 Subject: [PATCH 4/6] chore: struct for storing cursor states --- src/lib/cursor.rs | 56 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/lib/cursor.rs diff --git a/src/lib/cursor.rs b/src/lib/cursor.rs new file mode 100644 index 0000000..c6ed07a --- /dev/null +++ b/src/lib/cursor.rs @@ -0,0 +1,56 @@ +use std::{collections::HashMap, sync::Mutex}; + +use crate::proto::CursorMov; + +/// Note that this differs from any hashmap in its put method: no &mut! +pub trait CursorStorage { + fn get(&self, id: &String) -> Option; + fn put(&self, id: String, val: Cursor); + + fn update(&self, event: CursorMov) -> Option { + let mut cur = self.get(&event.user)?; + cur.buffer = event.path; + cur.start = (event.row, event.col).into(); + self.put(event.user, cur.clone()); + Some(cur) + } +} + +#[derive(Copy, Clone)] +pub struct Position { + row: i64, + col: i64, +} + +impl From::<(i64, i64)> for Position { + fn from((row, col): (i64, i64)) -> Self { + Position { row, col } + } +} + +#[derive(Clone)] +pub struct Cursor { + buffer: String, + start: Position, + end: Position, +} + +pub struct CursorController { + users: Mutex>, +} + +impl CursorController { + pub fn new() -> Self { + CursorController { users: Mutex::new(HashMap::new()) } + } +} + +impl CursorStorage for CursorController { + fn get(&self, id: &String) -> Option { + Some(self.users.lock().unwrap().get(id)?.clone()) + } + + fn put(&self, id: String, val: Cursor) { + self.users.lock().unwrap().insert(id, val); + } +} From b8aa7d5fce1501eab41da9e2d8abe989fe6dba39 Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 18 Apr 2023 21:46:19 +0200 Subject: [PATCH 5/6] fix: temp value was dropped --- src/lib/operation/factory.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lib/operation/factory.rs b/src/lib/operation/factory.rs index 8035520..b8fa014 100644 --- a/src/lib/operation/factory.rs +++ b/src/lib/operation/factory.rs @@ -6,11 +6,12 @@ pub trait OperationFactory { fn replace(&self, txt: &str) -> OperationSeq { let mut out = OperationSeq::default(); - if self.content() == txt { + let content = self.content(); + if content == txt { return out; // TODO this won't work, should we return a noop instead? } - let diff = TextDiff::from_chars(self.content().as_str(), txt); + let diff = TextDiff::from_chars(content.as_str(), txt); for change in diff.iter_all_changes() { match change.tag() { From 3609dbfa84a348df511c080803677d842294e722 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 19 Apr 2023 04:18:22 +0200 Subject: [PATCH 6/6] chore: finished reimplementing features modularly now everything that worked in 0.2 seems to work again, and should actually be better. plus, merging differences is done properly and thus should be way more reliable --- src/client/nvim/codemp.lua | 3 +- src/client/nvim/main.rs | 153 ++++++++++++++++++--------------- src/lib/client.rs | 16 +++- src/lib/cursor.rs | 47 ++++++++-- src/lib/operation/processor.rs | 26 ++++-- 5 files changed, 158 insertions(+), 87 deletions(-) diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 3be8f8e..55e0a1d 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -49,13 +49,13 @@ local function hook_callbacks(path, buffer) { callback = function(args) local cursor = vim.api.nvim_win_get_cursor(0) + pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors if cursor[1] == last_line then return end last_line = cursor[1] local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false) pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors - pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors end, buffer = buffer, group = codemp_autocmds, @@ -111,6 +111,7 @@ vim.api.nvim_create_user_command('Connect', -- print(vim.fn.join(data, "\n")) end, stderr_buffered = false, + env = { RUST_BACKTRACE = 1 } } ) if M.jobid <= 0 then diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index f5651a6..f7e9125 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,7 +1,7 @@ +use std::sync::Arc; use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; -use codemp::cursor::CursorController; -use codemp::operation::OperationController; +use codemp::operation::{OperationController, OperationFactory}; use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::proto::buffer_client::BufferClient; use rmpv::Value; @@ -16,8 +16,7 @@ use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { client: CodempClient, - factories: BTreeMap, - cursor: Option, + factories: Arc>>>, } fn nullable_optional_str(args: &Vec, index: usize) -> Option { @@ -36,6 +35,12 @@ fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } +impl NeovimHandler { + fn buffer_controller(&self, path: &String) -> Option> { + Some(self.factories.lock().unwrap().get(path)?.clone()) + } +} + #[tonic::async_trait] impl Handler for NeovimHandler { type Writer = Compat; @@ -72,16 +77,18 @@ impl Handler for NeovimHandler { } let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let pos = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.insert(path, txt, pos).await { - Ok(res) => { - match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), + let mut pos = default_zero_number(&args, 2) as i64; + + if pos <= 0 { pos = 0 } // TODO wtf vim?? + + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => { + match controller.apply(controller.insert(&txt, pos as u64)).await { + Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + Ok(_res) => Ok(Value::Nil), } - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + } } }, @@ -93,13 +100,12 @@ impl Handler for NeovimHandler { let pos = default_zero_number(&args, 1) as u64; let count = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.delete(path, pos, count).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.delete(pos, count)).await { + Err(e) => Err(Value::from(format!("could not send delete: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -110,13 +116,12 @@ impl Handler for NeovimHandler { let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let mut c = self.client.clone(); - match c.replace(path, txt).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.replace(&txt)).await { + Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -132,63 +137,72 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); - let buf = buffer.clone(); - match c.attach(path, move |x| { - let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); - let b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = b.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); - } - }); - }).await { + match c.attach(path.clone()).await { Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), - Ok(content) => { - let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); - if let Err(e) = buffer.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); + Ok(controller) => { + let _controller = controller.clone(); + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + match buffer.set_lines(0, -1, false, lines).await { + Err(e) => Err(Value::from(format!("could not sync buffer: {}", e))), + Ok(()) => { + tokio::spawn(async move { + loop { + _controller.wait().await; + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + if let Err(e) = buffer.set_lines(0, -1, false, lines).await { + error!("could not update buffer: {}", e); + } + } + }); + self.factories.lock().unwrap().insert(path, controller); + Ok(Value::Nil) + } } - Ok(Value::Nil) }, } }, "detach" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - c.detach(path); - Ok(Value::Nil) + Err(Value::from("unimplemented! try with :q!")) + // if args.len() < 1 { + // return Err(Value::from("no path given")); + // } + // let path = default_empty_str(&args, 0); + // let mut c = self.client.clone(); + // c.detach(path); + // Ok(Value::Nil) }, "listen" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - let ns = nvim.create_namespace("Cursor").await .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; let buf = nvim.get_current_buf().await .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; - match c.listen(path, move |cur| { - let _b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = _b.clear_namespace(ns, 0, -1).await { - error!("could not clear previous cursor highlight: {}", e); - } - if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await { - error!("could not create highlight for cursor: {}", e); - } - }); - }).await { - Ok(()) => Ok(Value::Nil), + let mut c = self.client.clone(); + match c.listen().await { Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))), + Ok(cursor) => { + let mut sub = cursor.sub(); + debug!("spawning cursor processing worker"); + tokio::spawn(async move { + loop { + match sub.recv().await { + Err(e) => return error!("error receiving cursor update from controller: {}", e), + Ok((_usr, cur)) => { + if let Err(e) = buf.clear_namespace(ns, 0, -1).await { + error!("could not clear previous cursor highlight: {}", e); + } + if let Err(e) = buf.add_highlight(ns, "ErrorMsg", cur.start.row-1, cur.start.col, cur.start.col+1).await { + error!("could not create highlight for cursor: {}", e); + } + } + } + } + }); + Ok(Value::Nil) + }, } }, @@ -202,8 +216,8 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); match c.cursor(path, row, col).await { - Ok(()) => Ok(Value::Nil), - Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), + Ok(_) => Ok(Value::Nil), + Err(e) => Err(Value:: from(format!("could not update cursor: {}", e))), } }, @@ -262,6 +276,7 @@ async fn main() -> Result<(), Box> { let handler: NeovimHandler = NeovimHandler { client: client.into(), + factories: Arc::new(Mutex::new(BTreeMap::new())), }; let (_nvim, io_handler) = create::new_parent(handler).await; diff --git a/src/lib/client.rs b/src/lib/client.rs index 0ab4938..f16ce15 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::{ cursor::{CursorController, CursorStorage}, operation::{OperationController, OperationProcessor}, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, }; #[derive(Clone)] @@ -26,7 +26,6 @@ impl From::> for CodempClient { impl CodempClient { pub fn new(id: String, client: BufferClient) -> Self { CodempClient { id, client } - } pub async fn create(&mut self, path: String, content: Option) -> Result { @@ -91,7 +90,9 @@ impl CodempClient { Err(e) => break error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_txt) => { } + Ok(_txt) => { + // send event containing where the change happened + } } }, } @@ -125,4 +126,13 @@ impl CodempClient { Ok(factory) } + + pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result { + let req = CursorMov { + path, row, col, + user: self.id.clone(), + }; + let res = self.client.cursor(req).await?.into_inner(); + Ok(res.accepted) + } } diff --git a/src/lib/cursor.rs b/src/lib/cursor.rs index c6ed07a..9c3a2b4 100644 --- a/src/lib/cursor.rs +++ b/src/lib/cursor.rs @@ -1,5 +1,8 @@ use std::{collections::HashMap, sync::Mutex}; +use tokio::sync::broadcast; +use tracing::{info, error, debug, warn}; + use crate::proto::CursorMov; /// Note that this differs from any hashmap in its put method: no &mut! @@ -16,10 +19,10 @@ pub trait CursorStorage { } } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug, Default)] pub struct Position { - row: i64, - col: i64, + pub row: i64, + pub col: i64, } impl From::<(i64, i64)> for Position { @@ -28,24 +31,52 @@ impl From::<(i64, i64)> for Position { } } -#[derive(Clone)] +#[derive(Clone, Debug, Default)] pub struct Cursor { - buffer: String, - start: Position, - end: Position, + pub buffer: String, + pub start: Position, + pub end: Position, } pub struct CursorController { users: Mutex>, + bus: broadcast::Sender<(String, Cursor)>, + _bus_keepalive: Mutex>, } impl CursorController { pub fn new() -> Self { - CursorController { users: Mutex::new(HashMap::new()) } + let (tx, _rx) = broadcast::channel(64); + CursorController { + users: Mutex::new(HashMap::new()), + bus: tx, + _bus_keepalive: Mutex::new(_rx), + } + } + + pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> { + self.bus.subscribe() } } impl CursorStorage for CursorController { + fn update(&self, event: CursorMov) -> Option { + debug!("processing cursor event: {:?}", event); + let mut cur = self.get(&event.user).unwrap_or(Cursor::default()); + cur.buffer = event.path; + cur.start = (event.row, event.col).into(); + cur.end = (event.row, event.col).into(); + self.put(event.user.clone(), cur.clone()); + if let Err(e) = self.bus.send((event.user, cur.clone())) { + error!("could not broadcast cursor event: {}", e); + } else { // this is because once there are no receivers, nothing else can be sent + if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() { + warn!("could not consume event: {}", e); + } + } + Some(cur) + } + fn get(&self, id: &String) -> Option { Some(self.users.lock().unwrap().get(id)?.clone()) } diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index a363cf8..a585b80 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -1,19 +1,19 @@ -use std::{sync::{Mutex, Arc}, collections::VecDeque}; +use std::{sync::Mutex, collections::VecDeque}; use operational_transform::{OperationSeq, OTError}; -use tokio::sync::{watch, oneshot, mpsc}; -use tracing::error; +use tokio::sync::watch; use crate::operation::factory::OperationFactory; #[tonic::async_trait] -pub trait OperationProcessor : OperationFactory{ +pub trait OperationProcessor : OperationFactory { async fn apply(&self, op: OperationSeq) -> Result; async fn process(&self, op: OperationSeq) -> Result; async fn poll(&self) -> Option; async fn ack(&self) -> Option; + async fn wait(&self); } @@ -22,16 +22,21 @@ pub struct OperationController { queue: Mutex>, last: Mutex>, notifier: watch::Sender, + changed: Mutex>, + changed_notifier: watch::Sender<()>, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); + let (done, wait) = watch::channel(()); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), last: Mutex::new(rx), notifier: tx, + changed: Mutex::new(wait), + changed_notifier: done, } } } @@ -49,10 +54,16 @@ impl OperationProcessor for OperationController { let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op).unwrap(); + self.notifier.send(op); Ok(res) } + async fn wait(&self) { + let mut blocker = self.changed.lock().unwrap().clone(); + blocker.changed().await; + blocker.changed().await; + } + async fn process(&self, mut op: OperationSeq) -> Result { let mut queue = self.queue.lock().unwrap(); for el in queue.iter_mut() { @@ -61,6 +72,7 @@ impl OperationProcessor for OperationController { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); + self.changed_notifier.send(()); Ok(res) } @@ -68,7 +80,9 @@ impl OperationProcessor for OperationController { let len = self.queue.lock().unwrap().len(); if len <= 0 { let mut recv = self.last.lock().unwrap().clone(); - recv.changed().await.unwrap(); + // TODO this is not 100% reliable + recv.changed().await; // acknowledge current state + recv.changed().await; // wait for a change in state } Some(self.queue.lock().unwrap().get(0)?.clone()) }