From 975a082262c04c4183cd1b1866f96537048dcbc3 Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 2 Jul 2023 03:19:00 +0200 Subject: [PATCH] feat: actually report true accepted status --- server/src/buffer/actor.rs | 29 ++++++++++++++++------------- server/src/buffer/service.rs | 10 +++++++--- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/server/src/buffer/actor.rs b/server/src/buffer/actor.rs index 03ed261..d7eea4e 100644 --- a/server/src/buffer/actor.rs +++ b/server/src/buffer/actor.rs @@ -1,5 +1,5 @@ -use codemp::proto::{RawOp, OperationRequest}; -use tokio::sync::{mpsc, broadcast, watch}; +use codemp::{proto::{RawOp, OperationRequest}, errors::IgnorableError}; +use tokio::sync::{mpsc, broadcast, watch, oneshot}; use tracing::{error, warn}; // use md5::Digest; @@ -17,7 +17,7 @@ pub trait BufferStore { #[derive(Clone)] pub struct BufferHandle { - pub edit: mpsc::Sender, + pub edit: mpsc::Sender<(oneshot::Sender, OperationRequest)>, events: broadcast::Sender, // pub digest: watch::Receiver, pub content: watch::Receiver, @@ -59,7 +59,7 @@ impl BufferHandle { struct BufferWorker { store: String, - edits: mpsc::Receiver, + edits: mpsc::Receiver<(oneshot::Sender, OperationRequest)>, events: broadcast::Sender, // digest: watch::Sender, content: watch::Sender, @@ -70,10 +70,16 @@ impl BufferWorker { loop { match self.edits.recv().await { None => break warn!("channel closed"), - Some(v) => match serde_json::from_str::(&v.opseq) { - Err(e) => break error!("could not deserialize opseq: {}", e), + Some((ack, v)) => match serde_json::from_str::(&v.opseq) { + Err(e) => { + ack.send(false).unwrap_or_warn("could not reject undeserializable opseq"); + error!("could not deserialize opseq: {}", e); + }, Ok(op) => match op.apply(&self.store) { - Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e), // TODO + Err(e) => { + ack.send(false).unwrap_or_warn("could not reject unappliable opseq"); + error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e); // TODO + } Ok(res) => { self.store = res; let msg = RawOp { @@ -83,12 +89,9 @@ impl BufferWorker { // if let Err(e) = self.digest.send(md5::compute(&self.store)) { // error!("could not update digest: {}", e); // } - if let Err(e) = self.content.send(self.store.clone()) { - error!("could not update content: {}", e); - } - if let Err(e) = self.events.send(msg) { - error!("could not broadcast OpSeq: {}", e); - } + ack.send(true).unwrap_or_warn("could not accept opseq"); + self.content.send(self.store.clone()).unwrap_or_warn("could not update content"); + self.events.send(msg).unwrap_or_warn("could not broadcast OpSeq"); }, } }, diff --git a/server/src/buffer/service.rs b/server/src/buffer/service.rs index 76fb3b8..6066d22 100644 --- a/server/src/buffer/service.rs +++ b/server/src/buffer/service.rs @@ -1,6 +1,6 @@ use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; -use tokio::sync::{mpsc, broadcast}; +use tokio::sync::{mpsc, broadcast, oneshot}; use tonic::{Request, Response, Status}; use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? @@ -107,9 +107,13 @@ impl Buffer for BufferService { None => return Err(Status::not_found("path not found")), }; info!("sending edit to buffer: {}", request.opseq); - match tx.send(request).await { - Ok(()) => Ok(Response::new(BufferResponse { accepted: true, content: None })), + let (ack, status) = oneshot::channel(); + match tx.send((ack, request)).await { Err(e) => Err(Status::internal(format!("error sending edit to buffer actor: {}", e))), + Ok(()) => Ok(Response::new(BufferResponse { + accepted: status.await.unwrap_or(false), + content: None + })) } }