diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 5c97860..1743c20 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -162,7 +162,7 @@ impl ControllerWorker for BufferWorker { let new_local_v = branch.local_version(); let hash = if timer.step() { - Some(crate::hash(branch.content().to_string())) + Some(crate::ext::hash(branch.content().to_string())) } else { None }; let tc = match dtop.kind { diff --git a/src/client.rs b/src/client.rs index 762a7a2..da368d9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,13 +1,12 @@ -//! ### client -//! -//! codemp client manager, containing grpc services +//! ### Client +//! Main `codemp` client, containing and managing all underlying services. use std::sync::Arc; use dashmap::DashMap; use tonic::{service::interceptor::InterceptedService, transport::{Channel, Endpoint}}; -use crate::{api::User, errors::{ConnectionResult, RemoteResult}, ext::InternallyMutable, workspace::Workspace}; +use crate::{api::User, errors::{ConnectionResult, RemoteResult}, ext::InternallyMutable, network, workspace::Workspace}; use codemp_proto::{ auth::{auth_client::AuthClient, LoginRequest}, common::{Empty, Token}, session::{session_client::SessionClient, InviteRequest, WorkspaceRequest}, @@ -16,11 +15,11 @@ use codemp_proto::{ #[cfg(feature = "python")] use pyo3::prelude::*; -/// codemp client manager +/// A `codemp` client handle. /// -/// contains all required grpc services and the unique user id -/// will disconnect when dropped -/// can be used to interact with server +/// It generates a new UUID and stores user credentials upon connecting. +/// +/// A new [Client] can be obtained with [Client::connect]. #[derive(Debug, Clone)] #[cfg_attr(feature = "js", napi_derive::napi)] #[cfg_attr(feature = "python", pyclass)] @@ -32,12 +31,12 @@ struct ClientInner { host: String, workspaces: DashMap, auth: AuthClient, - session: SessionClient>, + session: SessionClient>, claims: InternallyMutable, } impl Client { - /// instantiate and connect a new client + /// Connect to the server, authenticate and instantiate a new [Client]. pub async fn connect( host: impl AsRef, username: impl AsRef, @@ -62,7 +61,7 @@ impl Client { let claims = InternallyMutable::new(resp.token); let session = SessionClient::with_interceptor( - channel, SessionInterceptor(claims.channel()) + channel, network::SessionInterceptor(claims.channel()) ); Ok(Client(Arc::new(ClientInner { @@ -74,7 +73,7 @@ impl Client { }))) } - /// refresh session token + /// Refresh session token. pub async fn refresh(&self) -> tonic::Result<()> { let new_token = self.0.auth.clone().refresh(self.0.claims.get()) .await? @@ -83,7 +82,7 @@ impl Client { Ok(()) } - /// attempts to create a new workspace with given name + /// Attempt to create a new workspace with given name. pub async fn create_workspace(&self, name: impl AsRef) -> RemoteResult<()> { self.0.session .clone() @@ -92,7 +91,7 @@ impl Client { Ok(()) } - /// delete an existing workspace if possible + /// Delete an existing workspace if possible. pub async fn delete_workspace(&self, name: impl AsRef) -> RemoteResult<()> { self.0.session .clone() @@ -101,7 +100,7 @@ impl Client { Ok(()) } - /// invite user associated with username to workspace, if possible + /// Invite user with given username to the given workspace, if possible. pub async fn invite_to_workspace(&self, workspace_name: impl AsRef, user_name: impl AsRef) -> RemoteResult<()> { self.0.session .clone() @@ -113,7 +112,7 @@ impl Client { Ok(()) } - /// list all available workspaces, filtering between those owned and those invited to + /// List all available workspaces, also filtering between those owned and those invited to. pub async fn list_workspaces(&self, owned: bool, invited: bool) -> RemoteResult> { let mut workspaces = self.0.session .clone() @@ -129,9 +128,8 @@ impl Client { Ok(out) } - /// join a workspace, returns [Workspace] + /// Join and return a [Workspace]. pub async fn join_workspace(&self, workspace: impl AsRef) -> ConnectionResult { - // STATUS let token = self.0.session .clone() .access_workspace(WorkspaceRequest { workspace: workspace.as_ref().to_string() }) @@ -154,17 +152,17 @@ impl Client { Ok(ws) } - /// leaves a [Workspace] by name + /// Leave the [Workspace] with the given name. pub fn leave_workspace(&self, id: &str) -> bool { self.0.workspaces.remove(id).is_some() } - /// gets a [Workspace] by name + /// Gets a [Workspace] handle by name. pub fn get_workspace(&self, id: &str) -> Option { self.0.workspaces.get(id).map(|x| x.clone()) } - /// get name of all active [Workspace]s + /// Get the names of all active [Workspace]s. pub fn active_workspaces(&self) -> Vec { self.0 .workspaces @@ -173,23 +171,8 @@ impl Client { .collect() } - /// accessor for user id + /// Get the currently logged in user. pub fn user(&self) -> &User { &self.0.user } } - -#[derive(Debug, Clone)] -struct SessionInterceptor(tokio::sync::watch::Receiver); -impl tonic::service::Interceptor for SessionInterceptor { - fn call( - &mut self, - mut request: tonic::Request<()>, - ) -> tonic::Result> { - if let Ok(token) = self.0.borrow().token.parse() { - request.metadata_mut().insert("session", token); - } - - Ok(request) - } -} diff --git a/src/errors.rs b/src/errors.rs index 5e5b11c..e817631 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,18 +1,25 @@ -pub type RemoteResult = std::result::Result; +//! ### Errors +//! Contains the crate's error types. +/// An error returned by the server as response to a request. +/// +/// This currently wraps an [http code](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status), +/// returned as procedure status. #[derive(Debug, thiserror::Error)] #[error("server rejected procedure with error code: {0}")] pub struct RemoteError(#[from] tonic::Status); +/// Wraps [std::result::Result] with a [RemoteError]. +pub type RemoteResult = std::result::Result; - -pub type ConnectionResult = std::result::Result; - +/// An error that may occur when processing requests that require new connections. #[derive(Debug, thiserror::Error)] pub enum ConnectionError { - #[error("network error: {0}")] + /// Underlying [`tonic::transport::Error`]. + #[error("transport error: {0}")] Transport(#[from] tonic::transport::Error), + /// Error from the remote server, see [`RemoteError`]. #[error("server rejected connection attempt: {0}")] Remote(#[from] RemoteError), } @@ -23,15 +30,19 @@ impl From for ConnectionError { } } +/// Wraps [std::result::Result] with a [ConnectionError]. +pub type ConnectionResult = std::result::Result; - -pub type ControllerResult = std::result::Result; - +/// An error that may occur when an [`crate::api::Controller`] attempts to +/// perform an illegal operation. #[derive(Debug, thiserror::Error)] pub enum ControllerError { + /// Error occurred because the underlying controller worker is already stopped. #[error("worker is already stopped")] Stopped, + /// Error occurred because the underlying controller worker stopped before + /// fulfilling the request, without rejecting it first. #[error("worker stopped before completing requested operation")] Unfulfilled, } @@ -47,3 +58,7 @@ impl From for ControllerError { Self::Unfulfilled } } + +/// Wraps [std::result::Result] with a [ControllerError]. +pub type ControllerResult = std::result::Result; + diff --git a/src/ext.rs b/src/ext.rs index 16dc8b3..4920558 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,16 +1,18 @@ +//! ### Extensions +//! Contains a number of utils used internally or that may be of general interest. + use crate::{api::Controller, errors::ControllerResult}; use tokio::sync::mpsc; -/// invoke .poll() on all given buffer controllers and wait, returning the first one ready +/// Polls all given buffer controllers and waits, returning the first one ready. /// -/// this will spawn tasks blocked on .poll() for each buffer controller. as soon as -/// one finishes, all other tasks will be canceled and the ready controller will be -/// returned +/// It will spawn tasks blocked on [`Controller::poll`] for each buffer controller. +/// As soon as one finishes, its controller is returned and all other tasks are canceled. /// -/// if timeout is None, result will never be None, otherwise returns None if no buffer -/// is ready before timeout expires +/// If a timeout is provided, the result may be `None` if it expires before any task is +/// complete. /// -/// returns an error if all buffers returned errors while polling. +/// It may return an error if all buffers returned errors while polling. pub async fn select_buffer( buffers: &[crate::buffer::Controller], timeout: Option, @@ -49,12 +51,17 @@ pub async fn select_buffer( } } +/// Hashes a given byte array with the internally used algorithm. +/// +/// Currently, it uses [`xxhash_rust::xxh3::xxh3_64`]. pub fn hash(data: impl AsRef<[u8]>) -> i64 { let hash = xxhash_rust::xxh3::xxh3_64(data.as_ref()); i64::from_ne_bytes(hash.to_ne_bytes()) } -/// wraps sender and receiver to allow mutable field with immutable ref +/// A field that can be *internally mutated* regardless of its external mutability. +/// +/// Currently, it wraps the [`tokio::sync::watch`] channel couple to achieve this. #[derive(Debug)] pub struct InternallyMutable { getter: tokio::sync::watch::Receiver, @@ -70,7 +77,7 @@ impl Default for InternallyMutable { impl InternallyMutable { pub fn new(init: T) -> Self { let (tx, rx) = tokio::sync::watch::channel(init); - InternallyMutable { + Self { getter: rx, setter: tx, } @@ -91,22 +98,14 @@ impl InternallyMutable { } } -/* -pub(crate) struct CallbackHandleWatch(pub(crate) tokio::sync::watch::Sender>); - -impl crate::api::controller::CallbackHandle for CallbackHandleWatch { - fn unregister(self) { - self.0.send_replace(None); - } -}*/ - -/// an error which can be ignored with just a warning entry +/// An error that can be ignored with just a warning. pub trait IgnorableError { fn unwrap_or_warn(self, msg: &str); } impl IgnorableError for std::result::Result where E : std::fmt::Debug { + /// Logs the error as a warning and returns a unit. fn unwrap_or_warn(self, msg: &str) { match self { Ok(_) => {}, @@ -114,22 +113,3 @@ where E : std::fmt::Debug { } } } - - -/// an error which can be ignored with just a warning entry and returning the default value -pub trait IgnorableDefaultableError { - fn unwrap_or_warn_default(self, msg: &str) -> T; -} - -impl IgnorableDefaultableError for std::result::Result -where E : std::fmt::Display, T: Default { - fn unwrap_or_warn_default(self, msg: &str) -> T { - match self { - Ok(x) => x, - Err(e) => { - tracing::warn!("{}: {}", msg, e); - T::default() - }, - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 585eb86..f1d5c81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,139 +1,108 @@ -//! # MultiPlayer Code Editing lib +//! # Code MultiPlexer - cooperative development //! -//! ![just a nice pic](https://alemi.dev/img/about-slice-3.jpg) +//! `codemp` is an async client library to create cooperation tools for any text editor. //! -//! > the core library of the codemp project, driving all editor plugins +//! It is built as a batteries-included client library managing an authenticated user, multiple +//! workspaces each containing any number of buffers. //! -//! ## structure -//! The main entrypoint is the [Client] object, that maintains a connection and can -//! be used to join workspaces or attach to buffers. It contains the underlying [Workspace] and -//! stores active controllers. -//! -//! Some actions will return structs implementing the [api::Controller] trait. These can be polled -//! for new stream events ([api::Controller::poll]/[api::Controller::recv]), which will be returned in order. -//! Blocking and callback variants are also implemented. The [api::Controller] can also be used to send new -//! events to the server ([api::Controller::send]). +//! The [`Client`] is completely managed by the library itself, making its use simple across async +//! contexts and FFI boundaries. Asynchronous actions are abstracted away by the [`api::Controller`], +//! providing an unopinionated approach with both callback-based and blocking-based APIs. //! -//! Each operation on a buffer is represented as an [woot::crdt::Op]. The underlying buffer is a -//! [WOOT CRDT](https://inria.hal.science/file/index/docid/71240/filename/RR-5580.pdf), -//! but to use this library it's only sufficient to know that all WOOT buffers that have received -//! the same operations converge to the same state, and that operations might not get integrated -//! immediately but instead deferred until compatible. +//! The library also provides ready-to-use bindings in a growing number of other programming languages, +//! to support a potentially infinite number of editors. //! -//! ## features -//! * `api` : include traits for core interfaces under [api] (default enabled) -//! * `woot` : include the underlying CRDT library and re-exports it (default enabled) -//! * `proto` : include GRCP protocol definitions under [proto] (default enabled) -//! * `client` : include the local [client] implementation (default enabled) -//! -//! ## examples -//! most methods are split between the [Client] itself and the current [Workspace] +//! # Overview +//! The main entrypoint is [`Client::connect`], which establishes an authenticated connection with +//! a supported remote server and returns a [`Client`] handle to interact with it. //! -//! ### async -//! ```rust,no_run -//! use codemp::api::{Controller, TextChange}; -//! -//! # async fn async_example() -> codemp::Result<()> { -//! // creating a client session will immediately attempt to connect -//! let mut session = codemp::Client::new("http://alemi.dev:50053").await?; -//! -//! // login first, obtaining a new token granting access to 'some_workspace' -//! session.login( -//! "username".to_string(), -//! "password".to_string(), -//! Some("some_workspace".to_string()) -//! ).await?; -//! -//! // join a remote workspace, obtaining a workspace handle -//! let workspace = session.join_workspace("some_workspace").await?; -//! -//! workspace.cursor().send( // move cursor -//! codemp::proto::cursor::CursorPosition { -//! buffer: "test.txt".into(), -//! start: codemp::proto::cursor::RowCol { row: 0, col: 0 }, -//! end: codemp::proto::cursor::RowCol { row: 0, col: 1 }, -//! } -//! )?; -//! let op = workspace.cursor().recv().await?; // receive event from server -//! println!("received cursor event: {:?}", op); -//! -//! // attach to a new buffer and execute operations on it -//! workspace.create("test.txt").await?; // create new buffer -//! let buffer = workspace.attach("test.txt").await?; // attach to it -//! let local_change = TextChange { span: 0..0, content: "hello!".into() }; -//! buffer.send(local_change)?; // insert some text -//! let remote_change = buffer.recv().await?; // await remote change -//! # -//! # Ok(()) +//! ```rust +//! # async fn main() { +//! let client = codemp::Client::connect( +//! "https://api.codemp.dev", // default server, by hexed.technology +//! "mail@example.net", // your username, on hexed.technology it's the email +//! "dont-use-this-password" // your password +//! ) +//! .await +//! .expect("failed to connect!"); //! # } //! ``` //! -//! it's always possible to get a [Workspace] reference using [Client::get_workspace] -//! -//! ### sync -//! if async is not viable, a solution might be keeping a global tokio runtime and blocking on it: +//! A [`Client`] can acquire a [`Workspace`] handle by joining an existing one it can access with +//! [`Client::join_workspace`] or create a new one with [`Client::create_workspace`]. //! //! ```rust,no_run -//! # use std::sync::Arc; +//! # async fn main() { +//! # let client = codemp::Client::connect("", "", "").await.unwrap(); +//! client.create_workspace("my-workspace").await.expect("failed to create workspace!"); +//! let workspace = client.attach_workspace("my-workspace").await.expect("failed to attach!"); +//! # } +//! ``` +//! +//! A [`Workspace`] handle can be used to acquire a [`cursor::Controller`] to track remote [`api::Cursor`]s +//! and one or more [`buffer::Controller`] to send and receive [`api::TextChange`]s. +//! +//! ```rust,no_run +//! # async fn main() { +//! # let client = codemp::Client::connect("", "", "").await.unwrap(); +//! # client.create_workspace("").await.unwrap(); +//! # let workspace = client.attach_workspace("").await.unwrap(); +//! use codemp::api::Controller; // needed to access trait methods +//! let cursor = workspace.cursor(); +//! let event = cursor.recv().await.expect("disconnected while waiting for event!"); +//! println!("user {event.user} moved on buffer {event.buffer}"); +//! # } +//! ``` +//! +//! Internally, [`buffer::Controller`]s store the buffer state as a [diamond_types] CRDT, guaranteeing +//! eventual consistency. Each [`api::TextChange`] is translated in a network counterpart that is +//! guaranteed to converge. +//! +//! ```rust,no_run +//! # async fn main() { +//! # let client = codemp::Client::connect("", "", "").await.unwrap(); +//! # client.create_workspace("").await.unwrap(); +//! # let workspace = client.attach_workspace("").await.unwrap(); //! # use codemp::api::Controller; -//! # -//! # fn sync_example() -> codemp::Result<()> { -//! let rt = tokio::runtime::Runtime::new().unwrap(); -//! let mut session = rt.block_on( // using block_on allows calling async code -//! codemp::Client::new("http://alemi.dev:50051") -//! )?; -//! -//! rt.block_on(session.login( -//! "username".to_string(), -//! "password".to_string(), -//! Some("some_workspace".to_string()) -//! ))?; -//! -//! let workspace = rt.block_on(session.join_workspace("some_workspace"))?; -//! -//! // attach to buffer and blockingly receive events -//! let buffer = rt.block_on(workspace.attach("test.txt"))?; // attach to buffer, must already exist -//! while let Ok(op) = rt.block_on(buffer.recv()) { // must pass runtime -//! println!("received buffer event: {:?}", op); -//! } -//! # -//! # Ok(()) +//! let buffer = workspace.attach_buffer("/some/file.txt").await.expect("failed to attach"); +//! buffer.content(); // force-sync +//! if let Some(change) = buffer.try_recv().await.unwrap() { +//! println!("content: {change.content}, span: {change.span.start}-{change.span.end}"); +//! } // if None, no changes are currently available //! # } //! ``` //! -//! ## references +//! ## FFI +//! As mentioned, we provide bindings in various programming languages. To obtain them, you can +//! compile with the appropriate feature flag. Currently, the following are supported: +//! * `lua` +//! * `javascript` +//! * `java` (requires additional build steps to be usable) +//! * `python` //! -//! ![another cool pic coz why not](https://alemi.dev/img/about-slice-2.png) -//! -//! check [codemp-vscode](https://github.com/codewithotherpeopleandchangenamelater/codemp-vscode) -//! or [codemp-nvim](https://github.com/codewithotherpeopleandchangenamelater/codemp-nvim) -//! or [codemp-server](https://github.com/codewithotherpeopleandchangenamelater/codemp-server) for -//! reference implementations. -//! -//! keep track of feature completedness with the -//! [feature comparison matrix](https://github.com/orgs/codewithotherpeopleandchangenamelater/projects/3) +//! For some of these, ready-to-use packages are available in various registries: +//! * [pypi (python)](https://pypi.org/project/codemp) +//! * [npm (javascript)](https://www.npmjs.com/package/codemp) //! -#![doc(html_no_source)] - -/// public traits exposed to clients +/// core structs and traits pub mod api; /// cursor related types and controller pub mod cursor; -/// buffer operations, factory, controller and types +/// buffer related types and controller pub mod buffer; -/// workspace operations +/// workspace handle and operations pub mod workspace; pub use workspace::Workspace; -/// codemp client, wrapping all above +/// client handle, containing all of the above pub mod client; pub use client::Client; -/// crate error types and helpers +/// crate error types pub mod errors; /// all-in-one imports : `use codemp::prelude::*;` @@ -141,7 +110,9 @@ pub mod prelude; /// common utils used in this library and re-exposed pub mod ext; -pub use ext::hash; /// language-specific ffi "glue" pub mod ffi; + +/// internal network services and interceptors +pub(crate) mod network; diff --git a/src/network.rs b/src/network.rs new file mode 100644 index 0000000..8f8489b --- /dev/null +++ b/src/network.rs @@ -0,0 +1,89 @@ +use codemp_proto::{ + common::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, + workspace::workspace_client::WorkspaceClient, +}; +use tonic::{ + service::{interceptor::InterceptedService, Interceptor}, + transport::{Channel, Endpoint}, +}; + +use crate::errors::ConnectionResult; + +type AuthedService = InterceptedService; + +#[derive(Debug, Clone)] +pub struct SessionInterceptor(pub tokio::sync::watch::Receiver); +impl tonic::service::Interceptor for SessionInterceptor { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> tonic::Result> { + if let Ok(token) = self.0.borrow().token.parse() { + request.metadata_mut().insert("session", token); + } + + Ok(request) + } +} + +#[derive(Debug)] +pub struct Services { + workspace: WorkspaceClient, + buffer: BufferClient, + cursor: CursorClient, +} + +impl Services { + pub async fn try_new( + dest: &str, + session: tokio::sync::watch::Receiver, + workspace: tokio::sync::watch::Receiver, + ) -> ConnectionResult { + // TRANSPORT ERROR + let channel = Endpoint::from_shared(dest.to_string())?.connect().await?; + let inter = WorkspaceInterceptor { session, workspace }; + Ok(Self { + cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()), + workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()), + // TODO technically we could keep buffers on separate servers, and thus manage buffer + // connections separately, but for now it's more convenient to bundle them with workspace + buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()), + }) + } + + // TODO just make fields pub(crate) ?? idk + pub fn ws(&self) -> WorkspaceClient { + self.workspace.clone() + } + + pub fn buf(&self) -> BufferClient { + self.buffer.clone() + } + + pub fn cur(&self) -> CursorClient { + self.cursor.clone() + } +} + +#[derive(Clone)] +pub struct WorkspaceInterceptor { + session: tokio::sync::watch::Receiver, + workspace: tokio::sync::watch::Receiver, +} + +impl Interceptor for WorkspaceInterceptor { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> Result, tonic::Status> { + if let Ok(token) = self.session.borrow().token.parse() { + request.metadata_mut().insert("session", token); + } + + if let Ok(token) = self.workspace.borrow().token.parse() { + request.metadata_mut().insert("workspace", token); + } + + Ok(request) + } +} diff --git a/src/prelude.rs b/src/prelude.rs index 23e12e3..834bc36 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,6 +1,5 @@ //! ### Prelude -//! -//! all-in-one renamed imports with `use codemp::prelude::*` +//! All-in-one renamed imports with `use codemp::prelude::*`. pub use crate::api::{ Controller as CodempController, @@ -16,6 +15,3 @@ pub use crate::{ cursor::Controller as CodempCursorController, buffer::Controller as CodempBufferController, }; - -#[deprecated = "use CodempUser instead"] -pub use crate::api::user::User as CodempUserInfo; diff --git a/src/workspace.rs b/src/workspace.rs new file mode 100644 index 0000000..7783536 --- /dev/null +++ b/src/workspace.rs @@ -0,0 +1,376 @@ +//! ### Workspace +//! A workspace represents a development environment. It contains any number of buffers and +//! tracks cursor movements across them. +//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. + +use crate::{ + api::{controller::ControllerWorker, Controller, Event, User}, + buffer::{self, worker::BufferWorker}, + cursor::{self, worker::CursorWorker}, + errors::{ConnectionResult, ControllerResult, RemoteResult}, + ext::InternallyMutable, + network::Services +}; + +use codemp_proto::{ + common::{Empty, Token}, + files::BufferNode, + workspace::{ + workspace_event::{ + Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave, + }, + WorkspaceEvent, + }, +}; + +use dashmap::{DashMap, DashSet}; +use std::{collections::BTreeSet, sync::Arc}; +use tokio::sync::mpsc; +use tonic::Streaming; +use uuid::Uuid; + +#[cfg(feature = "js")] +use napi_derive::napi; + +#[derive(Debug, Clone)] +#[cfg_attr(feature = "python", pyo3::pyclass)] +#[cfg_attr(feature = "js", napi)] +pub struct Workspace(Arc); + +#[derive(Debug)] +struct WorkspaceInner { + name: String, + user: User, // TODO back-reference to global user id... needed for buffer controllers + cursor: cursor::Controller, + buffers: DashMap, + filetree: DashSet, + users: DashMap, + services: Services, + // TODO can we drop the mutex? + events: tokio::sync::Mutex>, +} + +impl Workspace { + pub(crate) async fn try_new( + name: String, + user: User, + dest: &str, + token: Token, + claims: tokio::sync::watch::Receiver, // TODO ughh receiving this + ) -> ConnectionResult { + let workspace_claim = InternallyMutable::new(token); + let services = Services::try_new(dest, claims, workspace_claim.channel()).await?; + let ws_stream = services.ws().attach(Empty {}).await?.into_inner(); + + let (tx, rx) = mpsc::channel(128); + let (ev_tx, ev_rx) = mpsc::unbounded_channel(); + let cur_stream = services + .cur() + .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await? + .into_inner(); + + let worker = CursorWorker::default(); + let controller = worker.controller(); + tokio::spawn(async move { + tracing::debug!("controller worker started"); + worker.work(tx, cur_stream).await; + tracing::debug!("controller worker stopped"); + }); + + let ws = Self(Arc::new(WorkspaceInner { + name, + user, + cursor: controller, + buffers: DashMap::default(), + filetree: DashSet::default(), + users: DashMap::default(), + events: tokio::sync::Mutex::new(ev_rx), + services, + })); + + ws.fetch_users().await?; + ws.fetch_buffers().await?; + ws.run_actor(ws_stream, ev_tx); + + Ok(ws) + } + + pub(crate) fn run_actor( + &self, + mut stream: Streaming, + tx: mpsc::UnboundedSender, + ) { + // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? + let inner = self.0.clone(); + let name = self.id(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), + Ok(None) => break tracing::info!("leaving workspace {}", name), + Ok(Some(WorkspaceEvent { event: None })) => { + tracing::warn!("workspace {} received empty event", name) + } + Ok(Some(WorkspaceEvent { event: Some(ev) })) => { + let update = crate::api::Event::from(&ev); + match ev { + // user + WorkspaceEventInner::Join(UserJoin { user }) => { + inner + .users + .insert(user.id.uuid(), user.into()); + } + WorkspaceEventInner::Leave(UserLeave { user }) => { + inner.users.remove(&user.id.uuid()); + } + // buffer + WorkspaceEventInner::Create(FileCreate { path }) => { + inner.filetree.insert(path); + } + WorkspaceEventInner::Rename(FileRename { before, after }) => { + inner.filetree.remove(&before); + inner.filetree.insert(after); + } + WorkspaceEventInner::Delete(FileDelete { path }) => { + inner.filetree.remove(&path); + if let Some((_name, controller)) = inner.buffers.remove(&path) { + controller.stop(); + } + } + } + if tx.send(update).is_err() { + tracing::warn!("no active controller to receive workspace event"); + } + } + } + } + }); + } +} + +impl Workspace { + /// create a new buffer in current workspace + pub async fn create(&self, path: &str) -> RemoteResult<()> { + let mut workspace_client = self.0.services.ws(); + workspace_client + .create_buffer(tonic::Request::new(BufferNode { + path: path.to_string(), + })) + .await?; + + // add to filetree + self.0.filetree.insert(path.to_string()); + + // fetch buffers + self.fetch_buffers().await?; + + Ok(()) + } + + /// attach to a buffer, starting a buffer controller and returning a new reference to it + /// + /// to interact with such buffer use [crate::api::Controller::send] or + /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] + pub async fn attach(&self, path: &str) -> ConnectionResult { + let mut worskspace_client = self.0.services.ws(); + let request = tonic::Request::new(BufferNode { + path: path.to_string(), + }); + let credentials = worskspace_client.access_buffer(request).await?.into_inner(); + + let (tx, rx) = mpsc::channel(256); + let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); + req.metadata_mut() + .insert( + "buffer", + tonic::metadata::MetadataValue::try_from(credentials.token) + .map_err(|e| tonic::Status::internal(format!("failed representing token to string: {e}")))?, + ); + let stream = self.0.services.buf().attach(req).await?.into_inner(); + + let worker = BufferWorker::new(self.0.user.id, path); + let controller = worker.controller(); + tokio::spawn(async move { + tracing::debug!("controller worker started"); + worker.work(tx, stream).await; + tracing::debug!("controller worker stopped"); + }); + + self.0.buffers.insert(path.to_string(), controller.clone()); + + Ok(controller) + } + + /// detach from an active buffer + /// + /// this option will be carried in background: [buffer::worker::BufferWorker] will be stopped and dropped. there + /// may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be + /// accessible anymore from [Workspace]. + /// + /// ### returns + /// [DetachResult::NotAttached] if buffer wasn't attached in the first place + /// [DetachResult::Detaching] if detach was correctly requested + /// [DetachResult::AlreadyDetached] if worker is already stopped + pub fn detach(&self, path: &str) -> DetachResult { + match self.0.buffers.remove(path) { + None => DetachResult::NotAttached, + Some((_name, controller)) => { + if controller.stop() { + DetachResult::Detaching + } else { + DetachResult::AlreadyDetached + } + } + } + } + + /// await next workspace [crate::api::Event] and return it + // TODO this method is weird and ugly, can we make it more standard? + pub async fn event(&self) -> ControllerResult { + self.0 + .events + .lock() + .await + .recv() + .await + .ok_or(crate::errors::ControllerError::Unfulfilled) + } + + /// fetch a list of all buffers in a workspace + pub async fn fetch_buffers(&self) -> RemoteResult<()> { + let mut workspace_client = self.0.services.ws(); + let buffers = workspace_client + .list_buffers(tonic::Request::new(Empty {})) + .await? + .into_inner() + .buffers; + + self.0.filetree.clear(); + for b in buffers { + self.0.filetree.insert(b.path); + } + + Ok(()) + } + + /// fetch a list of all users in a workspace + pub async fn fetch_users(&self) -> RemoteResult<()> { + let mut workspace_client = self.0.services.ws(); + let users = BTreeSet::from_iter( + workspace_client + .list_users(tonic::Request::new(Empty {})) + .await? + .into_inner() + .users + .into_iter() + .map(User::from), + ); + + self.0.users.clear(); + for u in users { + self.0.users.insert(u.id, u); + } + + Ok(()) + } + + /// get a list of the users attached to a specific buffer + /// + /// TODO: discuss implementation details + pub async fn list_buffer_users(&self, path: &str) -> RemoteResult> { + let mut workspace_client = self.0.services.ws(); + let buffer_users = workspace_client + .list_buffer_users(tonic::Request::new(BufferNode { + path: path.to_string(), + })) + .await? + .into_inner() + .users + .into_iter() + .map(|id| id.into()) + .collect(); + + Ok(buffer_users) + } + + /// delete a buffer + pub async fn delete(&self, path: &str) -> RemoteResult<()> { + let mut workspace_client = self.0.services.ws(); + workspace_client + .delete_buffer(tonic::Request::new(BufferNode { + path: path.to_string(), + })) + .await?; + + if let Some((_name, controller)) = self.0.buffers.remove(path) { + controller.stop(); + } + + self.0.filetree.remove(path); + + Ok(()) + } + + /// get the id of the workspace + // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 + pub fn id(&self) -> String { + self.0.name.clone() + } + + /// return a reference to current cursor controller, if currently in a workspace + // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 + pub fn cursor(&self) -> cursor::Controller { + self.0.cursor.clone() + } + + /// get a new reference to a buffer controller, if any is active to given path + // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 + pub fn buffer_by_name(&self, path: &str) -> Option { + self.0.buffers.get(path).map(|x| x.clone()) + } + + /// get a list of all the currently attached to buffers + // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 + pub fn buffer_list(&self) -> Vec { + self.0 + .buffers + .iter() + .map(|elem| elem.key().clone()) + .collect() + } + + /// get the currently cached "filetree" + // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 + pub fn filetree(&self, filter: Option<&str>) -> Vec { + self.0.filetree.iter() + .filter(|f| filter.map_or(true, |flt| f.starts_with(flt))) + .map(|f| f.clone()) + .collect() + } +} + +impl Drop for WorkspaceInner { + fn drop(&mut self) { + for entry in self.buffers.iter() { + if !entry.value().stop() { + tracing::warn!( + "could not stop buffer worker {} for workspace {}", + entry.value().name(), + self.name + ); + } + } + if !self.cursor.stop() { + tracing::warn!("could not stop cursor worker for workspace {}", self.name); + } + } +} + +#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))] +#[cfg_attr(feature = "python", derive(PartialEq))] +pub enum DetachResult { + NotAttached, + Detaching, + AlreadyDetached, +}