diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 1d33f9d..6785b67 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::sync::oneshot; -use tokio::sync::{watch, mpsc, RwLock}; +use tokio::sync::{watch, mpsc}; use tonic::async_trait; use crate::errors::IgnorableError; @@ -32,9 +32,9 @@ pub struct BufferController { /// unique identifier of buffer pub name: String, content: watch::Receiver, - seen: Arc>, + seen: StatusCheck, operations: mpsc::UnboundedSender, - poller: mpsc::Sender>, + poller: mpsc::UnboundedSender>, _stop: Arc, // just exist } @@ -43,18 +43,19 @@ impl BufferController { name: String, content: watch::Receiver, operations: mpsc::UnboundedSender, - poller: mpsc::Sender>, + poller: mpsc::UnboundedSender>, stop: mpsc::UnboundedSender<()>, ) -> Self { BufferController { name, content, operations, poller, - seen: Arc::new(RwLock::new("".into())), + seen: StatusCheck::default(), _stop: Arc::new(StopOnDrop(stop)), } } pub fn content(&self) -> String { + self.seen.update(self.content.borrow().clone()); self.content.borrow().clone() } } @@ -74,41 +75,66 @@ impl Controller for BufferController { // block until a new text change is available async fn poll(&self) -> crate::Result<()> { + if self.seen.check() != *self.content.borrow() { + return Ok(()); // short circuit: already available! + } let (tx, rx) = oneshot::channel::<()>(); - self.poller.send(tx).await?; - Ok(rx.await.map_err(|_| crate::Error::Channel { send: false })?) + self.poller.send(tx)?; + tracing::info!("polling"); + rx.await.map_err(|_| crate::Error::Channel { send: false })?; + tracing::info!("polling unblocked"); + Ok(()) } // if a new text change is available, return it immediately fn try_recv(&self) -> crate::Result> { - let seen = match self.seen.try_read() { - Err(_) => return Err(crate::Error::Deadlocked), - Ok(x) => x.clone(), - }; + let seen = self.seen.check(); let actual = self.content.borrow().clone(); if seen == actual { return Ok(None); } let change = TextChange::from_diff(&seen, &actual); - match self.seen.try_write() { - Err(_) => return Err(crate::Error::Deadlocked), - Ok(mut w) => *w = actual, - }; + self.seen.update(actual); Ok(Some(change)) } // block until a new text change is available, and return it async fn recv(&self) -> crate::Result { self.poll().await?; - let cur = self.seen.read().await.clone(); - let change = TextChange::from_diff(&cur, &self.content.borrow()); - let mut seen = self.seen.write().await; - *seen = self.content.borrow().clone(); + let seen = self.seen.check(); + let actual = self.content.borrow().clone(); + let change = TextChange::from_diff(&seen, &actual); + self.seen.update(actual); Ok(change) } /// enqueue an opseq for processing fn send(&self, op: TextChange) -> crate::Result<()> { + let before = self.seen.check(); + self.seen.update(op.apply(&before)); Ok(self.operations.send(op)?) } } + +#[derive(Debug, Clone)] +pub struct StatusCheck { + state: watch::Receiver, + updater: Arc>, +} + +impl Default for StatusCheck { + fn default() -> Self { + let (tx, rx) = watch::channel(T::default()); + StatusCheck { state: rx, updater: Arc::new(tx) } + } +} + +impl StatusCheck { + pub fn update(&self, state: T) -> T { + self.updater.send_replace(state) + } + + pub fn check(&self) -> T { + self.state.borrow().clone() + } +} diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 8bafd61..394b3bb 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -23,7 +23,7 @@ pub(crate) struct BufferControllerWorker { buffer: Woot, content: watch::Sender, operations: mpsc::UnboundedReceiver, - poller: mpsc::Receiver>, + poller: mpsc::UnboundedReceiver>, pollers: Vec>, handles: ClonableHandlesForController, stop: mpsc::UnboundedReceiver<()>, @@ -31,7 +31,7 @@ pub(crate) struct BufferControllerWorker { struct ClonableHandlesForController { operations: mpsc::UnboundedSender, - poller: mpsc::Sender>, + poller: mpsc::UnboundedSender>, stop: mpsc::UnboundedSender<()>, content: watch::Receiver, } @@ -41,7 +41,7 @@ impl BufferControllerWorker { let (txt_tx, txt_rx) = watch::channel("".to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); - let (poller_tx, poller_rx) = mpsc::channel(10); + let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let mut hasher = DefaultHasher::new(); uid.hash(&mut hasher); let site_id = hasher.finish() as usize;