fix: cursor controller backpressure

Co-authored-by: cschen <cschen@codemp.dev>
This commit is contained in:
əlemi 2024-08-14 15:56:10 +02:00
parent 413247f9b4
commit 4bed9d7432
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 6 additions and 6 deletions

View file

@ -26,7 +26,7 @@ pub struct CursorController(pub(crate) Arc<CursorControllerInner>);
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct CursorControllerInner { pub(crate) struct CursorControllerInner {
op: mpsc::UnboundedSender<CursorPosition>, op: mpsc::Sender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>, last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
@ -34,7 +34,7 @@ pub(crate) struct CursorControllerInner {
impl CursorControllerInner { impl CursorControllerInner {
pub(crate) fn new( pub(crate) fn new(
op: mpsc::UnboundedSender<CursorPosition>, op: mpsc::Sender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>, last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
@ -52,11 +52,11 @@ impl CursorControllerInner {
impl Controller<Cursor> for CursorController { impl Controller<Cursor> for CursorController {
/// enqueue a cursor event to be broadcast to current workspace /// enqueue a cursor event to be broadcast to current workspace
/// will automatically invert cursor start/end if they are inverted /// will automatically invert cursor start/end if they are inverted
fn send(&self, mut cursor: Cursor) -> crate::Result<()> { async fn send(&self, mut cursor: Cursor) -> crate::Result<()> {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
Ok(self.0.op.send(cursor.into())?) Ok(self.0.op.send(cursor.into()).await?)
} }
/// try to receive without blocking, but will still block on stream mutex /// try to receive without blocking, but will still block on stream mutex

View file

@ -9,7 +9,7 @@ use codemp_proto::cursor::{CursorPosition, CursorEvent};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
pub(crate) struct CursorWorker { pub(crate) struct CursorWorker {
op: mpsc::UnboundedReceiver<CursorPosition>, op: mpsc::Receiver<CursorPosition>,
changed: watch::Sender<CursorEvent>, changed: watch::Sender<CursorEvent>,
channel: broadcast::Sender<CursorEvent>, channel: broadcast::Sender<CursorEvent>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
@ -18,7 +18,7 @@ pub(crate) struct CursorWorker {
impl Default for CursorWorker { impl Default for CursorWorker {
fn default() -> Self { fn default() -> Self {
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::channel(8);
let (cur_tx, _cur_rx) = broadcast::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (change_tx, change_rx) = watch::channel(CursorEvent::default()); let (change_tx, change_rx) = watch::channel(CursorEvent::default());