From 02b2588073c64a892ef38196b2e63fa8bd684bc0 Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 30 Jul 2023 17:48:55 +0200 Subject: [PATCH] feat: major restructure, workspace rework, tweaks all controllers use internal mutability so that they can all be put behind Arcs --- client/vscode/src/lib.rs | 18 +++-- src/{ => buffer}/client.rs | 20 ++--- .../buffer.rs => buffer/controller.rs} | 45 +++++------ src/{ => buffer}/factory.rs | 21 ++++- src/buffer/mod.rs | 3 + src/controller/mod.rs | 30 ------- .../cursor.rs => cursor/controller.rs} | 46 +++++++---- src/{cursor.rs => cursor/mod.rs} | 2 + src/errors.rs | 44 +++++++++++ src/lib.rs | 16 +++- src/proto.rs | 1 - src/workspace.rs | 57 ------------- src/workspace/controller.rs | 79 +++++++++++++++++++ src/workspace/mod.rs | 1 + 14 files changed, 237 insertions(+), 146 deletions(-) rename src/{ => buffer}/client.rs (84%) rename src/{controller/buffer.rs => buffer/controller.rs} (77%) rename src/{ => buffer}/factory.rs (74%) create mode 100644 src/buffer/mod.rs delete mode 100644 src/controller/mod.rs rename src/{controller/cursor.rs => cursor/controller.rs} (59%) rename src/{cursor.rs => cursor/mod.rs} (95%) delete mode 100644 src/proto.rs delete mode 100644 src/workspace.rs create mode 100644 src/workspace/controller.rs create mode 100644 src/workspace/mod.rs diff --git a/client/vscode/src/lib.rs b/client/vscode/src/lib.rs index 7ad69ef..4c2794f 100644 --- a/client/vscode/src/lib.rs +++ b/client/vscode/src/lib.rs @@ -3,9 +3,13 @@ use std::sync::Arc; use neon::prelude::*; use once_cell::sync::OnceCell; use codemp::{ - controller::{cursor::{CursorSubscriber, CursorControllerHandle}, buffer::{OperationControllerHandle, OperationControllerSubscriber}}, - client::CodempClient, - proto::buffer_client::BufferClient, factory::OperationFactory, + cursor::controller::{CursorSubscriber, CursorControllerHandle}, + buffer::{ + controller::{OperationControllerHandle, OperationControllerSubscriber}, + client::CodempClient, + factory::OperationFactory, + }, + proto::buffer_client::BufferClient, }; use codemp::tokio::{runtime::Runtime, sync::Mutex}; @@ -75,7 +79,7 @@ fn create_client(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); runtime(&mut cx)?.spawn(async move { - match rc.lock().await.create(path, content).await { + match rc.lock().await.create(&path, content.as_deref()).await { Ok(accepted) => deferred.settle_with(&channel, move |mut cx| Ok(cx.boolean(accepted))), Err(e) => deferred.settle_with(&channel, move |mut cx| cx.throw_error::>(e.to_string())), } @@ -123,7 +127,7 @@ fn attach_client(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); runtime(&mut cx)?.spawn(async move { - match rc.lock().await.attach(path).await { + match rc.lock().await.attach(&path).await { Ok(controller) => { deferred.settle_with(&channel, move |mut cx| { let obj = cx.empty_object(); @@ -183,7 +187,7 @@ fn callback_operation(mut cx: FunctionContext) -> JsResult { let boxed : Handle> = this.get(&mut cx, "boxed")?; let callback = Arc::new(cx.argument::(0)?.root(&mut cx)); - let mut rc = boxed.0.clone(); + let rc = boxed.0.clone(); let channel = cx.channel(); // TODO when garbage collecting OperationController stop this worker @@ -213,7 +217,7 @@ fn callback_cursor(mut cx: FunctionContext) -> JsResult { let boxed : Handle> = this.get(&mut cx, "boxed")?; let callback = Arc::new(cx.argument::(0)?.root(&mut cx)); - let mut rc = boxed.0.clone(); + let rc = boxed.0.clone(); let channel = cx.channel(); // TODO when garbage collecting OperationController stop this worker diff --git a/src/client.rs b/src/buffer/client.rs similarity index 84% rename from src/client.rs rename to src/buffer/client.rs index 69fae5d..aff3254 100644 --- a/src/client.rs +++ b/src/buffer/client.rs @@ -3,10 +3,9 @@ use tonic::{transport::Channel, Status, Streaming, async_trait}; use uuid::Uuid; use crate::{ - controller::{ControllerWorker, - cursor::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, - buffer::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker} - }, + ControllerWorker, + cursor::controller::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, + buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest, Cursor}, }; @@ -27,11 +26,12 @@ impl CodempClient { Ok(BufferClient::connect(dest.to_string()).await?.into()) } - pub fn id(&self) -> &str { &self.id } + pub fn id(&self) -> &str { &self.id } - pub async fn create(&mut self, path: String, content: Option) -> Result { + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { let req = BufferPayload { - path, content, + path: path.to_string(), + content: content.map(|x| x.to_string()), user: self.id.clone(), }; @@ -61,9 +61,9 @@ impl CodempClient { Ok(handle) } - pub async fn attach(&mut self, path: String) -> Result { + pub async fn attach(&mut self, path: &str) -> Result { let req = BufferPayload { - path: path.clone(), + path: path.to_string(), content: None, user: self.id.clone(), }; @@ -76,7 +76,7 @@ impl CodempClient { let stream = self.client.attach(req).await?.into_inner(); - let controller = OperationControllerWorker::new((self.clone(), stream), content, path); + let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); let factory = controller.subscribe(); tokio::spawn(async move { diff --git a/src/controller/buffer.rs b/src/buffer/controller.rs similarity index 77% rename from src/controller/buffer.rs rename to src/buffer/controller.rs index 11ca642..f7dcd93 100644 --- a/src/controller/buffer.rs +++ b/src/buffer/controller.rs @@ -1,12 +1,12 @@ use std::{sync::Arc, collections::VecDeque, ops::Range}; use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, broadcast}; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::async_trait; -use super::{leading_noop, tailing_noop, ControllerWorker}; +use crate::ControllerWorker; use crate::errors::IgnorableError; -use crate::factory::OperationFactory; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; pub struct TextChange { pub span: Range, @@ -15,26 +15,16 @@ pub struct TextChange { #[async_trait] pub trait OperationControllerSubscriber { - async fn poll(&mut self) -> Option; + async fn poll(&self) -> Option; async fn apply(&self, op: OperationSeq); } +#[derive(Clone)] pub struct OperationControllerHandle { content: watch::Receiver, operations: mpsc::Sender, original: Arc>, - stream: broadcast::Receiver, -} - -impl Clone for OperationControllerHandle { - fn clone(&self) -> Self { - OperationControllerHandle { - content: self.content.clone(), - operations: self.operations.clone(), - original: self.original.clone(), - stream: self.original.subscribe(), - } - } + stream: Arc>>, } #[async_trait] @@ -46,8 +36,8 @@ impl OperationFactory for OperationControllerHandle { #[async_trait] impl OperationControllerSubscriber for OperationControllerHandle { - async fn poll(&mut self) -> Option { - let op = self.stream.recv().await.ok()?; + async fn poll(&self) -> Option { + let op = self.stream.lock().await.recv().await.ok()?; let after = self.content.borrow().clone(); let skip = leading_noop(op.ops()) as usize; let before_len = op.base_len(); @@ -61,6 +51,15 @@ impl OperationControllerSubscriber for OperationControllerHandle { self.operations.send(op).await .unwrap_or_warn("could not apply+send operation") } + + // fn subscribe(&self) -> Self { + // OperationControllerHandle { + // content: self.content.clone(), + // operations: self.operations.clone(), + // original: self.original.clone(), + // stream: Arc::new(Mutex::new(self.original.subscribe())), + // } + // } } #[async_trait] @@ -88,7 +87,7 @@ impl ControllerWorker ControllerWorker OperationControllerWorker { - pub fn new(client: C, buffer: String, path: String) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.clone()); + pub fn new(client: C, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::channel(64); let (s_tx, _s_rx) = broadcast::channel(64); OperationControllerWorker { @@ -133,7 +132,9 @@ impl OperationControllerWorker { receiver: txt_rx, sender: op_tx, queue: VecDeque::new(), - client, buffer, path + buffer: buffer.to_string(), + path: path.to_string(), + client, } } } diff --git a/src/factory.rs b/src/buffer/factory.rs similarity index 74% rename from src/factory.rs rename to src/buffer/factory.rs index b9c47c2..4c175e0 100644 --- a/src/factory.rs +++ b/src/buffer/factory.rs @@ -1,6 +1,25 @@ -use operational_transform::OperationSeq; +use std::ops::Range; + +use operational_transform::{OperationSeq, Operation}; use similar::{TextDiff, ChangeTag}; +pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } +pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } + +const fn count_noop(op: Option<&Operation>) -> u64 { + match op { + None => 0, + Some(Operation::Retain(n)) => *n, + Some(_) => 0, + } +} + +pub fn op_effective_range(op: &OperationSeq) -> Range { + let first = leading_noop(op.ops()); + let last = op.base_len() as u64 - tailing_noop(op.ops()); + first..last +} + pub trait OperationFactory { fn content(&self) -> String; diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs new file mode 100644 index 0000000..ca7ad3c --- /dev/null +++ b/src/buffer/mod.rs @@ -0,0 +1,3 @@ +pub mod factory; +pub mod client; +pub mod controller; diff --git a/src/controller/mod.rs b/src/controller/mod.rs deleted file mode 100644 index 86ca589..0000000 --- a/src/controller/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -pub mod buffer; -pub mod cursor; - -use std::ops::Range; - -use operational_transform::{Operation, OperationSeq}; -use tonic::async_trait; - -#[async_trait] -pub trait ControllerWorker { - fn subscribe(&self) -> T; - async fn work(self); -} - -pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } -pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } - -const fn count_noop(op: Option<&Operation>) -> u64 { - match op { - None => 0, - Some(Operation::Retain(n)) => *n, - Some(_) => 0, - } -} - -pub fn op_effective_range(op: &OperationSeq) -> Range { - let first = leading_noop(op.ops()); - let last = op.base_len() as u64 - tailing_noop(op.ops()); - first..last -} diff --git a/src/controller/cursor.rs b/src/cursor/controller.rs similarity index 59% rename from src/controller/cursor.rs rename to src/cursor/controller.rs index 267e358..54985e4 100644 --- a/src/controller/cursor.rs +++ b/src/cursor/controller.rs @@ -1,30 +1,31 @@ use std::sync::Arc; -use tokio::sync::{mpsc, broadcast}; +use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex}; use tonic::async_trait; -use crate::{proto::{Position, Cursor}, errors::IgnorableError, controller::ControllerWorker}; +use crate::{proto::{Position, Cursor}, errors::IgnorableError, ControllerWorker}; #[async_trait] pub trait CursorSubscriber { async fn send(&self, path: &str, start: Position, end: Position); - async fn poll(&mut self) -> Option; + async fn poll(&self) -> Option; + fn try_poll(&self) -> Option>; // TODO fuck this fuck neovim } pub struct CursorControllerHandle { uid: String, op: mpsc::Sender, - stream: broadcast::Receiver, original: Arc>, + stream: Mutex>, } impl Clone for CursorControllerHandle { fn clone(&self) -> Self { - Self { + CursorControllerHandle { uid: self.uid.clone(), op: self.op.clone(), - stream: self.original.subscribe(), - original: self.original.clone() + original: self.original.clone(), + stream: Mutex::new(self.original.subscribe()), } } } @@ -40,12 +41,30 @@ impl CursorSubscriber for CursorControllerHandle { }).await.unwrap_or_warn("could not send cursor op") } - async fn poll(&mut self) -> Option { - match self.stream.recv().await { + // TODO is this cancelable? so it can be used in tokio::select! + async fn poll(&self) -> Option { + let mut stream = self.stream.lock().await; + match stream.recv().await { Ok(x) => Some(x), - Err(e) => { - tracing::warn!("could not poll for cursor: {}", e); - None + Err(RecvError::Closed) => None, + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Some(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + fn try_poll(&self) -> Option> { + match self.stream.try_lock() { + Err(_) => None, + Ok(mut x) => match x.try_recv() { + Ok(x) => Some(Some(x)), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Closed) => Some(None), + Err(TryRecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Some(Some(x.try_recv().expect("could not receive after lagging"))) + } } } } @@ -84,8 +103,8 @@ impl ControllerWorker for Curso CursorControllerHandle { uid: self.uid.clone(), op: self.producer.clone(), - stream: self.channel.subscribe(), original: self.channel.clone(), + stream: Mutex::new(self.channel.subscribe()), } } @@ -98,7 +117,6 @@ impl ControllerWorker for Curso } } } - } diff --git a/src/cursor.rs b/src/cursor/mod.rs similarity index 95% rename from src/cursor.rs rename to src/cursor/mod.rs index 4ee265e..b4cfefc 100644 --- a/src/cursor.rs +++ b/src/cursor/mod.rs @@ -1,3 +1,5 @@ +pub mod controller; + use crate::proto::{Position, Cursor}; impl From:: for (i32, i32) { diff --git a/src/errors.rs b/src/errors.rs index b8dd254..b3f53cc 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,6 @@ +use std::{error::Error, fmt::Display}; + +use tonic::{Status, Code}; use tracing::warn; pub trait IgnorableError { @@ -13,3 +16,44 @@ where E : std::fmt::Display { } } } + +// TODO split this into specific errors for various parts of the library +#[derive(Debug)] +pub enum CodempError { + Transport { + status: Code, + message: String, + }, + Channel { }, + + // TODO filler error, remove later + Filler { + message: String, + }, +} + +impl Error for CodempError {} + +impl Display for CodempError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Transport { status, message } => write!(f, "Transport error: ({}) {}", status, message), + Self::Channel { } => write!(f, "Channel error"), + _ => write!(f, "Unknown error"), + } + } +} + +impl From for CodempError { + fn from(status: Status) -> Self { + CodempError::Transport { status: status.code(), message: status.message().to_string() } + } +} + +impl From for CodempError { + fn from(err: tonic::transport::Error) -> Self { + CodempError::Transport { + status: Code::Unknown, message: format!("underlying transport error: {:?}", err) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 2debfb8..46fb964 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,20 @@ -pub mod proto; -pub mod client; pub mod workspace; -pub mod controller; pub mod cursor; pub mod errors; -pub mod factory; +pub mod buffer; pub use tonic; pub use tokio; pub use operational_transform as ot; +use tonic::async_trait; + +#[async_trait] // TODO move this somewhere? +pub trait ControllerWorker { + fn subscribe(&self) -> T; + async fn work(self); +} + +pub mod proto { + tonic::include_proto!("buffer"); +} diff --git a/src/proto.rs b/src/proto.rs deleted file mode 100644 index 4e168ef..0000000 --- a/src/proto.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("buffer"); diff --git a/src/workspace.rs b/src/workspace.rs deleted file mode 100644 index 56609da..0000000 --- a/src/workspace.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{collections::BTreeMap, ops::Range}; - -use tokio::sync::RwLock; -use tonic::Status; - -use crate::{ - client::CodempClient, factory::OperationFactory, - controller::{buffer::{OperationControllerHandle, OperationControllerSubscriber}, - cursor::{CursorControllerHandle, CursorSubscriber}}, proto::Cursor, -}; - -pub struct Workspace { - client: CodempClient, - buffers: RwLock>, - cursor: RwLock, -} - -impl Workspace { - pub async fn new(mut client: CodempClient) -> Result { - Ok( - Workspace { - cursor: RwLock::new(client.listen().await?), - client, - buffers: RwLock::new(BTreeMap::new()), - } - ) - } - - pub async fn create(&self, path: &str, content: Option) -> Result { - self.client.clone().create(path.into(), content).await - } - - pub async fn attach(&self, path: String) -> Result<(), Status> { - self.buffers.write().await.insert( - path.clone(), - self.client.clone().attach(path).await? - ); - Ok(()) - } - - pub async fn diff(&self, path: &str, span: Range, text: &str) { - if let Some(controller) = self.buffers.read().await.get(path) { - if let Some(op) = controller.delta(span.start, text, span.end) { - controller.apply(op).await - } - } - } - - pub async fn send(&self, path: &str, start: (i32, i32), end: (i32, i32)) { - self.cursor.read().await.send(path, start.into(), end.into()).await - } - - pub async fn recv(&self) -> Option { - self.cursor.write().await.poll().await - - } -} diff --git a/src/workspace/controller.rs b/src/workspace/controller.rs new file mode 100644 index 0000000..3b6cab4 --- /dev/null +++ b/src/workspace/controller.rs @@ -0,0 +1,79 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use tokio::sync::RwLock; +use tonic::async_trait; + +use crate::{ + buffer::{client::CodempClient, controller::OperationControllerSubscriber}, + cursor::controller::CursorSubscriber, errors::CodempError, +}; + +pub struct Workspace { + client: CodempClient, + buffers: RwLock, BufferController>>, + cursor: CursorController, +} + +pub type CursorController = Arc; +pub type BufferController = Arc; + +#[async_trait] +pub trait WorkspaceHandle { + fn cursor(&self) -> CursorController; + async fn buffer(&self, path: &str) -> Option; + async fn attach(&self, path: &str) -> Result<(), CodempError>; + async fn create(&self, path: &str, content: Option<&str>) -> Result; +} + +impl Workspace { + pub async fn new(dest: &str) -> Result { + let mut client = CodempClient::new(dest).await?; + let cursor = Arc::new(client.listen().await?); + Ok( + Workspace { + buffers: RwLock::new(BTreeMap::new()), + cursor, + client, + } + ) + } +} + +#[async_trait] +impl WorkspaceHandle for Workspace { + // Cursor + fn cursor(&self) -> CursorController { + self.cursor.clone() + } + + // Buffer + async fn buffer(&self, path: &str) -> Option { + self.buffers.read().await.get(path).cloned() + } + + async fn create(&self, path: &str, content: Option<&str>) -> Result { + Ok(self.client.clone().create(path, content).await?) + } + + async fn attach(&self, path: &str) -> Result<(), CodempError> { + let controller = self.client.clone().attach(path).await?; + self.buffers.write().await.insert(path.into(), Arc::new(controller)); + Ok(()) + } + + // pub async fn diff(&self, path: &str, span: Range, text: &str) { + // if let Some(controller) = self.inner.read().await.buffers.get(path) { + // if let Some(op) = controller.delta(span.start, text, span.end) { + // controller.apply(op).await + // } + // } + // } + + // async fn send(&self, path: &str, start: (i32, i32), end: (i32, i32)) { + // self.inner.read().await.cursor.send(path, start.into(), end.into()).await + // } + + // pub async fn recv(&self) -> Option { + // self.inner.write().await.cursor.poll().await + // } +} diff --git a/src/workspace/mod.rs b/src/workspace/mod.rs new file mode 100644 index 0000000..cb9e0ac --- /dev/null +++ b/src/workspace/mod.rs @@ -0,0 +1 @@ +pub mod controller;