From 0154e5a03239aa07079411ffd5d1baa4123b0180 Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 17 Aug 2024 00:17:01 +0200 Subject: [PATCH] fix: try_recv() channel error : delta may be None basically if there was no change to report, the oneshot would not get updated which is bad. so we put back the version we got and send a None (the channel now has nullable TextChange). so basically its always a try_recv, but its fine since recv is implemented with try_recv + poll anyway --- src/buffer/controller.rs | 6 ++++-- src/buffer/worker.rs | 9 +++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index acbf201..a678d34 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -15,6 +15,8 @@ use crate::api::TextChange; use crate::ext::InternallyMutable; +use super::worker::DeltaRequest; + /// the buffer controller implementation /// /// for each controller a worker exists, managing outgoing and inbound @@ -52,7 +54,7 @@ pub(crate) struct BufferControllerInner { pub(crate) poller: mpsc::UnboundedSender>, pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, - pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + pub(crate) delta_request: mpsc::Sender, pub(crate) callback: watch::Sender>>, } @@ -85,7 +87,7 @@ impl Controller for BufferController { self.0.delta_request.send((last_update, tx)).await?; let (v, change) = rx.await?; self.0.last_update.set(v); - Ok(Some(change)) + Ok(change) } /// enqueue a text change for processing diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 140d930..11f5c5d 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -14,6 +14,9 @@ use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; +pub(crate) type DeltaOp = (LocalVersion, Option); +pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); + pub(crate) struct BufferWorker { user_id: Uuid, latest_version: watch::Sender, @@ -21,7 +24,7 @@ pub(crate) struct BufferWorker { poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, - delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + delta_req: mpsc::Receiver, stop: mpsc::UnboundedReceiver<()>, controller: BufferController, callback: watch::Receiver>>, @@ -181,7 +184,9 @@ impl ControllerWorker for BufferWorker { } } }; - tx.send((new_local_v, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } else { + tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); } }, },