diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index acbf201..a678d34 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -15,6 +15,8 @@ use crate::api::TextChange; use crate::ext::InternallyMutable; +use super::worker::DeltaRequest; + /// the buffer controller implementation /// /// for each controller a worker exists, managing outgoing and inbound @@ -52,7 +54,7 @@ pub(crate) struct BufferControllerInner { pub(crate) poller: mpsc::UnboundedSender>, pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, - pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + pub(crate) delta_request: mpsc::Sender, pub(crate) callback: watch::Sender>>, } @@ -85,7 +87,7 @@ impl Controller for BufferController { self.0.delta_request.send((last_update, tx)).await?; let (v, change) = rx.await?; self.0.last_update.set(v); - Ok(Some(change)) + Ok(change) } /// enqueue a text change for processing diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 140d930..11f5c5d 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -14,6 +14,9 @@ use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; +pub(crate) type DeltaOp = (LocalVersion, Option); +pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); + pub(crate) struct BufferWorker { user_id: Uuid, latest_version: watch::Sender, @@ -21,7 +24,7 @@ pub(crate) struct BufferWorker { poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, - delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + delta_req: mpsc::Receiver, stop: mpsc::UnboundedReceiver<()>, controller: BufferController, callback: watch::Receiver>>, @@ -181,7 +184,9 @@ impl ControllerWorker for BufferWorker { } } }; - tx.send((new_local_v, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } else { + tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); } }, },