chore: improved seen mechanism

instead of manually handling the mutex leave it to tokio and use a fancy
generic struct with update() and check()
This commit is contained in:
əlemi 2023-11-30 03:03:09 +01:00
parent 1f50f75eb4
commit 3881bb38ea
2 changed files with 48 additions and 22 deletions

View file

@ -6,7 +6,7 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::{watch, mpsc, RwLock}; use tokio::sync::{watch, mpsc};
use tonic::async_trait; use tonic::async_trait;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
@ -32,9 +32,9 @@ pub struct BufferController {
/// unique identifier of buffer /// unique identifier of buffer
pub name: String, pub name: String,
content: watch::Receiver<String>, content: watch::Receiver<String>,
seen: Arc<RwLock<String>>, seen: StatusCheck<String>,
operations: mpsc::UnboundedSender<TextChange>, operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::Sender<oneshot::Sender<()>>, poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
_stop: Arc<StopOnDrop>, // just exist _stop: Arc<StopOnDrop>, // just exist
} }
@ -43,18 +43,19 @@ impl BufferController {
name: String, name: String,
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<TextChange>, operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::Sender<oneshot::Sender<()>>, poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
BufferController { BufferController {
name, name,
content, operations, poller, content, operations, poller,
seen: Arc::new(RwLock::new("".into())), seen: StatusCheck::default(),
_stop: Arc::new(StopOnDrop(stop)), _stop: Arc::new(StopOnDrop(stop)),
} }
} }
pub fn content(&self) -> String { pub fn content(&self) -> String {
self.seen.update(self.content.borrow().clone());
self.content.borrow().clone() self.content.borrow().clone()
} }
} }
@ -74,41 +75,66 @@ impl Controller<TextChange> for BufferController {
// block until a new text change is available // block until a new text change is available
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> crate::Result<()> {
if self.seen.check() != *self.content.borrow() {
return Ok(()); // short circuit: already available!
}
let (tx, rx) = oneshot::channel::<()>(); let (tx, rx) = oneshot::channel::<()>();
self.poller.send(tx).await?; self.poller.send(tx)?;
Ok(rx.await.map_err(|_| crate::Error::Channel { send: false })?) tracing::info!("polling");
rx.await.map_err(|_| crate::Error::Channel { send: false })?;
tracing::info!("polling unblocked");
Ok(())
} }
// if a new text change is available, return it immediately // if a new text change is available, return it immediately
fn try_recv(&self) -> crate::Result<Option<TextChange>> { fn try_recv(&self) -> crate::Result<Option<TextChange>> {
let seen = match self.seen.try_read() { let seen = self.seen.check();
Err(_) => return Err(crate::Error::Deadlocked),
Ok(x) => x.clone(),
};
let actual = self.content.borrow().clone(); let actual = self.content.borrow().clone();
if seen == actual { if seen == actual {
return Ok(None); return Ok(None);
} }
let change = TextChange::from_diff(&seen, &actual); let change = TextChange::from_diff(&seen, &actual);
match self.seen.try_write() { self.seen.update(actual);
Err(_) => return Err(crate::Error::Deadlocked),
Ok(mut w) => *w = actual,
};
Ok(Some(change)) Ok(Some(change))
} }
// block until a new text change is available, and return it // block until a new text change is available, and return it
async fn recv(&self) -> crate::Result<TextChange> { async fn recv(&self) -> crate::Result<TextChange> {
self.poll().await?; self.poll().await?;
let cur = self.seen.read().await.clone(); let seen = self.seen.check();
let change = TextChange::from_diff(&cur, &self.content.borrow()); let actual = self.content.borrow().clone();
let mut seen = self.seen.write().await; let change = TextChange::from_diff(&seen, &actual);
*seen = self.content.borrow().clone(); self.seen.update(actual);
Ok(change) Ok(change)
} }
/// enqueue an opseq for processing /// enqueue an opseq for processing
fn send(&self, op: TextChange) -> crate::Result<()> { fn send(&self, op: TextChange) -> crate::Result<()> {
let before = self.seen.check();
self.seen.update(op.apply(&before));
Ok(self.operations.send(op)?) Ok(self.operations.send(op)?)
} }
} }
#[derive(Debug, Clone)]
pub struct StatusCheck<T : Clone> {
state: watch::Receiver<T>,
updater: Arc<watch::Sender<T>>,
}
impl<T : Clone + Default> Default for StatusCheck<T> {
fn default() -> Self {
let (tx, rx) = watch::channel(T::default());
StatusCheck { state: rx, updater: Arc::new(tx) }
}
}
impl<T : Clone> StatusCheck<T> {
pub fn update(&self, state: T) -> T {
self.updater.send_replace(state)
}
pub fn check(&self) -> T {
self.state.borrow().clone()
}
}

View file

@ -23,7 +23,7 @@ pub(crate) struct BufferControllerWorker {
buffer: Woot, buffer: Woot,
content: watch::Sender<String>, content: watch::Sender<String>,
operations: mpsc::UnboundedReceiver<TextChange>, operations: mpsc::UnboundedReceiver<TextChange>,
poller: mpsc::Receiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
handles: ClonableHandlesForController, handles: ClonableHandlesForController,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
@ -31,7 +31,7 @@ pub(crate) struct BufferControllerWorker {
struct ClonableHandlesForController { struct ClonableHandlesForController {
operations: mpsc::UnboundedSender<TextChange>, operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::Sender<oneshot::Sender<()>>, poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
content: watch::Receiver<String>, content: watch::Receiver<String>,
} }
@ -41,7 +41,7 @@ impl BufferControllerWorker {
let (txt_tx, txt_rx) = watch::channel("".to_string()); let (txt_tx, txt_rx) = watch::channel("".to_string());
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (poller_tx, poller_rx) = mpsc::channel(10); let (poller_tx, poller_rx) = mpsc::unbounded_channel();
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
uid.hash(&mut hasher); uid.hash(&mut hasher);
let site_id = hasher.finish() as usize; let site_id = hasher.finish() as usize;