From 96217d1a1a2efa3c9ea681cd660baadbac6c3f5e Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 18:58:42 +0200 Subject: [PATCH] feat: standardized Controller and ControllerWorker --- src/buffer/controller.rs | 42 +++------------ src/buffer/handle.rs | 113 +++++++++++++++++++++++---------------- src/cursor/client.rs | 1 - src/cursor/tracker.rs | 37 ++++++++----- src/errors.rs | 8 ++- src/lib.rs | 13 +++-- 6 files changed, 112 insertions(+), 102 deletions(-) delete mode 100644 src/cursor/client.rs diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9e03318..45c0d59 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,11 +1,10 @@ -use operational_transform::OperationSeq; -use tonic::{transport::Channel, Status, Streaming, async_trait}; +use tonic::{transport::Channel, Status}; use uuid::Uuid; use crate::{ ControllerWorker, - buffer::handle::{BufferHandle, OperationControllerEditor, OperationControllerWorker}, - proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, + buffer::handle::{BufferHandle, OperationControllerWorker}, + proto::{buffer_client::BufferClient, BufferPayload}, }; #[derive(Clone)] @@ -75,45 +74,16 @@ impl BufferController { let stream = self.client.attach(req).await?.into_inner(); - let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); + let controller = OperationControllerWorker::new(self.id().to_string(), &content, path); let factory = controller.subscribe(); + let client = self.client.clone(); tokio::spawn(async move { tracing::debug!("buffer worker started"); - controller.work().await; + controller.work(client, stream).await; tracing::debug!("buffer worker stopped"); }); Ok(factory) } } - -#[async_trait] -impl OperationControllerEditor for (BufferController, Streaming) { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool { - let req = OperationRequest { - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path, - user: self.0.id().to_string(), - }; - match self.0.client.edit(req).await { - Ok(_) => true, - Err(e) => { - tracing::error!("error sending edit: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), - Ok(None) => None, - Err(e) => { - tracing::error!("could not receive edit from server: {}", e); - None - } - } - } -} diff --git a/src/buffer/handle.rs b/src/buffer/handle.rs index ecd1a12..bbebc66 100644 --- a/src/buffer/handle.rs +++ b/src/buffer/handle.rs @@ -2,10 +2,12 @@ use std::{sync::Arc, collections::VecDeque, ops::Range}; use operational_transform::OperationSeq; use tokio::sync::{watch, mpsc, broadcast, Mutex}; -use tonic::async_trait; +use tonic::transport::Channel; +use tonic::{async_trait, Streaming}; -use crate::ControllerWorker; -use crate::errors::IgnorableError; +use crate::proto::{OperationRequest, RawOp}; +use crate::proto::buffer_client::BufferClient; +use crate::{ControllerWorker, Controller, CodempError}; use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; pub struct TextChange { @@ -26,53 +28,63 @@ impl OperationFactory for BufferHandle { } } -impl BufferHandle { - async fn poll(&self) -> Option { - let op = self.stream.lock().await.recv().await.ok()?; +#[async_trait] +impl Controller for BufferHandle { + type Input = OperationSeq; + + async fn recv(&self) -> Result { + let op = self.stream.lock().await.recv().await?; let after = self.content.borrow().clone(); let skip = leading_noop(op.ops()) as usize; let before_len = op.base_len(); let tail = tailing_noop(op.ops()) as usize; let span = skip..before_len-tail; let content = after[skip..after.len()-tail].to_string(); - Some(TextChange { span, content }) + Ok(TextChange { span, content }) } - async fn apply(&self, op: OperationSeq) { - self.operations.send(op).await - .unwrap_or_warn("could not apply+send operation") + async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { + Ok(self.operations.send(op).await?) } - - // fn subscribe(&self) -> Self { - // OperationControllerHandle { - // content: self.content.clone(), - // operations: self.operations.clone(), - // original: self.original.clone(), - // stream: Arc::new(Mutex::new(self.original.subscribe())), - // } - // } } -#[async_trait] -pub(crate) trait OperationControllerEditor { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct OperationControllerWorker { +pub(crate) struct OperationControllerWorker { + uid: String, pub(crate) content: watch::Sender, pub(crate) operations: mpsc::Receiver, pub(crate) stream: Arc>, pub(crate) queue: VecDeque, receiver: watch::Receiver, sender: mpsc::Sender, - client: C, buffer: String, path: String, } +impl OperationControllerWorker { + pub fn new(uid: String, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + let (op_tx, op_rx) = mpsc::channel(64); + let (s_tx, _s_rx) = broadcast::channel(64); + OperationControllerWorker { + uid, + content: txt_tx, + operations: op_rx, + stream: Arc::new(s_tx), + receiver: txt_rx, + sender: op_tx, + queue: VecDeque::new(), + buffer: buffer.to_string(), + path: path.to_string(), + } + } +} + #[async_trait] -impl ControllerWorker for OperationControllerWorker { +impl ControllerWorker for OperationControllerWorker { + type Controller = BufferHandle; + type Tx = BufferClient; + type Rx = Streaming; + fn subscribe(&self) -> BufferHandle { BufferHandle { content: self.receiver.clone(), @@ -81,10 +93,10 @@ impl ControllerWorker for Op } } - async fn work(mut self) { + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { loop { let op = tokio::select! { - Some(operation) = self.client.recv() => { + Some(operation) = recv_opseq(&mut rx) => { let mut out = operation; for op in self.queue.iter_mut() { (*op, out) = op.transform(&out).unwrap(); @@ -102,29 +114,36 @@ impl ControllerWorker for Op self.content.send(self.buffer.clone()).unwrap(); while let Some(op) = self.queue.get(0) { - if !self.client.edit(self.path.clone(), op.clone()).await { break } + if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break } self.queue.pop_front(); } } } - } -impl OperationControllerWorker { - pub fn new(client: C, buffer: &str, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); - let (op_tx, op_rx) = mpsc::channel(64); - let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { - content: txt_tx, - operations: op_rx, - stream: Arc::new(s_tx), - receiver: txt_rx, - sender: op_tx, - queue: VecDeque::new(), - buffer: buffer.to_string(), - path: path.to_string(), - client, +async fn send_opseq(tx: &mut BufferClient, uid: String, path: String, op: OperationSeq) -> bool { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path, + user: uid, + }; + match tx.edit(req).await { + Ok(_) => true, + Err(e) => { + tracing::error!("error sending edit: {}", e); + false + } + } +} + +async fn recv_opseq(rx: &mut Streaming) -> Option { + match rx.message().await { + Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(None) => None, + Err(e) => { + tracing::error!("could not receive edit from server: {}", e); + None } } } diff --git a/src/cursor/client.rs b/src/cursor/client.rs deleted file mode 100644 index 756547a..0000000 --- a/src/cursor/client.rs +++ /dev/null @@ -1 +0,0 @@ -// TODO separate cursor movement from buffer operations in protocol! diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs index 7fde457..6983f40 100644 --- a/src/cursor/tracker.rs +++ b/src/cursor/tracker.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; -use tonic::{Streaming, transport::Channel}; +use tonic::{Streaming, transport::Channel, async_trait}; -use crate::{proto::{RowColumn, CursorPosition, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; +use crate::{proto::{CursorPosition, cursor_client::CursorClient, RowColumn}, errors::IgnorableError, CodempError, Controller, ControllerWorker}; pub struct CursorTracker { uid: String, @@ -11,19 +11,22 @@ pub struct CursorTracker { stream: Mutex>, } -impl CursorTracker { - pub async fn moved(&self, path: &str, start: RowColumn, end: RowColumn) -> Result<(), CodempError> { +#[async_trait] +impl Controller for CursorTracker { + type Input = (String, RowColumn, RowColumn); + + async fn send(&self, (buffer, start, end): Self::Input) -> Result<(), CodempError> { Ok(self.op.send(CursorPosition { user: self.uid.clone(), - buffer: path.to_string(), - start: start.into(), - end: end.into(), + start: Some(start), + end: Some(end), + buffer, }).await?) } // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? - pub async fn recv(&self) -> Result { + async fn recv(&self) -> Result { let mut stream = self.stream.lock().await; match stream.recv().await { Ok(x) => Ok(x), @@ -51,14 +54,14 @@ impl CursorTracker { // } } -pub(crate) struct CursorPositionTrackerWorker { +pub(crate) struct CursorTrackerWorker { uid: String, producer: mpsc::Sender, op: mpsc::Receiver, channel: Arc>, } -impl CursorPositionTrackerWorker { +impl CursorTrackerWorker { pub(crate) fn new(uid: String) -> Self { let (op_tx, op_rx) = mpsc::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64); @@ -69,8 +72,15 @@ impl CursorPositionTrackerWorker { channel: Arc::new(cur_tx), } } +} - pub(crate) fn subscribe(&self) -> CursorTracker { +#[async_trait] +impl ControllerWorker for CursorTrackerWorker { + type Controller = CursorTracker; + type Tx = CursorClient; + type Rx = Streaming; + + fn subscribe(&self) -> CursorTracker { CursorTracker { uid: self.uid.clone(), op: self.producer.clone(), @@ -78,12 +88,11 @@ impl CursorPositionTrackerWorker { } } - // TODO is it possible to avoid passing directly tonic Streaming and proto BufferClient ? - pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { loop { tokio::select!{ Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { todo!() } // tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, else => break, } } diff --git a/src/errors.rs b/src/errors.rs index 54caa6b..b9c1f80 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ use std::{error::Error, fmt::Display}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, broadcast}; use tonic::{Status, Code}; use tracing::warn; @@ -66,3 +66,9 @@ impl From> for CodempError { CodempError::Channel { send: true } } } + +impl From for CodempError { + fn from(_value: broadcast::error::RecvError) -> Self { + CodempError::Channel { send: false } + } +} diff --git a/src/lib.rs b/src/lib.rs index bdd43d3..e376b81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod errors; pub mod buffer; pub mod state; +pub mod client; pub use tonic; pub use tokio; @@ -19,12 +20,18 @@ pub use errors::CodempError; #[tonic::async_trait] // TODO move this somewhere? pub(crate) trait ControllerWorker { - fn subscribe(&self) -> T; - async fn work(self); + type Controller : Controller; + type Tx; + type Rx; + + fn subscribe(&self) -> Self::Controller; + async fn work(self, tx: Self::Tx, rx: Self::Rx); } #[tonic::async_trait] pub trait Controller { + type Input; + + async fn send(&self, x: Self::Input) -> Result<(), CodempError>; async fn recv(&self) -> Result; - async fn send(&self, x: T) -> Result<(), CodempError>; }