From cd8f7cd5c59ac58a97298c8247e169a39e2f09f1 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 8 Aug 2024 00:27:24 +0200 Subject: [PATCH] feat: internally mutable Co-authored-by: zaaarf --- src/buffer/controller.rs | 44 ++++++++++------------------------------ src/buffer/tools.rs | 27 ++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 92a71c9..d588ed0 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -9,10 +9,11 @@ use tokio::sync::{mpsc, watch}; use tonic::async_trait; use crate::api::Controller; -use crate::errors::IgnorableError; use crate::api::TextChange; +use super::tools::InternallyMutable; + /// the buffer controller implementation /// /// for each controller a worker exists, managing outgoing and inbound @@ -29,7 +30,7 @@ pub struct BufferController(Arc); struct BufferControllerInner { name: String, content: watch::Receiver, - seen: StatusCheck, // internal buffer previous state + seen: InternallyMutable, // internal buffer previous state operations: mpsc::UnboundedSender, poller: mpsc::UnboundedSender>, _stop: Arc, // just exist @@ -73,6 +74,7 @@ impl Drop for StopOnDrop { self.0 .send(()) .unwrap_or_warn("could not send stop message to worker"); + seen: InternallyMutable::default(), } } @@ -81,7 +83,7 @@ impl Controller for BufferController { /// block until a text change is available /// this returns immediately if one is already available async fn poll(&self) -> crate::Result<()> { - if self.0.seen.check() != *self.0.content.borrow() { + if self.0.seen.get() != *self.0.content.borrow() { return Ok(()); // short circuit: already available! } let (tx, rx) = oneshot::channel::<()>(); @@ -93,57 +95,33 @@ impl Controller for BufferController { /// if a text change is available, return it immediately fn try_recv(&self) -> crate::Result> { - let seen = self.0.seen.check(); + let seen = self.0.seen.get(); let actual = self.0.content.borrow().clone(); if seen == actual { return Ok(None); } let change = TextChange::from_diff(&seen, &actual); - self.0.seen.update(actual); + self.0.seen.set(actual); Ok(Some(change)) } /// block until a new text change is available, and return it async fn recv(&self) -> crate::Result { self.poll().await?; - let seen = self.0.seen.check(); + let seen = self.0.seen.get(); let actual = self.0.content.borrow().clone(); let change = TextChange::from_diff(&seen, &actual); - self.0.seen.update(actual); + self.0.seen.set(actual); Ok(change) } /// enqueue a text change for processing /// this also updates internal buffer previous state fn send(&self, op: TextChange) -> crate::Result<()> { - let before = self.0.seen.check(); - self.0.seen.update(op.apply(&before)); + let before = self.0.seen.get(); + self.0.seen.set(op.apply(&before)); Ok(self.0.operations.send(op)?) } -} -#[derive(Debug, Clone)] -struct StatusCheck { - state: watch::Receiver, - updater: Arc>, -} - -impl Default for StatusCheck { - fn default() -> Self { - let (tx, rx) = watch::channel(T::default()); - StatusCheck { - state: rx, - updater: Arc::new(tx), - } - } -} - -impl StatusCheck { - fn update(&self, state: T) -> T { - self.updater.send_replace(state) - } - - fn check(&self) -> T { - self.state.borrow().clone() } } diff --git a/src/buffer/tools.rs b/src/buffer/tools.rs index c4f8765..f4a8fd8 100644 --- a/src/buffer/tools.rs +++ b/src/buffer/tools.rs @@ -48,3 +48,30 @@ pub async fn select_buffer( } } } + +/// wraps sender and receiver to allow mutable field with immutable ref +#[derive(Debug)] +pub struct InternallyMutable { + getter: tokio::sync::watch::Receiver, + setter: tokio::sync::watch::Sender, +} + +impl Default for InternallyMutable { + fn default() -> Self { + let (tx, rx) = tokio::sync::watch::channel(T::default()); + InternallyMutable { + getter: rx, + setter: tx, + } + } +} + +impl InternallyMutable { + pub fn set(&self, state: T) -> T { + self.setter.send_replace(state) + } + + pub fn get(&self) -> T { + self.getter.borrow().clone() + } +}