diff --git a/src/lib/operation/controller.rs b/src/lib/operation/controller.rs new file mode 100644 index 0000000..be36712 --- /dev/null +++ b/src/lib/operation/controller.rs @@ -0,0 +1,98 @@ +use std::{sync::Mutex, collections::VecDeque, ops::Range}; + +use operational_transform::{OperationSeq, OTError}; +use tokio::sync::watch; +use tracing::warn; + +use super::{OperationFactory, OperationProcessor, op_effective_range}; + + +pub struct OperationController { + text: Mutex, + queue: Mutex>, + last: Mutex>, + notifier: watch::Sender, + changed: Mutex>>, + changed_notifier: watch::Sender>, +} + +impl OperationController { + pub fn new(content: String) -> Self { + let (tx, rx) = watch::channel(OperationSeq::default()); + let (done, wait) = watch::channel(0..0); + OperationController { + text: Mutex::new(content), + queue: Mutex::new(VecDeque::new()), + last: Mutex::new(rx), + notifier: tx, + changed: Mutex::new(wait), + changed_notifier: done, + } + } + + pub async fn wait(&self) -> Range { + let mut blocker = self.changed.lock().unwrap().clone(); + // TODO less jank way + ignore_and_log(blocker.changed().await, "waiting for changed content #1"); + ignore_and_log(blocker.changed().await, "waiting for changed content #2"); + let span = blocker.borrow().clone(); + span + } + + pub async fn poll(&self) -> Option { + let len = self.queue.lock().unwrap().len(); + if len <= 0 { + let mut recv = self.last.lock().unwrap().clone(); + // TODO less jank way + ignore_and_log(recv.changed().await, "wairing for op changes #1"); // acknowledge current state + ignore_and_log(recv.changed().await, "wairing for op changes #2"); // wait for a change in state + } + Some(self.queue.lock().unwrap().get(0)?.clone()) + } + + pub async fn ack(&self) -> Option { + self.queue.lock().unwrap().pop_front() + } +} + +impl OperationFactory for OperationController { + fn content(&self) -> String { + self.text.lock().unwrap().clone() + } +} + +/// TODO properly handle errors rather than sinking them all in here! +fn ignore_and_log(x: Result, msg: &str) { + match x { + Ok(_) => {}, + Err(e) => { + warn!("ignored error {}: {}", msg, e); + } + } +} + +#[tonic::async_trait] +impl OperationProcessor for OperationController { + async fn apply(&self, op: OperationSeq) -> Result, OTError> { + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + self.queue.lock().unwrap().push_back(op.clone()); + ignore_and_log(self.notifier.send(op.clone()), "notifying of applied change"); + Ok(op_effective_range(&op)) + } + + + async fn process(&self, mut op: OperationSeq) -> Result, OTError> { + let mut queue = self.queue.lock().unwrap(); + for el in queue.iter_mut() { + (op, *el) = op.transform(el)?; + } + let txt = self.content(); + let res = op.apply(&txt)?; + let span = op_effective_range(&op); + *self.text.lock().unwrap() = res.clone(); + ignore_and_log(self.changed_notifier.send(span.clone()), "notifying of changed content"); + Ok(span) + } +} diff --git a/src/lib/operation/mod.rs b/src/lib/operation/mod.rs index 39f22da..6ed85bc 100644 --- a/src/lib/operation/mod.rs +++ b/src/lib/operation/mod.rs @@ -1,10 +1,12 @@ pub mod factory; pub mod processor; +pub mod controller; use std::ops::Range; use operational_transform::{Operation, OperationSeq}; -pub use processor::{OperationController, OperationProcessor}; +pub use processor::OperationProcessor; +pub use controller::OperationController; pub use factory::OperationFactory; pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index 2832dfe..a37dfa2 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -1,110 +1,11 @@ -use std::{sync::Mutex, collections::VecDeque, ops::Range}; +use std::ops::Range; use operational_transform::{OperationSeq, OTError}; -use tokio::sync::watch; -use tracing::warn; use crate::operation::factory::OperationFactory; -use super::op_effective_range; - - #[tonic::async_trait] pub trait OperationProcessor : OperationFactory { async fn apply(&self, op: OperationSeq) -> Result, OTError>; async fn process(&self, op: OperationSeq) -> Result, OTError>; - - async fn poll(&self) -> Option; - async fn ack(&self) -> Option; - async fn wait(&self) -> Range; -} - - -pub struct OperationController { - text: Mutex, - queue: Mutex>, - last: Mutex>, - notifier: watch::Sender, - changed: Mutex>>, - changed_notifier: watch::Sender>, -} - -impl OperationController { - pub fn new(content: String) -> Self { - let (tx, rx) = watch::channel(OperationSeq::default()); - let (done, wait) = watch::channel(0..0); - OperationController { - text: Mutex::new(content), - queue: Mutex::new(VecDeque::new()), - last: Mutex::new(rx), - notifier: tx, - changed: Mutex::new(wait), - changed_notifier: done, - } - } -} - -impl OperationFactory for OperationController { - fn content(&self) -> String { - self.text.lock().unwrap().clone() - } -} - -/// TODO properly handle errors rather than sinking them all in here! -fn ignore_and_log(x: Result, msg: &str) { - match x { - Ok(_) => {}, - Err(e) => { - warn!("ignored error {}: {}", msg, e); - } - } -} - -#[tonic::async_trait] -impl OperationProcessor for OperationController { - async fn apply(&self, op: OperationSeq) -> Result, OTError> { - let txt = self.content(); - let res = op.apply(&txt)?; - *self.text.lock().unwrap() = res.clone(); - self.queue.lock().unwrap().push_back(op.clone()); - ignore_and_log(self.notifier.send(op.clone()), "notifying of applied change"); - Ok(op_effective_range(&op)) - } - - async fn wait(&self) -> Range { - let mut blocker = self.changed.lock().unwrap().clone(); - // TODO less jank way - ignore_and_log(blocker.changed().await, "waiting for changed content #1"); - ignore_and_log(blocker.changed().await, "waiting for changed content #2"); - let span = blocker.borrow().clone(); - span - } - - async fn process(&self, mut op: OperationSeq) -> Result, OTError> { - let mut queue = self.queue.lock().unwrap(); - for el in queue.iter_mut() { - (op, *el) = op.transform(el)?; - } - let txt = self.content(); - let res = op.apply(&txt)?; - let span = op_effective_range(&op); - *self.text.lock().unwrap() = res.clone(); - ignore_and_log(self.changed_notifier.send(span.clone()), "notifying of changed content"); - Ok(span) - } - - async fn poll(&self) -> Option { - let len = self.queue.lock().unwrap().len(); - if len <= 0 { - let mut recv = self.last.lock().unwrap().clone(); - // TODO less jank way - ignore_and_log(recv.changed().await, "wairing for op changes #1"); // acknowledge current state - ignore_and_log(recv.changed().await, "wairing for op changes #2"); // wait for a change in state - } - Some(self.queue.lock().unwrap().get(0)?.clone()) - } - - async fn ack(&self) -> Option { - self.queue.lock().unwrap().pop_front() - } }