From b2520898b537b1ba5ce3e7a12c931b5c26d01948 Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 10 Sep 2023 03:00:47 +0200 Subject: [PATCH] chore: moved factory under api, restructured --- src/api.rs | 80 ------------------ src/buffer/factory.rs | 186 ------------------------------------------ src/buffer/mod.rs | 2 - src/buffer/worker.rs | 4 +- src/client.rs | 3 +- src/cursor/worker.rs | 2 +- src/prelude.rs | 2 +- 7 files changed, 6 insertions(+), 273 deletions(-) delete mode 100644 src/api.rs delete mode 100644 src/buffer/factory.rs diff --git a/src/api.rs b/src/api.rs deleted file mode 100644 index 8ed8b7f..0000000 --- a/src/api.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::Result; -use std::sync::Arc; -use tokio::runtime::Runtime; - -#[tonic::async_trait] // TODO move this somewhere? -pub(crate) trait ControllerWorker { - type Controller : Controller; - type Tx; - type Rx; - - fn subscribe(&self) -> Self::Controller; - async fn work(self, tx: Self::Tx, rx: Self::Rx); -} - -/// async and threadsafe handle to a generic bidirectional stream -/// -/// this generic trait is implemented by actors managing stream procedures. -/// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking -/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking -/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented. -#[tonic::async_trait] -pub trait Controller : Sized + Send + Sync { - /// type of upstream values, used in [Self::send] - type Input; - - /// enqueue a new value to be sent - fn send(&self, x: Self::Input) -> Result<()>; - - /// get next value from stream, blocking until one is available - /// - /// this is just an async trait function wrapped by `async_trait`: - /// - /// `async fn recv(&self) -> codemp::Result;` - async fn recv(&self) -> Result; - - /// block until next value is added to the stream without removing any element - /// - /// this is just an async trait function wrapped by `async_trait`: - /// - /// `async fn poll(&self) -> codemp::Result<()>;` - async fn poll(&self) -> Result<()>; - - /// attempt to receive a value without blocking, return None if nothing is available - fn try_recv(&self) -> Result>; - - /// sync variant of [Self::recv], blocking invoking thread - fn blocking_recv(&self, rt: &Runtime) -> Result { - rt.block_on(self.recv()) - } - - /// register a callback to be called for each received stream value - /// - /// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given - /// callback for each received value. a stop channel should be provided, and first value sent - /// into it will stop the worker loop. - /// - /// note: creating a callback handler will hold an Arc reference to the given controller, - /// preventing it from being dropped (and likely disconnecting). using the stop channel is - /// important for proper cleanup - fn callback( - self: &Arc, - rt: &tokio::runtime::Runtime, - mut stop: tokio::sync::mpsc::UnboundedReceiver<()>, - mut cb: F - ) where - Self : 'static, - F : FnMut(T) + Sync + Send + 'static - { - let _self = self.clone(); - rt.spawn(async move { - loop { - tokio::select! { - Ok(data) = _self.recv() => cb(data), - Some(()) = stop.recv() => break, - else => break, - } - } - }); - } -} diff --git a/src/buffer/factory.rs b/src/buffer/factory.rs deleted file mode 100644 index 05cf36d..0000000 --- a/src/buffer/factory.rs +++ /dev/null @@ -1,186 +0,0 @@ -//! ### factory -//! -//! a helper trait to produce Operation Sequences, knowing the current -//! state of the buffer -//! -//! ```rust -//! use codemp::buffer::factory::{OperationFactory, SimpleOperationFactory}; -//! -//! let mut factory = SimpleOperationFactory::from(""); -//! let op = factory.insert("asd", 0); -//! factory.update(op)?; -//! assert_eq!(factory.content(), "asd"); -//! # Ok::<(), codemp::ot::OTError>(()) -//! ``` -//! -//! use [OperationFactory::insert] to add new characters at a specific index -//! -//! ```rust -//! # use codemp::buffer::factory::{OperationFactory, SimpleOperationFactory}; -//! # let mut factory = SimpleOperationFactory::from("asd"); -//! factory.update(factory.insert(" dsa", 3))?; -//! assert_eq!(factory.content(), "asd dsa"); -//! # Ok::<(), codemp::ot::OTError>(()) -//! ``` -//! -//! use [OperationFactory::delta] to arbitrarily change text at any position -//! -//! ```rust -//! # use codemp::buffer::factory::{OperationFactory, SimpleOperationFactory}; -//! # let mut factory = SimpleOperationFactory::from("asd dsa"); -//! let op = factory.delta(2, " xxx ", 5).expect("replaced region is equal to origin"); -//! factory.update(op)?; -//! assert_eq!(factory.content(), "as xxx sa"); -//! # Ok::<(), codemp::ot::OTError>(()) -//! ``` -//! -//! use [OperationFactory::delete] to remove characters from given index -//! -//! ```rust -//! # use codemp::buffer::factory::{OperationFactory, SimpleOperationFactory}; -//! # let mut factory = SimpleOperationFactory::from("as xxx sa"); -//! factory.update(factory.delete(2, 5))?; -//! assert_eq!(factory.content(), "assa"); -//! # Ok::<(), codemp::ot::OTError>(()) -//! ``` -//! -//! use [OperationFactory::replace] to completely replace buffer content -//! -//! ```rust -//! # use codemp::buffer::factory::{OperationFactory, SimpleOperationFactory}; -//! # let mut factory = SimpleOperationFactory::from("assa"); -//! let op = factory.replace("from scratch").expect("replace is equal to origin"); -//! factory.update(op)?; -//! assert_eq!(factory.content(), "from scratch"); -//! # Ok::<(), codemp::ot::OTError>(()) -//! ``` -//! -//! use [OperationFactory::cancel] to remove characters at index, but backwards -//! -//! ```rust -//! # use codemp::buffer::factory::{OperationFactory, SimpleOperationFactory}; -//! # let mut factory = SimpleOperationFactory::from("from scratch"); -//! factory.update(factory.cancel(12, 8))?; -//! assert_eq!(factory.content(), "from"); -//! # Ok::<(), codemp::ot::OTError>(()) -//! ``` -//! - -use std::ops::Range; - -use operational_transform::{OperationSeq, Operation, OTError}; -use similar::{TextDiff, ChangeTag}; - -/// calculate leading no-ops in given opseq -pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } - -/// calculate tailing no-ops in given opseq -pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } - -const fn count_noop(op: Option<&Operation>) -> u64 { - match op { - None => 0, - Some(Operation::Retain(n)) => *n, - Some(_) => 0, - } -} - -/// return the range on which the operation seq is actually applying its changes -pub fn op_effective_range(op: &OperationSeq) -> Range { - let first = leading_noop(op.ops()); - let last = op.base_len() as u64 - tailing_noop(op.ops()); - first..last -} - -/// a helper trait that any string container can implement, which generates opseqs -pub trait OperationFactory { - /// the current content of the buffer - fn content(&self) -> String; - - /// completely replace the buffer with given text - fn replace(&self, txt: &str) -> Option { - self.delta(0, txt, self.content().len()) - } - - /// transform buffer in range [start..end] with given text - fn delta(&self, start: usize, txt: &str, end: usize) -> Option { - let mut out = OperationSeq::default(); - let content = self.content(); - let tail_skip = content.len() - end; // TODO len is number of bytes, not chars - let content_slice = &content[start..end]; - - if content_slice == txt { - // if slice equals given text, no operation should be taken - return None; - } - - out.retain(start as u64); - - let diff = TextDiff::from_chars(content_slice, txt); - - for change in diff.iter_all_changes() { - match change.tag() { - ChangeTag::Equal => out.retain(1), - ChangeTag::Delete => out.delete(1), - ChangeTag::Insert => out.insert(change.value()), - } - } - - out.retain(tail_skip as u64); - - Some(out) - } - - /// insert given chars at target position - fn insert(&self, txt: &str, pos: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let total = self.content().len() as u64; - out.retain(pos); - out.insert(txt); - out.retain(total - pos); - out - } - - /// delete n characters forward at given position - fn delete(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos); - out.delete(count); - out.retain(len - (pos+count)); - out - } - - /// delete n characters backwards at given position - fn cancel(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos - count); - out.delete(count); - out.retain(len - pos); - out - } -} - -/// a simple example operation factory that just wraps a string -pub struct SimpleOperationFactory(String); - -impl SimpleOperationFactory { - /// applies given OperationSeq to underlying string, changing it - pub fn update(&mut self, op: OperationSeq) -> Result<(), OTError> { - self.0 = op.apply(&self.0)?; - Ok(()) - } -} - -impl From::<&str> for SimpleOperationFactory { - fn from(value: &str) -> Self { - SimpleOperationFactory(value.to_string()) - } -} - -impl OperationFactory for SimpleOperationFactory { - fn content(&self) -> String { - self.0.clone() - } -} diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index c8c3fa5..044547e 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -13,8 +13,6 @@ pub(crate) mod worker; /// buffer controller implementation pub mod controller; -/// operation factory, with helper functions to produce opseqs -pub mod factory; pub use factory::OperationFactory; pub use controller::BufferController as Controller; diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 537cc21..49e364f 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -9,11 +9,11 @@ use tonic::{async_trait, Streaming}; use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; -use crate::api::ControllerWorker; +use crate::api::controller::ControllerWorker; +use crate::api::factory::{leading_noop, tailing_noop}; use super::TextChange; use super::controller::BufferController; -use super::factory::{leading_noop, tailing_noop}; pub(crate) struct BufferControllerWorker { diff --git a/src/client.rs b/src/client.rs index 531a50c..99a7194 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,8 @@ use crate::{ proto::{ buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, }, - Error, api::ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, + Error, api::controller::ControllerWorker, + buffer::{controller::BufferController, worker::BufferControllerWorker}, }; diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 5666215..e0196bd 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, transport::Channel, async_trait}; -use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::ControllerWorker}; +use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::controller::ControllerWorker}; use super::controller::CursorController; diff --git a/src/prelude.rs b/src/prelude.rs index 4c50214..40320ff 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -8,10 +8,10 @@ pub use crate::{ Client as CodempClient, api::Controller as CodempController, + api::OperationFactory as CodempOperationFactory, cursor::Controller as CodempCursorController, buffer::Controller as CodempBufferController, - buffer::OperationFactory as CodempOperationFactory, ot::OperationSeq as CodempOperationSeq, buffer::TextChange as CodempTextChange,