diff --git a/src/client.rs b/src/client.rs index 1ad72f0..a4ad8eb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -78,17 +78,18 @@ impl CodempClient { let _factory = factory.clone(); let _path = path.clone(); + tokio::spawn(async move { loop { if !_factory.run() { break debug!("downstream worker clean exit") } match stream.message().await { - Err(e) => break error!("error receiving update: {}", e), - Ok(None) => break warn!("stream closed for buffer {}", _path), + Err(e) => break error!("error receiving update: {}", e), + Ok(None) => break warn!("stream closed for buffer {}", _path), Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { - Err(e) => error!("error deserializing opseq: {}", e), - Ok(v) => match _factory.process(v).await { - Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here + Err(e) => error!("error deserializing opseq: {}", e), + Ok(v) => match _factory.process(v).await { + Err(e) => break error!("could not apply operation from server: {}", e), + Ok(_range) => { } // range is obtained awaiting wait(), need to pass the OpSeq itself } }, } diff --git a/src/errors.rs b/src/errors.rs index 2b89441..b8dd254 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,12 +1,12 @@ use tracing::warn; pub trait IgnorableError { - fn unwrap_or_log(self, msg: &str); + fn unwrap_or_warn(self, msg: &str); } impl IgnorableError for Result where E : std::fmt::Display { - fn unwrap_or_log(self, msg: &str) { + fn unwrap_or_warn(self, msg: &str) { match self { Ok(_) => {}, Err(e) => warn!("{}: {}", msg, e), diff --git a/src/operation/controller.rs b/src/operation/controller.rs index 8628eb3..5cc16e9 100644 --- a/src/operation/controller.rs +++ b/src/operation/controller.rs @@ -38,8 +38,8 @@ impl OperationController { pub async fn wait(&self) -> Range { let mut blocker = self.changed.lock().unwrap().clone(); // TODO less jank way - blocker.changed().await.unwrap_or_log("waiting for changed content #1"); - blocker.changed().await.unwrap_or_log("waiting for changed content #2"); + blocker.changed().await.unwrap_or_warn("waiting for changed content #1"); + blocker.changed().await.unwrap_or_warn("waiting for changed content #2"); let span = blocker.borrow().clone(); span } @@ -49,8 +49,8 @@ impl OperationController { if len <= 0 { let mut recv = self.last.lock().unwrap().clone(); // TODO less jank way - recv.changed().await.unwrap_or_log("wairing for op changes #1"); // acknowledge current state - recv.changed().await.unwrap_or_log("wairing for op changes #2"); // wait for a change in state + recv.changed().await.unwrap_or_warn("wairing for op changes #1"); // acknowledge current state + recv.changed().await.unwrap_or_warn("wairing for op changes #2"); // wait for a change in state } Some(self.queue.lock().unwrap().get(0)?.clone()) } @@ -62,8 +62,8 @@ impl OperationController { pub fn stop(&self) -> bool { match self.stop.send(false) { Ok(()) => { - self.changed_notifier.send(0..0).unwrap_or_log("unlocking downstream for stop"); - self.notifier.send(OperationSeq::default()).unwrap_or_log("unlocking upstream for stop"); + self.changed_notifier.send(0..0).unwrap_or_warn("unlocking downstream for stop"); + self.notifier.send(OperationSeq::default()).unwrap_or_warn("unlocking upstream for stop"); true }, Err(e) => { @@ -96,7 +96,7 @@ impl OperationProcessor for OperationController { async fn apply(&self, op: OperationSeq) -> Result, OTError> { let span = self.operation(&op).await?; self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op.clone()).unwrap_or_log("notifying of applied change"); + self.notifier.send(op.clone()).unwrap_or_warn("notifying of applied change"); Ok(span) } @@ -109,7 +109,7 @@ impl OperationProcessor for OperationController { } } let span = self.operation(&op).await?; - self.changed_notifier.send(span.clone()).unwrap_or_log("notifying of changed content"); + self.changed_notifier.send(span.clone()).unwrap_or_warn("notifying of changed content"); Ok(span) } }