chore: moved controller under api module

This commit is contained in:
əlemi 2023-09-03 23:04:08 +02:00
parent 62f7eef835
commit 1034f0482c
8 changed files with 115 additions and 104 deletions

80
src/api.rs Normal file
View file

@ -0,0 +1,80 @@
use crate::Result;
use std::sync::Arc;
use tokio::runtime::Runtime;
#[tonic::async_trait] // TODO move this somewhere?
pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
type Controller : Controller<T>;
type Tx;
type Rx;
fn subscribe(&self) -> Self::Controller;
async fn work(self, tx: Self::Tx, rx: Self::Rx);
}
/// async and threadsafe handle to a generic bidirectional stream
///
/// this generic trait is implemented by actors managing stream procedures.
/// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking
/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking
/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented.
#[tonic::async_trait]
pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// type of upstream values, used in [Self::send]
type Input;
/// enqueue a new value to be sent
fn send(&self, x: Self::Input) -> Result<()>;
/// get next value from stream, blocking until one is available
///
/// this is just an async trait function wrapped by `async_trait`:
///
/// `async fn recv(&self) -> codemp::Result<T>;`
async fn recv(&self) -> Result<T>;
/// block until next value is added to the stream without removing any element
///
/// this is just an async trait function wrapped by `async_trait`:
///
/// `async fn poll(&self) -> codemp::Result<()>;`
async fn poll(&self) -> Result<()>;
/// attempt to receive a value without blocking, return None if nothing is available
fn try_recv(&self) -> Result<Option<T>>;
/// sync variant of [Self::recv], blocking invoking thread
fn blocking_recv(&self, rt: &Runtime) -> Result<T> {
rt.block_on(self.recv())
}
/// register a callback to be called for each received stream value
///
/// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given
/// callback for each received value. a stop channel should be provided, and first value sent
/// into it will stop the worker loop.
///
/// note: creating a callback handler will hold an Arc reference to the given controller,
/// preventing it from being dropped (and likely disconnecting). using the stop channel is
/// important for proper cleanup
fn callback<F>(
self: &Arc<Self>,
rt: &tokio::runtime::Runtime,
mut stop: tokio::sync::mpsc::UnboundedReceiver<()>,
mut cb: F
) where
Self : 'static,
F : FnMut(T) + Sync + Send + 'static
{
let _self = self.clone();
rt.spawn(async move {
loop {
tokio::select! {
Ok(data) = _self.recv() => cb(data),
Some(()) = stop.recv() => break,
else => break,
}
}
});
}
}

View file

@ -8,7 +8,7 @@ use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tonic::async_trait; use tonic::async_trait;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
use crate::{Controller, Error}; use crate::{api::Controller, Error};
use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory};
use super::TextChange; use super::TextChange;

View file

@ -8,7 +8,7 @@ use tonic::{async_trait, Streaming};
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
use crate::proto::{OperationRequest, RawOp}; use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient; use crate::proto::buffer_client::BufferClient;
use crate::ControllerWorker; use crate::api::ControllerWorker;
use super::TextChange; use super::TextChange;
use super::controller::BufferController; use super::controller::BufferController;

View file

@ -11,7 +11,7 @@ use crate::{
proto::{ proto::{
buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload,
}, },
Error, ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, Error, api::ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker},
}; };
@ -73,8 +73,8 @@ impl Client {
/// join a workspace, starting a cursorcontroller and returning a new reference to it /// join a workspace, starting a cursorcontroller and returning a new reference to it
/// ///
/// to interact with such workspace [crate::Controller::send] cursor events or /// to interact with such workspace [crate::api::Controller::send] cursor events or
/// [crate::Controller::recv] for events on the associated [crate::cursor::Controller]. /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller].
pub async fn join(&mut self, _session: &str) -> Result<Arc<CursorController>, Error> { pub async fn join(&mut self, _session: &str) -> Result<Arc<CursorController>, Error> {
// TODO there is no real workspace handling in codemp server so it behaves like one big global // TODO there is no real workspace handling in codemp server so it behaves like one big global
// session. I'm still creating this to start laying out the proper use flow // session. I'm still creating this to start laying out the proper use flow
@ -119,8 +119,8 @@ impl Client {
/// attach to a buffer, starting a buffer controller and returning a new reference to it /// attach to a buffer, starting a buffer controller and returning a new reference to it
/// ///
/// to interact with such buffer [crate::Controller::send] operation sequences /// to interact with such buffer [crate::api::Controller::send] operation sequences
/// or [crate::Controller::recv] for text events using its [crate::buffer::Controller]. /// or [crate::api::Controller::recv] for text events using its [crate::buffer::Controller].
/// to generate operation sequences use the [crate::buffer::OperationFactory] /// to generate operation sequences use the [crate::buffer::OperationFactory]
/// methods, which are implemented on [crate::buffer::Controller], such as /// methods, which are implemented on [crate::buffer::Controller], such as
/// [crate::buffer::OperationFactory::delta]. /// [crate::buffer::OperationFactory::delta].

View file

@ -5,7 +5,7 @@
use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError}; use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors::IgnorableError};
/// the cursor controller implementation /// the cursor controller implementation
/// ///

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
use tonic::{Streaming, transport::Channel, async_trait}; use tonic::{Streaming, transport::Channel, async_trait};
use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::ControllerWorker};
use super::controller::CursorController; use super::controller::CursorController;

View file

@ -36,11 +36,12 @@
//! [instance::a_sync::Instance] //! [instance::a_sync::Instance]
//! //!
//! ```rust,no_run //! ```rust,no_run
//! use codemp::Controller; //! use codemp::api::Controller;
//! use codemp::buffer::OperationFactory; //! use codemp::buffer::OperationFactory;
//! # use codemp::instance::a_sync::Instance;
//! //!
//! # async fn async_example() -> codemp::Result<()> { //! # async fn async_example() -> codemp::Result<()> {
//! let session = codemp::Instance::default(); // create global session //! let session = Instance::default(); // create global session
//! session.connect("http://alemi.dev:50051").await?; // connect to remote server //! session.connect("http://alemi.dev:50051").await?; // connect to remote server
//! //!
//! // join a remote workspace, obtaining a cursor controller //! // join a remote workspace, obtaining a cursor controller
@ -74,7 +75,7 @@
//! ```rust,no_run //! ```rust,no_run
//! # use codemp::instance::sync::Instance; //! # use codemp::instance::sync::Instance;
//! # use std::sync::Arc; //! # use std::sync::Arc;
//! # use codemp::Controller; //! # use codemp::api::Controller;
//! # //! #
//! # fn sync_example() -> codemp::Result<()> { //! # fn sync_example() -> codemp::Result<()> {
//! let session = Instance::default(); // instantiate sync variant //! let session = Instance::default(); // instantiate sync variant
@ -108,13 +109,18 @@
//! # use codemp::instance::sync::Instance; //! # use codemp::instance::sync::Instance;
//! # use std::sync::Arc; //! # use std::sync::Arc;
//! use codemp::prelude::*; // prelude includes everything with "Codemp" in front //! use codemp::prelude::*; // prelude includes everything with "Codemp" in front
//! # async fn global_example() -> codemp::Result<()> { //! # fn global_example() -> codemp::Result<()> {
//! CODEMP_INSTANCE.connect("http://alemi.dev:50051").await?; // connect to server //! CODEMP_INSTANCE.connect("http://alemi.dev:50051")?; // connect to server
//! let cursor = CODEMP_INSTANCE.join("some_workspace").await?; // join workspace //! let cursor = CODEMP_INSTANCE.join("some_workspace")?; // join workspace
//! while let Ok(event) = cursor.recv().await { // receive cursor events //! std::thread::spawn(move || {
//! println!("received cursor event: {:?}", event); //! loop {
//! match cursor.try_recv() {
//! Ok(Some(event)) => println!("received cursor event: {:?}", event), // there might be more
//! Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)), // wait for more
//! Err(e) => break, // channel closed
//! } //! }
//! # //! }
//! });
//! # Ok(()) //! # Ok(())
//! # } //! # }
//! ``` //! ```
@ -133,6 +139,9 @@
//! //!
/// public traits exposed to clients
pub mod api;
/// cursor related types and controller /// cursor related types and controller
pub mod cursor; pub mod cursor;
@ -154,11 +163,6 @@ pub mod prelude;
/// underlying OperationalTransform library used, re-exported /// underlying OperationalTransform library used, re-exported
pub use operational_transform as ot; pub use operational_transform as ot;
pub use client::Client;
#[cfg(feature = "sync")] pub use instance::sync::Instance;
#[cfg(not(feature = "sync"))] pub use instance::a_sync::Instance;
/// protocol types and services auto-generated by grpc /// protocol types and services auto-generated by grpc
#[cfg(feature = "proto")] #[cfg(feature = "proto")]
#[allow(non_snake_case)] #[allow(non_snake_case)]
@ -167,85 +171,12 @@ pub mod proto {
tonic::include_proto!("codemp.cursor"); tonic::include_proto!("codemp.cursor");
} }
pub use api::Controller;
pub use client::Client;
pub use errors::Error; pub use errors::Error;
pub use errors::Result; pub use errors::Result;
#[cfg(feature = "sync")] pub use instance::sync::Instance;
#[cfg(not(feature = "sync"))] pub use instance::a_sync::Instance;
use std::sync::Arc;
use tokio::runtime::Runtime;
#[tonic::async_trait] // TODO move this somewhere?
pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
type Controller : Controller<T>;
type Tx;
type Rx;
fn subscribe(&self) -> Self::Controller;
async fn work(self, tx: Self::Tx, rx: Self::Rx);
}
/// async and threadsafe handle to a generic bidirectional stream
///
/// this generic trait is implemented by actors managing stream procedures.
/// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking
/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking
/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented.
#[tonic::async_trait]
pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// type of upstream values, used in [Self::send]
type Input;
/// enqueue a new value to be sent
fn send(&self, x: Self::Input) -> Result<()>;
/// get next value from stream, blocking until one is available
///
/// this is just an async trait function wrapped by `async_trait`:
///
/// `async fn recv(&self) -> codemp::Result<T>;`
async fn recv(&self) -> Result<T>;
/// block until next value is added to the stream without removing any element
///
/// this is just an async trait function wrapped by `async_trait`:
///
/// `async fn poll(&self) -> codemp::Result<()>;`
async fn poll(&self) -> Result<()>;
/// attempt to receive a value without blocking, return None if nothing is available
fn try_recv(&self) -> Result<Option<T>>;
/// sync variant of [Self::recv], blocking invoking thread
fn blocking_recv(&self, rt: &Runtime) -> Result<T> {
rt.block_on(self.recv())
}
/// register a callback to be called for each received stream value
///
/// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given
/// callback for each received value. a stop channel should be provided, and first value sent
/// into it will stop the worker loop.
///
/// note: creating a callback handler will hold an Arc reference to the given controller,
/// preventing it from being dropped (and likely disconnecting). using the stop channel is
/// important for proper cleanup
fn callback<F>(
self: &Arc<Self>,
rt: &tokio::runtime::Runtime,
mut stop: tokio::sync::mpsc::UnboundedReceiver<()>,
mut cb: F
) where
Self : 'static,
F : FnMut(T) + Sync + Send + 'static
{
let _self = self.clone();
rt.spawn(async move {
loop {
tokio::select! {
Ok(data) = _self.recv() => cb(data),
Some(()) = stop.recv() => break,
else => break,
}
}
});
}
}

View file

@ -7,7 +7,7 @@ pub use crate::{
Result as CodempResult, Result as CodempResult,
Client as CodempClient, Client as CodempClient,
Controller as CodempController, api::Controller as CodempController,
cursor::Controller as CodempCursorController, cursor::Controller as CodempCursorController,
buffer::Controller as CodempBufferController, buffer::Controller as CodempBufferController,