diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index a585b80..9786341 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -2,6 +2,7 @@ use std::{sync::Mutex, collections::VecDeque}; use operational_transform::{OperationSeq, OTError}; use tokio::sync::watch; +use tracing::warn; use crate::operation::factory::OperationFactory; @@ -47,6 +48,16 @@ impl OperationFactory for OperationController { } } +/// TODO properly handle errors rather than sinking them all in here! +fn ignore_and_log(x: Result, msg: &str) { + match x { + Ok(_) => {}, + Err(e) => { + warn!("ignored error {}: {}", msg, e); + } + } +} + #[tonic::async_trait] impl OperationProcessor for OperationController { async fn apply(&self, op: OperationSeq) -> Result { @@ -54,14 +65,15 @@ impl OperationProcessor for OperationController { let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op); + ignore_and_log(self.notifier.send(op), "notifying of applied change"); Ok(res) } async fn wait(&self) { let mut blocker = self.changed.lock().unwrap().clone(); - blocker.changed().await; - blocker.changed().await; + // TODO less jank way + ignore_and_log(blocker.changed().await, "waiting for changed content #1"); + ignore_and_log(blocker.changed().await, "waiting for changed content #2"); } async fn process(&self, mut op: OperationSeq) -> Result { @@ -72,7 +84,7 @@ impl OperationProcessor for OperationController { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); - self.changed_notifier.send(()); + ignore_and_log(self.changed_notifier.send(()), "notifying of changed content"); Ok(res) } @@ -80,9 +92,9 @@ impl OperationProcessor for OperationController { let len = self.queue.lock().unwrap().len(); if len <= 0 { let mut recv = self.last.lock().unwrap().clone(); - // TODO this is not 100% reliable - recv.changed().await; // acknowledge current state - recv.changed().await; // wait for a change in state + // TODO less jank way + ignore_and_log(recv.changed().await, "wairing for op changes #1"); // acknowledge current state + ignore_and_log(recv.changed().await, "wairing for op changes #2"); // wait for a change in state } Some(self.queue.lock().unwrap().get(0)?.clone()) }