From 59d8a4640d84d221906d4c358a4f58792f7874a0 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 8 Aug 2024 02:27:06 +0200 Subject: [PATCH] fix: proper lifetime for cursor, renamed methods Co-authored-by: zaaarf --- src/api/controller.rs | 2 +- src/buffer/worker.rs | 8 ++++---- src/cursor/controller.rs | 10 +++++----- src/cursor/worker.rs | 34 ++++++++++++++++------------------ 4 files changed, 26 insertions(+), 28 deletions(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index 3743fb4..10509ff 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -11,7 +11,7 @@ pub(crate) trait ControllerWorker { type Tx; type Rx; - fn subscribe(&self) -> Self::Controller; + fn controller(&self) -> Self::Controller; async fn work(self, tx: Self::Tx, rx: Self::Rx); } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 1b2cf28..8fb0b0c 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -23,7 +23,7 @@ pub(crate) struct BufferWorker { poller: mpsc::UnboundedReceiver>, pollers: Vec>, stop: mpsc::UnboundedReceiver<()>, - controller: Arc, + controller: BufferController, } impl BufferWorker { @@ -50,7 +50,7 @@ impl BufferWorker { poller: poller_rx, pollers: Vec::new(), stop: end_rx, - controller: Arc::new(controller), + controller: BufferController(Arc::new(controller)), } } } @@ -61,8 +61,8 @@ impl ControllerWorker for BufferWorker { type Tx = mpsc::Sender; type Rx = Streaming; - fn subscribe(&self) -> BufferController { - BufferController(self.controller.clone()) + fn controller(&self) -> BufferController { + self.controller.clone() } async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 28bee9e..67ba81c 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -28,29 +28,29 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "js", napi_derive::napi)] -pub struct CursorController(Arc); +pub struct CursorController(pub(crate) Arc); #[derive(Debug)] -struct CursorControllerInner { +pub(crate) struct CursorControllerInner { op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, } -impl CursorController { +impl CursorControllerInner { pub(crate) fn new( op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - Self(Arc::new(CursorControllerInner { + Self { op, last_op, stream, stop, - })) + } } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 8a893e9..4948aa3 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -6,16 +6,14 @@ use tonic::{Streaming, async_trait}; use crate::{api::{controller::ControllerWorker, Cursor}, errors::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; -use super::controller::CursorController; +use super::controller::{CursorController, CursorControllerInner}; pub(crate) struct CursorWorker { - producer: mpsc::UnboundedSender, op: mpsc::UnboundedReceiver, changed: watch::Sender, - last_op: watch::Receiver, - channel: Arc>, + channel: broadcast::Sender, stop: mpsc::UnboundedReceiver<()>, - stop_control: mpsc::UnboundedSender<()>, + controller: CursorController, } impl Default for CursorWorker { @@ -24,14 +22,18 @@ impl Default for CursorWorker { let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (change_tx, change_rx) = watch::channel(CursorEvent::default()); + let controller = CursorControllerInner::new( + op_tx, + Mutex::new(change_rx), + Mutex::new(cur_tx.subscribe()), + end_tx + ); Self { - producer: op_tx, op: op_rx, changed: change_tx, - last_op: change_rx, - channel: Arc::new(cur_tx), + channel: cur_tx, stop: end_rx, - stop_control: end_tx, + controller: CursorController(Arc::new(controller)), } } } @@ -42,24 +44,20 @@ impl ControllerWorker for CursorWorker { type Tx = mpsc::Sender; type Rx = Streaming; - fn subscribe(&self) -> CursorController { - CursorController::new( - self.producer.clone(), - Mutex::new(self.last_op.clone()), - Mutex::new(self.channel.subscribe()), - self.stop_control.clone(), - ) + fn controller(&self) -> CursorController { + self.controller.clone() } async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { loop { tokio::select!{ + biased; + Some(()) = self.stop.recv() => { break; }, + Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); }, Ok(Some(cur)) = rx.message() => { self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.changed.send(cur).unwrap_or_warn("could not update last event"); }, - Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); }, - Some(()) = self.stop.recv() => { break; }, else => break, } }