From c1b7073e89523b61ccc388e7788d0a4b12037fa5 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 12 Apr 2023 00:33:14 +0200 Subject: [PATCH] fix: better error handling --- src/server/buffer/actor.rs | 20 ++++++++++++-------- src/server/buffer/service.rs | 6 ++++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/server/buffer/actor.rs b/src/server/buffer/actor.rs index ddbc1fb..f65e5f6 100644 --- a/src/server/buffer/actor.rs +++ b/src/server/buffer/actor.rs @@ -1,6 +1,6 @@ use codemp::proto::{RawOp, OperationRequest}; use tokio::sync::{mpsc, broadcast, watch}; -use tracing::error; +use tracing::{error, warn}; use md5::Digest; use operational_transform::OperationSeq; @@ -69,23 +69,27 @@ impl BufferWorker { async fn work(mut self) { loop { match self.edits.recv().await { - None => break, - Some(v) => { - let op : OperationSeq = serde_json::from_str(&v.opseq).unwrap(); - match op.apply(&self.store) { + None => break warn!("channel closed"), + Some(v) => match serde_json::from_str::(&v.opseq) { + Err(e) => break error!("could not deserialize opseq: {}", e), + Ok(op) => match op.apply(&self.store) { + Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e), Ok(res) => { self.store = res; - self.digest.send(md5::compute(&self.store)).unwrap(); - self.content.send(self.store.clone()).unwrap(); let msg = RawOp { opseq: v.opseq, user: v.user }; + if let Err(e) = self.digest.send(md5::compute(&self.store)) { + error!("could not update digest: {}", e); + } + if let Err(e) = self.content.send(self.store.clone()) { + error!("could not update content: {}", e); + } if let Err(e) = self.events.send(msg) { error!("could not broadcast OpSeq: {}", e); } }, - Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e), } }, } diff --git a/src/server/buffer/service.rs b/src/server/buffer/service.rs index a7f894b..6d26dc0 100644 --- a/src/server/buffer/service.rs +++ b/src/server/buffer/service.rs @@ -77,8 +77,10 @@ impl Buffer for BufferService { None => return Err(Status::not_found("path not found")), }; info!("sending edit to buffer: {}", request.opseq); - tx.send(request).await.unwrap(); - Ok(Response::new(BufferResponse { accepted: true, content: None })) + match tx.send(request).await { + Ok(()) => Ok(Response::new(BufferResponse { accepted: true, content: None })), + Err(e) => Err(Status::internal(format!("error sending edit to buffer actor: {}", e))), + } } async fn create(&self, req:Request) -> Result, Status> {