feat: poll doesn't ever lock the RwLock

to make try_recv deadlocks way more rare
This commit is contained in:
əlemi 2023-11-23 15:52:36 +01:00
parent 1e39363815
commit 8df0b8ec41
2 changed files with 29 additions and 13 deletions

View file

@ -5,6 +5,7 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::{watch, mpsc, RwLock}; use tokio::sync::{watch, mpsc, RwLock};
use tonic::async_trait; use tonic::async_trait;
@ -33,6 +34,7 @@ pub struct BufferController {
content: watch::Receiver<String>, content: watch::Receiver<String>,
seen: Arc<RwLock<String>>, seen: Arc<RwLock<String>>,
operations: mpsc::UnboundedSender<TextChange>, operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::Sender<oneshot::Sender<()>>,
_stop: Arc<StopOnDrop>, // just exist _stop: Arc<StopOnDrop>, // just exist
} }
@ -41,15 +43,20 @@ impl BufferController {
name: String, name: String,
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<TextChange>, operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::Sender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
BufferController { BufferController {
name, name,
content, operations, content, operations, poller,
_stop: Arc::new(StopOnDrop(stop)), _stop: Arc::new(StopOnDrop(stop)),
seen: Arc::new(RwLock::new("".into())), seen: Arc::new(RwLock::new("".into())),
} }
} }
pub fn content(&self) -> String {
self.content.borrow().clone()
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -66,15 +73,9 @@ impl Controller<TextChange> for BufferController {
type Input = TextChange; type Input = TextChange;
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> crate::Result<()> {
let mut poller = self.content.clone(); let (tx, rx) = oneshot::channel::<()>();
loop { self.poller.send(tx);
poller.changed().await?; Ok(rx.await.map_err(|_| crate::Error::Channel { send: false })?)
let seen = self.seen.read().await.clone();
if *poller.borrow() != seen {
break
}
}
Ok(())
} }
fn try_recv(&self) -> crate::Result<Option<TextChange>> { fn try_recv(&self) -> crate::Result<Option<TextChange>> {

View file

@ -2,7 +2,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use similar::{TextDiff, ChangeTag}; use similar::{TextDiff, ChangeTag};
use tokio::sync::{watch, mpsc}; use tokio::sync::{watch, mpsc, oneshot};
use tonic::transport::Channel; use tonic::transport::Channel;
use tonic::{async_trait, Streaming}; use tonic::{async_trait, Streaming};
use woot::crdt::{Op, CRDT, TextEditor}; use woot::crdt::{Op, CRDT, TextEditor};
@ -27,6 +27,9 @@ pub(crate) struct BufferControllerWorker {
name: String, name: String,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>, stop_control: mpsc::UnboundedSender<()>,
poller_rx: mpsc::Receiver<oneshot::Sender<()>>,
poller_tx: mpsc::Sender<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>,
} }
impl BufferControllerWorker { impl BufferControllerWorker {
@ -34,11 +37,13 @@ impl BufferControllerWorker {
let (txt_tx, txt_rx) = watch::channel("".to_string()); let (txt_tx, txt_rx) = watch::channel("".to_string());
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (poller_tx, poller_rx) = mpsc::channel(10);
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
uid.hash(&mut hasher); uid.hash(&mut hasher);
let site_id = hasher.finish() as usize; let site_id = hasher.finish() as usize;
BufferControllerWorker { BufferControllerWorker {
uid, uid, poller_rx, poller_tx,
pollers: Vec::new(),
content: txt_tx, content: txt_tx,
operations: op_rx, operations: op_rx,
receiver: txt_rx, receiver: txt_rx,
@ -75,6 +80,7 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
self.name.clone(), self.name.clone(),
self.receiver.clone(), self.receiver.clone(),
self.sender.clone(), self.sender.clone(),
self.poller_tx.clone(),
self.stop_control.clone(), self.stop_control.clone(),
) )
} }
@ -88,6 +94,12 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
// received stop signal // received stop signal
_ = self.stop.recv() => break, _ = self.stop.recv() => break,
// received a new poller, add it to collection
res = self.poller_rx.recv() => match res {
None => break tracing::error!("poller channel closed"),
Some(tx) => self.pollers.push(tx),
},
// received a text change from editor // received a text change from editor
res = self.operations.recv() => match res { res = self.operations.recv() => match res {
None => break, None => break,
@ -133,7 +145,7 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
} }
}, },
// received a stop request (or channel got closed) // received a message from server
res = rx.message() => match res { res = rx.message() => match res {
Err(_e) => break, Err(_e) => break,
Ok(None) => break, Ok(None) => break,
@ -141,6 +153,9 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
Ok(op) => { Ok(op) => {
self.buffer.merge(op); self.buffer.merge(op);
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
for tx in self.pollers.drain(0..self.pollers.len()) {
tx.send(()).unwrap_or_warn("could not wake up poller");
}
}, },
Err(e) => tracing::error!("could not deserialize operation from server: {}", e), Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
}, },