From 2c695a41f6656856ab24e3041cb9c42c0b0a62b8 Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 1 Jul 2023 13:54:34 +0200 Subject: [PATCH] fix: ignore errors better, unified op processing --- src/errors.rs | 15 ++++++++++ src/lib.rs | 2 ++ src/operation/controller.rs | 55 +++++++++++++++++-------------------- 3 files changed, 42 insertions(+), 30 deletions(-) create mode 100644 src/errors.rs diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..2b89441 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,15 @@ +use tracing::warn; + +pub trait IgnorableError { + fn unwrap_or_log(self, msg: &str); +} + +impl IgnorableError for Result +where E : std::fmt::Display { + fn unwrap_or_log(self, msg: &str) { + match self { + Ok(_) => {}, + Err(e) => warn!("{}: {}", msg, e), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 1ae3d88..4e12e2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ pub mod proto; pub mod client; pub mod operation; pub mod cursor; +pub mod errors; pub use tonic; pub use tokio; + diff --git a/src/operation/controller.rs b/src/operation/controller.rs index 3dfaad8..8628eb3 100644 --- a/src/operation/controller.rs +++ b/src/operation/controller.rs @@ -2,9 +2,10 @@ use std::{sync::Mutex, collections::VecDeque, ops::Range}; use operational_transform::{OperationSeq, OTError}; use tokio::sync::watch; -use tracing::{warn, error}; +use tracing::error; use super::{OperationFactory, OperationProcessor, op_effective_range}; +use crate::errors::IgnorableError; pub struct OperationController { @@ -37,8 +38,8 @@ impl OperationController { pub async fn wait(&self) -> Range { let mut blocker = self.changed.lock().unwrap().clone(); // TODO less jank way - ignore_and_log(blocker.changed().await, "waiting for changed content #1"); - ignore_and_log(blocker.changed().await, "waiting for changed content #2"); + blocker.changed().await.unwrap_or_log("waiting for changed content #1"); + blocker.changed().await.unwrap_or_log("waiting for changed content #2"); let span = blocker.borrow().clone(); span } @@ -48,8 +49,8 @@ impl OperationController { if len <= 0 { let mut recv = self.last.lock().unwrap().clone(); // TODO less jank way - ignore_and_log(recv.changed().await, "wairing for op changes #1"); // acknowledge current state - ignore_and_log(recv.changed().await, "wairing for op changes #2"); // wait for a change in state + recv.changed().await.unwrap_or_log("wairing for op changes #1"); // acknowledge current state + recv.changed().await.unwrap_or_log("wairing for op changes #2"); // wait for a change in state } Some(self.queue.lock().unwrap().get(0)?.clone()) } @@ -61,8 +62,8 @@ impl OperationController { pub fn stop(&self) -> bool { match self.stop.send(false) { Ok(()) => { - ignore_and_log(self.changed_notifier.send(0..0), "unlocking downstream for stop"); - ignore_and_log(self.notifier.send(OperationSeq::default()), "unlocking upstream for stop"); + self.changed_notifier.send(0..0).unwrap_or_log("unlocking downstream for stop"); + self.notifier.send(OperationSeq::default()).unwrap_or_log("unlocking upstream for stop"); true }, Err(e) => { @@ -75,6 +76,13 @@ impl OperationController { pub fn run(&self) -> bool { *self.run.borrow() } + + async fn operation(&self, op: &OperationSeq) -> Result, OTError> { + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + Ok(op_effective_range(op)) + } } impl OperationFactory for OperationController { @@ -83,38 +91,25 @@ impl OperationFactory for OperationController { } } -/// TODO properly handle errors rather than sinking them all in here! -fn ignore_and_log(x: Result, msg: &str) { - match x { - Ok(_) => {}, - Err(e) => { - warn!("ignored error {}: {}", msg, e); - } - } -} - #[tonic::async_trait] impl OperationProcessor for OperationController { async fn apply(&self, op: OperationSeq) -> Result, OTError> { - let txt = self.content(); - let res = op.apply(&txt)?; - *self.text.lock().unwrap() = res.clone(); + let span = self.operation(&op).await?; self.queue.lock().unwrap().push_back(op.clone()); - ignore_and_log(self.notifier.send(op.clone()), "notifying of applied change"); - Ok(op_effective_range(&op)) + self.notifier.send(op.clone()).unwrap_or_log("notifying of applied change"); + Ok(span) } async fn process(&self, mut op: OperationSeq) -> Result, OTError> { - let mut queue = self.queue.lock().unwrap(); - for el in queue.iter_mut() { - (op, *el) = op.transform(el)?; + { + let mut queue = self.queue.lock().unwrap(); + for el in queue.iter_mut() { + (op, *el) = op.transform(el)?; + } } - let txt = self.content(); - let res = op.apply(&txt)?; - let span = op_effective_range(&op); - *self.text.lock().unwrap() = res.clone(); - ignore_and_log(self.changed_notifier.send(span.clone()), "notifying of changed content"); + let span = self.operation(&op).await?; + self.changed_notifier.send(span.clone()).unwrap_or_log("notifying of changed content"); Ok(span) } }