From 4bed9d7432516aad576701c894b15f763992d3c0 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 14 Aug 2024 15:56:10 +0200 Subject: [PATCH] fix: cursor controller backpressure Co-authored-by: cschen --- src/cursor/controller.rs | 8 ++++---- src/cursor/worker.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index a9cb6b4..50237b3 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -26,7 +26,7 @@ pub struct CursorController(pub(crate) Arc); #[derive(Debug)] pub(crate) struct CursorControllerInner { - op: mpsc::UnboundedSender, + op: mpsc::Sender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -34,7 +34,7 @@ pub(crate) struct CursorControllerInner { impl CursorControllerInner { pub(crate) fn new( - op: mpsc::UnboundedSender, + op: mpsc::Sender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -52,11 +52,11 @@ impl CursorControllerInner { impl Controller for CursorController { /// enqueue a cursor event to be broadcast to current workspace /// will automatically invert cursor start/end if they are inverted - fn send(&self, mut cursor: Cursor) -> crate::Result<()> { + async fn send(&self, mut cursor: Cursor) -> crate::Result<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self.0.op.send(cursor.into())?) + Ok(self.0.op.send(cursor.into()).await?) } /// try to receive without blocking, but will still block on stream mutex diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 4948aa3..c35a9fc 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -9,7 +9,7 @@ use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; pub(crate) struct CursorWorker { - op: mpsc::UnboundedReceiver, + op: mpsc::Receiver, changed: watch::Sender, channel: broadcast::Sender, stop: mpsc::UnboundedReceiver<()>, @@ -18,7 +18,7 @@ pub(crate) struct CursorWorker { impl Default for CursorWorker { fn default() -> Self { - let (op_tx, op_rx) = mpsc::unbounded_channel(); + let (op_tx, op_rx) = mpsc::channel(8); let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (change_tx, change_rx) = watch::channel(CursorEvent::default());