From 02b2588073c64a892ef38196b2e63fa8bd684bc0 Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 30 Jul 2023 17:48:55 +0200 Subject: [PATCH 1/9] feat: major restructure, workspace rework, tweaks all controllers use internal mutability so that they can all be put behind Arcs --- client/vscode/src/lib.rs | 18 +++-- src/{ => buffer}/client.rs | 20 ++--- .../buffer.rs => buffer/controller.rs} | 45 +++++------ src/{ => buffer}/factory.rs | 21 ++++- src/buffer/mod.rs | 3 + src/controller/mod.rs | 30 ------- .../cursor.rs => cursor/controller.rs} | 46 +++++++---- src/{cursor.rs => cursor/mod.rs} | 2 + src/errors.rs | 44 +++++++++++ src/lib.rs | 16 +++- src/proto.rs | 1 - src/workspace.rs | 57 ------------- src/workspace/controller.rs | 79 +++++++++++++++++++ src/workspace/mod.rs | 1 + 14 files changed, 237 insertions(+), 146 deletions(-) rename src/{ => buffer}/client.rs (84%) rename src/{controller/buffer.rs => buffer/controller.rs} (77%) rename src/{ => buffer}/factory.rs (74%) create mode 100644 src/buffer/mod.rs delete mode 100644 src/controller/mod.rs rename src/{controller/cursor.rs => cursor/controller.rs} (59%) rename src/{cursor.rs => cursor/mod.rs} (95%) delete mode 100644 src/proto.rs delete mode 100644 src/workspace.rs create mode 100644 src/workspace/controller.rs create mode 100644 src/workspace/mod.rs diff --git a/client/vscode/src/lib.rs b/client/vscode/src/lib.rs index 7ad69ef..4c2794f 100644 --- a/client/vscode/src/lib.rs +++ b/client/vscode/src/lib.rs @@ -3,9 +3,13 @@ use std::sync::Arc; use neon::prelude::*; use once_cell::sync::OnceCell; use codemp::{ - controller::{cursor::{CursorSubscriber, CursorControllerHandle}, buffer::{OperationControllerHandle, OperationControllerSubscriber}}, - client::CodempClient, - proto::buffer_client::BufferClient, factory::OperationFactory, + cursor::controller::{CursorSubscriber, CursorControllerHandle}, + buffer::{ + controller::{OperationControllerHandle, OperationControllerSubscriber}, + client::CodempClient, + factory::OperationFactory, + }, + proto::buffer_client::BufferClient, }; use codemp::tokio::{runtime::Runtime, sync::Mutex}; @@ -75,7 +79,7 @@ fn create_client(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); runtime(&mut cx)?.spawn(async move { - match rc.lock().await.create(path, content).await { + match rc.lock().await.create(&path, content.as_deref()).await { Ok(accepted) => deferred.settle_with(&channel, move |mut cx| Ok(cx.boolean(accepted))), Err(e) => deferred.settle_with(&channel, move |mut cx| cx.throw_error::>(e.to_string())), } @@ -123,7 +127,7 @@ fn attach_client(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); runtime(&mut cx)?.spawn(async move { - match rc.lock().await.attach(path).await { + match rc.lock().await.attach(&path).await { Ok(controller) => { deferred.settle_with(&channel, move |mut cx| { let obj = cx.empty_object(); @@ -183,7 +187,7 @@ fn callback_operation(mut cx: FunctionContext) -> JsResult { let boxed : Handle> = this.get(&mut cx, "boxed")?; let callback = Arc::new(cx.argument::(0)?.root(&mut cx)); - let mut rc = boxed.0.clone(); + let rc = boxed.0.clone(); let channel = cx.channel(); // TODO when garbage collecting OperationController stop this worker @@ -213,7 +217,7 @@ fn callback_cursor(mut cx: FunctionContext) -> JsResult { let boxed : Handle> = this.get(&mut cx, "boxed")?; let callback = Arc::new(cx.argument::(0)?.root(&mut cx)); - let mut rc = boxed.0.clone(); + let rc = boxed.0.clone(); let channel = cx.channel(); // TODO when garbage collecting OperationController stop this worker diff --git a/src/client.rs b/src/buffer/client.rs similarity index 84% rename from src/client.rs rename to src/buffer/client.rs index 69fae5d..aff3254 100644 --- a/src/client.rs +++ b/src/buffer/client.rs @@ -3,10 +3,9 @@ use tonic::{transport::Channel, Status, Streaming, async_trait}; use uuid::Uuid; use crate::{ - controller::{ControllerWorker, - cursor::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, - buffer::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker} - }, + ControllerWorker, + cursor::controller::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, + buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest, Cursor}, }; @@ -27,11 +26,12 @@ impl CodempClient { Ok(BufferClient::connect(dest.to_string()).await?.into()) } - pub fn id(&self) -> &str { &self.id } + pub fn id(&self) -> &str { &self.id } - pub async fn create(&mut self, path: String, content: Option) -> Result { + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { let req = BufferPayload { - path, content, + path: path.to_string(), + content: content.map(|x| x.to_string()), user: self.id.clone(), }; @@ -61,9 +61,9 @@ impl CodempClient { Ok(handle) } - pub async fn attach(&mut self, path: String) -> Result { + pub async fn attach(&mut self, path: &str) -> Result { let req = BufferPayload { - path: path.clone(), + path: path.to_string(), content: None, user: self.id.clone(), }; @@ -76,7 +76,7 @@ impl CodempClient { let stream = self.client.attach(req).await?.into_inner(); - let controller = OperationControllerWorker::new((self.clone(), stream), content, path); + let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); let factory = controller.subscribe(); tokio::spawn(async move { diff --git a/src/controller/buffer.rs b/src/buffer/controller.rs similarity index 77% rename from src/controller/buffer.rs rename to src/buffer/controller.rs index 11ca642..f7dcd93 100644 --- a/src/controller/buffer.rs +++ b/src/buffer/controller.rs @@ -1,12 +1,12 @@ use std::{sync::Arc, collections::VecDeque, ops::Range}; use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, broadcast}; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::async_trait; -use super::{leading_noop, tailing_noop, ControllerWorker}; +use crate::ControllerWorker; use crate::errors::IgnorableError; -use crate::factory::OperationFactory; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; pub struct TextChange { pub span: Range, @@ -15,26 +15,16 @@ pub struct TextChange { #[async_trait] pub trait OperationControllerSubscriber { - async fn poll(&mut self) -> Option; + async fn poll(&self) -> Option; async fn apply(&self, op: OperationSeq); } +#[derive(Clone)] pub struct OperationControllerHandle { content: watch::Receiver, operations: mpsc::Sender, original: Arc>, - stream: broadcast::Receiver, -} - -impl Clone for OperationControllerHandle { - fn clone(&self) -> Self { - OperationControllerHandle { - content: self.content.clone(), - operations: self.operations.clone(), - original: self.original.clone(), - stream: self.original.subscribe(), - } - } + stream: Arc>>, } #[async_trait] @@ -46,8 +36,8 @@ impl OperationFactory for OperationControllerHandle { #[async_trait] impl OperationControllerSubscriber for OperationControllerHandle { - async fn poll(&mut self) -> Option { - let op = self.stream.recv().await.ok()?; + async fn poll(&self) -> Option { + let op = self.stream.lock().await.recv().await.ok()?; let after = self.content.borrow().clone(); let skip = leading_noop(op.ops()) as usize; let before_len = op.base_len(); @@ -61,6 +51,15 @@ impl OperationControllerSubscriber for OperationControllerHandle { self.operations.send(op).await .unwrap_or_warn("could not apply+send operation") } + + // fn subscribe(&self) -> Self { + // OperationControllerHandle { + // content: self.content.clone(), + // operations: self.operations.clone(), + // original: self.original.clone(), + // stream: Arc::new(Mutex::new(self.original.subscribe())), + // } + // } } #[async_trait] @@ -88,7 +87,7 @@ impl ControllerWorker ControllerWorker OperationControllerWorker { - pub fn new(client: C, buffer: String, path: String) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.clone()); + pub fn new(client: C, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::channel(64); let (s_tx, _s_rx) = broadcast::channel(64); OperationControllerWorker { @@ -133,7 +132,9 @@ impl OperationControllerWorker { receiver: txt_rx, sender: op_tx, queue: VecDeque::new(), - client, buffer, path + buffer: buffer.to_string(), + path: path.to_string(), + client, } } } diff --git a/src/factory.rs b/src/buffer/factory.rs similarity index 74% rename from src/factory.rs rename to src/buffer/factory.rs index b9c47c2..4c175e0 100644 --- a/src/factory.rs +++ b/src/buffer/factory.rs @@ -1,6 +1,25 @@ -use operational_transform::OperationSeq; +use std::ops::Range; + +use operational_transform::{OperationSeq, Operation}; use similar::{TextDiff, ChangeTag}; +pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } +pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } + +const fn count_noop(op: Option<&Operation>) -> u64 { + match op { + None => 0, + Some(Operation::Retain(n)) => *n, + Some(_) => 0, + } +} + +pub fn op_effective_range(op: &OperationSeq) -> Range { + let first = leading_noop(op.ops()); + let last = op.base_len() as u64 - tailing_noop(op.ops()); + first..last +} + pub trait OperationFactory { fn content(&self) -> String; diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs new file mode 100644 index 0000000..ca7ad3c --- /dev/null +++ b/src/buffer/mod.rs @@ -0,0 +1,3 @@ +pub mod factory; +pub mod client; +pub mod controller; diff --git a/src/controller/mod.rs b/src/controller/mod.rs deleted file mode 100644 index 86ca589..0000000 --- a/src/controller/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -pub mod buffer; -pub mod cursor; - -use std::ops::Range; - -use operational_transform::{Operation, OperationSeq}; -use tonic::async_trait; - -#[async_trait] -pub trait ControllerWorker { - fn subscribe(&self) -> T; - async fn work(self); -} - -pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } -pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } - -const fn count_noop(op: Option<&Operation>) -> u64 { - match op { - None => 0, - Some(Operation::Retain(n)) => *n, - Some(_) => 0, - } -} - -pub fn op_effective_range(op: &OperationSeq) -> Range { - let first = leading_noop(op.ops()); - let last = op.base_len() as u64 - tailing_noop(op.ops()); - first..last -} diff --git a/src/controller/cursor.rs b/src/cursor/controller.rs similarity index 59% rename from src/controller/cursor.rs rename to src/cursor/controller.rs index 267e358..54985e4 100644 --- a/src/controller/cursor.rs +++ b/src/cursor/controller.rs @@ -1,30 +1,31 @@ use std::sync::Arc; -use tokio::sync::{mpsc, broadcast}; +use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex}; use tonic::async_trait; -use crate::{proto::{Position, Cursor}, errors::IgnorableError, controller::ControllerWorker}; +use crate::{proto::{Position, Cursor}, errors::IgnorableError, ControllerWorker}; #[async_trait] pub trait CursorSubscriber { async fn send(&self, path: &str, start: Position, end: Position); - async fn poll(&mut self) -> Option; + async fn poll(&self) -> Option; + fn try_poll(&self) -> Option>; // TODO fuck this fuck neovim } pub struct CursorControllerHandle { uid: String, op: mpsc::Sender, - stream: broadcast::Receiver, original: Arc>, + stream: Mutex>, } impl Clone for CursorControllerHandle { fn clone(&self) -> Self { - Self { + CursorControllerHandle { uid: self.uid.clone(), op: self.op.clone(), - stream: self.original.subscribe(), - original: self.original.clone() + original: self.original.clone(), + stream: Mutex::new(self.original.subscribe()), } } } @@ -40,12 +41,30 @@ impl CursorSubscriber for CursorControllerHandle { }).await.unwrap_or_warn("could not send cursor op") } - async fn poll(&mut self) -> Option { - match self.stream.recv().await { + // TODO is this cancelable? so it can be used in tokio::select! + async fn poll(&self) -> Option { + let mut stream = self.stream.lock().await; + match stream.recv().await { Ok(x) => Some(x), - Err(e) => { - tracing::warn!("could not poll for cursor: {}", e); - None + Err(RecvError::Closed) => None, + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Some(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + fn try_poll(&self) -> Option> { + match self.stream.try_lock() { + Err(_) => None, + Ok(mut x) => match x.try_recv() { + Ok(x) => Some(Some(x)), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Closed) => Some(None), + Err(TryRecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Some(Some(x.try_recv().expect("could not receive after lagging"))) + } } } } @@ -84,8 +103,8 @@ impl ControllerWorker for Curso CursorControllerHandle { uid: self.uid.clone(), op: self.producer.clone(), - stream: self.channel.subscribe(), original: self.channel.clone(), + stream: Mutex::new(self.channel.subscribe()), } } @@ -98,7 +117,6 @@ impl ControllerWorker for Curso } } } - } diff --git a/src/cursor.rs b/src/cursor/mod.rs similarity index 95% rename from src/cursor.rs rename to src/cursor/mod.rs index 4ee265e..b4cfefc 100644 --- a/src/cursor.rs +++ b/src/cursor/mod.rs @@ -1,3 +1,5 @@ +pub mod controller; + use crate::proto::{Position, Cursor}; impl From:: for (i32, i32) { diff --git a/src/errors.rs b/src/errors.rs index b8dd254..b3f53cc 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,6 @@ +use std::{error::Error, fmt::Display}; + +use tonic::{Status, Code}; use tracing::warn; pub trait IgnorableError { @@ -13,3 +16,44 @@ where E : std::fmt::Display { } } } + +// TODO split this into specific errors for various parts of the library +#[derive(Debug)] +pub enum CodempError { + Transport { + status: Code, + message: String, + }, + Channel { }, + + // TODO filler error, remove later + Filler { + message: String, + }, +} + +impl Error for CodempError {} + +impl Display for CodempError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Transport { status, message } => write!(f, "Transport error: ({}) {}", status, message), + Self::Channel { } => write!(f, "Channel error"), + _ => write!(f, "Unknown error"), + } + } +} + +impl From for CodempError { + fn from(status: Status) -> Self { + CodempError::Transport { status: status.code(), message: status.message().to_string() } + } +} + +impl From for CodempError { + fn from(err: tonic::transport::Error) -> Self { + CodempError::Transport { + status: Code::Unknown, message: format!("underlying transport error: {:?}", err) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 2debfb8..46fb964 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,20 @@ -pub mod proto; -pub mod client; pub mod workspace; -pub mod controller; pub mod cursor; pub mod errors; -pub mod factory; +pub mod buffer; pub use tonic; pub use tokio; pub use operational_transform as ot; +use tonic::async_trait; + +#[async_trait] // TODO move this somewhere? +pub trait ControllerWorker { + fn subscribe(&self) -> T; + async fn work(self); +} + +pub mod proto { + tonic::include_proto!("buffer"); +} diff --git a/src/proto.rs b/src/proto.rs deleted file mode 100644 index 4e168ef..0000000 --- a/src/proto.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("buffer"); diff --git a/src/workspace.rs b/src/workspace.rs deleted file mode 100644 index 56609da..0000000 --- a/src/workspace.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{collections::BTreeMap, ops::Range}; - -use tokio::sync::RwLock; -use tonic::Status; - -use crate::{ - client::CodempClient, factory::OperationFactory, - controller::{buffer::{OperationControllerHandle, OperationControllerSubscriber}, - cursor::{CursorControllerHandle, CursorSubscriber}}, proto::Cursor, -}; - -pub struct Workspace { - client: CodempClient, - buffers: RwLock>, - cursor: RwLock, -} - -impl Workspace { - pub async fn new(mut client: CodempClient) -> Result { - Ok( - Workspace { - cursor: RwLock::new(client.listen().await?), - client, - buffers: RwLock::new(BTreeMap::new()), - } - ) - } - - pub async fn create(&self, path: &str, content: Option) -> Result { - self.client.clone().create(path.into(), content).await - } - - pub async fn attach(&self, path: String) -> Result<(), Status> { - self.buffers.write().await.insert( - path.clone(), - self.client.clone().attach(path).await? - ); - Ok(()) - } - - pub async fn diff(&self, path: &str, span: Range, text: &str) { - if let Some(controller) = self.buffers.read().await.get(path) { - if let Some(op) = controller.delta(span.start, text, span.end) { - controller.apply(op).await - } - } - } - - pub async fn send(&self, path: &str, start: (i32, i32), end: (i32, i32)) { - self.cursor.read().await.send(path, start.into(), end.into()).await - } - - pub async fn recv(&self) -> Option { - self.cursor.write().await.poll().await - - } -} diff --git a/src/workspace/controller.rs b/src/workspace/controller.rs new file mode 100644 index 0000000..3b6cab4 --- /dev/null +++ b/src/workspace/controller.rs @@ -0,0 +1,79 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use tokio::sync::RwLock; +use tonic::async_trait; + +use crate::{ + buffer::{client::CodempClient, controller::OperationControllerSubscriber}, + cursor::controller::CursorSubscriber, errors::CodempError, +}; + +pub struct Workspace { + client: CodempClient, + buffers: RwLock, BufferController>>, + cursor: CursorController, +} + +pub type CursorController = Arc; +pub type BufferController = Arc; + +#[async_trait] +pub trait WorkspaceHandle { + fn cursor(&self) -> CursorController; + async fn buffer(&self, path: &str) -> Option; + async fn attach(&self, path: &str) -> Result<(), CodempError>; + async fn create(&self, path: &str, content: Option<&str>) -> Result; +} + +impl Workspace { + pub async fn new(dest: &str) -> Result { + let mut client = CodempClient::new(dest).await?; + let cursor = Arc::new(client.listen().await?); + Ok( + Workspace { + buffers: RwLock::new(BTreeMap::new()), + cursor, + client, + } + ) + } +} + +#[async_trait] +impl WorkspaceHandle for Workspace { + // Cursor + fn cursor(&self) -> CursorController { + self.cursor.clone() + } + + // Buffer + async fn buffer(&self, path: &str) -> Option { + self.buffers.read().await.get(path).cloned() + } + + async fn create(&self, path: &str, content: Option<&str>) -> Result { + Ok(self.client.clone().create(path, content).await?) + } + + async fn attach(&self, path: &str) -> Result<(), CodempError> { + let controller = self.client.clone().attach(path).await?; + self.buffers.write().await.insert(path.into(), Arc::new(controller)); + Ok(()) + } + + // pub async fn diff(&self, path: &str, span: Range, text: &str) { + // if let Some(controller) = self.inner.read().await.buffers.get(path) { + // if let Some(op) = controller.delta(span.start, text, span.end) { + // controller.apply(op).await + // } + // } + // } + + // async fn send(&self, path: &str, start: (i32, i32), end: (i32, i32)) { + // self.inner.read().await.cursor.send(path, start.into(), end.into()).await + // } + + // pub async fn recv(&self) -> Option { + // self.inner.write().await.cursor.poll().await + // } +} diff --git a/src/workspace/mod.rs b/src/workspace/mod.rs new file mode 100644 index 0000000..cb9e0ac --- /dev/null +++ b/src/workspace/mod.rs @@ -0,0 +1 @@ +pub mod controller; From 5cddb27b98fac7be1e992ee8c46bade2049fff3f Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 30 Jul 2023 22:58:24 +0200 Subject: [PATCH 2/9] feat: initial features splitting, added singleton --- Cargo.toml | 8 +++++++- src/buffer/controller.rs | 19 +++++++++++++++---- src/cursor/client.rs | 1 + src/lib.rs | 20 ++++++++++++++------ src/{workspace/controller.rs => state.rs} | 19 +++++++++++++++++-- src/workspace/mod.rs | 1 - 6 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 src/cursor/client.rs rename src/{workspace/controller.rs => state.rs} (79%) delete mode 100644 src/workspace/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 00370e3..0e6edc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ name = "codemp" # core tracing = "0.1" tonic = { version = "0.9", features = ["tls", "tls-roots"] } -prost = "0.11.8" +prost = { version = "0.11.8", optional = true } md5 = "0.7.0" uuid = { version = "1.3.1", features = ["v4"] } operational-transform = { version = "0.6", features = ["serde"] } @@ -23,6 +23,12 @@ serde = { version = "1", optional = false } serde_json = { version = "1", optional = false } tracing-subscriber = { version = "0.3", optional = true } similar = { version = "2.2", features = ["inline"] } +lazy_static = { version = "1.4", optional = true } [build-dependencies] tonic-build = "0.9" + +[features] +default = ["proto", "static"] +proto = ["dep:prost"] +static = ["dep:lazy_static"] diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index f7dcd93..39cecb1 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -14,17 +14,28 @@ pub struct TextChange { } #[async_trait] -pub trait OperationControllerSubscriber { +pub trait OperationControllerSubscriber : OperationFactory { async fn poll(&self) -> Option; async fn apply(&self, op: OperationSeq); } -#[derive(Clone)] + pub struct OperationControllerHandle { content: watch::Receiver, operations: mpsc::Sender, original: Arc>, - stream: Arc>>, + stream: Mutex>, +} + +impl Clone for OperationControllerHandle { + fn clone(&self) -> Self { + OperationControllerHandle { + content: self.content.clone(), + operations: self.operations.clone(), + original: self.original.clone(), + stream: Mutex::new(self.original.subscribe()), + } + } } #[async_trait] @@ -87,7 +98,7 @@ impl ControllerWorker { +pub use errors::CodempError; + +#[tonic::async_trait] // TODO move this somewhere? +pub(crate) trait ControllerWorker { fn subscribe(&self) -> T; async fn work(self); } -pub mod proto { - tonic::include_proto!("buffer"); +#[tonic::async_trait] +pub trait Controller { + async fn recv(&self) -> Result; + async fn send(&self, x: T) -> Result<(), CodempError>; } diff --git a/src/workspace/controller.rs b/src/state.rs similarity index 79% rename from src/workspace/controller.rs rename to src/state.rs index 3b6cab4..2431ddb 100644 --- a/src/workspace/controller.rs +++ b/src/state.rs @@ -8,6 +8,21 @@ use crate::{ cursor::controller::CursorSubscriber, errors::CodempError, }; +#[cfg(feature = "static")] +pub mod instance { + use tokio::runtime::Runtime; + use super::Workspace; + + const CODEMP_DEFAULT_HOST : &str = "http://fantabos.co:50051"; + + lazy_static::lazy_static! { + static ref RUNTIME : Runtime = Runtime::new().expect("could not create tokio runtime"); + static ref WORKSPACE : Workspace = RUNTIME.block_on( + Workspace::new(&std::env::var("CODEMP_HOST").unwrap_or(CODEMP_DEFAULT_HOST.into())) + ).expect("could not create codemp workspace"); + } +} + pub struct Workspace { client: CodempClient, buffers: RwLock, BufferController>>, @@ -19,7 +34,7 @@ pub type BufferController = Arc #[async_trait] pub trait WorkspaceHandle { - fn cursor(&self) -> CursorController; + async fn cursor(&self) -> CursorController; async fn buffer(&self, path: &str) -> Option; async fn attach(&self, path: &str) -> Result<(), CodempError>; async fn create(&self, path: &str, content: Option<&str>) -> Result; @@ -42,7 +57,7 @@ impl Workspace { #[async_trait] impl WorkspaceHandle for Workspace { // Cursor - fn cursor(&self) -> CursorController { + async fn cursor(&self) -> CursorController { self.cursor.clone() } diff --git a/src/workspace/mod.rs b/src/workspace/mod.rs deleted file mode 100644 index cb9e0ac..0000000 --- a/src/workspace/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod controller; From 37e6268f0c8d66d5399bceb0c4c1d47df052814b Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 11 Aug 2023 15:33:40 +0200 Subject: [PATCH 3/9] chore: buffer(controller->handle, client->controller) --- src/buffer/client.rs | 143 ------------------------ src/buffer/controller.rs | 232 +++++++++++++++++---------------------- src/buffer/handle.rs | 130 ++++++++++++++++++++++ src/buffer/mod.rs | 2 +- src/cursor/controller.rs | 122 -------------------- src/cursor/mod.rs | 2 +- src/cursor/tracker.rs | 92 ++++++++++++++++ 7 files changed, 325 insertions(+), 398 deletions(-) delete mode 100644 src/buffer/client.rs create mode 100644 src/buffer/handle.rs delete mode 100644 src/cursor/controller.rs create mode 100644 src/cursor/tracker.rs diff --git a/src/buffer/client.rs b/src/buffer/client.rs deleted file mode 100644 index aff3254..0000000 --- a/src/buffer/client.rs +++ /dev/null @@ -1,143 +0,0 @@ -use operational_transform::OperationSeq; -use tonic::{transport::Channel, Status, Streaming, async_trait}; -use uuid::Uuid; - -use crate::{ - ControllerWorker, - cursor::controller::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, - buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, - proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest, Cursor}, -}; - -#[derive(Clone)] -pub struct CodempClient { - id: String, - client: BufferClient, -} - -impl From::> for CodempClient { - fn from(value: BufferClient) -> Self { - CodempClient { id: Uuid::new_v4().to_string(), client: value } - } -} - -impl CodempClient { - pub async fn new(dest: &str) -> Result { - Ok(BufferClient::connect(dest.to_string()).await?.into()) - } - - pub fn id(&self) -> &str { &self.id } - - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { - let req = BufferPayload { - path: path.to_string(), - content: content.map(|x| x.to_string()), - user: self.id.clone(), - }; - - let res = self.client.create(req).await?; - - Ok(res.into_inner().accepted) - } - - pub async fn listen(&mut self) -> Result { - let req = BufferPayload { - path: "".into(), - content: None, - user: self.id.clone(), - }; - - let stream = self.client.listen(req).await?.into_inner(); - - let controller = CursorControllerWorker::new(self.id().to_string(), (self.clone(), stream)); - let handle = controller.subscribe(); - - tokio::spawn(async move { - tracing::debug!("cursor worker started"); - controller.work().await; - tracing::debug!("cursor worker stopped"); - }); - - Ok(handle) - } - - pub async fn attach(&mut self, path: &str) -> Result { - let req = BufferPayload { - path: path.to_string(), - content: None, - user: self.id.clone(), - }; - - let content = self.client.sync(req.clone()) - .await? - .into_inner() - .content - .unwrap_or("".into()); - - let stream = self.client.attach(req).await?.into_inner(); - - let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); - let factory = controller.subscribe(); - - tokio::spawn(async move { - tracing::debug!("buffer worker started"); - controller.work().await; - tracing::debug!("buffer worker stopped"); - }); - - Ok(factory) - } -} - -#[async_trait] -impl OperationControllerEditor for (CodempClient, Streaming) { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool { - let req = OperationRequest { - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path, - user: self.0.id().to_string(), - }; - match self.0.client.edit(req).await { - Ok(res) => res.into_inner().accepted, - Err(e) => { - tracing::error!("error sending edit: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), - Ok(None) => None, - Err(e) => { - tracing::error!("could not receive edit from server: {}", e); - None - } - } - } -} - -#[async_trait] -impl CursorEditor for (CodempClient, Streaming) { - async fn moved(&mut self, cursor: Cursor) -> bool { - match self.0.client.moved(cursor).await { - Ok(res) => res.into_inner().accepted, - Err(e) => { - tracing::error!("could not send cursor movement: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(cursor) => cursor, - Err(e) => { - tracing::error!("could not receive cursor update: {}", e); - None - } - } - } -} diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 39cecb1..0e1de5f 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,151 +1,121 @@ -use std::{sync::Arc, collections::VecDeque, ops::Range}; - use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, broadcast, Mutex}; -use tonic::async_trait; +use tonic::{transport::Channel, Status, Streaming, async_trait}; +use uuid::Uuid; -use crate::ControllerWorker; -use crate::errors::IgnorableError; -use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; +use crate::{ + ControllerWorker, + cursor::tracker::{CursorTracker, CursorTrackerWorker}, + buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, + proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, +}; -pub struct TextChange { - pub span: Range, - pub content: String, +#[derive(Clone)] +pub struct BufferController { + id: String, + client: BufferClient, } -#[async_trait] -pub trait OperationControllerSubscriber : OperationFactory { - async fn poll(&self) -> Option; - async fn apply(&self, op: OperationSeq); +impl From::> for BufferController { + fn from(value: BufferClient) -> Self { + BufferController { id: Uuid::new_v4().to_string(), client: value } + } } +impl BufferController { + pub async fn new(dest: &str) -> Result { + Ok(BufferClient::connect(dest.to_string()).await?.into()) + } -pub struct OperationControllerHandle { - content: watch::Receiver, - operations: mpsc::Sender, - original: Arc>, - stream: Mutex>, -} + pub fn id(&self) -> &str { &self.id } -impl Clone for OperationControllerHandle { - fn clone(&self) -> Self { - OperationControllerHandle { - content: self.content.clone(), - operations: self.operations.clone(), - original: self.original.clone(), - stream: Mutex::new(self.original.subscribe()), - } + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { + let req = BufferPayload { + path: path.to_string(), + content: content.map(|x| x.to_string()), + user: self.id.clone(), + }; + + let res = self.client.create(req).await?; + + Ok(res.into_inner().accepted) + } + + pub async fn listen(&mut self) -> Result { + let req = BufferPayload { + path: "".into(), + content: None, + user: self.id.clone(), + }; + + let stream = self.client.listen(req).await?.into_inner(); + + let controller = CursorTrackerWorker::new(self.id().to_string()); + let handle = controller.subscribe(); + let client = self.client.clone(); + + tokio::spawn(async move { + tracing::debug!("cursor worker started"); + controller.work(stream, client).await; + tracing::debug!("cursor worker stopped"); + }); + + Ok(handle) + } + + pub async fn attach(&mut self, path: &str) -> Result { + let req = BufferPayload { + path: path.to_string(), + content: None, + user: self.id.clone(), + }; + + let content = self.client.sync(req.clone()) + .await? + .into_inner() + .content + .unwrap_or("".into()); + + let stream = self.client.attach(req).await?.into_inner(); + + let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); + let factory = controller.subscribe(); + + tokio::spawn(async move { + tracing::debug!("buffer worker started"); + controller.work().await; + tracing::debug!("buffer worker stopped"); + }); + + Ok(factory) } } #[async_trait] -impl OperationFactory for OperationControllerHandle { - fn content(&self) -> String { - self.content.borrow().clone() - } -} - -#[async_trait] -impl OperationControllerSubscriber for OperationControllerHandle { - async fn poll(&self) -> Option { - let op = self.stream.lock().await.recv().await.ok()?; - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Some(TextChange { span, content }) - } - - async fn apply(&self, op: OperationSeq) { - self.operations.send(op).await - .unwrap_or_warn("could not apply+send operation") - } - - // fn subscribe(&self) -> Self { - // OperationControllerHandle { - // content: self.content.clone(), - // operations: self.operations.clone(), - // original: self.original.clone(), - // stream: Arc::new(Mutex::new(self.original.subscribe())), - // } - // } -} - -#[async_trait] -pub(crate) trait OperationControllerEditor { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct OperationControllerWorker { - pub(crate) content: watch::Sender, - pub(crate) operations: mpsc::Receiver, - pub(crate) stream: Arc>, - pub(crate) queue: VecDeque, - receiver: watch::Receiver, - sender: mpsc::Sender, - client: C, - buffer: String, - path: String, -} - -#[async_trait] -impl ControllerWorker for OperationControllerWorker { - fn subscribe(&self) -> OperationControllerHandle { - OperationControllerHandle { - content: self.receiver.clone(), - operations: self.sender.clone(), - original: self.stream.clone(), - stream: Mutex::new(self.stream.subscribe()), - } - } - - async fn work(mut self) { - loop { - let op = tokio::select! { - Some(operation) = self.client.recv() => { - let mut out = operation; - for op in self.queue.iter_mut() { - (*op, out) = op.transform(&out).unwrap(); - } - self.stream.send(out.clone()).unwrap(); - out - }, - Some(op) = self.operations.recv() => { - self.queue.push_back(op.clone()); - op - }, - else => break - }; - self.buffer = op.apply(&self.buffer).unwrap(); - self.content.send(self.buffer.clone()).unwrap(); - - while let Some(op) = self.queue.get(0) { - if !self.client.edit(self.path.clone(), op.clone()).await { break } - self.queue.pop_front(); +impl OperationControllerEditor for (BufferController, Streaming) { + async fn edit(&mut self, path: String, op: OperationSeq) -> bool { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path, + user: self.0.id().to_string(), + }; + match self.0.client.edit(req).await { + Ok(res) => res.into_inner().accepted, + Err(e) => { + tracing::error!("error sending edit: {}", e); + false } } } -} - -impl OperationControllerWorker { - pub fn new(client: C, buffer: &str, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); - let (op_tx, op_rx) = mpsc::channel(64); - let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { - content: txt_tx, - operations: op_rx, - stream: Arc::new(s_tx), - receiver: txt_rx, - sender: op_tx, - queue: VecDeque::new(), - buffer: buffer.to_string(), - path: path.to_string(), - client, + async fn recv(&mut self) -> Option { + match self.1.message().await { + Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(None) => None, + Err(e) => { + tracing::error!("could not receive edit from server: {}", e); + None + } } } } diff --git a/src/buffer/handle.rs b/src/buffer/handle.rs new file mode 100644 index 0000000..ecd1a12 --- /dev/null +++ b/src/buffer/handle.rs @@ -0,0 +1,130 @@ +use std::{sync::Arc, collections::VecDeque, ops::Range}; + +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tonic::async_trait; + +use crate::ControllerWorker; +use crate::errors::IgnorableError; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; + +pub struct TextChange { + pub span: Range, + pub content: String, +} + +pub struct BufferHandle { + content: watch::Receiver, + operations: mpsc::Sender, + stream: Mutex>, +} + +#[async_trait] +impl OperationFactory for BufferHandle { + fn content(&self) -> String { + self.content.borrow().clone() + } +} + +impl BufferHandle { + async fn poll(&self) -> Option { + let op = self.stream.lock().await.recv().await.ok()?; + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Some(TextChange { span, content }) + } + + async fn apply(&self, op: OperationSeq) { + self.operations.send(op).await + .unwrap_or_warn("could not apply+send operation") + } + + // fn subscribe(&self) -> Self { + // OperationControllerHandle { + // content: self.content.clone(), + // operations: self.operations.clone(), + // original: self.original.clone(), + // stream: Arc::new(Mutex::new(self.original.subscribe())), + // } + // } +} + +#[async_trait] +pub(crate) trait OperationControllerEditor { + async fn edit(&mut self, path: String, op: OperationSeq) -> bool; + async fn recv(&mut self) -> Option; +} + +pub(crate) struct OperationControllerWorker { + pub(crate) content: watch::Sender, + pub(crate) operations: mpsc::Receiver, + pub(crate) stream: Arc>, + pub(crate) queue: VecDeque, + receiver: watch::Receiver, + sender: mpsc::Sender, + client: C, + buffer: String, + path: String, +} + +#[async_trait] +impl ControllerWorker for OperationControllerWorker { + fn subscribe(&self) -> BufferHandle { + BufferHandle { + content: self.receiver.clone(), + operations: self.sender.clone(), + stream: Mutex::new(self.stream.subscribe()), + } + } + + async fn work(mut self) { + loop { + let op = tokio::select! { + Some(operation) = self.client.recv() => { + let mut out = operation; + for op in self.queue.iter_mut() { + (*op, out) = op.transform(&out).unwrap(); + } + self.stream.send(out.clone()).unwrap(); + out + }, + Some(op) = self.operations.recv() => { + self.queue.push_back(op.clone()); + op + }, + else => break + }; + self.buffer = op.apply(&self.buffer).unwrap(); + self.content.send(self.buffer.clone()).unwrap(); + + while let Some(op) = self.queue.get(0) { + if !self.client.edit(self.path.clone(), op.clone()).await { break } + self.queue.pop_front(); + } + } + } + +} + +impl OperationControllerWorker { + pub fn new(client: C, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + let (op_tx, op_rx) = mpsc::channel(64); + let (s_tx, _s_rx) = broadcast::channel(64); + OperationControllerWorker { + content: txt_tx, + operations: op_rx, + stream: Arc::new(s_tx), + receiver: txt_rx, + sender: op_tx, + queue: VecDeque::new(), + buffer: buffer.to_string(), + path: path.to_string(), + client, + } + } +} diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index ca7ad3c..7fe80e1 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -1,3 +1,3 @@ pub mod factory; -pub mod client; pub mod controller; +pub mod handle; diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs deleted file mode 100644 index 54985e4..0000000 --- a/src/cursor/controller.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex}; -use tonic::async_trait; - -use crate::{proto::{Position, Cursor}, errors::IgnorableError, ControllerWorker}; - -#[async_trait] -pub trait CursorSubscriber { - async fn send(&self, path: &str, start: Position, end: Position); - async fn poll(&self) -> Option; - fn try_poll(&self) -> Option>; // TODO fuck this fuck neovim -} - -pub struct CursorControllerHandle { - uid: String, - op: mpsc::Sender, - original: Arc>, - stream: Mutex>, -} - -impl Clone for CursorControllerHandle { - fn clone(&self) -> Self { - CursorControllerHandle { - uid: self.uid.clone(), - op: self.op.clone(), - original: self.original.clone(), - stream: Mutex::new(self.original.subscribe()), - } - } -} - -#[async_trait] -impl CursorSubscriber for CursorControllerHandle { - async fn send(&self, path: &str, start: Position, end: Position) { - self.op.send(Cursor { - user: self.uid.clone(), - buffer: path.to_string(), - start: Some(start), - end: Some(end), - }).await.unwrap_or_warn("could not send cursor op") - } - - // TODO is this cancelable? so it can be used in tokio::select! - async fn poll(&self) -> Option { - let mut stream = self.stream.lock().await; - match stream.recv().await { - Ok(x) => Some(x), - Err(RecvError::Closed) => None, - Err(RecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Some(stream.recv().await.expect("could not receive after lagging")) - } - } - } - - fn try_poll(&self) -> Option> { - match self.stream.try_lock() { - Err(_) => None, - Ok(mut x) => match x.try_recv() { - Ok(x) => Some(Some(x)), - Err(TryRecvError::Empty) => None, - Err(TryRecvError::Closed) => Some(None), - Err(TryRecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Some(Some(x.try_recv().expect("could not receive after lagging"))) - } - } - } - } -} - -#[async_trait] -pub(crate) trait CursorEditor { - async fn moved(&mut self, cursor: Cursor) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct CursorControllerWorker { - uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, - client: C, -} - -impl CursorControllerWorker { - pub(crate) fn new(uid: String, client: C) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); - let (cur_tx, _cur_rx) = broadcast::channel(64); - CursorControllerWorker { - uid, client, - producer: op_tx, - op: op_rx, - channel: Arc::new(cur_tx), - } - } -} - -#[async_trait] -impl ControllerWorker for CursorControllerWorker { - fn subscribe(&self) -> CursorControllerHandle { - CursorControllerHandle { - uid: self.uid.clone(), - op: self.producer.clone(), - original: self.channel.clone(), - stream: Mutex::new(self.channel.subscribe()), - } - } - - async fn work(mut self) { - loop { - tokio::select!{ - Some(cur) = self.client.recv() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { self.client.moved(op).await; }, - else => break, - } - } - } -} - - diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index b4cfefc..134f2a7 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,4 +1,4 @@ -pub mod controller; +pub mod tracker; use crate::proto::{Position, Cursor}; diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs new file mode 100644 index 0000000..67149fb --- /dev/null +++ b/src/cursor/tracker.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; +use tonic::{Streaming, transport::Channel}; + +use crate::{proto::{Position, Cursor, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; + +pub struct CursorTracker { + uid: String, + op: mpsc::Sender, + stream: Mutex>, +} + +impl CursorTracker { + pub async fn moved(&self, path: &str, start: Position, end: Position) -> Result<(), CodempError> { + Ok(self.op.send(Cursor { + user: self.uid.clone(), + buffer: path.to_string(), + start: start.into(), + end: end.into(), + }).await?) + } + + // TODO is this cancelable? so it can be used in tokio::select! + // TODO is the result type overkill? should be an option? + pub async fn recv(&self) -> Result { + let mut stream = self.stream.lock().await; + match stream.recv().await { + Ok(x) => Ok(x), + Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Ok(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + // fn try_poll(&self) -> Option> { + // match self.stream.try_lock() { + // Err(_) => None, + // Ok(mut x) => match x.try_recv() { + // Ok(x) => Some(Some(x)), + // Err(TryRecvError::Empty) => None, + // Err(TryRecvError::Closed) => Some(None), + // Err(TryRecvError::Lagged(n)) => { + // tracing::error!("cursor channel lagged behind, skipping {} events", n); + // Some(Some(x.try_recv().expect("could not receive after lagging"))) + // } + // } + // } + // } +} + +pub(crate) struct CursorTrackerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorTrackerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + Self { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } + + pub(crate) fn subscribe(&self) -> CursorTracker { + CursorTracker { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: Mutex::new(self.channel.subscribe()), + } + } + + // TODO is it possible to avoid passing directly tonic Streaming and proto BufferClient ? + pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { + loop { + tokio::select!{ + Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + else => break, + } + } + } +} + From 98cae0969d0287fbfc70d1a0c65121d099bc3ab9 Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 11 Aug 2023 15:34:04 +0200 Subject: [PATCH 4/9] fix: remove workspace trait, add channel error --- src/errors.rs | 13 +++++++++++-- src/state.rs | 31 +++++++++---------------------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index b3f53cc..54caa6b 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,5 +1,6 @@ use std::{error::Error, fmt::Display}; +use tokio::sync::mpsc; use tonic::{Status, Code}; use tracing::warn; @@ -24,7 +25,9 @@ pub enum CodempError { status: Code, message: String, }, - Channel { }, + Channel { + send: bool + }, // TODO filler error, remove later Filler { @@ -38,7 +41,7 @@ impl Display for CodempError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Transport { status, message } => write!(f, "Transport error: ({}) {}", status, message), - Self::Channel { } => write!(f, "Channel error"), + Self::Channel { send } => write!(f, "Channel error (send:{})", send), _ => write!(f, "Unknown error"), } } @@ -57,3 +60,9 @@ impl From for CodempError { } } } + +impl From> for CodempError { + fn from(_value: mpsc::error::SendError) -> Self { + CodempError::Channel { send: true } + } +} diff --git a/src/state.rs b/src/state.rs index 2431ddb..39d1f54 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,13 +1,14 @@ use std::{collections::BTreeMap, sync::Arc}; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, Mutex}; use tonic::async_trait; use crate::{ - buffer::{client::CodempClient, controller::OperationControllerSubscriber}, - cursor::controller::CursorSubscriber, errors::CodempError, + buffer::{controller::BufferController, handle::OperationControllerEditor}, + errors::CodempError, Controller, proto::Cursor, cursor::tracker::CursorTracker, }; + #[cfg(feature = "static")] pub mod instance { use tokio::runtime::Runtime; @@ -26,18 +27,7 @@ pub mod instance { pub struct Workspace { client: CodempClient, buffers: RwLock, BufferController>>, - cursor: CursorController, -} - -pub type CursorController = Arc; -pub type BufferController = Arc; - -#[async_trait] -pub trait WorkspaceHandle { - async fn cursor(&self) -> CursorController; - async fn buffer(&self, path: &str) -> Option; - async fn attach(&self, path: &str) -> Result<(), CodempError>; - async fn create(&self, path: &str, content: Option<&str>) -> Result; + cursor: Arc, } impl Workspace { @@ -52,25 +42,22 @@ impl Workspace { } ) } -} -#[async_trait] -impl WorkspaceHandle for Workspace { // Cursor - async fn cursor(&self) -> CursorController { + pub async fn cursor(&self) -> Arc { self.cursor.clone() } // Buffer - async fn buffer(&self, path: &str) -> Option { + pub async fn buffer(&self, path: &str) -> Option { self.buffers.read().await.get(path).cloned() } - async fn create(&self, path: &str, content: Option<&str>) -> Result { + pub async fn create(&self, path: &str, content: Option<&str>) -> Result { Ok(self.client.clone().create(path, content).await?) } - async fn attach(&self, path: &str) -> Result<(), CodempError> { + pub async fn attach(&self, path: &str) -> Result<(), CodempError> { let controller = self.client.clone().attach(path).await?; self.buffers.write().await.insert(path.into(), Arc::new(controller)); Ok(()) From 8595d0c9270464747f5a172989a54a499b5c80f9 Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 11 Aug 2023 15:50:17 +0200 Subject: [PATCH 5/9] fix: imports, Arc --- src/buffer/controller.rs | 4 ++-- src/state.rs | 32 ++++++++------------------------ 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 0e1de5f..9be39f4 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use crate::{ ControllerWorker, cursor::tracker::{CursorTracker, CursorTrackerWorker}, - buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, + buffer::handle::{BufferHandle, OperationControllerEditor, OperationControllerWorker}, proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, }; @@ -62,7 +62,7 @@ impl BufferController { Ok(handle) } - pub async fn attach(&mut self, path: &str) -> Result { + pub async fn attach(&mut self, path: &str) -> Result { let req = BufferPayload { path: path.to_string(), content: None, diff --git a/src/state.rs b/src/state.rs index 39d1f54..abb78d5 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,11 +1,11 @@ use std::{collections::BTreeMap, sync::Arc}; -use tokio::sync::{RwLock, Mutex}; -use tonic::async_trait; +use tokio::sync::RwLock; use crate::{ - buffer::{controller::BufferController, handle::OperationControllerEditor}, - errors::CodempError, Controller, proto::Cursor, cursor::tracker::CursorTracker, + buffer::{controller::BufferController, handle::BufferHandle}, + cursor::tracker::CursorTracker, + errors::CodempError, }; @@ -25,14 +25,14 @@ pub mod instance { } pub struct Workspace { - client: CodempClient, - buffers: RwLock, BufferController>>, + client: BufferController, + buffers: RwLock, Arc>>, cursor: Arc, } impl Workspace { pub async fn new(dest: &str) -> Result { - let mut client = CodempClient::new(dest).await?; + let mut client = BufferController::new(dest).await?; let cursor = Arc::new(client.listen().await?); Ok( Workspace { @@ -49,7 +49,7 @@ impl Workspace { } // Buffer - pub async fn buffer(&self, path: &str) -> Option { + pub async fn buffer(&self, path: &str) -> Option> { self.buffers.read().await.get(path).cloned() } @@ -62,20 +62,4 @@ impl Workspace { self.buffers.write().await.insert(path.into(), Arc::new(controller)); Ok(()) } - - // pub async fn diff(&self, path: &str, span: Range, text: &str) { - // if let Some(controller) = self.inner.read().await.buffers.get(path) { - // if let Some(op) = controller.delta(span.start, text, span.end) { - // controller.apply(op).await - // } - // } - // } - - // async fn send(&self, path: &str, start: (i32, i32), end: (i32, i32)) { - // self.inner.read().await.cursor.send(path, start.into(), end.into()).await - // } - - // pub async fn recv(&self) -> Option { - // self.inner.write().await.cursor.poll().await - // } } From 9a1d84bc64f4d622cdbce728719b9287df2e8e5a Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 17:08:31 +0200 Subject: [PATCH 6/9] chore: split cursor and buffer protos --- build.rs | 1 + proto/buffer.proto | 25 +++++++------------------ proto/cursor.proto | 26 ++++++++++++++++++++++++++ src/lib.rs | 4 +++- 4 files changed, 37 insertions(+), 19 deletions(-) create mode 100644 proto/cursor.proto diff --git a/build.rs b/build.rs index 838368e..8c727c0 100644 --- a/build.rs +++ b/build.rs @@ -1,4 +1,5 @@ fn main() -> Result<(), Box> { tonic_build::compile_protos("proto/buffer.proto")?; + tonic_build::compile_protos("proto/cursor.proto")?; Ok(()) } diff --git a/proto/buffer.proto b/proto/buffer.proto index 9b33b2e..b668a44 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -1,26 +1,16 @@ syntax = "proto3"; -package buffer; + +package codemp.buffer; service Buffer { rpc Attach (BufferPayload) returns (stream RawOp); - rpc Edit (OperationRequest) returns (BufferResponse); - rpc Create (BufferPayload) returns (BufferResponse); + rpc Edit (OperationRequest) returns (BufferEditResponse); + rpc Create (BufferPayload) returns (BufferCreateResponse); rpc Sync (BufferPayload) returns (BufferResponse); - rpc Moved (Cursor) returns (BufferResponse); - rpc Listen (BufferPayload) returns (stream Cursor); } -message Position { - int32 row = 1; - int32 col = 2; -} - -message Cursor { - string user = 1; - string buffer = 2; - Position start = 3; - Position end = 4; -} +message BufferCreateResponse {} +message BufferEditResponse {} message RawOp { string opseq = 1; @@ -41,6 +31,5 @@ message BufferPayload { } message BufferResponse { - bool accepted = 1; - optional string content = 2; + string content = 2; } diff --git a/proto/cursor.proto b/proto/cursor.proto new file mode 100644 index 0000000..39d40df --- /dev/null +++ b/proto/cursor.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +package codemp.cursor; + +service Cursor { + rpc Moved (CursorPosition) returns (MovedResponse); + rpc Listen (UserIdentity) returns (stream CursorPosition); +} + +message MovedResponse {} + +message RowColumn { + int32 row = 1; + int32 col = 2; +} + +message CursorPosition { + string user = 1; + string buffer = 2; + RowColumn start = 3; + RowColumn end = 4; +} + +message UserIdentity { + string id = 1; +} diff --git a/src/lib.rs b/src/lib.rs index 132154c..bdd43d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,8 +9,10 @@ pub use tokio; pub use operational_transform as ot; #[cfg(feature = "proto")] +#[allow(non_snake_case)] pub mod proto { - tonic::include_proto!("buffer"); + tonic::include_proto!("codemp.buffer"); + tonic::include_proto!("codemp.cursor"); } pub use errors::CodempError; From 74faca0f25871fa2d2c4883c3211245223bc0476 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 17:09:21 +0200 Subject: [PATCH 7/9] chore: cleaned up server and lib after split --- server/src/buffer/service.rs | 72 ++++++++---------------------------- server/src/cursor/mod.rs | 1 + server/src/cursor/service.rs | 52 ++++++++++++++++++++++++++ server/src/main.rs | 7 +++- src/buffer/controller.rs | 46 +++++++++++------------ src/cursor/mod.rs | 16 ++++---- src/cursor/tracker.rs | 28 +++++++------- src/state.rs | 19 +++++----- 8 files changed, 128 insertions(+), 113 deletions(-) create mode 100644 server/src/cursor/mod.rs create mode 100644 server/src/cursor/service.rs diff --git a/server/src/buffer/service.rs b/server/src/buffer/service.rs index 5785eda..5745981 100644 --- a/server/src/buffer/service.rs +++ b/server/src/buffer/service.rs @@ -1,17 +1,16 @@ use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; -use tokio::sync::{mpsc, broadcast, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tonic::{Request, Response, Status}; use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? -use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest, Cursor}; +use codemp::proto::{buffer_server::Buffer, RawOp, BufferPayload, BufferResponse, OperationRequest, BufferEditResponse, BufferCreateResponse}; use tracing::info; use super::actor::{BufferHandle, BufferStore}; type OperationStream = Pin> + Send>>; -type CursorStream = Pin> + Send>>; struct BufferMap { store: HashMap, @@ -34,15 +33,12 @@ impl BufferStore for BufferMap { pub struct BufferService { map: Arc>, - cursor: broadcast::Sender, } -impl BufferService { - #[allow(unused)] - fn get_buffer(&self, path: &String) -> Result { - match self.map.read().unwrap().get(path) { - Some(buf) => Ok(buf.clone()), - None => Err(Status::not_found("no buffer for given path")), +impl Default for BufferService { + fn default() -> BufferService { + BufferService { + map: Arc::new(RwLock::new(HashMap::new().into())), } } } @@ -50,7 +46,6 @@ impl BufferService { #[tonic::async_trait] impl Buffer for BufferService { type AttachStream = OperationStream; - type ListenStream = CursorStream; async fn attach(&self, req: Request) -> Result, Status> { let request = req.into_inner(); @@ -73,29 +68,7 @@ impl Buffer for BufferService { } } - async fn listen(&self, req: Request) -> Result, Status> { - let mut sub = self.cursor.subscribe(); - let myself = req.into_inner().user; - let (tx, rx) = mpsc::channel(128); - tokio::spawn(async move { - while let Ok(v) = sub.recv().await { - if v.user == myself { continue } - tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? - } - }); - let output_stream = ReceiverStream::new(rx); - info!("registered new subscriber to cursor updates"); - Ok(Response::new(Box::pin(output_stream))) - } - - async fn moved(&self, req:Request) -> Result, Status> { - match self.cursor.send(req.into_inner()) { - Ok(_) => Ok(Response::new(BufferResponse { accepted: true, content: None})), - Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), - } - } - - async fn edit(&self, req:Request) -> Result, Status> { + async fn edit(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let tx = match self.map.read().unwrap().get(&request.path) { Some(handle) => { @@ -110,19 +83,20 @@ impl Buffer for BufferService { let (ack, status) = oneshot::channel(); match tx.send((ack, request)).await { Err(e) => Err(Status::internal(format!("error sending edit to buffer actor: {}", e))), - Ok(()) => Ok(Response::new(BufferResponse { - accepted: status.await.unwrap_or(false), - content: None - })) + Ok(()) => { + match status.await { + Ok(_accepted) => Ok(Response::new(BufferEditResponse { })), + Err(e) => Err(Status::internal(format!("error receiving edit result: {}", e))), + } + } } } - async fn create(&self, req:Request) -> Result, Status> { + async fn create(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let _handle = self.map.write().unwrap().handle(request.path, request.content); info!("created new buffer"); - let answ = BufferResponse { accepted: true, content: None }; - Ok(Response::new(answ)) + Ok(Response::new(BufferCreateResponse { })) } async fn sync(&self, req: Request) -> Result, Status> { @@ -131,23 +105,9 @@ impl Buffer for BufferService { None => Err(Status::not_found("requested buffer does not exist")), Some(buf) => { info!("synching buffer"); - let answ = BufferResponse { accepted: true, content: Some(buf.content.borrow().clone()) }; + let answ = BufferResponse { content: buf.content.borrow().clone() }; Ok(Response::new(answ)) } } } } - -impl BufferService { - pub fn new() -> BufferService { - let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity - BufferService { - map: Arc::new(RwLock::new(HashMap::new().into())), - cursor: cur_tx, - } - } - - pub fn server(self) -> BufferServer { - BufferServer::new(self) - } -} diff --git a/server/src/cursor/mod.rs b/server/src/cursor/mod.rs new file mode 100644 index 0000000..1f278a4 --- /dev/null +++ b/server/src/cursor/mod.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/server/src/cursor/service.rs b/server/src/cursor/service.rs new file mode 100644 index 0000000..b641c95 --- /dev/null +++ b/server/src/cursor/service.rs @@ -0,0 +1,52 @@ +use std::pin::Pin; + +use tokio::sync::{mpsc, broadcast}; +use tonic::{Request, Response, Status}; + +use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? + +use codemp::proto::{cursor_server::Cursor, UserIdentity, CursorPosition, MovedResponse}; +use tracing::info; + +type CursorStream = Pin> + Send>>; + +pub struct CursorService { + cursor: broadcast::Sender, +} + +#[tonic::async_trait] +impl Cursor for CursorService { + type ListenStream = CursorStream; + + async fn listen(&self, req: Request) -> Result, Status> { + let mut sub = self.cursor.subscribe(); + let myself = req.into_inner().id; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + while let Ok(v) = sub.recv().await { + if v.user == myself { continue } + tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? + } + }); + let output_stream = ReceiverStream::new(rx); + info!("registered new subscriber to cursor updates"); + Ok(Response::new(Box::pin(output_stream))) + } + + async fn moved(&self, req:Request) -> Result, Status> { + match self.cursor.send(req.into_inner()) { + Ok(_) => Ok(Response::new(MovedResponse { })), + Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), + } + } +} + +impl Default for CursorService { + fn default() -> Self { + let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity + // TODO don't drop receiver because sending event when there are no receivers throws an error + CursorService { + cursor: cur_tx, + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 13fe906..ec951e1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,12 +5,16 @@ //! use clap::Parser; +use codemp::proto::buffer_server::BufferServer; +use codemp::proto::cursor_server::CursorServer; use tracing::info; use tonic::transport::Server; mod buffer; +mod cursor; use crate::buffer::service::BufferService; +use crate::cursor::service::CursorService; #[derive(Parser, Debug)] struct CliArgs { @@ -37,7 +41,8 @@ async fn main() -> Result<(), Box> { info!("binding on {}", args.host); Server::builder() - .add_service(BufferService::new().server()) + .add_service(BufferServer::new(BufferService::default())) + .add_service(CursorServer::new(CursorService::default())) .serve(args.host.parse()?) .await?; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9be39f4..9e03318 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -4,7 +4,6 @@ use uuid::Uuid; use crate::{ ControllerWorker, - cursor::tracker::{CursorTracker, CursorTrackerWorker}, buffer::handle::{BufferHandle, OperationControllerEditor, OperationControllerWorker}, proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, }; @@ -28,39 +27,39 @@ impl BufferController { pub fn id(&self) -> &str { &self.id } - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), Status> { let req = BufferPayload { path: path.to_string(), content: content.map(|x| x.to_string()), user: self.id.clone(), }; - let res = self.client.create(req).await?; + self.client.create(req).await?; - Ok(res.into_inner().accepted) + Ok(()) } - pub async fn listen(&mut self) -> Result { - let req = BufferPayload { - path: "".into(), - content: None, - user: self.id.clone(), - }; + // pub async fn listen(&mut self) -> Result { + // let req = BufferPayload { + // path: "".into(), + // content: None, + // user: self.id.clone(), + // }; - let stream = self.client.listen(req).await?.into_inner(); + // let stream = self.client.listen(req).await?.into_inner(); - let controller = CursorTrackerWorker::new(self.id().to_string()); - let handle = controller.subscribe(); - let client = self.client.clone(); + // let controller = CursorTrackerWorker::new(self.id().to_string()); + // let handle = controller.subscribe(); + // let client = self.client.clone(); - tokio::spawn(async move { - tracing::debug!("cursor worker started"); - controller.work(stream, client).await; - tracing::debug!("cursor worker stopped"); - }); + // tokio::spawn(async move { + // tracing::debug!("cursor worker started"); + // controller.work(stream, client).await; + // tracing::debug!("cursor worker stopped"); + // }); - Ok(handle) - } + // Ok(handle) + // } pub async fn attach(&mut self, path: &str) -> Result { let req = BufferPayload { @@ -72,8 +71,7 @@ impl BufferController { let content = self.client.sync(req.clone()) .await? .into_inner() - .content - .unwrap_or("".into()); + .content; let stream = self.client.attach(req).await?.into_inner(); @@ -100,7 +98,7 @@ impl OperationControllerEditor for (BufferController, Streaming) { user: self.0.id().to_string(), }; match self.0.client.edit(req).await { - Ok(res) => res.into_inner().accepted, + Ok(_) => true, Err(e) => { tracing::error!("error sending edit: {}", e); false diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 134f2a7..58a0abe 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,25 +1,25 @@ pub mod tracker; -use crate::proto::{Position, Cursor}; +use crate::proto::{RowColumn, CursorPosition}; -impl From:: for (i32, i32) { - fn from(pos: Position) -> (i32, i32) { +impl From:: for (i32, i32) { + fn from(pos: RowColumn) -> (i32, i32) { (pos.row, pos.col) } } -impl From::<(i32, i32)> for Position { +impl From::<(i32, i32)> for RowColumn { fn from((row, col): (i32, i32)) -> Self { - Position { row, col } + RowColumn { row, col } } } -impl Cursor { - pub fn start(&self) -> Position { +impl CursorPosition { + pub fn start(&self) -> RowColumn { self.start.clone().unwrap_or((0, 0).into()) } - pub fn end(&self) -> Position { + pub fn end(&self) -> RowColumn { self.end.clone().unwrap_or((0, 0).into()) } } diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs index 67149fb..7fde457 100644 --- a/src/cursor/tracker.rs +++ b/src/cursor/tracker.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; use tonic::{Streaming, transport::Channel}; -use crate::{proto::{Position, Cursor, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; +use crate::{proto::{RowColumn, CursorPosition, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; pub struct CursorTracker { uid: String, - op: mpsc::Sender, - stream: Mutex>, + op: mpsc::Sender, + stream: Mutex>, } impl CursorTracker { - pub async fn moved(&self, path: &str, start: Position, end: Position) -> Result<(), CodempError> { - Ok(self.op.send(Cursor { + pub async fn moved(&self, path: &str, start: RowColumn, end: RowColumn) -> Result<(), CodempError> { + Ok(self.op.send(CursorPosition { user: self.uid.clone(), buffer: path.to_string(), start: start.into(), @@ -23,7 +23,7 @@ impl CursorTracker { // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? - pub async fn recv(&self) -> Result { + pub async fn recv(&self) -> Result { let mut stream = self.stream.lock().await; match stream.recv().await { Ok(x) => Ok(x), @@ -35,7 +35,7 @@ impl CursorTracker { } } - // fn try_poll(&self) -> Option> { + // fn try_poll(&self) -> Option> { // match self.stream.try_lock() { // Err(_) => None, // Ok(mut x) => match x.try_recv() { @@ -51,14 +51,14 @@ impl CursorTracker { // } } -pub(crate) struct CursorTrackerWorker { +pub(crate) struct CursorPositionTrackerWorker { uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, } -impl CursorTrackerWorker { +impl CursorPositionTrackerWorker { pub(crate) fn new(uid: String) -> Self { let (op_tx, op_rx) = mpsc::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64); @@ -79,11 +79,11 @@ impl CursorTrackerWorker { } // TODO is it possible to avoid passing directly tonic Streaming and proto BufferClient ? - pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { + pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { loop { tokio::select!{ Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(op) = self.op.recv() => { todo!() } // tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, else => break, } } diff --git a/src/state.rs b/src/state.rs index abb78d5..5755896 100644 --- a/src/state.rs +++ b/src/state.rs @@ -4,7 +4,6 @@ use tokio::sync::RwLock; use crate::{ buffer::{controller::BufferController, handle::BufferHandle}, - cursor::tracker::CursorTracker, errors::CodempError, }; @@ -25,35 +24,35 @@ pub mod instance { } pub struct Workspace { - client: BufferController, buffers: RwLock, Arc>>, - cursor: Arc, + // cursor: Arc, + client: BufferController, } impl Workspace { pub async fn new(dest: &str) -> Result { - let mut client = BufferController::new(dest).await?; - let cursor = Arc::new(client.listen().await?); + let client = BufferController::new(dest).await?; + // let cursor = Arc::new(client.listen().await?); Ok( Workspace { buffers: RwLock::new(BTreeMap::new()), - cursor, + // cursor, client, } ) } // Cursor - pub async fn cursor(&self) -> Arc { - self.cursor.clone() - } + // pub async fn cursor(&self) -> Arc { + // self.cursor.clone() + // } // Buffer pub async fn buffer(&self, path: &str) -> Option> { self.buffers.read().await.get(path).cloned() } - pub async fn create(&self, path: &str, content: Option<&str>) -> Result { + pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { Ok(self.client.clone().create(path, content).await?) } From 96217d1a1a2efa3c9ea681cd660baadbac6c3f5e Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 18:58:42 +0200 Subject: [PATCH 8/9] feat: standardized Controller and ControllerWorker --- src/buffer/controller.rs | 42 +++------------ src/buffer/handle.rs | 113 +++++++++++++++++++++++---------------- src/cursor/client.rs | 1 - src/cursor/tracker.rs | 37 ++++++++----- src/errors.rs | 8 ++- src/lib.rs | 13 +++-- 6 files changed, 112 insertions(+), 102 deletions(-) delete mode 100644 src/cursor/client.rs diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9e03318..45c0d59 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,11 +1,10 @@ -use operational_transform::OperationSeq; -use tonic::{transport::Channel, Status, Streaming, async_trait}; +use tonic::{transport::Channel, Status}; use uuid::Uuid; use crate::{ ControllerWorker, - buffer::handle::{BufferHandle, OperationControllerEditor, OperationControllerWorker}, - proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, + buffer::handle::{BufferHandle, OperationControllerWorker}, + proto::{buffer_client::BufferClient, BufferPayload}, }; #[derive(Clone)] @@ -75,45 +74,16 @@ impl BufferController { let stream = self.client.attach(req).await?.into_inner(); - let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); + let controller = OperationControllerWorker::new(self.id().to_string(), &content, path); let factory = controller.subscribe(); + let client = self.client.clone(); tokio::spawn(async move { tracing::debug!("buffer worker started"); - controller.work().await; + controller.work(client, stream).await; tracing::debug!("buffer worker stopped"); }); Ok(factory) } } - -#[async_trait] -impl OperationControllerEditor for (BufferController, Streaming) { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool { - let req = OperationRequest { - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path, - user: self.0.id().to_string(), - }; - match self.0.client.edit(req).await { - Ok(_) => true, - Err(e) => { - tracing::error!("error sending edit: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), - Ok(None) => None, - Err(e) => { - tracing::error!("could not receive edit from server: {}", e); - None - } - } - } -} diff --git a/src/buffer/handle.rs b/src/buffer/handle.rs index ecd1a12..bbebc66 100644 --- a/src/buffer/handle.rs +++ b/src/buffer/handle.rs @@ -2,10 +2,12 @@ use std::{sync::Arc, collections::VecDeque, ops::Range}; use operational_transform::OperationSeq; use tokio::sync::{watch, mpsc, broadcast, Mutex}; -use tonic::async_trait; +use tonic::transport::Channel; +use tonic::{async_trait, Streaming}; -use crate::ControllerWorker; -use crate::errors::IgnorableError; +use crate::proto::{OperationRequest, RawOp}; +use crate::proto::buffer_client::BufferClient; +use crate::{ControllerWorker, Controller, CodempError}; use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; pub struct TextChange { @@ -26,53 +28,63 @@ impl OperationFactory for BufferHandle { } } -impl BufferHandle { - async fn poll(&self) -> Option { - let op = self.stream.lock().await.recv().await.ok()?; +#[async_trait] +impl Controller for BufferHandle { + type Input = OperationSeq; + + async fn recv(&self) -> Result { + let op = self.stream.lock().await.recv().await?; let after = self.content.borrow().clone(); let skip = leading_noop(op.ops()) as usize; let before_len = op.base_len(); let tail = tailing_noop(op.ops()) as usize; let span = skip..before_len-tail; let content = after[skip..after.len()-tail].to_string(); - Some(TextChange { span, content }) + Ok(TextChange { span, content }) } - async fn apply(&self, op: OperationSeq) { - self.operations.send(op).await - .unwrap_or_warn("could not apply+send operation") + async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { + Ok(self.operations.send(op).await?) } - - // fn subscribe(&self) -> Self { - // OperationControllerHandle { - // content: self.content.clone(), - // operations: self.operations.clone(), - // original: self.original.clone(), - // stream: Arc::new(Mutex::new(self.original.subscribe())), - // } - // } } -#[async_trait] -pub(crate) trait OperationControllerEditor { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct OperationControllerWorker { +pub(crate) struct OperationControllerWorker { + uid: String, pub(crate) content: watch::Sender, pub(crate) operations: mpsc::Receiver, pub(crate) stream: Arc>, pub(crate) queue: VecDeque, receiver: watch::Receiver, sender: mpsc::Sender, - client: C, buffer: String, path: String, } +impl OperationControllerWorker { + pub fn new(uid: String, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + let (op_tx, op_rx) = mpsc::channel(64); + let (s_tx, _s_rx) = broadcast::channel(64); + OperationControllerWorker { + uid, + content: txt_tx, + operations: op_rx, + stream: Arc::new(s_tx), + receiver: txt_rx, + sender: op_tx, + queue: VecDeque::new(), + buffer: buffer.to_string(), + path: path.to_string(), + } + } +} + #[async_trait] -impl ControllerWorker for OperationControllerWorker { +impl ControllerWorker for OperationControllerWorker { + type Controller = BufferHandle; + type Tx = BufferClient; + type Rx = Streaming; + fn subscribe(&self) -> BufferHandle { BufferHandle { content: self.receiver.clone(), @@ -81,10 +93,10 @@ impl ControllerWorker for Op } } - async fn work(mut self) { + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { loop { let op = tokio::select! { - Some(operation) = self.client.recv() => { + Some(operation) = recv_opseq(&mut rx) => { let mut out = operation; for op in self.queue.iter_mut() { (*op, out) = op.transform(&out).unwrap(); @@ -102,29 +114,36 @@ impl ControllerWorker for Op self.content.send(self.buffer.clone()).unwrap(); while let Some(op) = self.queue.get(0) { - if !self.client.edit(self.path.clone(), op.clone()).await { break } + if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break } self.queue.pop_front(); } } } - } -impl OperationControllerWorker { - pub fn new(client: C, buffer: &str, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); - let (op_tx, op_rx) = mpsc::channel(64); - let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { - content: txt_tx, - operations: op_rx, - stream: Arc::new(s_tx), - receiver: txt_rx, - sender: op_tx, - queue: VecDeque::new(), - buffer: buffer.to_string(), - path: path.to_string(), - client, +async fn send_opseq(tx: &mut BufferClient, uid: String, path: String, op: OperationSeq) -> bool { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path, + user: uid, + }; + match tx.edit(req).await { + Ok(_) => true, + Err(e) => { + tracing::error!("error sending edit: {}", e); + false + } + } +} + +async fn recv_opseq(rx: &mut Streaming) -> Option { + match rx.message().await { + Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(None) => None, + Err(e) => { + tracing::error!("could not receive edit from server: {}", e); + None } } } diff --git a/src/cursor/client.rs b/src/cursor/client.rs deleted file mode 100644 index 756547a..0000000 --- a/src/cursor/client.rs +++ /dev/null @@ -1 +0,0 @@ -// TODO separate cursor movement from buffer operations in protocol! diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs index 7fde457..6983f40 100644 --- a/src/cursor/tracker.rs +++ b/src/cursor/tracker.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; -use tonic::{Streaming, transport::Channel}; +use tonic::{Streaming, transport::Channel, async_trait}; -use crate::{proto::{RowColumn, CursorPosition, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; +use crate::{proto::{CursorPosition, cursor_client::CursorClient, RowColumn}, errors::IgnorableError, CodempError, Controller, ControllerWorker}; pub struct CursorTracker { uid: String, @@ -11,19 +11,22 @@ pub struct CursorTracker { stream: Mutex>, } -impl CursorTracker { - pub async fn moved(&self, path: &str, start: RowColumn, end: RowColumn) -> Result<(), CodempError> { +#[async_trait] +impl Controller for CursorTracker { + type Input = (String, RowColumn, RowColumn); + + async fn send(&self, (buffer, start, end): Self::Input) -> Result<(), CodempError> { Ok(self.op.send(CursorPosition { user: self.uid.clone(), - buffer: path.to_string(), - start: start.into(), - end: end.into(), + start: Some(start), + end: Some(end), + buffer, }).await?) } // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? - pub async fn recv(&self) -> Result { + async fn recv(&self) -> Result { let mut stream = self.stream.lock().await; match stream.recv().await { Ok(x) => Ok(x), @@ -51,14 +54,14 @@ impl CursorTracker { // } } -pub(crate) struct CursorPositionTrackerWorker { +pub(crate) struct CursorTrackerWorker { uid: String, producer: mpsc::Sender, op: mpsc::Receiver, channel: Arc>, } -impl CursorPositionTrackerWorker { +impl CursorTrackerWorker { pub(crate) fn new(uid: String) -> Self { let (op_tx, op_rx) = mpsc::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64); @@ -69,8 +72,15 @@ impl CursorPositionTrackerWorker { channel: Arc::new(cur_tx), } } +} - pub(crate) fn subscribe(&self) -> CursorTracker { +#[async_trait] +impl ControllerWorker for CursorTrackerWorker { + type Controller = CursorTracker; + type Tx = CursorClient; + type Rx = Streaming; + + fn subscribe(&self) -> CursorTracker { CursorTracker { uid: self.uid.clone(), op: self.producer.clone(), @@ -78,12 +88,11 @@ impl CursorPositionTrackerWorker { } } - // TODO is it possible to avoid passing directly tonic Streaming and proto BufferClient ? - pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { loop { tokio::select!{ Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { todo!() } // tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, else => break, } } diff --git a/src/errors.rs b/src/errors.rs index 54caa6b..b9c1f80 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ use std::{error::Error, fmt::Display}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, broadcast}; use tonic::{Status, Code}; use tracing::warn; @@ -66,3 +66,9 @@ impl From> for CodempError { CodempError::Channel { send: true } } } + +impl From for CodempError { + fn from(_value: broadcast::error::RecvError) -> Self { + CodempError::Channel { send: false } + } +} diff --git a/src/lib.rs b/src/lib.rs index bdd43d3..e376b81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod errors; pub mod buffer; pub mod state; +pub mod client; pub use tonic; pub use tokio; @@ -19,12 +20,18 @@ pub use errors::CodempError; #[tonic::async_trait] // TODO move this somewhere? pub(crate) trait ControllerWorker { - fn subscribe(&self) -> T; - async fn work(self); + type Controller : Controller; + type Tx; + type Rx; + + fn subscribe(&self) -> Self::Controller; + async fn work(self, tx: Self::Tx, rx: Self::Rx); } #[tonic::async_trait] pub trait Controller { + type Input; + + async fn send(&self, x: Self::Input) -> Result<(), CodempError>; async fn recv(&self) -> Result; - async fn send(&self, x: T) -> Result<(), CodempError>; } From f8e77f08279e7bc89176ab1e051716a091f10a17 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 23:09:47 +0200 Subject: [PATCH 9/9] feat: reworked client, added static instance --- proto/cursor.proto | 16 ++-- src/buffer/controller.rs | 106 +++++++------------------ src/buffer/mod.rs | 12 ++- src/buffer/{handle.rs => worker.rs} | 57 +++----------- src/client.rs | 117 ++++++++++++++++++++++++++++ src/cursor/controller.rs | 51 ++++++++++++ src/cursor/mod.rs | 17 ++-- src/cursor/tracker.rs | 101 ------------------------ src/cursor/worker.rs | 54 +++++++++++++ src/errors.rs | 3 + src/instance.rs | 81 +++++++++++++++++++ src/lib.rs | 4 +- src/state.rs | 64 --------------- 13 files changed, 378 insertions(+), 305 deletions(-) rename src/buffer/{handle.rs => worker.rs} (64%) create mode 100644 src/client.rs create mode 100644 src/cursor/controller.rs delete mode 100644 src/cursor/tracker.rs create mode 100644 src/cursor/worker.rs create mode 100644 src/instance.rs delete mode 100644 src/state.rs diff --git a/proto/cursor.proto b/proto/cursor.proto index 39d40df..5af4fa2 100644 --- a/proto/cursor.proto +++ b/proto/cursor.proto @@ -3,22 +3,26 @@ syntax = "proto3"; package codemp.cursor; service Cursor { - rpc Moved (CursorPosition) returns (MovedResponse); - rpc Listen (UserIdentity) returns (stream CursorPosition); + rpc Moved (CursorEvent) returns (MovedResponse); + rpc Listen (UserIdentity) returns (stream CursorEvent); } message MovedResponse {} -message RowColumn { +message RowCol { int32 row = 1; int32 col = 2; } message CursorPosition { + string buffer = 1; + RowCol start = 2; + RowCol end = 3; +} + +message CursorEvent { string user = 1; - string buffer = 2; - RowColumn start = 3; - RowColumn end = 4; + CursorPosition position = 2; } message UserIdentity { diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 45c0d59..73e967e 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,89 +1,41 @@ -use tonic::{transport::Channel, Status}; -use uuid::Uuid; +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tonic::async_trait; -use crate::{ - ControllerWorker, - buffer::handle::{BufferHandle, OperationControllerWorker}, - proto::{buffer_client::BufferClient, BufferPayload}, -}; +use crate::{Controller, CodempError}; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; + +use super::TextChange; -#[derive(Clone)] pub struct BufferController { - id: String, - client: BufferClient, + content: watch::Receiver, + operations: mpsc::Sender, + stream: Mutex>, } -impl From::> for BufferController { - fn from(value: BufferClient) -> Self { - BufferController { id: Uuid::new_v4().to_string(), client: value } +#[async_trait] +impl OperationFactory for BufferController { + fn content(&self) -> String { + self.content.borrow().clone() } } -impl BufferController { - pub async fn new(dest: &str) -> Result { - Ok(BufferClient::connect(dest.to_string()).await?.into()) +#[async_trait] +impl Controller for BufferController { + type Input = OperationSeq; + + async fn recv(&self) -> Result { + let op = self.stream.lock().await.recv().await?; + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Ok(TextChange { span, content }) } - pub fn id(&self) -> &str { &self.id } - - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), Status> { - let req = BufferPayload { - path: path.to_string(), - content: content.map(|x| x.to_string()), - user: self.id.clone(), - }; - - self.client.create(req).await?; - - Ok(()) - } - - // pub async fn listen(&mut self) -> Result { - // let req = BufferPayload { - // path: "".into(), - // content: None, - // user: self.id.clone(), - // }; - - // let stream = self.client.listen(req).await?.into_inner(); - - // let controller = CursorTrackerWorker::new(self.id().to_string()); - // let handle = controller.subscribe(); - // let client = self.client.clone(); - - // tokio::spawn(async move { - // tracing::debug!("cursor worker started"); - // controller.work(stream, client).await; - // tracing::debug!("cursor worker stopped"); - // }); - - // Ok(handle) - // } - - pub async fn attach(&mut self, path: &str) -> Result { - let req = BufferPayload { - path: path.to_string(), - content: None, - user: self.id.clone(), - }; - - let content = self.client.sync(req.clone()) - .await? - .into_inner() - .content; - - let stream = self.client.attach(req).await?.into_inner(); - - let controller = OperationControllerWorker::new(self.id().to_string(), &content, path); - let factory = controller.subscribe(); - let client = self.client.clone(); - - tokio::spawn(async move { - tracing::debug!("buffer worker started"); - controller.work(client, stream).await; - tracing::debug!("buffer worker stopped"); - }); - - Ok(factory) + async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { + Ok(self.operations.send(op).await?) } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 7fe80e1..922e217 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -1,3 +1,11 @@ -pub mod factory; +use std::ops::Range; + +pub(crate) mod worker; pub mod controller; -pub mod handle; +pub mod factory; + + +pub struct TextChange { + pub span: Range, + pub content: String, +} diff --git a/src/buffer/handle.rs b/src/buffer/worker.rs similarity index 64% rename from src/buffer/handle.rs rename to src/buffer/worker.rs index bbebc66..43bb008 100644 --- a/src/buffer/handle.rs +++ b/src/buffer/worker.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, collections::VecDeque, ops::Range}; +use std::{sync::Arc, collections::VecDeque}; use operational_transform::OperationSeq; use tokio::sync::{watch, mpsc, broadcast, Mutex}; @@ -7,48 +7,13 @@ use tonic::{async_trait, Streaming}; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; -use crate::{ControllerWorker, Controller, CodempError}; -use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; +use crate::ControllerWorker; -pub struct TextChange { - pub span: Range, - pub content: String, -} +use super::TextChange; +use super::controller::BufferController; -pub struct BufferHandle { - content: watch::Receiver, - operations: mpsc::Sender, - stream: Mutex>, -} -#[async_trait] -impl OperationFactory for BufferHandle { - fn content(&self) -> String { - self.content.borrow().clone() - } -} - -#[async_trait] -impl Controller for BufferHandle { - type Input = OperationSeq; - - async fn recv(&self) -> Result { - let op = self.stream.lock().await.recv().await?; - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Ok(TextChange { span, content }) - } - - async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { - Ok(self.operations.send(op).await?) - } -} - -pub(crate) struct OperationControllerWorker { +pub(crate) struct BufferControllerWorker { uid: String, pub(crate) content: watch::Sender, pub(crate) operations: mpsc::Receiver, @@ -60,12 +25,12 @@ pub(crate) struct OperationControllerWorker { path: String, } -impl OperationControllerWorker { +impl BufferControllerWorker { pub fn new(uid: String, buffer: &str, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::channel(64); let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { + BufferControllerWorker { uid, content: txt_tx, operations: op_rx, @@ -80,13 +45,13 @@ impl OperationControllerWorker { } #[async_trait] -impl ControllerWorker for OperationControllerWorker { - type Controller = BufferHandle; +impl ControllerWorker for BufferControllerWorker { + type Controller = BufferController; type Tx = BufferClient; type Rx = Streaming; - fn subscribe(&self) -> BufferHandle { - BufferHandle { + fn subscribe(&self) -> BufferController { + BufferController { content: self.receiver.clone(), operations: self.sender.clone(), stream: Mutex::new(self.stream.subscribe()), diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..9691ee2 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,117 @@ +use std::{sync::Arc, collections::BTreeMap}; + +use tonic::transport::Channel; + +use crate::{ + cursor::{worker::CursorControllerWorker, controller::CursorController}, + proto::{ + buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, + }, + CodempError, ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, +}; + + +pub struct CodempClient { + id: String, + client: ServiceClients, + workspace: Option, +} + +struct ServiceClients { + buffer: BufferClient, + cursor: CursorClient, +} + +struct Workspace { + cursor: Arc, + buffers: BTreeMap>, +} + + +impl CodempClient { + pub async fn new(dst: &str) -> Result { + let buffer = BufferClient::connect(dst.to_string()).await?; + let cursor = CursorClient::connect(dst.to_string()).await?; + let id = uuid::Uuid::new_v4().to_string(); + + Ok(CodempClient { id, client: ServiceClients { buffer, cursor}, workspace: None }) + } + + pub fn get_cursor(&self) -> Option> { + Some(self.workspace?.cursor.clone()) + } + + pub fn get_buffer(&self, path: &str) -> Option> { + self.workspace?.buffers.get(path).cloned() + } + + pub async fn join(&mut self, _session: &str) -> Result, CodempError> { + // TODO there is no real workspace handling in codemp server so it behaves like one big global + // session. I'm still creating this to start laying out the proper use flow + let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); + + let controller = CursorControllerWorker::new(self.id.clone()); + let client = self.client.cursor.clone(); + + let handle = Arc::new(controller.subscribe()); + + tokio::spawn(async move { + tracing::debug!("cursor worker started"); + controller.work(client, stream).await; + tracing::debug!("cursor worker stopped"); + }); + + self.workspace = Some( + Workspace { + cursor: handle.clone(), + buffers: BTreeMap::new() + } + ); + + Ok(handle) + } + + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), CodempError> { + if let Some(workspace) = &self.workspace { + self.client.buffer + .create(BufferPayload { + user: self.id.clone(), + path: path.to_string(), + content: content.map(|x| x.to_string()), + }).await?; + + Ok(()) + } else { + Err(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + } + + pub async fn attach(&mut self, path: &str, content: Option<&str>) -> Result, CodempError> { + if let Some(workspace) = &mut self.workspace { + let mut client = self.client.buffer.clone(); + let req = BufferPayload { + path: path.to_string(), user: self.id.clone(), content: None + }; + + let content = client.sync(req.clone()).await?.into_inner().content; + + let stream = client.attach(req).await?.into_inner(); + + let controller = BufferControllerWorker::new(self.id.clone(), &content, path); + let handler = Arc::new(controller.subscribe()); + + let _path = path.to_string(); + tokio::spawn(async move { + tracing::debug!("buffer[{}] worker started", _path); + controller.work(client, stream).await; + tracing::debug!("buffer[{}] worker stopped", _path); + }); + + workspace.buffers.insert(path.to_string(), handler.clone()); + + Ok(handler) + } else { + Err(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + } +} diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs new file mode 100644 index 0000000..7e879a7 --- /dev/null +++ b/src/cursor/controller.rs @@ -0,0 +1,51 @@ +use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; +use tonic::async_trait; + +use crate::{proto::{CursorPosition, CursorEvent}, CodempError, Controller}; + +pub struct CursorController { + uid: String, + op: mpsc::Sender, + stream: Mutex>, +} + +#[async_trait] +impl Controller for CursorController { + type Input = CursorPosition; + + async fn send(&self, cursor: CursorPosition) -> Result<(), CodempError> { + Ok(self.op.send(CursorEvent { + user: self.uid.clone(), + position: Some(cursor), + }).await?) + } + + // TODO is this cancelable? so it can be used in tokio::select! + // TODO is the result type overkill? should be an option? + async fn recv(&self) -> Result { + let mut stream = self.stream.lock().await; + match stream.recv().await { + Ok(x) => Ok(x), + Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Ok(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + // fn try_poll(&self) -> Option> { + // match self.stream.try_lock() { + // Err(_) => None, + // Ok(mut x) => match x.try_recv() { + // Ok(x) => Some(Some(x)), + // Err(TryRecvError::Empty) => None, + // Err(TryRecvError::Closed) => Some(None), + // Err(TryRecvError::Lagged(n)) => { + // tracing::error!("cursor channel lagged behind, skipping {} events", n); + // Some(Some(x.try_recv().expect("could not receive after lagging"))) + // } + // } + // } + // } +} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 58a0abe..c6fbdd9 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,25 +1,26 @@ -pub mod tracker; +pub(crate) mod worker; +pub mod controller; -use crate::proto::{RowColumn, CursorPosition}; +use crate::proto::{RowCol, CursorPosition}; -impl From:: for (i32, i32) { - fn from(pos: RowColumn) -> (i32, i32) { +impl From:: for (i32, i32) { + fn from(pos: RowCol) -> (i32, i32) { (pos.row, pos.col) } } -impl From::<(i32, i32)> for RowColumn { +impl From::<(i32, i32)> for RowCol { fn from((row, col): (i32, i32)) -> Self { - RowColumn { row, col } + RowCol { row, col } } } impl CursorPosition { - pub fn start(&self) -> RowColumn { + pub fn start(&self) -> RowCol { self.start.clone().unwrap_or((0, 0).into()) } - pub fn end(&self) -> RowColumn { + pub fn end(&self) -> RowCol { self.end.clone().unwrap_or((0, 0).into()) } } diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs deleted file mode 100644 index 6983f40..0000000 --- a/src/cursor/tracker.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; -use tonic::{Streaming, transport::Channel, async_trait}; - -use crate::{proto::{CursorPosition, cursor_client::CursorClient, RowColumn}, errors::IgnorableError, CodempError, Controller, ControllerWorker}; - -pub struct CursorTracker { - uid: String, - op: mpsc::Sender, - stream: Mutex>, -} - -#[async_trait] -impl Controller for CursorTracker { - type Input = (String, RowColumn, RowColumn); - - async fn send(&self, (buffer, start, end): Self::Input) -> Result<(), CodempError> { - Ok(self.op.send(CursorPosition { - user: self.uid.clone(), - start: Some(start), - end: Some(end), - buffer, - }).await?) - } - - // TODO is this cancelable? so it can be used in tokio::select! - // TODO is the result type overkill? should be an option? - async fn recv(&self) -> Result { - let mut stream = self.stream.lock().await; - match stream.recv().await { - Ok(x) => Ok(x), - Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), - Err(RecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Ok(stream.recv().await.expect("could not receive after lagging")) - } - } - } - - // fn try_poll(&self) -> Option> { - // match self.stream.try_lock() { - // Err(_) => None, - // Ok(mut x) => match x.try_recv() { - // Ok(x) => Some(Some(x)), - // Err(TryRecvError::Empty) => None, - // Err(TryRecvError::Closed) => Some(None), - // Err(TryRecvError::Lagged(n)) => { - // tracing::error!("cursor channel lagged behind, skipping {} events", n); - // Some(Some(x.try_recv().expect("could not receive after lagging"))) - // } - // } - // } - // } -} - -pub(crate) struct CursorTrackerWorker { - uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, -} - -impl CursorTrackerWorker { - pub(crate) fn new(uid: String) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); - let (cur_tx, _cur_rx) = broadcast::channel(64); - Self { - uid, - producer: op_tx, - op: op_rx, - channel: Arc::new(cur_tx), - } - } -} - -#[async_trait] -impl ControllerWorker for CursorTrackerWorker { - type Controller = CursorTracker; - type Tx = CursorClient; - type Rx = Streaming; - - fn subscribe(&self) -> CursorTracker { - CursorTracker { - uid: self.uid.clone(), - op: self.producer.clone(), - stream: Mutex::new(self.channel.subscribe()), - } - } - - async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { - loop { - tokio::select!{ - Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, - else => break, - } - } - } -} - diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs new file mode 100644 index 0000000..42fa694 --- /dev/null +++ b/src/cursor/worker.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, broadcast::{self}, Mutex}; +use tonic::{Streaming, transport::Channel, async_trait}; + +use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; + +use super::controller::CursorController; + +pub(crate) struct CursorControllerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorControllerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + Self { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } +} + +#[async_trait] +impl ControllerWorker for CursorControllerWorker { + type Controller = CursorController; + type Tx = CursorClient; + type Rx = Streaming; + + fn subscribe(&self) -> CursorController { + CursorController { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: Mutex::new(self.channel.subscribe()), + } + } + + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + loop { + tokio::select!{ + Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + else => break, + } + } + } +} + diff --git a/src/errors.rs b/src/errors.rs index b9c1f80..6b923a7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -28,6 +28,9 @@ pub enum CodempError { Channel { send: bool }, + InvalidState { + msg: String, + }, // TODO filler error, remove later Filler { diff --git a/src/instance.rs b/src/instance.rs new file mode 100644 index 0000000..1421d29 --- /dev/null +++ b/src/instance.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::{ + buffer::controller::BufferController, + errors::CodempError, client::CodempClient, cursor::controller::CursorController, +}; + + +use tokio::runtime::Runtime; + +const CODEMP_DEFAULT_HOST : &str = "http://alemi.dev:50051"; + +lazy_static::lazy_static! { + static ref RUNTIME : Runtime = Runtime::new().expect("could not create tokio runtime"); + static ref INSTANCE : Instance = Instance::default(); +} + +pub struct Instance { + client: Mutex>, +} + +impl Default for Instance { + fn default() -> Self { + Instance { client: Mutex::new(None) } + } +} + +// TODO these methods repeat a lot of code but Mutex makes it hard to simplify + +impl Instance { + pub async fn connect(&self, addr: &str) -> Result<(), CodempError> { + *self.client.lock().await = Some(CodempClient::new(addr).await?); + Ok(()) + } + + pub async fn join(&self, session: &str) -> Result<(), CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .join(session) + .await?; + + Ok(()) + } + + pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .create(path, content) + .await?; + + Ok(()) + } + + pub async fn get_cursor(&self) -> Result, CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .get_cursor() + .ok_or(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + + pub async fn get_buffer(&self, path: &str) -> Result, CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .get_buffer(path) + .ok_or(CodempError::InvalidState { msg: "join a workspace or create requested buffer first".into() }) + } +} diff --git a/src/lib.rs b/src/lib.rs index e376b81..0975600 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,11 @@ pub mod cursor; pub mod errors; pub mod buffer; -pub mod state; pub mod client; +#[cfg(feature = "static")] +pub mod instance; + pub use tonic; pub use tokio; pub use operational_transform as ot; diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index 5755896..0000000 --- a/src/state.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc}; - -use tokio::sync::RwLock; - -use crate::{ - buffer::{controller::BufferController, handle::BufferHandle}, - errors::CodempError, -}; - - -#[cfg(feature = "static")] -pub mod instance { - use tokio::runtime::Runtime; - use super::Workspace; - - const CODEMP_DEFAULT_HOST : &str = "http://fantabos.co:50051"; - - lazy_static::lazy_static! { - static ref RUNTIME : Runtime = Runtime::new().expect("could not create tokio runtime"); - static ref WORKSPACE : Workspace = RUNTIME.block_on( - Workspace::new(&std::env::var("CODEMP_HOST").unwrap_or(CODEMP_DEFAULT_HOST.into())) - ).expect("could not create codemp workspace"); - } -} - -pub struct Workspace { - buffers: RwLock, Arc>>, - // cursor: Arc, - client: BufferController, -} - -impl Workspace { - pub async fn new(dest: &str) -> Result { - let client = BufferController::new(dest).await?; - // let cursor = Arc::new(client.listen().await?); - Ok( - Workspace { - buffers: RwLock::new(BTreeMap::new()), - // cursor, - client, - } - ) - } - - // Cursor - // pub async fn cursor(&self) -> Arc { - // self.cursor.clone() - // } - - // Buffer - pub async fn buffer(&self, path: &str) -> Option> { - self.buffers.read().await.get(path).cloned() - } - - pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { - Ok(self.client.clone().create(path, content).await?) - } - - pub async fn attach(&self, path: &str) -> Result<(), CodempError> { - let controller = self.client.clone().attach(path).await?; - self.buffers.write().await.insert(path.into(), Arc::new(controller)); - Ok(()) - } -}