From bfe84c45e08cc907c7362f7e41d5171c57a3ec0e Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 4 Sep 2024 21:37:35 +0200 Subject: [PATCH] docs: re-documented remaining members --- src/api/cursor.rs | 13 +- src/api/event.rs | 7 + src/api/mod.rs | 11 +- src/api/user.rs | 10 +- src/buffer/controller.rs | 25 +-- src/buffer/mod.rs | 11 +- src/client.rs | 14 +- src/cursor/controller.rs | 23 +-- src/cursor/mod.rs | 9 +- src/ffi/java/mod.rs | 8 + src/ffi/js/mod.rs | 4 + src/ffi/lua.rs | 21 ++- src/ffi/mod.rs | 10 ++ src/ffi/python/mod.rs | 4 + src/workspace.rs | 133 +++++++------- src/workspace/mod.rs | 4 - src/workspace/service.rs | 73 -------- src/workspace/worker.rs | 371 --------------------------------------- 18 files changed, 156 insertions(+), 595 deletions(-) delete mode 100644 src/workspace/mod.rs delete mode 100644 src/workspace/service.rs delete mode 100644 src/workspace/worker.rs diff --git a/src/api/cursor.rs b/src/api/cursor.rs index e238a14..eb253fe 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -1,7 +1,5 @@ -//! # Cursor -//! -//! represents the position of an user's cursor, with -//! information about their identity +//! ### Cursor +//! Represents the position of a remote user's cursor, with their display name use codemp_proto as proto; use uuid::Uuid; @@ -9,15 +7,18 @@ use uuid::Uuid; #[cfg(feature = "python")] use pyo3::prelude::*; -/// user cursor position in a buffer +/// User cursor position in a buffer #[derive(Clone, Debug, Default)] #[cfg_attr(feature = "python", pyclass)] // #[cfg_attr(feature = "python", pyo3(crate = "reexported::pyo3"))] pub struct Cursor { - /// range of text change, as char indexes in buffer previous state + /// Cursor start position in buffer, as 0-indexed row-column tuple pub start: (i32, i32), + /// Cursor end position in buffer, as 0-indexed row-column tuple pub end: (i32, i32), + /// Path of buffer this cursor is on pub buffer: String, + /// User display name, if provided pub user: Option, } diff --git a/src/api/event.rs b/src/api/event.rs index 1d881ec..6c8665f 100644 --- a/src/api/event.rs +++ b/src/api/event.rs @@ -1,10 +1,17 @@ +//! # Event +//! Real time notification of changes in a workspace, to either users or buffers use codemp_proto::workspace::workspace_event::Event as WorkspaceEventInner; +/// Event in a [crate::Workspace] #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] pub enum Event { + /// Fired when the file tree changes + /// containes the modified buffer path (deleted or created or renamed) FileTreeUpdated(String), + /// Fired when an user joins the current workspace UserJoin(String), + /// Fired when an user leaves the current workspace UserLeave(String), } diff --git a/src/api/mod.rs b/src/api/mod.rs index a52da0a..8097b14 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,8 +1,5 @@ -//! # api -//! -//! these traits represent the internal api for the codemp library. -//! more methods and structs are provided but these are the core interfaces to -//! interact with the client. +//! # API +//! These traits and structs represent the main `codemp` library API. /// a generic async provider for bidirectional communication pub mod controller; @@ -13,10 +10,10 @@ pub mod change; /// representation for an user's cursor pub mod cursor; -/// workspace events +/// live events in workspaces pub mod event; -/// data structure for service users +/// data structure for remote users pub mod user; pub use controller::Controller; diff --git a/src/api/user.rs b/src/api/user.rs index ce940b1..36a9017 100644 --- a/src/api/user.rs +++ b/src/api/user.rs @@ -1,15 +1,15 @@ //! # User -//! -//! data structures for our service users +//! An user is identified by an UUID, which should never change. +//! Each user has an username, which can change but should be unique. use uuid::Uuid; -/// represents a service user -/// -/// all users are identified uniquely with UUIDs +/// Represents a service user #[derive(Debug, Clone)] pub struct User { + /// User unique identifier, should never change pub id: Uuid, + /// User name, can change but should be unique pub name: String, } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 4a3cf82..cc6c7e4 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,6 +1,5 @@ -//! ### controller -//! -//! a controller implementation for buffer actions +//! ### Buffer Controller +//! A [Controller] implementation for buffer actions use std::sync::Arc; @@ -15,25 +14,22 @@ use crate::ext::InternallyMutable; use super::worker::DeltaRequest; -/// the buffer controller implementation +/// A [Controller] to asyncrhonously interact with remote buffers /// -/// for each controller a worker exists, managing outgoing and inbound -/// queues, transforming outbound delayed ops and applying remote changes -/// to the local buffer -/// -/// upon dropping this handle will stop the associated worker +/// Each buffer controller internally tracks the last acknowledged state, remaining always in sync +/// with the server while allowing to procedurally receiving changes while still sending new ones. #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "js", napi_derive::napi)] pub struct BufferController(pub(crate) Arc); impl BufferController { - /// unique identifier of buffer - pub fn name(&self) -> &str { + /// Get the buffer path + pub fn path(&self) -> &str { &self.0.name } - /// return buffer whole content, updating internal buffer previous state + /// Return buffer whole content, updating internal acknowledgement tracker pub async fn content(&self) -> ControllerResult { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; @@ -60,8 +56,6 @@ pub(crate) struct BufferControllerInner { #[async_trait] impl Controller for BufferController { - /// block until a text change is available - /// this returns immediately if one is already available async fn poll(&self) -> ControllerResult<()> { if self.0.last_update.get() != *self.0.latest_version.borrow() { return Ok(()); @@ -73,7 +67,6 @@ impl Controller for BufferController { Ok(()) } - /// if a text change is available, return it immediately async fn try_recv(&self) -> ControllerResult> { let last_update = self.0.last_update.get(); let latest_version = self.0.latest_version.borrow().clone(); @@ -89,8 +82,6 @@ impl Controller for BufferController { Ok(change) } - /// enqueue a text change for processing - /// this also updates internal buffer previous state async fn send(&self, op: TextChange) -> ControllerResult<()> { // we let the worker do the updating to the last version and send it back. let (tx, rx) = oneshot::channel(); diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 1610400..47a1221 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -1,10 +1,7 @@ -//! ### buffer -//! -//! ![demo gif of early buffer sync in action](https://cdn.alemi.dev/codemp/demo-vscode.gif) -//! -//! a buffer is a container fo text edited by users. -//! this module contains buffer-related operations and helpers to create Operation Sequences -//! (the underlying chunks of changes sent over the wire) +//! ### Buffer +//! A buffer is a container of text, modifiable in sync by users. +//! It is built on top of [diamond_types] CRDT, guaranteeing that all peers which have received the +//! same set of operations will converge to the same content. /// buffer controller implementation pub mod controller; diff --git a/src/client.rs b/src/client.rs index da368d9..2bdc60b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,7 +19,7 @@ use pyo3::prelude::*; /// /// It generates a new UUID and stores user credentials upon connecting. /// -/// A new [Client] can be obtained with [Client::connect]. +/// A new [`Client`] can be obtained with [`Client::connect`]. #[derive(Debug, Clone)] #[cfg_attr(feature = "js", napi_derive::napi)] #[cfg_attr(feature = "python", pyclass)] @@ -36,7 +36,7 @@ struct ClientInner { } impl Client { - /// Connect to the server, authenticate and instantiate a new [Client]. + /// Connect to the server, authenticate and instantiate a new [`Client`]. pub async fn connect( host: impl AsRef, username: impl AsRef, @@ -74,7 +74,7 @@ impl Client { } /// Refresh session token. - pub async fn refresh(&self) -> tonic::Result<()> { + pub async fn refresh(&self) -> RemoteResult<()> { let new_token = self.0.auth.clone().refresh(self.0.claims.get()) .await? .into_inner(); @@ -128,7 +128,7 @@ impl Client { Ok(out) } - /// Join and return a [Workspace]. + /// Join and return a [`Workspace`]. pub async fn join_workspace(&self, workspace: impl AsRef) -> ConnectionResult { let token = self.0.session .clone() @@ -152,17 +152,17 @@ impl Client { Ok(ws) } - /// Leave the [Workspace] with the given name. + /// Leave the [`Workspace`] with the given name. pub fn leave_workspace(&self, id: &str) -> bool { self.0.workspaces.remove(id).is_some() } - /// Gets a [Workspace] handle by name. + /// Gets a [`Workspace`] handle by name. pub fn get_workspace(&self, id: &str) -> Option { self.0.workspaces.get(id).map(|x| x.clone()) } - /// Get the names of all active [Workspace]s. + /// Get the names of all active [`Workspace`]s. pub fn active_workspaces(&self) -> Vec { self.0 .workspaces diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 56f1495..6c57592 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -1,6 +1,6 @@ -//! ### controller -//! -//! a controller implementation for cursor actions +//! ### Cursor Controller +//! A [Controller] implementation for [Cursor] actions in a [crate::Workspace] + use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; @@ -8,17 +8,10 @@ use tonic::async_trait; use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult}; use codemp_proto::cursor::CursorPosition; -/// the cursor controller implementation + +/// A [Controller] for asynchronously sending and receiving [Cursor] events /// -/// this contains -/// * the unique identifier of current user -/// * a sink to send movements into -/// * a mutex over a stream of inbound cursor events -/// * a channel to stop the associated worker -/// -/// for each controller a worker exists , managing outgoing and inbound event queues -/// -/// upon dropping this handle will stop the associated worker +/// An unique [CursorController] exists for each active [crate::Workspace]. #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "js", napi_derive::napi)] @@ -35,8 +28,6 @@ pub(crate) struct CursorControllerInner { #[async_trait] impl Controller for CursorController { - /// enqueue a cursor event to be broadcast to current workspace - /// will automatically invert cursor start/end if they are inverted async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); @@ -44,14 +35,12 @@ impl Controller for CursorController { Ok(self.0.op.send(cursor.into()).await?) } - /// try to receive without blocking, but will still block on stream mutex async fn try_recv(&self) -> ControllerResult> { let (tx, rx) = oneshot::channel(); self.0.stream.send(tx).await?; Ok(rx.await?) } - /// await for changed mutex and then next op change async fn poll(&self) -> ControllerResult<()> { let (tx, rx) = oneshot::channel(); self.0.poll.send(tx)?; diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index ed318dc..ce2ede7 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,9 +1,6 @@ -//! ### cursor -//! -//! ![demo gif of early cursor sync in action](https://cdn.alemi.dev/codemp/demo-nvim.gif) -//! -//! each user holds a cursor, which consists of multiple highlighted region -//! on a specific buffer +//! ### Cursor +//! Each user in a [crate::Workspace] holds a cursor and can move it across multiple buffers. +//! A cursor spans zero or more characters across one or more lines. pub(crate) mod worker; diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 368c4fa..aa1719c 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -1,3 +1,11 @@ +//! ### java +//! Since for java it is necessary to deal with the JNI and no complete FFI library is available, +//! java glue directly writes JNI functions leveraging [jni] rust bindings. +//! +//! To have a runnable `jar`, some extra Java code must be compiled (available under `dist/java`) +//! and bundled together with the shared object. Such extra wrapper provides classes and methods +//! loading the native extension and invoking the underlying native functions. + pub mod client; pub mod workspace; pub mod cursor; diff --git a/src/ffi/js/mod.rs b/src/ffi/js/mod.rs index 713b60f..2d06620 100644 --- a/src/ffi/js/mod.rs +++ b/src/ffi/js/mod.rs @@ -1,3 +1,7 @@ +//! ### javascript +//! Using [napi] it's possible to map perfectly the entirety of `codemp` API. +//! Async operations run on a dedicated [tokio] runtime and the result is sent back to main thread + pub mod client; pub mod workspace; pub mod cursor; diff --git a/src/ffi/lua.rs b/src/ffi/lua.rs index 4bc9a31..4b4c7d1 100644 --- a/src/ffi/lua.rs +++ b/src/ffi/lua.rs @@ -1,10 +1,23 @@ +//! ### lua +//! Using [mlua] it's possible to map almost perfectly the entirety of `codemp` API. +//! Notable outliers are functions that receive `codemp` objects: these instead receive arguments +//! to build the object instead (such as [`crate::api::Controller::send`]) +//! +//! Note that async operations are carried out on a [tokio] current_thread runtime, so it is +//! necessary to drive it. A separate driver thread can be spawned with `spawn_runtime_driver` +//! function. +//! +//! To make callbacks work, the main lua thread must periodically stop and poll for callbacks via +//! `poll_callback`, otherwise those will never run. This is necessary to allow safe concurrent +//! access to the global Lua state, so minimize runtime inside callbacks as much as possile. + use std::io::Write; use std::sync::Mutex; use crate::api::Cursor; use crate::ext::IgnorableError; use crate::prelude::*; -use crate::workspace::worker::DetachResult; +use crate::workspace::DetachResult; use mlua::prelude::*; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; @@ -15,8 +28,8 @@ impl From:: for LuaError { } } -impl From:: for LuaError { - fn from(value: crate::errors::ProcedureError) -> Self { +impl From:: for LuaError { + fn from(value: crate::errors::RemoteError) -> Self { LuaError::runtime(value.to_string()) } } @@ -520,7 +533,7 @@ fn codemp_native(lua: &Lua) -> LuaResult { // utils exports.set("hash", lua.create_function(|_, (txt,):(String,)| - Ok(crate::hash(txt)) + Ok(crate::ext::hash(txt)) )?)?; // runtime diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index e384afb..c3965ac 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -1,11 +1,21 @@ +//! ### FFI +//! Foreign-Function-Interface glue code, each gated behind feature flags +//! +//! For all except java, the resulting shared object is ready to use, but external packages are +//! available to simplify the dependancy and provide type hints in editor. + +/// java bindings, built with [jni] #[cfg(feature = "java")] pub mod java; +/// lua bindings, built with [mlua] #[cfg(feature = "lua")] pub mod lua; +/// javascript bindings, built with [napi] #[cfg(feature = "js")] pub mod js; +/// python bindings, built with [pyo3] #[cfg(feature = "python")] pub mod python; diff --git a/src/ffi/python/mod.rs b/src/ffi/python/mod.rs index 334b79f..92be8f4 100644 --- a/src/ffi/python/mod.rs +++ b/src/ffi/python/mod.rs @@ -1,3 +1,7 @@ +//! ### python +//! Using [pyo3] it's possible to map perfectly the entirety of `codemp` API. +//! Async operations run on a dedicated [tokio] runtime + pub mod client; pub mod controllers; pub mod workspace; diff --git a/src/workspace.rs b/src/workspace.rs index 7783536..8254e36 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -96,60 +96,6 @@ impl Workspace { Ok(ws) } - pub(crate) fn run_actor( - &self, - mut stream: Streaming, - tx: mpsc::UnboundedSender, - ) { - // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? - let inner = self.0.clone(); - let name = self.id(); - tokio::spawn(async move { - loop { - match stream.message().await { - Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), - Ok(None) => break tracing::info!("leaving workspace {}", name), - Ok(Some(WorkspaceEvent { event: None })) => { - tracing::warn!("workspace {} received empty event", name) - } - Ok(Some(WorkspaceEvent { event: Some(ev) })) => { - let update = crate::api::Event::from(&ev); - match ev { - // user - WorkspaceEventInner::Join(UserJoin { user }) => { - inner - .users - .insert(user.id.uuid(), user.into()); - } - WorkspaceEventInner::Leave(UserLeave { user }) => { - inner.users.remove(&user.id.uuid()); - } - // buffer - WorkspaceEventInner::Create(FileCreate { path }) => { - inner.filetree.insert(path); - } - WorkspaceEventInner::Rename(FileRename { before, after }) => { - inner.filetree.remove(&before); - inner.filetree.insert(after); - } - WorkspaceEventInner::Delete(FileDelete { path }) => { - inner.filetree.remove(&path); - if let Some((_name, controller)) = inner.buffers.remove(&path) { - controller.stop(); - } - } - } - if tx.send(update).is_err() { - tracing::warn!("no active controller to receive workspace event"); - } - } - } - } - }); - } -} - -impl Workspace { /// create a new buffer in current workspace pub async fn create(&self, path: &str) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws(); @@ -207,11 +153,6 @@ impl Workspace { /// this option will be carried in background: [buffer::worker::BufferWorker] will be stopped and dropped. there /// may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be /// accessible anymore from [Workspace]. - /// - /// ### returns - /// [DetachResult::NotAttached] if buffer wasn't attached in the first place - /// [DetachResult::Detaching] if detach was correctly requested - /// [DetachResult::AlreadyDetached] if worker is already stopped pub fn detach(&self, path: &str) -> DetachResult { match self.0.buffers.remove(path) { None => DetachResult::NotAttached, @@ -225,7 +166,7 @@ impl Workspace { } } - /// await next workspace [crate::api::Event] and return it + /// Await next workspace [Event] and return it // TODO this method is weird and ugly, can we make it more standard? pub async fn event(&self) -> ControllerResult { self.0 @@ -237,7 +178,7 @@ impl Workspace { .ok_or(crate::errors::ControllerError::Unfulfilled) } - /// fetch a list of all buffers in a workspace + /// Re-fetch list of all buffers in a workspace pub async fn fetch_buffers(&self) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws(); let buffers = workspace_client @@ -254,7 +195,7 @@ impl Workspace { Ok(()) } - /// fetch a list of all users in a workspace + /// Re-fetch list of all users in a workspace pub async fn fetch_users(&self) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws(); let users = BTreeSet::from_iter( @@ -275,9 +216,7 @@ impl Workspace { Ok(()) } - /// get a list of the users attached to a specific buffer - /// - /// TODO: discuss implementation details + /// Get a list of the [User]s attached to a specific buffer pub async fn list_buffer_users(&self, path: &str) -> RemoteResult> { let mut workspace_client = self.0.services.ws(); let buffer_users = workspace_client @@ -294,7 +233,7 @@ impl Workspace { Ok(buffer_users) } - /// delete a buffer + /// Delete a buffer pub async fn delete(&self, path: &str) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws(); workspace_client @@ -312,25 +251,25 @@ impl Workspace { Ok(()) } - /// get the id of the workspace + /// Get the workspace unique id // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn id(&self) -> String { self.0.name.clone() } - /// return a reference to current cursor controller, if currently in a workspace + /// Return a handle to workspace cursor controller // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn cursor(&self) -> cursor::Controller { self.0.cursor.clone() } - /// get a new reference to a buffer controller, if any is active to given path + /// Get a [buffer::Controller] by path, if any is active on given path // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn buffer_by_name(&self, path: &str) -> Option { self.0.buffers.get(path).map(|x| x.clone()) } - /// get a list of all the currently attached to buffers + /// Get a list of all the currently attached buffers // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn buffer_list(&self) -> Vec { self.0 @@ -348,6 +287,58 @@ impl Workspace { .map(|f| f.clone()) .collect() } + + pub(crate) fn run_actor( + &self, + mut stream: Streaming, + tx: mpsc::UnboundedSender, + ) { + // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? + let inner = self.0.clone(); + let name = self.id(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), + Ok(None) => break tracing::info!("leaving workspace {}", name), + Ok(Some(WorkspaceEvent { event: None })) => { + tracing::warn!("workspace {} received empty event", name) + } + Ok(Some(WorkspaceEvent { event: Some(ev) })) => { + let update = crate::api::Event::from(&ev); + match ev { + // user + WorkspaceEventInner::Join(UserJoin { user }) => { + inner + .users + .insert(user.id.uuid(), user.into()); + } + WorkspaceEventInner::Leave(UserLeave { user }) => { + inner.users.remove(&user.id.uuid()); + } + // buffer + WorkspaceEventInner::Create(FileCreate { path }) => { + inner.filetree.insert(path); + } + WorkspaceEventInner::Rename(FileRename { before, after }) => { + inner.filetree.remove(&before); + inner.filetree.insert(after); + } + WorkspaceEventInner::Delete(FileDelete { path }) => { + inner.filetree.remove(&path); + if let Some((_name, controller)) = inner.buffers.remove(&path) { + controller.stop(); + } + } + } + if tx.send(update).is_err() { + tracing::warn!("no active controller to receive workspace event"); + } + } + } + } + }); + } } impl Drop for WorkspaceInner { @@ -356,7 +347,7 @@ impl Drop for WorkspaceInner { if !entry.value().stop() { tracing::warn!( "could not stop buffer worker {} for workspace {}", - entry.value().name(), + entry.value().path(), self.name ); } diff --git a/src/workspace/mod.rs b/src/workspace/mod.rs deleted file mode 100644 index 3a57e75..0000000 --- a/src/workspace/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod service; -pub mod worker; - -pub use worker::Workspace; diff --git a/src/workspace/service.rs b/src/workspace/service.rs deleted file mode 100644 index 9de73ee..0000000 --- a/src/workspace/service.rs +++ /dev/null @@ -1,73 +0,0 @@ -use codemp_proto::{ - common::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, - workspace::workspace_client::WorkspaceClient, -}; -use tonic::{ - service::{interceptor::InterceptedService, Interceptor}, - transport::{Channel, Endpoint}, -}; - -use crate::errors::ConnectionResult; - -type AuthedService = InterceptedService; - -#[derive(Debug)] -pub struct Services { - workspace: WorkspaceClient, - buffer: BufferClient, - cursor: CursorClient, -} - -impl Services { - pub async fn try_new( - dest: &str, - session: tokio::sync::watch::Receiver, - workspace: tokio::sync::watch::Receiver, - ) -> ConnectionResult { - let channel = Endpoint::from_shared(dest.to_string())?.connect().await?; - let inter = WorkspaceInterceptor { session, workspace }; - Ok(Self { - cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()), - workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()), - // TODO technically we could keep buffers on separate servers, and thus manage buffer - // connections separately, but for now it's more convenient to bundle them with workspace - buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()), - }) - } - - // TODO just make fields pub(crate) ?? idk - pub fn ws(&self) -> WorkspaceClient { - self.workspace.clone() - } - - pub fn buf(&self) -> BufferClient { - self.buffer.clone() - } - - pub fn cur(&self) -> CursorClient { - self.cursor.clone() - } -} - -#[derive(Clone)] -pub struct WorkspaceInterceptor { - session: tokio::sync::watch::Receiver, - workspace: tokio::sync::watch::Receiver, -} - -impl Interceptor for WorkspaceInterceptor { - fn call( - &mut self, - mut request: tonic::Request<()>, - ) -> Result, tonic::Status> { - if let Ok(token) = self.session.borrow().token.parse() { - request.metadata_mut().insert("session", token); - } - - if let Ok(token) = self.workspace.borrow().token.parse() { - request.metadata_mut().insert("workspace", token); - } - - Ok(request) - } -} diff --git a/src/workspace/worker.rs b/src/workspace/worker.rs deleted file mode 100644 index 8906100..0000000 --- a/src/workspace/worker.rs +++ /dev/null @@ -1,371 +0,0 @@ -use crate::{ - api::{controller::ControllerWorker, Controller, Event, User}, - buffer::{self, worker::BufferWorker}, - cursor::{self, worker::CursorWorker}, - errors::{ConnectionResult, ControllerResult, RemoteResult}, - ext::InternallyMutable, - workspace::service::Services -}; - -use codemp_proto::{ - common::{Empty, Token}, - files::BufferNode, - workspace::{ - workspace_event::{ - Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave, - }, - WorkspaceEvent, - }, -}; - -use dashmap::{DashMap, DashSet}; -use std::{collections::BTreeSet, sync::Arc}; -use tokio::sync::mpsc; -use tonic::Streaming; -use uuid::Uuid; - -#[cfg(feature = "js")] -use napi_derive::napi; - -#[derive(Debug, Clone)] -#[cfg_attr(feature = "python", pyo3::pyclass)] -#[cfg_attr(feature = "js", napi)] -pub struct Workspace(Arc); - -#[derive(Debug)] -struct WorkspaceInner { - name: String, - user: User, // TODO back-reference to global user id... needed for buffer controllers - cursor: cursor::Controller, - buffers: DashMap, - filetree: DashSet, - users: DashMap, - services: Services, - // TODO can we drop the mutex? - events: tokio::sync::Mutex>, -} - -impl Workspace { - pub(crate) async fn try_new( - name: String, - user: User, - dest: &str, - token: Token, - claims: tokio::sync::watch::Receiver, // TODO ughh receiving this - ) -> ConnectionResult { - let workspace_claim = InternallyMutable::new(token); - let services = Services::try_new(dest, claims, workspace_claim.channel()).await?; - let ws_stream = services.ws().attach(Empty {}).await?.into_inner(); - - let (tx, rx) = mpsc::channel(128); - let (ev_tx, ev_rx) = mpsc::unbounded_channel(); - let cur_stream = services - .cur() - .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) - .await? - .into_inner(); - - let worker = CursorWorker::default(); - let controller = worker.controller(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, cur_stream).await; - tracing::debug!("controller worker stopped"); - }); - - let ws = Self(Arc::new(WorkspaceInner { - name, - user, - cursor: controller, - buffers: DashMap::default(), - filetree: DashSet::default(), - users: DashMap::default(), - events: tokio::sync::Mutex::new(ev_rx), - services, - })); - - ws.fetch_users().await?; - ws.fetch_buffers().await?; - ws.run_actor(ws_stream, ev_tx); - - Ok(ws) - } - - pub(crate) fn run_actor( - &self, - mut stream: Streaming, - tx: mpsc::UnboundedSender, - ) { - // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? - let inner = self.0.clone(); - let name = self.id(); - tokio::spawn(async move { - loop { - match stream.message().await { - Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), - Ok(None) => break tracing::info!("leaving workspace {}", name), - Ok(Some(WorkspaceEvent { event: None })) => { - tracing::warn!("workspace {} received empty event", name) - } - Ok(Some(WorkspaceEvent { event: Some(ev) })) => { - let update = crate::api::Event::from(&ev); - match ev { - // user - WorkspaceEventInner::Join(UserJoin { user }) => { - inner - .users - .insert(user.id.uuid(), user.into()); - } - WorkspaceEventInner::Leave(UserLeave { user }) => { - inner.users.remove(&user.id.uuid()); - } - // buffer - WorkspaceEventInner::Create(FileCreate { path }) => { - inner.filetree.insert(path); - } - WorkspaceEventInner::Rename(FileRename { before, after }) => { - inner.filetree.remove(&before); - inner.filetree.insert(after); - } - WorkspaceEventInner::Delete(FileDelete { path }) => { - inner.filetree.remove(&path); - if let Some((_name, controller)) = inner.buffers.remove(&path) { - controller.stop(); - } - } - } - if tx.send(update).is_err() { - tracing::warn!("no active controller to receive workspace event"); - } - } - } - } - }); - } -} - -impl Workspace { - /// create a new buffer in current workspace - pub async fn create(&self, path: &str) -> RemoteResult<()> { - let mut workspace_client = self.0.services.ws(); - workspace_client - .create_buffer(tonic::Request::new(BufferNode { - path: path.to_string(), - })) - .await?; - - // add to filetree - self.0.filetree.insert(path.to_string()); - - // fetch buffers - self.fetch_buffers().await?; - - Ok(()) - } - - /// attach to a buffer, starting a buffer controller and returning a new reference to it - /// - /// to interact with such buffer use [crate::api::Controller::send] or - /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] - pub async fn attach(&self, path: &str) -> ConnectionResult { - let mut worskspace_client = self.0.services.ws(); - let request = tonic::Request::new(BufferNode { - path: path.to_string(), - }); - let credentials = worskspace_client.access_buffer(request).await?.into_inner(); - - let (tx, rx) = mpsc::channel(256); - let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); - req.metadata_mut() - .insert( - "buffer", - tonic::metadata::MetadataValue::try_from(credentials.token) - .map_err(|e| tonic::Status::internal(format!("failed representing token to string: {e}")))?, - ); - let stream = self.0.services.buf().attach(req).await?.into_inner(); - - let worker = BufferWorker::new(self.0.user.id, path); - let controller = worker.controller(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, stream).await; - tracing::debug!("controller worker stopped"); - }); - - self.0.buffers.insert(path.to_string(), controller.clone()); - - Ok(controller) - } - - /// detach from an active buffer - /// - /// this option will be carried in background: [buffer::worker::BufferWorker] will be stopped and dropped. there - /// may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be - /// accessible anymore from [Workspace]. - /// - /// ### returns - /// [DetachResult::NotAttached] if buffer wasn't attached in the first place - /// [DetachResult::Detaching] if detach was correctly requested - /// [DetachResult::AlreadyDetached] if worker is already stopped - pub fn detach(&self, path: &str) -> DetachResult { - match self.0.buffers.remove(path) { - None => DetachResult::NotAttached, - Some((_name, controller)) => { - if controller.stop() { - DetachResult::Detaching - } else { - DetachResult::AlreadyDetached - } - } - } - } - - /// await next workspace [crate::api::Event] and return it - // TODO this method is weird and ugly, can we make it more standard? - pub async fn event(&self) -> ControllerResult { - self.0 - .events - .lock() - .await - .recv() - .await - .ok_or(crate::errors::ControllerError::Unfulfilled) - } - - /// fetch a list of all buffers in a workspace - pub async fn fetch_buffers(&self) -> RemoteResult<()> { - let mut workspace_client = self.0.services.ws(); - let buffers = workspace_client - .list_buffers(tonic::Request::new(Empty {})) - .await? - .into_inner() - .buffers; - - self.0.filetree.clear(); - for b in buffers { - self.0.filetree.insert(b.path); - } - - Ok(()) - } - - /// fetch a list of all users in a workspace - pub async fn fetch_users(&self) -> RemoteResult<()> { - let mut workspace_client = self.0.services.ws(); - let users = BTreeSet::from_iter( - workspace_client - .list_users(tonic::Request::new(Empty {})) - .await? - .into_inner() - .users - .into_iter() - .map(User::from), - ); - - self.0.users.clear(); - for u in users { - self.0.users.insert(u.id, u); - } - - Ok(()) - } - - /// get a list of the users attached to a specific buffer - /// - /// TODO: discuss implementation details - pub async fn list_buffer_users(&self, path: &str) -> RemoteResult> { - let mut workspace_client = self.0.services.ws(); - let buffer_users = workspace_client - .list_buffer_users(tonic::Request::new(BufferNode { - path: path.to_string(), - })) - .await? - .into_inner() - .users - .into_iter() - .map(|id| id.into()) - .collect(); - - Ok(buffer_users) - } - - /// delete a buffer - pub async fn delete(&self, path: &str) -> RemoteResult<()> { - let mut workspace_client = self.0.services.ws(); - workspace_client - .delete_buffer(tonic::Request::new(BufferNode { - path: path.to_string(), - })) - .await?; - - if let Some((_name, controller)) = self.0.buffers.remove(path) { - controller.stop(); - } - - self.0.filetree.remove(path); - - Ok(()) - } - - /// get the id of the workspace - // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 - pub fn id(&self) -> String { - self.0.name.clone() - } - - /// return a reference to current cursor controller, if currently in a workspace - // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 - pub fn cursor(&self) -> cursor::Controller { - self.0.cursor.clone() - } - - /// get a new reference to a buffer controller, if any is active to given path - // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 - pub fn buffer_by_name(&self, path: &str) -> Option { - self.0.buffers.get(path).map(|x| x.clone()) - } - - /// get a list of all the currently attached to buffers - // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 - pub fn buffer_list(&self) -> Vec { - self.0 - .buffers - .iter() - .map(|elem| elem.key().clone()) - .collect() - } - - /// get the currently cached "filetree" - // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 - pub fn filetree(&self, filter: Option<&str>) -> Vec { - self.0.filetree.iter() - .filter(|f| filter.map_or(true, |flt| f.starts_with(flt))) - .map(|f| f.clone()) - .collect() - } -} - -impl Drop for WorkspaceInner { - fn drop(&mut self) { - for entry in self.buffers.iter() { - if !entry.value().stop() { - tracing::warn!( - "could not stop buffer worker {} for workspace {}", - entry.value().name(), - self.name - ); - } - } - if !self.cursor.stop() { - tracing::warn!("could not stop cursor worker for workspace {}", self.name); - } - } -} - -#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))] -#[cfg_attr(feature = "python", derive(PartialEq))] -pub enum DetachResult { - NotAttached, - Detaching, - AlreadyDetached, -}