feat: sync send in controller, docs, better import

This commit is contained in:
əlemi 2023-08-19 21:44:27 +02:00
parent 489ade9127
commit 1e05af6d79
10 changed files with 147 additions and 42 deletions

View file

@ -10,7 +10,7 @@ use super::TextChange;
pub struct BufferController { pub struct BufferController {
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::Sender<OperationSeq>, operations: mpsc::UnboundedSender<OperationSeq>,
stream: Mutex<broadcast::Receiver<OperationSeq>>, stream: Mutex<broadcast::Receiver<OperationSeq>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
} }
@ -18,7 +18,7 @@ pub struct BufferController {
impl BufferController { impl BufferController {
pub(crate) fn new( pub(crate) fn new(
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::Sender<OperationSeq>, operations: mpsc::UnboundedSender<OperationSeq>,
stream: Mutex<broadcast::Receiver<OperationSeq>>, stream: Mutex<broadcast::Receiver<OperationSeq>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
@ -54,7 +54,7 @@ impl Controller<TextChange> for BufferController {
Ok(TextChange { span, content }) Ok(TextChange { span, content })
} }
async fn send(&self, op: OperationSeq) -> Result<(), Error> { fn send(&self, op: OperationSeq) -> Result<(), Error> {
Ok(self.operations.send(op).await?) Ok(self.operations.send(op)?)
} }
} }

View file

@ -4,7 +4,11 @@ pub(crate) mod worker;
pub mod controller; pub mod controller;
pub mod factory; pub mod factory;
pub use factory::OperationFactory;
pub use controller::BufferController as Controller;
/// TODO move in proto
#[derive(Debug)] #[derive(Debug)]
pub struct TextChange { pub struct TextChange {
pub span: Range<usize>, pub span: Range<usize>,

View file

@ -17,11 +17,11 @@ use super::controller::BufferController;
pub(crate) struct BufferControllerWorker { pub(crate) struct BufferControllerWorker {
uid: String, uid: String,
pub(crate) content: watch::Sender<String>, pub(crate) content: watch::Sender<String>,
pub(crate) operations: mpsc::Receiver<OperationSeq>, pub(crate) operations: mpsc::UnboundedReceiver<OperationSeq>,
pub(crate) stream: Arc<broadcast::Sender<OperationSeq>>, pub(crate) stream: Arc<broadcast::Sender<OperationSeq>>,
pub(crate) queue: VecDeque<OperationSeq>, pub(crate) queue: VecDeque<OperationSeq>,
receiver: watch::Receiver<String>, receiver: watch::Receiver<String>,
sender: mpsc::Sender<OperationSeq>, sender: mpsc::UnboundedSender<OperationSeq>,
buffer: String, buffer: String,
path: String, path: String,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
@ -31,7 +31,7 @@ pub(crate) struct BufferControllerWorker {
impl BufferControllerWorker { impl BufferControllerWorker {
pub fn new(uid: String, buffer: &str, path: &str) -> Self { pub fn new(uid: String, buffer: &str, path: &str) -> Self {
let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (txt_tx, txt_rx) = watch::channel(buffer.to_string());
let (op_tx, op_rx) = mpsc::channel(64); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (s_tx, _s_rx) = broadcast::channel(64); let (s_tx, _s_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
BufferControllerWorker { BufferControllerWorker {

View file

@ -5,7 +5,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::Ign
pub struct CursorController { pub struct CursorController {
uid: String, uid: String,
op: mpsc::Sender<CursorEvent>, op: mpsc::UnboundedSender<CursorEvent>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
} }
@ -19,7 +19,7 @@ impl Drop for CursorController {
impl CursorController { impl CursorController {
pub(crate) fn new( pub(crate) fn new(
uid: String, uid: String,
op: mpsc::Sender<CursorEvent>, op: mpsc::UnboundedSender<CursorEvent>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
@ -31,11 +31,11 @@ impl CursorController {
impl Controller<CursorEvent> for CursorController { impl Controller<CursorEvent> for CursorController {
type Input = CursorPosition; type Input = CursorPosition;
async fn send(&self, cursor: CursorPosition) -> Result<(), Error> { fn send(&self, cursor: CursorPosition) -> Result<(), Error> {
Ok(self.op.send(CursorEvent { Ok(self.op.send(CursorEvent {
user: self.uid.clone(), user: self.uid.clone(),
position: Some(cursor), position: Some(cursor),
}).await?) })?)
} }
// TODO is this cancelable? so it can be used in tokio::select! // TODO is this cancelable? so it can be used in tokio::select!

View file

@ -1,6 +1,8 @@
pub(crate) mod worker; pub(crate) mod worker;
pub mod controller; pub mod controller;
pub use controller::CursorController as Controller;
use crate::proto::{RowCol, CursorPosition}; use crate::proto::{RowCol, CursorPosition};
impl From::<RowCol> for (i32, i32) { impl From::<RowCol> for (i32, i32) {

View file

@ -9,8 +9,8 @@ use super::controller::CursorController;
pub(crate) struct CursorControllerWorker { pub(crate) struct CursorControllerWorker {
uid: String, uid: String,
producer: mpsc::Sender<CursorEvent>, producer: mpsc::UnboundedSender<CursorEvent>,
op: mpsc::Receiver<CursorEvent>, op: mpsc::UnboundedReceiver<CursorEvent>,
channel: Arc<broadcast::Sender<CursorEvent>>, channel: Arc<broadcast::Sender<CursorEvent>>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>, stop_control: mpsc::UnboundedSender<()>,
@ -18,7 +18,7 @@ pub(crate) struct CursorControllerWorker {
impl CursorControllerWorker { impl CursorControllerWorker {
pub(crate) fn new(uid: String) -> Self { pub(crate) fn new(uid: String) -> Self {
let (op_tx, op_rx) = mpsc::channel(64); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (cur_tx, _cur_rx) = broadcast::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
Self { Self {

View file

@ -1,4 +1,4 @@
use std::{error::Error as StdError, fmt::Display}; use std::{result::Result as StdResult, error::Error as StdError, fmt::Display};
use tokio::sync::{mpsc, broadcast}; use tokio::sync::{mpsc, broadcast};
use tonic::{Status, Code}; use tonic::{Status, Code};
@ -8,7 +8,7 @@ pub trait IgnorableError {
fn unwrap_or_warn(self, msg: &str); fn unwrap_or_warn(self, msg: &str);
} }
impl<T, E> IgnorableError for Result<T, E> impl<T, E> IgnorableError for StdResult<T, E>
where E : std::fmt::Display { where E : std::fmt::Display {
fn unwrap_or_warn(self, msg: &str) { fn unwrap_or_warn(self, msg: &str) {
match self { match self {
@ -18,6 +18,8 @@ where E : std::fmt::Display {
} }
} }
pub type Result<T> = StdResult<T, Error>;
// TODO split this into specific errors for various parts of the library // TODO split this into specific errors for various parts of the library
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {

View file

@ -1,17 +1,28 @@
//! ### Instance
//!
//! This module provides convenience managers for the client instance
//!
//! the global instance reference is immutable and lazy-loaded, and requires `global` feature.
/// static global instance, allocated only if feature `global` is active
#[cfg(feature = "global")] #[cfg(feature = "global")]
pub mod global { pub mod global {
#[cfg(not(feature = "sync"))] #[cfg(not(feature = "sync"))]
lazy_static::lazy_static! { lazy_static::lazy_static! {
/// the global instance of codemp session
pub static ref INSTANCE : super::a_sync::Instance = super::a_sync::Instance::default(); pub static ref INSTANCE : super::a_sync::Instance = super::a_sync::Instance::default();
} }
#[cfg(feature = "sync")] #[cfg(feature = "sync")]
lazy_static::lazy_static! { lazy_static::lazy_static! {
/// the global instance of codemp session
pub static ref INSTANCE : super::sync::Instance = super::sync::Instance::default(); pub static ref INSTANCE : super::sync::Instance = super::sync::Instance::default();
} }
} }
#[cfg(not(feature = "sync"))] pub use global::INSTANCE;
/// async implementation of session instance
pub mod a_sync { pub mod a_sync {
use std::sync::Arc; use std::sync::Arc;
@ -22,17 +33,24 @@ pub mod a_sync {
errors::Error, client::Client, cursor::controller::CursorController, errors::Error, client::Client, cursor::controller::CursorController,
}; };
/// persistant session manager for codemp client
///
/// will hold a tokio mutex over an optional client, and drop its reference when disconnecting.
/// all methods are async because will await mutex availability
#[derive(Default)] #[derive(Default)]
pub struct Instance { pub struct Instance {
/// the tokio mutex containing a client, if connected
client: Mutex<Option<Client>>, client: Mutex<Option<Client>>,
} }
impl Instance { impl Instance {
/// connect to remote address instantiating a new client [crate::Client::new]
pub async fn connect(&self, addr: &str) -> Result<(), Error> { pub async fn connect(&self, addr: &str) -> Result<(), Error> {
*self.client.lock().await = Some(Client::new(addr).await?); *self.client.lock().await = Some(Client::new(addr).await?);
Ok(()) Ok(())
} }
/// threadsafe version of [crate::Client::join]
pub async fn join(&self, session: &str) -> Result<Arc<CursorController>, Error> { pub async fn join(&self, session: &str) -> Result<Arc<CursorController>, Error> {
self.client self.client
.lock().await .lock().await
@ -42,6 +60,7 @@ pub mod a_sync {
.await .await
} }
/// threadsafe version of [crate::Client::create]
pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> {
self.client self.client
.lock().await .lock().await
@ -51,6 +70,7 @@ pub mod a_sync {
.await .await
} }
/// threadsafe version of [crate::Client::attach]
pub async fn attach(&self, path: &str) -> Result<Arc<BufferController>, Error> { pub async fn attach(&self, path: &str) -> Result<Arc<BufferController>, Error> {
self.client self.client
.lock().await .lock().await
@ -60,6 +80,7 @@ pub mod a_sync {
.await .await
} }
/// threadsafe version of [crate::Client::get_cursor]
pub async fn get_cursor(&self) -> Result<Arc<CursorController>, Error> { pub async fn get_cursor(&self) -> Result<Arc<CursorController>, Error> {
self.client self.client
.lock().await .lock().await
@ -69,6 +90,7 @@ pub mod a_sync {
.ok_or(Error::InvalidState { msg: "join workspace first".into() }) .ok_or(Error::InvalidState { msg: "join workspace first".into() })
} }
/// threadsafe version of [crate::Client::get_buffer]
pub async fn get_buffer(&self, path: &str) -> Result<Arc<BufferController>, Error> { pub async fn get_buffer(&self, path: &str) -> Result<Arc<BufferController>, Error> {
self.client self.client
.lock().await .lock().await
@ -78,6 +100,7 @@ pub mod a_sync {
.ok_or(Error::InvalidState { msg: "join workspace first".into() }) .ok_or(Error::InvalidState { msg: "join workspace first".into() })
} }
/// threadsafe version of [crate::Client::leave_workspace]
pub async fn leave_workspace(&self) -> Result<(), Error> { pub async fn leave_workspace(&self) -> Result<(), Error> {
self.client self.client
.lock().await .lock().await
@ -87,6 +110,7 @@ pub mod a_sync {
Ok(()) Ok(())
} }
/// threadsafe version of [crate::Client::disconnect_buffer]
pub async fn disconnect_buffer(&self, path: &str) -> Result<bool, Error> { pub async fn disconnect_buffer(&self, path: &str) -> Result<bool, Error> {
let res = self.client let res = self.client
.lock().await .lock().await
@ -98,7 +122,7 @@ pub mod a_sync {
} }
} }
#[cfg(feature = "sync")] /// sync implementation of session instance
pub mod sync { pub mod sync {
use std::sync::{Mutex, Arc}; use std::sync::{Mutex, Arc};
@ -110,8 +134,15 @@ pub mod sync {
buffer::controller::BufferController buffer::controller::BufferController
}; };
/// persistant session manager for codemp client
///
/// will hold a std mutex over an optional client, and drop its reference when disconnecting.
/// also contains a tokio runtime to execute async futures on
/// all methods are wrapped on a runtime.block_on and thus sync
pub struct Instance { pub struct Instance {
/// the std mutex containing a client, if connected
client: Mutex<Option<Client>>, client: Mutex<Option<Client>>,
/// the tokio runtime
runtime: Runtime, runtime: Runtime,
} }
@ -133,37 +164,46 @@ pub mod sync {
} }
} }
/// return a reference to contained tokio runtime, to spawn tasks on
pub fn rt(&self) -> &Runtime { &self.runtime } pub fn rt(&self) -> &Runtime { &self.runtime }
/// connect and store a client session, threadsafe and sync version of [crate::Client::new]
pub fn connect(&self, addr: &str) -> Result<(), Error> { pub fn connect(&self, addr: &str) -> Result<(), Error> {
*self.client.lock().expect("client mutex poisoned") = Some(self.rt().block_on(Client::new(addr))?); *self.client.lock().expect("client mutex poisoned") = Some(self.rt().block_on(Client::new(addr))?);
Ok(()) Ok(())
} }
/// threadsafe and sync version of [crate::Client::join]
pub fn join(&self, session: &str) -> Result<Arc<CursorController>, Error> { pub fn join(&self, session: &str) -> Result<Arc<CursorController>, Error> {
self.if_client(|c| self.rt().block_on(c.join(session)))? self.if_client(|c| self.rt().block_on(c.join(session)))?
} }
/// threadsafe and sync version of [crate::Client::create]
pub fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { pub fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> {
self.if_client(|c| self.rt().block_on(c.create(path, content)))? self.if_client(|c| self.rt().block_on(c.create(path, content)))?
} }
/// threadsafe and sync version of [crate::Client::attach]
pub fn attach(&self, path: &str) -> Result<Arc<BufferController>, Error> { pub fn attach(&self, path: &str) -> Result<Arc<BufferController>, Error> {
self.if_client(|c| self.rt().block_on(c.attach(path)))? self.if_client(|c| self.rt().block_on(c.attach(path)))?
} }
/// threadsafe and sync version of [crate::Client::get_cursor]
pub fn get_cursor(&self) -> Result<Arc<CursorController>, Error> { pub fn get_cursor(&self) -> Result<Arc<CursorController>, Error> {
self.if_client(|c| c.get_cursor().ok_or(Error::InvalidState { msg: "join workspace first".into() }))? self.if_client(|c| c.get_cursor().ok_or(Error::InvalidState { msg: "join workspace first".into() }))?
} }
/// threadsafe and sync version of [crate::Client::get_buffer]
pub fn get_buffer(&self, path: &str) -> Result<Arc<BufferController>, Error> { pub fn get_buffer(&self, path: &str) -> Result<Arc<BufferController>, Error> {
self.if_client(|c| c.get_buffer(path).ok_or(Error::InvalidState { msg: "join workspace or create requested buffer first".into() }))? self.if_client(|c| c.get_buffer(path).ok_or(Error::InvalidState { msg: "join workspace or create requested buffer first".into() }))?
} }
/// threadsafe and sync version of [crate::Client::leave_workspace]
pub fn leave_workspace(&self) -> Result<(), Error> { pub fn leave_workspace(&self) -> Result<(), Error> {
self.if_client(|c| c.leave_workspace()) self.if_client(|c| c.leave_workspace())
} }
/// threadsafe and sync version of [crate::Client::disconnect_buffer]
pub fn disconnect_buffer(&self, path: &str) -> Result<bool, Error> { pub fn disconnect_buffer(&self, path: &str) -> Result<bool, Error> {
self.if_client(|c| c.disconnect_buffer(path)) self.if_client(|c| c.disconnect_buffer(path))
} }

View file

@ -1,15 +1,49 @@
//! # MultiPlayer Code Editing
//!
//! This is the core library of the codemp project.
//!
//! ## Structure
//! The main entrypoint is the [client::Client] object, that maintains a connection and can
//! be used to join workspaces or attach to buffers.
//!
//! Some actions will return structs implementing the [Controller] trait. These can be polled
//! for new stream events, which will be returned in order. Blocking and callback variants are also
//! implemented. The [Controller] can also be used to send new events to the server.
//!
//! ## Features
//! * `proto` : include GRCP protocol definitions under [proto] (default)
//! * `global`: provide a lazy_static global INSTANCE in [instance::global]
//! * `sync` : wraps the [instance::a_sync::Instance] holder into a sync variant: [instance::sync::Instance]
//!
/// cursor related types and controller
pub mod cursor; pub mod cursor;
/// buffer operations, factory, controller and types
pub mod buffer; pub mod buffer;
/// crate error types and helpers
pub mod errors; pub mod errors;
/// underlying client session manager
pub mod client; pub mod client;
/// client wrapper to handle memory persistence
pub mod instance; pub mod instance;
/// all-in-one imports : `use codemp::prelude::*;`
pub mod prelude; pub mod prelude;
pub use tonic; /// underlying OperationalTransform library used, re-exported
pub use tokio;
pub use operational_transform as ot; pub use operational_transform as ot;
pub use client::Client;
#[cfg(feature = "sync")] pub use instance::sync::Instance;
#[cfg(not(feature = "sync"))] pub use instance::a_sync::Instance;
/// protocol types and services auto-generated by grpc
#[cfg(feature = "proto")] #[cfg(feature = "proto")]
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub mod proto { pub mod proto {
@ -18,11 +52,13 @@ pub mod proto {
} }
pub use errors::Error; pub use errors::Error;
pub use errors::Result;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Runtime;
#[tonic::async_trait] // TODO move this somewhere? #[tonic::async_trait] // TODO move this somewhere?
pub(crate) trait ControllerWorker<T> { pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
type Controller : Controller<T>; type Controller : Controller<T>;
type Tx; type Tx;
type Rx; type Rx;
@ -31,13 +67,37 @@ pub(crate) trait ControllerWorker<T> {
async fn work(self, tx: Self::Tx, rx: Self::Rx); async fn work(self, tx: Self::Tx, rx: Self::Rx);
} }
/// async and threadsafe handle to a generic bidirectional stream
///
/// this generic trait is implemented by actors managing stream procedures.
/// events can be enqueued for dispatching without blocking ([Controller:send]), and an async blocking
/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking
/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented.
#[tonic::async_trait] #[tonic::async_trait]
pub trait Controller<T> : Sized + Send + Sync { pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// type of upstream values, used in [Self::send]
type Input; type Input;
async fn send(&self, x: Self::Input) -> Result<(), Error>; /// enqueue a new value to be sent
async fn recv(&self) -> Result<T, Error>; fn send(&self, x: Self::Input) -> Result<()>;
/// get next value from stream, blocking until one is available
async fn recv(&self) -> Result<T>;
/// sync variant of [Self::recv], blocking invoking thread
fn blocking_recv(&self, rt: &Runtime) -> Result<T> {
rt.block_on(self.recv())
}
/// register a callback to be called for each received stream value
///
/// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given
/// callback for each received value. a stop channel should be provided, and first value sent
/// into it will stop the worker loop.
///
/// note: creating a callback handler will hold an Arc reference to the given controller,
/// preventing it from being dropped (and likely disconnecting). using the stop channel is
/// important for proper cleanup
fn callback<F>( fn callback<F>(
self: Arc<Self>, self: Arc<Self>,
rt: &tokio::runtime::Runtime, rt: &tokio::runtime::Runtime,

View file

@ -1,25 +1,22 @@
pub use crate::client::Client as CodempClient; pub use crate::{
pub use crate::errors::Error as CodempError; Error as CodempError,
Result as CodempResult,
Client as CodempClient,
Controller as CodempController,
cursor::Controller as CodempCursorController,
buffer::Controller as CodempBufferController,
pub use crate::Controller as CodempController; buffer::OperationFactory as CodempOperationFactory,
pub use crate::cursor::controller::CursorController as CodempCursorController; ot::OperationSeq as CodempOperationSeq,
pub use crate::buffer::controller::BufferController as CodempBufferController; buffer::TextChange as CodempTextChange,
pub use crate::buffer::factory::OperationFactory as CodempOperationFactory; proto::CursorPosition as CodempCursorPosition,
pub use operational_transform::OperationSeq as CodempOperationSeq; proto::CursorEvent as CodempCursorEvent,
pub use crate::buffer::TextChange as CodempTextChange; proto::RowCol as CodempRowCol,
pub use crate::proto::{ Instance as CodempInstance,
CursorPosition as CodempCursorPosition,
CursorEvent as CodempCursorEvent,
RowCol as CodempRowCol,
}; };
#[cfg(feature = "sync")]
pub use crate::instance::sync::Instance as CodempInstance;
#[cfg(not(feature = "sync"))]
pub use crate::instance::a_sync::Instance as CodempInstance;
#[cfg(feature = "global")] #[cfg(feature = "global")]
pub use crate::instance::global::INSTANCE as CODEMP_INSTANCE; pub use crate::instance::global::INSTANCE as CODEMP_INSTANCE;