feat: added poll/try_recv to controller, fixes

added Default derive to TextChange, added docs for poll and try_recv
methods, implemented new functionality in controllers, using a watch
channel (or reusing if available). Fixed global being always active and
wrongly imported when inactive.
This commit is contained in:
əlemi 2023-08-21 02:35:56 +02:00
parent 0c5fb282f6
commit 0cce1d1ea0
8 changed files with 90 additions and 17 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "codemp" name = "codemp"
version = "0.4.2" version = "0.4.3"
edition = "2021" edition = "2021"
[lib] [lib]

View file

@ -3,6 +3,7 @@
//! a controller implementation for buffer actions //! a controller implementation for buffer actions
use operational_transform::OperationSeq; use operational_transform::OperationSeq;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tonic::async_trait; use tonic::async_trait;
@ -31,6 +32,7 @@ use super::TextChange;
pub struct BufferController { pub struct BufferController {
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>, operations: mpsc::UnboundedSender<OperationSeq>,
last_op: Mutex<watch::Receiver<String>>,
stream: Mutex<broadcast::Receiver<OperationSeq>>, stream: Mutex<broadcast::Receiver<OperationSeq>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
} }
@ -42,7 +44,20 @@ impl BufferController {
stream: Mutex<broadcast::Receiver<OperationSeq>>, stream: Mutex<broadcast::Receiver<OperationSeq>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
BufferController { content, operations, stream, stop } BufferController {
last_op: Mutex::new(content.clone()),
content, operations, stream, stop,
}
}
fn op_to_change(&self, op: OperationSeq) -> TextChange {
let after = self.content.borrow().clone();
let skip = leading_noop(op.ops()) as usize;
let before_len = op.base_len();
let tail = tailing_noop(op.ops()) as usize;
let span = skip..before_len-tail;
let content = after[skip..after.len()-tail].to_string();
TextChange { span, content }
} }
} }
@ -63,16 +78,26 @@ impl OperationFactory for BufferController {
impl Controller<TextChange> for BufferController { impl Controller<TextChange> for BufferController {
type Input = OperationSeq; type Input = OperationSeq;
async fn poll(&self) -> Result<(), Error> {
Ok(self.last_op.lock().await.changed().await?)
}
fn try_recv(&self) -> Result<Option<TextChange>, Error> {
match self.stream.blocking_lock().try_recv() {
Ok(op) => Ok(Some(self.op_to_change(op))),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => {
tracing::warn!("buffer channel lagged, skipping {} events", n);
Ok(self.try_recv()?)
},
}
}
/// receive an operation seq and transform it into a TextChange from buffer content /// receive an operation seq and transform it into a TextChange from buffer content
async fn recv(&self) -> Result<TextChange, Error> { async fn recv(&self) -> Result<TextChange, Error> {
let op = self.stream.lock().await.recv().await?; let op = self.stream.lock().await.recv().await?;
let after = self.content.borrow().clone(); Ok(self.op_to_change(op))
let skip = leading_noop(op.ops()) as usize;
let before_len = op.base_len();
let tail = tailing_noop(op.ops()) as usize;
let span = skip..before_len-tail;
let content = after[skip..after.len()-tail].to_string();
Ok(TextChange { span, content })
} }
/// enqueue an opseq for processing /// enqueue an opseq for processing

View file

@ -23,7 +23,7 @@ pub use controller::BufferController as Controller;
/// an editor-friendly representation of a text change in a buffer /// an editor-friendly representation of a text change in a buffer
/// ///
/// TODO move in proto /// TODO move in proto
#[derive(Debug)] #[derive(Debug, Default)]
pub struct TextChange { pub struct TextChange {
/// range of text change, as byte indexes in buffer /// range of text change, as byte indexes in buffer
pub span: Range<usize>, pub span: Range<usize>,

View file

@ -2,7 +2,7 @@
//! //!
//! a controller implementation for cursor actions //! a controller implementation for cursor actions
use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError}; use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError};
@ -21,6 +21,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::Ign
pub struct CursorController { pub struct CursorController {
uid: String, uid: String,
op: mpsc::UnboundedSender<CursorEvent>, op: mpsc::UnboundedSender<CursorEvent>,
last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
} }
@ -35,10 +36,11 @@ impl CursorController {
pub(crate) fn new( pub(crate) fn new(
uid: String, uid: String,
op: mpsc::UnboundedSender<CursorEvent>, op: mpsc::UnboundedSender<CursorEvent>,
last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
CursorController { uid, op, stream, stop } CursorController { uid, op, last_op, stream, stop }
} }
} }
@ -54,6 +56,20 @@ impl Controller<CursorEvent> for CursorController {
})?) })?)
} }
/// try to receive without blocking, but will still block on stream mutex
fn try_recv(&self) -> crate::Result<Option<CursorEvent>> {
let mut stream = self.stream.blocking_lock();
match stream.try_recv() {
Ok(x) => Ok(Some(x)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => {
tracing::warn!("cursor channel lagged, skipping {} events", n);
Ok(stream.try_recv().ok())
},
}
}
// TODO is this cancelable? so it can be used in tokio::select! // TODO is this cancelable? so it can be used in tokio::select!
// TODO is the result type overkill? should be an option? // TODO is the result type overkill? should be an option?
/// get next cursor event from current workspace, or block until one is available /// get next cursor event from current workspace, or block until one is available
@ -68,4 +84,10 @@ impl Controller<CursorEvent> for CursorController {
} }
} }
} }
/// await for changed mutex and then next op change
async fn poll(&self) -> crate::Result<()> {
Ok(self.last_op.lock().await.changed().await?)
}
} }

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self}, Mutex}; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
use tonic::{Streaming, transport::Channel, async_trait}; use tonic::{Streaming, transport::Channel, async_trait};
use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker};
@ -11,6 +11,8 @@ pub(crate) struct CursorControllerWorker {
uid: String, uid: String,
producer: mpsc::UnboundedSender<CursorEvent>, producer: mpsc::UnboundedSender<CursorEvent>,
op: mpsc::UnboundedReceiver<CursorEvent>, op: mpsc::UnboundedReceiver<CursorEvent>,
changed: watch::Sender<CursorEvent>,
last_op: watch::Receiver<CursorEvent>,
channel: Arc<broadcast::Sender<CursorEvent>>, channel: Arc<broadcast::Sender<CursorEvent>>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>, stop_control: mpsc::UnboundedSender<()>,
@ -21,10 +23,13 @@ impl CursorControllerWorker {
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (cur_tx, _cur_rx) = broadcast::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (change_tx, change_rx) = watch::channel(CursorEvent::default());
Self { Self {
uid, uid,
producer: op_tx, producer: op_tx,
op: op_rx, op: op_rx,
changed: change_tx,
last_op: change_rx,
channel: Arc::new(cur_tx), channel: Arc::new(cur_tx),
stop: end_rx, stop: end_rx,
stop_control: end_tx, stop_control: end_tx,
@ -42,6 +47,7 @@ impl ControllerWorker<CursorEvent> for CursorControllerWorker {
CursorController::new( CursorController::new(
self.uid.clone(), self.uid.clone(),
self.producer.clone(), self.producer.clone(),
Mutex::new(self.last_op.clone()),
Mutex::new(self.channel.subscribe()), Mutex::new(self.channel.subscribe()),
self.stop_control.clone(), self.stop_control.clone(),
) )
@ -50,7 +56,10 @@ impl ControllerWorker<CursorEvent> for CursorControllerWorker {
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
loop { loop {
tokio::select!{ tokio::select!{
Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), Ok(Some(cur)) = rx.message() => {
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event");
self.changed.send(cur).unwrap_or_warn("could not update last event");
},
Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); },
Some(()) = self.stop.recv() => { break; }, Some(()) = self.stop.recv() => { break; },
else => break, else => break,

View file

@ -4,7 +4,7 @@
use std::{result::Result as StdResult, error::Error as StdError, fmt::Display}; use std::{result::Result as StdResult, error::Error as StdError, fmt::Display};
use tokio::sync::{mpsc, broadcast}; use tokio::sync::{mpsc, broadcast, watch};
use tonic::{Status, Code}; use tonic::{Status, Code};
use tracing::warn; use tracing::warn;
@ -87,3 +87,9 @@ impl From<broadcast::error::RecvError> for Error {
Error::Channel { send: false } Error::Channel { send: false }
} }
} }
impl From<watch::error::RecvError> for Error {
fn from(_value: watch::error::RecvError) -> Self {
Error::Channel { send: false }
}
}

View file

@ -20,6 +20,7 @@ pub mod global {
} }
} }
#[cfg(feature = "global")]
pub use global::INSTANCE; pub use global::INSTANCE;
/// async implementation of session instance /// async implementation of session instance

View file

@ -9,8 +9,8 @@
//! be used to join workspaces or attach to buffers. //! be used to join workspaces or attach to buffers.
//! //!
//! Some actions will return structs implementing the [Controller] trait. These can be polled //! Some actions will return structs implementing the [Controller] trait. These can be polled
//! for new stream events ([Controller::recv]), which will be returned in order. Blocking and //! for new stream events ([Controller::poll]/[Controller::recv]), which will be returned in order.
//! callback variants are also implemented. The [Controller] can also be used to send new //! Blocking and callback variants are also implemented. The [Controller] can also be used to send new
//! events to the server ([Controller::send]). //! events to the server ([Controller::send]).
//! //!
//! Each operation on a buffer is represented as an [ot::OperationSeq]. //! Each operation on a buffer is represented as an [ot::OperationSeq].
@ -204,6 +204,16 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// `async fn recv(&self) -> codemp::Result<T>;` /// `async fn recv(&self) -> codemp::Result<T>;`
async fn recv(&self) -> Result<T>; async fn recv(&self) -> Result<T>;
/// block until next value is added to the stream without removing any element
///
/// this is just an async trait function wrapped by `async_trait`:
///
/// `async fn poll(&self) -> codemp::Result<()>;`
async fn poll(&self) -> Result<()>;
/// attempt to receive a value without blocking, return None if nothing is available
fn try_recv(&self) -> Result<Option<T>>;
/// sync variant of [Self::recv], blocking invoking thread /// sync variant of [Self::recv], blocking invoking thread
fn blocking_recv(&self, rt: &Runtime) -> Result<T> { fn blocking_recv(&self, rt: &Runtime) -> Result<T> {
rt.block_on(self.recv()) rt.block_on(self.recv())