From 0cce1d1ea0f139451b3b8bafb3d49b413a4a45e6 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 21 Aug 2023 02:35:56 +0200 Subject: [PATCH] feat: added poll/try_recv to controller, fixes added Default derive to TextChange, added docs for poll and try_recv methods, implemented new functionality in controllers, using a watch channel (or reusing if available). Fixed global being always active and wrongly imported when inactive. --- Cargo.toml | 2 +- src/buffer/controller.rs | 41 ++++++++++++++++++++++++++++++++-------- src/buffer/mod.rs | 2 +- src/cursor/controller.rs | 26 +++++++++++++++++++++++-- src/cursor/worker.rs | 13 +++++++++++-- src/errors.rs | 8 +++++++- src/instance.rs | 1 + src/lib.rs | 14 ++++++++++++-- 8 files changed, 90 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a72ed7..1a68de5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "codemp" -version = "0.4.2" +version = "0.4.3" edition = "2021" [lib] diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 6042e1d..a4afc80 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -3,6 +3,7 @@ //! a controller implementation for buffer actions use operational_transform::OperationSeq; +use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::async_trait; @@ -31,6 +32,7 @@ use super::TextChange; pub struct BufferController { content: watch::Receiver, operations: mpsc::UnboundedSender, + last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, } @@ -42,7 +44,20 @@ impl BufferController { stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - BufferController { content, operations, stream, stop } + BufferController { + last_op: Mutex::new(content.clone()), + content, operations, stream, stop, + } + } + + fn op_to_change(&self, op: OperationSeq) -> TextChange { + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + TextChange { span, content } } } @@ -63,16 +78,26 @@ impl OperationFactory for BufferController { impl Controller for BufferController { type Input = OperationSeq; + async fn poll(&self) -> Result<(), Error> { + Ok(self.last_op.lock().await.changed().await?) + } + + fn try_recv(&self) -> Result, Error> { + match self.stream.blocking_lock().try_recv() { + Ok(op) => Ok(Some(self.op_to_change(op))), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), + Err(TryRecvError::Lagged(n)) => { + tracing::warn!("buffer channel lagged, skipping {} events", n); + Ok(self.try_recv()?) + }, + } + } + /// receive an operation seq and transform it into a TextChange from buffer content async fn recv(&self) -> Result { let op = self.stream.lock().await.recv().await?; - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Ok(TextChange { span, content }) + Ok(self.op_to_change(op)) } /// enqueue an opseq for processing diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 1e97171..243e6ae 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -23,7 +23,7 @@ pub use controller::BufferController as Controller; /// an editor-friendly representation of a text change in a buffer /// /// TODO move in proto -#[derive(Debug)] +#[derive(Debug, Default)] pub struct TextChange { /// range of text change, as byte indexes in buffer pub span: Range, diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 2374161..a6a76c1 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -2,7 +2,7 @@ //! //! a controller implementation for cursor actions -use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; +use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tonic::async_trait; use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError}; @@ -21,6 +21,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::Ign pub struct CursorController { uid: String, op: mpsc::UnboundedSender, + last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, } @@ -35,10 +36,11 @@ impl CursorController { pub(crate) fn new( uid: String, op: mpsc::UnboundedSender, + last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - CursorController { uid, op, stream, stop } + CursorController { uid, op, last_op, stream, stop } } } @@ -54,6 +56,20 @@ impl Controller for CursorController { })?) } + /// try to receive without blocking, but will still block on stream mutex + fn try_recv(&self) -> crate::Result> { + let mut stream = self.stream.blocking_lock(); + match stream.try_recv() { + Ok(x) => Ok(Some(x)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), + Err(TryRecvError::Lagged(n)) => { + tracing::warn!("cursor channel lagged, skipping {} events", n); + Ok(stream.try_recv().ok()) + }, + } + } + // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? /// get next cursor event from current workspace, or block until one is available @@ -68,4 +84,10 @@ impl Controller for CursorController { } } } + + /// await for changed mutex and then next op change + async fn poll(&self) -> crate::Result<()> { + Ok(self.last_op.lock().await.changed().await?) + } + } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index fa7bd0e..42bcbc5 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use tokio::sync::{mpsc, broadcast::{self}, Mutex}; +use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, transport::Channel, async_trait}; use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; @@ -11,6 +11,8 @@ pub(crate) struct CursorControllerWorker { uid: String, producer: mpsc::UnboundedSender, op: mpsc::UnboundedReceiver, + changed: watch::Sender, + last_op: watch::Receiver, channel: Arc>, stop: mpsc::UnboundedReceiver<()>, stop_control: mpsc::UnboundedSender<()>, @@ -21,10 +23,13 @@ impl CursorControllerWorker { let (op_tx, op_rx) = mpsc::unbounded_channel(); let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); + let (change_tx, change_rx) = watch::channel(CursorEvent::default()); Self { uid, producer: op_tx, op: op_rx, + changed: change_tx, + last_op: change_rx, channel: Arc::new(cur_tx), stop: end_rx, stop_control: end_tx, @@ -42,6 +47,7 @@ impl ControllerWorker for CursorControllerWorker { CursorController::new( self.uid.clone(), self.producer.clone(), + Mutex::new(self.last_op.clone()), Mutex::new(self.channel.subscribe()), self.stop_control.clone(), ) @@ -50,7 +56,10 @@ impl ControllerWorker for CursorControllerWorker { async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { loop { tokio::select!{ - Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Ok(Some(cur)) = rx.message() => { + self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); + self.changed.send(cur).unwrap_or_warn("could not update last event"); + }, Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, Some(()) = self.stop.recv() => { break; }, else => break, diff --git a/src/errors.rs b/src/errors.rs index 6c0bbbb..48b6f17 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -4,7 +4,7 @@ use std::{result::Result as StdResult, error::Error as StdError, fmt::Display}; -use tokio::sync::{mpsc, broadcast}; +use tokio::sync::{mpsc, broadcast, watch}; use tonic::{Status, Code}; use tracing::warn; @@ -87,3 +87,9 @@ impl From for Error { Error::Channel { send: false } } } + +impl From for Error { + fn from(_value: watch::error::RecvError) -> Self { + Error::Channel { send: false } + } +} diff --git a/src/instance.rs b/src/instance.rs index af25749..3ffa606 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -20,6 +20,7 @@ pub mod global { } } +#[cfg(feature = "global")] pub use global::INSTANCE; /// async implementation of session instance diff --git a/src/lib.rs b/src/lib.rs index 2782619..868d31f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,8 +9,8 @@ //! be used to join workspaces or attach to buffers. //! //! Some actions will return structs implementing the [Controller] trait. These can be polled -//! for new stream events ([Controller::recv]), which will be returned in order. Blocking and -//! callback variants are also implemented. The [Controller] can also be used to send new +//! for new stream events ([Controller::poll]/[Controller::recv]), which will be returned in order. +//! Blocking and callback variants are also implemented. The [Controller] can also be used to send new //! events to the server ([Controller::send]). //! //! Each operation on a buffer is represented as an [ot::OperationSeq]. @@ -204,6 +204,16 @@ pub trait Controller : Sized + Send + Sync { /// `async fn recv(&self) -> codemp::Result;` async fn recv(&self) -> Result; + /// block until next value is added to the stream without removing any element + /// + /// this is just an async trait function wrapped by `async_trait`: + /// + /// `async fn poll(&self) -> codemp::Result<()>;` + async fn poll(&self) -> Result<()>; + + /// attempt to receive a value without blocking, return None if nothing is available + fn try_recv(&self) -> Result>; + /// sync variant of [Self::recv], blocking invoking thread fn blocking_recv(&self, rt: &Runtime) -> Result { rt.block_on(self.recv())