From cf1e910dcb1a6858e00b51299f438107ee581eff Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 1 Sep 2024 03:08:43 +0200 Subject: [PATCH] feat: reworked cursor worker/controller now its more similar to buffer controller/worker and it behaves more like an actor/service --- src/buffer/worker.rs | 3 +- src/cursor/controller.rs | 29 ++++++++----------- src/cursor/worker.rs | 60 ++++++++++++++++++++++++++++++---------- src/lib.rs | 3 -- src/session.rs | 0 5 files changed, 58 insertions(+), 37 deletions(-) delete mode 100644 src/session.rs diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 995a131..5c97860 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -7,9 +7,8 @@ use uuid::Uuid; use crate::api::controller::{ControllerCallback, ControllerWorker}; use crate::api::TextChange; +use crate::ext::{IgnorableError, InternallyMutable}; -use crate::errors::IgnorableError; -use crate::ext::InternallyMutable; use codemp_proto::buffer::{BufferEvent, Operation}; use super::controller::{BufferController, BufferControllerInner}; diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index b6c016c..56f1495 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -3,14 +3,11 @@ //! a controller implementation for cursor actions use std::sync::Arc; -use tokio::sync::{ - broadcast::{self, error::TryRecvError}, - mpsc, watch, Mutex, -}; +use tokio::sync::{mpsc, oneshot, watch}; use tonic::async_trait; -use codemp_proto::cursor::{CursorEvent, CursorPosition}; use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult}; +use codemp_proto::cursor::CursorPosition; /// the cursor controller implementation /// /// this contains @@ -30,8 +27,8 @@ pub struct CursorController(pub(crate) Arc); #[derive(Debug)] pub(crate) struct CursorControllerInner { pub(crate) op: mpsc::Sender, - pub(crate) last_op: Mutex>, - pub(crate) stream: Mutex>, + pub(crate) stream: mpsc::Sender>>, + pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, pub(crate) stop: mpsc::UnboundedSender<()>, } @@ -48,22 +45,18 @@ impl Controller for CursorController { } /// try to receive without blocking, but will still block on stream mutex - let mut stream = self.0.stream.lock().await; - match stream.try_recv() { - Ok(x) => Ok(Some(x.into())), - Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), - Err(TryRecvError::Lagged(n)) => { - tracing::warn!("cursor channel lagged, skipping {} events", n); - Ok(stream.try_recv().map(|x| x.into()).ok()) - } - } async fn try_recv(&self) -> ControllerResult> { + let (tx, rx) = oneshot::channel(); + self.0.stream.send(tx).await?; + Ok(rx.await?) } /// await for changed mutex and then next op change - Ok(self.0.last_op.lock().await.changed().await?) async fn poll(&self) -> ControllerResult<()> { + let (tx, rx) = oneshot::channel(); + self.0.poll.send(tx)?; + rx.await?; + Ok(()) } fn callback(&self, cb: impl Into>) { diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 10766bd..e2eff7f 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -1,17 +1,19 @@ use std::sync::Arc; -use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tonic::{Streaming, async_trait}; -use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, errors::IgnorableError}; +use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, ext::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; pub(crate) struct CursorWorker { op: mpsc::Receiver, - changed: watch::Sender, - channel: broadcast::Sender, + stream: mpsc::Receiver>>, + poll: mpsc::UnboundedReceiver>, + pollers: Vec>, + store: std::collections::VecDeque, stop: mpsc::UnboundedReceiver<()>, controller: CursorController, callback: watch::Receiver>>, @@ -19,25 +21,33 @@ pub(crate) struct CursorWorker { impl Default for CursorWorker { fn default() -> Self { - let (op_tx, op_rx) = mpsc::channel(8); - let (cur_tx, _cur_rx) = broadcast::channel(64); + Self::new(64) + } +} + +impl CursorWorker { + fn new(buffer_size: usize) -> Self { + let (op_tx, op_rx) = mpsc::channel(buffer_size); + let (stream_tx, stream_rx) = mpsc::channel(1); let (end_tx, end_rx) = mpsc::unbounded_channel(); - let (change_tx, change_rx) = watch::channel(CursorEvent::default()); let (cb_tx, cb_rx) = watch::channel(None); + let (poll_tx, poll_rx) = mpsc::unbounded_channel(); let controller = CursorControllerInner { op: op_tx, - last_op: Mutex::new(change_rx), - stream: Mutex::new(cur_tx.subscribe()), + stream: stream_tx, stop: end_tx, callback: cb_tx, + poll: poll_tx, }; Self { op: op_rx, - changed: change_tx, - channel: cur_tx, + stream: stream_rx, + store: std::collections::VecDeque::default(), stop: end_rx, controller: CursorController(Arc::new(controller)), callback: cb_rx, + poll: poll_rx, + pollers: Vec::new(), } } } @@ -54,17 +64,39 @@ impl ControllerWorker for CursorWorker { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { loop { + tracing::debug!("cursor worker polling"); tokio::select!{ biased; + + // received stop signal Some(()) = self.stop.recv() => { break; }, - Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); }, + + // new poller + Some(poller) = self.poll.recv() => self.pollers.push(poller), + + // client moved their cursor + Some(op) = self.op.recv() => { + tracing::debug!("received cursor from editor"); + tx.send(op).await.unwrap_or_warn("could not update cursor"); + }, + + // server sents us a cursor Ok(Some(cur)) = rx.message() => { - self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); - self.changed.send(cur).unwrap_or_warn("could not update last event"); + tracing::debug!("received cursor from server"); + self.store.push_back(cur.into()); + for tx in self.pollers.drain(..) { + tx.send(()).unwrap_or_warn("poller dropped before unblocking"); + } if let Some(cb) = self.callback.borrow().as_ref() { + tracing::debug!("running cursor callback"); cb.call(self.controller.clone()); // TODO should this run in its own task/thread? } }, + + // client wants to get next cursor event + Some(tx) = self.stream.recv() => tx.send(self.store.pop_front()) + .unwrap_or_warn("client gave up receiving"), + else => break, } } diff --git a/src/lib.rs b/src/lib.rs index e388af1..587a1ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,9 +129,6 @@ pub mod buffer; pub mod workspace; pub use workspace::Workspace; -/// session -pub mod session; - /// codemp client, wrapping all above pub mod client; pub use client::Client; diff --git a/src/session.rs b/src/session.rs deleted file mode 100644 index e69de29..0000000