From 1034f0482cfca1b1dbe19aae073419d0ad3a1fa5 Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 3 Sep 2023 23:04:08 +0200 Subject: [PATCH] chore: moved controller under api module --- src/api.rs | 80 ++++++++++++++++++++++++++ src/buffer/controller.rs | 2 +- src/buffer/worker.rs | 2 +- src/client.rs | 10 ++-- src/cursor/controller.rs | 2 +- src/cursor/worker.rs | 2 +- src/lib.rs | 119 ++++++++------------------------------- src/prelude.rs | 2 +- 8 files changed, 115 insertions(+), 104 deletions(-) create mode 100644 src/api.rs diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..8ed8b7f --- /dev/null +++ b/src/api.rs @@ -0,0 +1,80 @@ +use crate::Result; +use std::sync::Arc; +use tokio::runtime::Runtime; + +#[tonic::async_trait] // TODO move this somewhere? +pub(crate) trait ControllerWorker { + type Controller : Controller; + type Tx; + type Rx; + + fn subscribe(&self) -> Self::Controller; + async fn work(self, tx: Self::Tx, rx: Self::Rx); +} + +/// async and threadsafe handle to a generic bidirectional stream +/// +/// this generic trait is implemented by actors managing stream procedures. +/// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking +/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking +/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented. +#[tonic::async_trait] +pub trait Controller : Sized + Send + Sync { + /// type of upstream values, used in [Self::send] + type Input; + + /// enqueue a new value to be sent + fn send(&self, x: Self::Input) -> Result<()>; + + /// get next value from stream, blocking until one is available + /// + /// this is just an async trait function wrapped by `async_trait`: + /// + /// `async fn recv(&self) -> codemp::Result;` + async fn recv(&self) -> Result; + + /// block until next value is added to the stream without removing any element + /// + /// this is just an async trait function wrapped by `async_trait`: + /// + /// `async fn poll(&self) -> codemp::Result<()>;` + async fn poll(&self) -> Result<()>; + + /// attempt to receive a value without blocking, return None if nothing is available + fn try_recv(&self) -> Result>; + + /// sync variant of [Self::recv], blocking invoking thread + fn blocking_recv(&self, rt: &Runtime) -> Result { + rt.block_on(self.recv()) + } + + /// register a callback to be called for each received stream value + /// + /// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given + /// callback for each received value. a stop channel should be provided, and first value sent + /// into it will stop the worker loop. + /// + /// note: creating a callback handler will hold an Arc reference to the given controller, + /// preventing it from being dropped (and likely disconnecting). using the stop channel is + /// important for proper cleanup + fn callback( + self: &Arc, + rt: &tokio::runtime::Runtime, + mut stop: tokio::sync::mpsc::UnboundedReceiver<()>, + mut cb: F + ) where + Self : 'static, + F : FnMut(T) + Sync + Send + 'static + { + let _self = self.clone(); + rt.spawn(async move { + loop { + tokio::select! { + Ok(data) = _self.recv() => cb(data), + Some(()) = stop.recv() => break, + else => break, + } + } + }); + } +} diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index b0eee32..26f540c 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -8,7 +8,7 @@ use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::async_trait; use crate::errors::IgnorableError; -use crate::{Controller, Error}; +use crate::{api::Controller, Error}; use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; use super::TextChange; diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 509b364..ef3b92a 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -8,7 +8,7 @@ use tonic::{async_trait, Streaming}; use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; -use crate::ControllerWorker; +use crate::api::ControllerWorker; use super::TextChange; use super::controller::BufferController; diff --git a/src/client.rs b/src/client.rs index 2f0341b..531a50c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,7 @@ use crate::{ proto::{ buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, }, - Error, ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, + Error, api::ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, }; @@ -73,8 +73,8 @@ impl Client { /// join a workspace, starting a cursorcontroller and returning a new reference to it /// - /// to interact with such workspace [crate::Controller::send] cursor events or - /// [crate::Controller::recv] for events on the associated [crate::cursor::Controller]. + /// to interact with such workspace [crate::api::Controller::send] cursor events or + /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller]. pub async fn join(&mut self, _session: &str) -> Result, Error> { // TODO there is no real workspace handling in codemp server so it behaves like one big global // session. I'm still creating this to start laying out the proper use flow @@ -119,8 +119,8 @@ impl Client { /// attach to a buffer, starting a buffer controller and returning a new reference to it /// - /// to interact with such buffer [crate::Controller::send] operation sequences - /// or [crate::Controller::recv] for text events using its [crate::buffer::Controller]. + /// to interact with such buffer [crate::api::Controller::send] operation sequences + /// or [crate::api::Controller::recv] for text events using its [crate::buffer::Controller]. /// to generate operation sequences use the [crate::buffer::OperationFactory] /// methods, which are implemented on [crate::buffer::Controller], such as /// [crate::buffer::OperationFactory::delta]. diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 7d45379..53f8ffe 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tonic::async_trait; -use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError}; +use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors::IgnorableError}; /// the cursor controller implementation /// diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 616d44d..5666215 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, transport::Channel, async_trait}; -use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; +use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::ControllerWorker}; use super::controller::CursorController; diff --git a/src/lib.rs b/src/lib.rs index 868d31f..404c67d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,11 +36,12 @@ //! [instance::a_sync::Instance] //! //! ```rust,no_run -//! use codemp::Controller; +//! use codemp::api::Controller; //! use codemp::buffer::OperationFactory; +//! # use codemp::instance::a_sync::Instance; //! //! # async fn async_example() -> codemp::Result<()> { -//! let session = codemp::Instance::default(); // create global session +//! let session = Instance::default(); // create global session //! session.connect("http://alemi.dev:50051").await?; // connect to remote server //! //! // join a remote workspace, obtaining a cursor controller @@ -74,7 +75,7 @@ //! ```rust,no_run //! # use codemp::instance::sync::Instance; //! # use std::sync::Arc; -//! # use codemp::Controller; +//! # use codemp::api::Controller; //! # //! # fn sync_example() -> codemp::Result<()> { //! let session = Instance::default(); // instantiate sync variant @@ -108,13 +109,18 @@ //! # use codemp::instance::sync::Instance; //! # use std::sync::Arc; //! use codemp::prelude::*; // prelude includes everything with "Codemp" in front -//! # async fn global_example() -> codemp::Result<()> { -//! CODEMP_INSTANCE.connect("http://alemi.dev:50051").await?; // connect to server -//! let cursor = CODEMP_INSTANCE.join("some_workspace").await?; // join workspace -//! while let Ok(event) = cursor.recv().await { // receive cursor events -//! println!("received cursor event: {:?}", event); -//! } -//! # +//! # fn global_example() -> codemp::Result<()> { +//! CODEMP_INSTANCE.connect("http://alemi.dev:50051")?; // connect to server +//! let cursor = CODEMP_INSTANCE.join("some_workspace")?; // join workspace +//! std::thread::spawn(move || { +//! loop { +//! match cursor.try_recv() { +//! Ok(Some(event)) => println!("received cursor event: {:?}", event), // there might be more +//! Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)), // wait for more +//! Err(e) => break, // channel closed +//! } +//! } +//! }); //! # Ok(()) //! # } //! ``` @@ -133,6 +139,9 @@ //! +/// public traits exposed to clients +pub mod api; + /// cursor related types and controller pub mod cursor; @@ -154,11 +163,6 @@ pub mod prelude; /// underlying OperationalTransform library used, re-exported pub use operational_transform as ot; -pub use client::Client; - -#[cfg(feature = "sync")] pub use instance::sync::Instance; -#[cfg(not(feature = "sync"))] pub use instance::a_sync::Instance; - /// protocol types and services auto-generated by grpc #[cfg(feature = "proto")] #[allow(non_snake_case)] @@ -167,85 +171,12 @@ pub mod proto { tonic::include_proto!("codemp.cursor"); } + + +pub use api::Controller; +pub use client::Client; pub use errors::Error; pub use errors::Result; +#[cfg(feature = "sync")] pub use instance::sync::Instance; +#[cfg(not(feature = "sync"))] pub use instance::a_sync::Instance; -use std::sync::Arc; -use tokio::runtime::Runtime; - -#[tonic::async_trait] // TODO move this somewhere? -pub(crate) trait ControllerWorker { - type Controller : Controller; - type Tx; - type Rx; - - fn subscribe(&self) -> Self::Controller; - async fn work(self, tx: Self::Tx, rx: Self::Rx); -} - -/// async and threadsafe handle to a generic bidirectional stream -/// -/// this generic trait is implemented by actors managing stream procedures. -/// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking -/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking -/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented. -#[tonic::async_trait] -pub trait Controller : Sized + Send + Sync { - /// type of upstream values, used in [Self::send] - type Input; - - /// enqueue a new value to be sent - fn send(&self, x: Self::Input) -> Result<()>; - - /// get next value from stream, blocking until one is available - /// - /// this is just an async trait function wrapped by `async_trait`: - /// - /// `async fn recv(&self) -> codemp::Result;` - async fn recv(&self) -> Result; - - /// block until next value is added to the stream without removing any element - /// - /// this is just an async trait function wrapped by `async_trait`: - /// - /// `async fn poll(&self) -> codemp::Result<()>;` - async fn poll(&self) -> Result<()>; - - /// attempt to receive a value without blocking, return None if nothing is available - fn try_recv(&self) -> Result>; - - /// sync variant of [Self::recv], blocking invoking thread - fn blocking_recv(&self, rt: &Runtime) -> Result { - rt.block_on(self.recv()) - } - - /// register a callback to be called for each received stream value - /// - /// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given - /// callback for each received value. a stop channel should be provided, and first value sent - /// into it will stop the worker loop. - /// - /// note: creating a callback handler will hold an Arc reference to the given controller, - /// preventing it from being dropped (and likely disconnecting). using the stop channel is - /// important for proper cleanup - fn callback( - self: &Arc, - rt: &tokio::runtime::Runtime, - mut stop: tokio::sync::mpsc::UnboundedReceiver<()>, - mut cb: F - ) where - Self : 'static, - F : FnMut(T) + Sync + Send + 'static - { - let _self = self.clone(); - rt.spawn(async move { - loop { - tokio::select! { - Ok(data) = _self.recv() => cb(data), - Some(()) = stop.recv() => break, - else => break, - } - } - }); - } -} diff --git a/src/prelude.rs b/src/prelude.rs index 93faaca..4c50214 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -7,7 +7,7 @@ pub use crate::{ Result as CodempResult, Client as CodempClient, - Controller as CodempController, + api::Controller as CodempController, cursor::Controller as CodempCursorController, buffer::Controller as CodempBufferController,