chore: split controller trait in 2 sides

Co-authored-by: zaaarf <me@zaaarf.foo>
This commit is contained in:
əlemi 2024-09-28 03:33:13 +02:00
parent 0b471b72fa
commit 1b16d4af59
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 48 additions and 19 deletions

View file

@ -8,16 +8,17 @@ use crate::errors::ControllerResult;
// note that we don't use thiserror's #[from] because we don't want the error structs to contain // note that we don't use thiserror's #[from] because we don't want the error structs to contain
// these foreign types, and also we want these to be easily constructable // these foreign types, and also we want these to be easily constructable
/// Asynchronous and thread-safe handle to a generic bidirectional stream. /// Asynchronous and thread-safe handle to a generic bidirectional stream. Exists as a combination
/// of [`AsyncSender`] and [`AsyncReceiver`].
/// ///
/// This generic trait is implemented by actors managing stream procedures, and will generally /// This generic trait is implemented by actors managing stream procedures, and will generally
/// imply a background worker. /// imply a background worker.
/// ///
/// Events can be enqueued for dispatching without blocking with [`Controller::send`]. /// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`].
/// ///
/// For receiving events from the server, an asynchronous API with [`Controller::recv`] is /// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is
/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, /// provided; if that is not feasible, consider using [`AsyncReceiver::callback`] or, alternatively,
/// [`Controller::poll`] combined with [`Controller::try_recv`]. /// [`AsyncReceiver::poll`] combined with [`AsyncReceiver::try_recv`].
/// ///
/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have /// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
/// been dropped. /// been dropped.
@ -25,10 +26,26 @@ use crate::errors::ControllerResult;
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync { pub trait Controller<T : Sized + Send + Sync> : AsyncSender<T> + AsyncReceiver<T> {}
/// Asynchronous and thread-safe handle to send data over a stream.
/// See [`Controller`]'s documentation for details.
///
/// Details about the receiving end are left to the implementor.
#[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait AsyncSender<T : Sized + Send + Sync> : Sized + Send + Sync {
/// Enqueue a new value to be sent to all other users. /// Enqueue a new value to be sent to all other users.
async fn send(&self, x: T) -> ControllerResult<()>; async fn send(&self, x: T) -> ControllerResult<()>;
}
/// Asynchronous and thread-safe handle to receive data from a stream.
/// See [`Controller`]'s documentation for details.
///
/// Details about the sender are left to the implementor.
#[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait AsyncReceiver<T : Sized + Send + Sync> : Sized + Send + Sync {
/// Block until a value is available and returns it. /// Block until a value is available and returns it.
async fn recv(&self) -> ControllerResult<T> { async fn recv(&self) -> ControllerResult<T> {
loop { loop {

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::api::controller::{Controller, ControllerCallback}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback};
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::ControllerResult; use crate::errors::ControllerResult;
use crate::ext::InternallyMutable; use crate::ext::InternallyMutable;
@ -53,7 +53,21 @@ pub(crate) struct BufferControllerInner {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<TextChange> for BufferController { impl Controller<TextChange> for BufferController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<TextChange> for BufferController {
async fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back.
let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
Ok(())
}
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<TextChange> for BufferController {
async fn poll(&self) -> ControllerResult<()> { async fn poll(&self) -> ControllerResult<()> {
if self.0.last_update.get() != *self.0.latest_version.borrow() { if self.0.last_update.get() != *self.0.latest_version.borrow() {
return Ok(()); return Ok(());
@ -80,14 +94,6 @@ impl Controller<TextChange> for BufferController {
Ok(change) Ok(change)
} }
async fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back.
let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
Ok(())
}
fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) { fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) {
if self.0.callback.send(Some(cb.into())).is_err() { if self.0.callback.send(Some(cb.into())).is_err() {
// TODO should we panic? we failed what we were supposed to do // TODO should we panic? we failed what we were supposed to do

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::{ use crate::{
api::{controller::ControllerCallback, Controller, Cursor}, api::{controller::{AsyncReceiver, AsyncSender, ControllerCallback}, Controller, Cursor},
errors::ControllerResult, errors::ControllerResult,
}; };
use codemp_proto::{ use codemp_proto::{
@ -31,7 +31,10 @@ pub(crate) struct CursorControllerInner {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<Cursor> for CursorController { impl Controller<Cursor> for CursorController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<Cursor> for CursorController {
async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
@ -54,7 +57,10 @@ impl Controller<Cursor> for CursorController {
}) })
.await?) .await?)
} }
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<Cursor> for CursorController {
async fn try_recv(&self) -> ControllerResult<Option<Cursor>> { async fn try_recv(&self) -> ControllerResult<Option<Cursor>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.stream.send(tx).await?; self.0.stream.send(tx).await?;

View file

@ -1,7 +1,7 @@
//! ### Extensions //! ### Extensions
//! Contains a number of utils used internally or that may be of general interest. //! Contains a number of utils used internally or that may be of general interest.
use crate::{api::Controller, errors::ControllerResult}; use crate::{api::controller::AsyncReceiver, errors::ControllerResult};
use tokio::sync::mpsc; use tokio::sync::mpsc;
/// Poll all given buffer controllers and wait, returning the first one ready. /// Poll all given buffer controllers and wait, returning the first one ready.