From 1e05af6d794507d57001acb7b2d4c4f8be3f5c8f Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 19 Aug 2023 21:44:27 +0200 Subject: [PATCH] feat: sync send in controller, docs, better import --- src/buffer/controller.rs | 8 ++--- src/buffer/mod.rs | 4 +++ src/buffer/worker.rs | 6 ++-- src/cursor/controller.rs | 8 ++--- src/cursor/mod.rs | 2 ++ src/cursor/worker.rs | 6 ++-- src/errors.rs | 6 ++-- src/instance.rs | 44 ++++++++++++++++++++++-- src/lib.rs | 72 ++++++++++++++++++++++++++++++++++++---- src/prelude.rs | 33 +++++++++--------- 10 files changed, 147 insertions(+), 42 deletions(-) diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 39efbe7..c42477a 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -10,7 +10,7 @@ use super::TextChange; pub struct BufferController { content: watch::Receiver, - operations: mpsc::Sender, + operations: mpsc::UnboundedSender, stream: Mutex>, stop: mpsc::UnboundedSender<()>, } @@ -18,7 +18,7 @@ pub struct BufferController { impl BufferController { pub(crate) fn new( content: watch::Receiver, - operations: mpsc::Sender, + operations: mpsc::UnboundedSender, stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { @@ -54,7 +54,7 @@ impl Controller for BufferController { Ok(TextChange { span, content }) } - async fn send(&self, op: OperationSeq) -> Result<(), Error> { - Ok(self.operations.send(op).await?) + fn send(&self, op: OperationSeq) -> Result<(), Error> { + Ok(self.operations.send(op)?) } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index adc53d0..13beed4 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -4,7 +4,11 @@ pub(crate) mod worker; pub mod controller; pub mod factory; +pub use factory::OperationFactory; +pub use controller::BufferController as Controller; + +/// TODO move in proto #[derive(Debug)] pub struct TextChange { pub span: Range, diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 5f19863..c4336a0 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -17,11 +17,11 @@ use super::controller::BufferController; pub(crate) struct BufferControllerWorker { uid: String, pub(crate) content: watch::Sender, - pub(crate) operations: mpsc::Receiver, + pub(crate) operations: mpsc::UnboundedReceiver, pub(crate) stream: Arc>, pub(crate) queue: VecDeque, receiver: watch::Receiver, - sender: mpsc::Sender, + sender: mpsc::UnboundedSender, buffer: String, path: String, stop: mpsc::UnboundedReceiver<()>, @@ -31,7 +31,7 @@ pub(crate) struct BufferControllerWorker { impl BufferControllerWorker { pub fn new(uid: String, buffer: &str, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); - let (op_tx, op_rx) = mpsc::channel(64); + let (op_tx, op_rx) = mpsc::unbounded_channel(); let (s_tx, _s_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); BufferControllerWorker { diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index fe6bd85..ae5239a 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -5,7 +5,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::Ign pub struct CursorController { uid: String, - op: mpsc::Sender, + op: mpsc::UnboundedSender, stream: Mutex>, stop: mpsc::UnboundedSender<()>, } @@ -19,7 +19,7 @@ impl Drop for CursorController { impl CursorController { pub(crate) fn new( uid: String, - op: mpsc::Sender, + op: mpsc::UnboundedSender, stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { @@ -31,11 +31,11 @@ impl CursorController { impl Controller for CursorController { type Input = CursorPosition; - async fn send(&self, cursor: CursorPosition) -> Result<(), Error> { + fn send(&self, cursor: CursorPosition) -> Result<(), Error> { Ok(self.op.send(CursorEvent { user: self.uid.clone(), position: Some(cursor), - }).await?) + })?) } // TODO is this cancelable? so it can be used in tokio::select! diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index c6fbdd9..b989418 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,6 +1,8 @@ pub(crate) mod worker; pub mod controller; +pub use controller::CursorController as Controller; + use crate::proto::{RowCol, CursorPosition}; impl From:: for (i32, i32) { diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index c71f232..fa7bd0e 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -9,8 +9,8 @@ use super::controller::CursorController; pub(crate) struct CursorControllerWorker { uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, + producer: mpsc::UnboundedSender, + op: mpsc::UnboundedReceiver, channel: Arc>, stop: mpsc::UnboundedReceiver<()>, stop_control: mpsc::UnboundedSender<()>, @@ -18,7 +18,7 @@ pub(crate) struct CursorControllerWorker { impl CursorControllerWorker { pub(crate) fn new(uid: String) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); + let (op_tx, op_rx) = mpsc::unbounded_channel(); let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); Self { diff --git a/src/errors.rs b/src/errors.rs index 5982ace..523c406 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,4 +1,4 @@ -use std::{error::Error as StdError, fmt::Display}; +use std::{result::Result as StdResult, error::Error as StdError, fmt::Display}; use tokio::sync::{mpsc, broadcast}; use tonic::{Status, Code}; @@ -8,7 +8,7 @@ pub trait IgnorableError { fn unwrap_or_warn(self, msg: &str); } -impl IgnorableError for Result +impl IgnorableError for StdResult where E : std::fmt::Display { fn unwrap_or_warn(self, msg: &str) { match self { @@ -18,6 +18,8 @@ where E : std::fmt::Display { } } +pub type Result = StdResult; + // TODO split this into specific errors for various parts of the library #[derive(Debug)] pub enum Error { diff --git a/src/instance.rs b/src/instance.rs index fd5b481..af25749 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,17 +1,28 @@ +//! ### Instance +//! +//! This module provides convenience managers for the client instance +//! +//! the global instance reference is immutable and lazy-loaded, and requires `global` feature. + +/// static global instance, allocated only if feature `global` is active #[cfg(feature = "global")] pub mod global { #[cfg(not(feature = "sync"))] lazy_static::lazy_static! { + /// the global instance of codemp session pub static ref INSTANCE : super::a_sync::Instance = super::a_sync::Instance::default(); } #[cfg(feature = "sync")] lazy_static::lazy_static! { + /// the global instance of codemp session pub static ref INSTANCE : super::sync::Instance = super::sync::Instance::default(); } } -#[cfg(not(feature = "sync"))] +pub use global::INSTANCE; + +/// async implementation of session instance pub mod a_sync { use std::sync::Arc; @@ -22,17 +33,24 @@ pub mod a_sync { errors::Error, client::Client, cursor::controller::CursorController, }; + /// persistant session manager for codemp client + /// + /// will hold a tokio mutex over an optional client, and drop its reference when disconnecting. + /// all methods are async because will await mutex availability #[derive(Default)] pub struct Instance { + /// the tokio mutex containing a client, if connected client: Mutex>, } impl Instance { + /// connect to remote address instantiating a new client [crate::Client::new] pub async fn connect(&self, addr: &str) -> Result<(), Error> { *self.client.lock().await = Some(Client::new(addr).await?); Ok(()) } + /// threadsafe version of [crate::Client::join] pub async fn join(&self, session: &str) -> Result, Error> { self.client .lock().await @@ -42,6 +60,7 @@ pub mod a_sync { .await } + /// threadsafe version of [crate::Client::create] pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { self.client .lock().await @@ -51,6 +70,7 @@ pub mod a_sync { .await } + /// threadsafe version of [crate::Client::attach] pub async fn attach(&self, path: &str) -> Result, Error> { self.client .lock().await @@ -60,6 +80,7 @@ pub mod a_sync { .await } + /// threadsafe version of [crate::Client::get_cursor] pub async fn get_cursor(&self) -> Result, Error> { self.client .lock().await @@ -69,6 +90,7 @@ pub mod a_sync { .ok_or(Error::InvalidState { msg: "join workspace first".into() }) } + /// threadsafe version of [crate::Client::get_buffer] pub async fn get_buffer(&self, path: &str) -> Result, Error> { self.client .lock().await @@ -78,6 +100,7 @@ pub mod a_sync { .ok_or(Error::InvalidState { msg: "join workspace first".into() }) } + /// threadsafe version of [crate::Client::leave_workspace] pub async fn leave_workspace(&self) -> Result<(), Error> { self.client .lock().await @@ -87,6 +110,7 @@ pub mod a_sync { Ok(()) } + /// threadsafe version of [crate::Client::disconnect_buffer] pub async fn disconnect_buffer(&self, path: &str) -> Result { let res = self.client .lock().await @@ -98,7 +122,7 @@ pub mod a_sync { } } -#[cfg(feature = "sync")] +/// sync implementation of session instance pub mod sync { use std::sync::{Mutex, Arc}; @@ -110,8 +134,15 @@ pub mod sync { buffer::controller::BufferController }; + /// persistant session manager for codemp client + /// + /// will hold a std mutex over an optional client, and drop its reference when disconnecting. + /// also contains a tokio runtime to execute async futures on + /// all methods are wrapped on a runtime.block_on and thus sync pub struct Instance { + /// the std mutex containing a client, if connected client: Mutex>, + /// the tokio runtime runtime: Runtime, } @@ -133,37 +164,46 @@ pub mod sync { } } + /// return a reference to contained tokio runtime, to spawn tasks on pub fn rt(&self) -> &Runtime { &self.runtime } + /// connect and store a client session, threadsafe and sync version of [crate::Client::new] pub fn connect(&self, addr: &str) -> Result<(), Error> { *self.client.lock().expect("client mutex poisoned") = Some(self.rt().block_on(Client::new(addr))?); Ok(()) } + /// threadsafe and sync version of [crate::Client::join] pub fn join(&self, session: &str) -> Result, Error> { self.if_client(|c| self.rt().block_on(c.join(session)))? } + /// threadsafe and sync version of [crate::Client::create] pub fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { self.if_client(|c| self.rt().block_on(c.create(path, content)))? } + /// threadsafe and sync version of [crate::Client::attach] pub fn attach(&self, path: &str) -> Result, Error> { self.if_client(|c| self.rt().block_on(c.attach(path)))? } + /// threadsafe and sync version of [crate::Client::get_cursor] pub fn get_cursor(&self) -> Result, Error> { self.if_client(|c| c.get_cursor().ok_or(Error::InvalidState { msg: "join workspace first".into() }))? } + /// threadsafe and sync version of [crate::Client::get_buffer] pub fn get_buffer(&self, path: &str) -> Result, Error> { self.if_client(|c| c.get_buffer(path).ok_or(Error::InvalidState { msg: "join workspace or create requested buffer first".into() }))? } + /// threadsafe and sync version of [crate::Client::leave_workspace] pub fn leave_workspace(&self) -> Result<(), Error> { self.if_client(|c| c.leave_workspace()) } + /// threadsafe and sync version of [crate::Client::disconnect_buffer] pub fn disconnect_buffer(&self, path: &str) -> Result { self.if_client(|c| c.disconnect_buffer(path)) } diff --git a/src/lib.rs b/src/lib.rs index 718527b..7876eff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,49 @@ +//! # MultiPlayer Code Editing +//! +//! This is the core library of the codemp project. +//! +//! ## Structure +//! The main entrypoint is the [client::Client] object, that maintains a connection and can +//! be used to join workspaces or attach to buffers. +//! +//! Some actions will return structs implementing the [Controller] trait. These can be polled +//! for new stream events, which will be returned in order. Blocking and callback variants are also +//! implemented. The [Controller] can also be used to send new events to the server. +//! +//! ## Features +//! * `proto` : include GRCP protocol definitions under [proto] (default) +//! * `global`: provide a lazy_static global INSTANCE in [instance::global] +//! * `sync` : wraps the [instance::a_sync::Instance] holder into a sync variant: [instance::sync::Instance] +//! + + +/// cursor related types and controller pub mod cursor; + +/// buffer operations, factory, controller and types pub mod buffer; + +/// crate error types and helpers pub mod errors; + +/// underlying client session manager pub mod client; + +/// client wrapper to handle memory persistence pub mod instance; +/// all-in-one imports : `use codemp::prelude::*;` pub mod prelude; -pub use tonic; -pub use tokio; +/// underlying OperationalTransform library used, re-exported pub use operational_transform as ot; +pub use client::Client; + +#[cfg(feature = "sync")] pub use instance::sync::Instance; +#[cfg(not(feature = "sync"))] pub use instance::a_sync::Instance; + +/// protocol types and services auto-generated by grpc #[cfg(feature = "proto")] #[allow(non_snake_case)] pub mod proto { @@ -18,11 +52,13 @@ pub mod proto { } pub use errors::Error; +pub use errors::Result; use std::sync::Arc; +use tokio::runtime::Runtime; #[tonic::async_trait] // TODO move this somewhere? -pub(crate) trait ControllerWorker { +pub(crate) trait ControllerWorker { type Controller : Controller; type Tx; type Rx; @@ -31,13 +67,37 @@ pub(crate) trait ControllerWorker { async fn work(self, tx: Self::Tx, rx: Self::Rx); } +/// async and threadsafe handle to a generic bidirectional stream +/// +/// this generic trait is implemented by actors managing stream procedures. +/// events can be enqueued for dispatching without blocking ([Controller:send]), and an async blocking +/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking +/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented. #[tonic::async_trait] -pub trait Controller : Sized + Send + Sync { +pub trait Controller : Sized + Send + Sync { + /// type of upstream values, used in [Self::send] type Input; - async fn send(&self, x: Self::Input) -> Result<(), Error>; - async fn recv(&self) -> Result; + /// enqueue a new value to be sent + fn send(&self, x: Self::Input) -> Result<()>; + /// get next value from stream, blocking until one is available + async fn recv(&self) -> Result; + + /// sync variant of [Self::recv], blocking invoking thread + fn blocking_recv(&self, rt: &Runtime) -> Result { + rt.block_on(self.recv()) + } + + /// register a callback to be called for each received stream value + /// + /// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given + /// callback for each received value. a stop channel should be provided, and first value sent + /// into it will stop the worker loop. + /// + /// note: creating a callback handler will hold an Arc reference to the given controller, + /// preventing it from being dropped (and likely disconnecting). using the stop channel is + /// important for proper cleanup fn callback( self: Arc, rt: &tokio::runtime::Runtime, diff --git a/src/prelude.rs b/src/prelude.rs index 73f4e49..dc9ef11 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,25 +1,22 @@ -pub use crate::client::Client as CodempClient; -pub use crate::errors::Error as CodempError; +pub use crate::{ + Error as CodempError, + Result as CodempResult, + + Client as CodempClient, + Controller as CodempController, + cursor::Controller as CodempCursorController, + buffer::Controller as CodempBufferController, -pub use crate::Controller as CodempController; -pub use crate::cursor::controller::CursorController as CodempCursorController; -pub use crate::buffer::controller::BufferController as CodempBufferController; + buffer::OperationFactory as CodempOperationFactory, + ot::OperationSeq as CodempOperationSeq, + buffer::TextChange as CodempTextChange, -pub use crate::buffer::factory::OperationFactory as CodempOperationFactory; -pub use operational_transform::OperationSeq as CodempOperationSeq; -pub use crate::buffer::TextChange as CodempTextChange; + proto::CursorPosition as CodempCursorPosition, + proto::CursorEvent as CodempCursorEvent, + proto::RowCol as CodempRowCol, -pub use crate::proto::{ - CursorPosition as CodempCursorPosition, - CursorEvent as CodempCursorEvent, - RowCol as CodempRowCol, + Instance as CodempInstance, }; -#[cfg(feature = "sync")] -pub use crate::instance::sync::Instance as CodempInstance; - -#[cfg(not(feature = "sync"))] -pub use crate::instance::a_sync::Instance as CodempInstance; - #[cfg(feature = "global")] pub use crate::instance::global::INSTANCE as CODEMP_INSTANCE;