diff --git a/.gitignore b/.gitignore index 6ab98ef..2339481 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ Cargo.lock /client/vscode/*.vsix /client/vscode/codemp.node +.cargo + +.vscode/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 78b6b23..8f25c02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,30 +10,30 @@ name = "codemp" # core tracing = "0.1" # woot -codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", tag = "v0.1.0", optional = true } +codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", features = ["serde"], tag = "v0.1.2", optional = true } # proto +uuid = { version = "1.3.1", features = ["v4"], optional = true } tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true } prost = { version = "0.11.8", optional = true } # api similar = { version = "2.2", features = ["inline"], optional = true } -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"], optional = true } +tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"], optional = true } async-trait = { version = "0.1", optional = true } # client md5 = { version = "0.7.0", optional = true } -uuid = { version = "1.3.1", features = ["v4"], optional = true } serde_json = { version = "1", optional = true } tokio-stream = { version = "0.1", optional = true } -# global -lazy_static = { version = "1.4", optional = true } +serde = { version = "1.0.193", features = ["derive"] } +dashmap = { version = "5.5.3", optional = true } +postcard = { version = "1.0.8", optional = true } [build-dependencies] tonic-build = "0.9" [features] -default = ["client"] -api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"] -woot = ["dep:codemp-woot"] -proto = ["dep:prost", "dep:tonic"] -client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json"] -global = ["client", "dep:lazy_static"] -sync = ["client"] +default = [] +api = ["dep:async-trait"] +woot = ["dep:codemp-woot", "dep:similar"] +proto = ["dep:prost", "dep:tonic", "dep:uuid"] +client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json", "dep:dashmap", "dep:postcard"] +server = ["proto", "woot"] diff --git a/build.rs b/build.rs index 8c727c0..d4f38c1 100644 --- a/build.rs +++ b/build.rs @@ -1,5 +1,18 @@ fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/buffer.proto")?; - tonic_build::compile_protos("proto/cursor.proto")?; + tonic_build::configure() + // .build_client(cfg!(feature = "client")) + // .build_server(cfg!(feature = "server")) // FIXME if false, build fails???? + // .build_transport(cfg!(feature = "proto")) + .compile( + &[ + "proto/common.proto", + "proto/cursor.proto", + "proto/files.proto", + "proto/auth.proto", + "proto/workspace.proto", + "proto/buffer.proto", + ], + &["proto"], + )?; Ok(()) -} + } diff --git a/proto/auth.proto b/proto/auth.proto new file mode 100644 index 0000000..39f7a20 --- /dev/null +++ b/proto/auth.proto @@ -0,0 +1,20 @@ +syntax = "proto2"; + +package auth; + +// authenticates users, issuing tokens +service Auth { + // send credentials and join a workspace, returns ready to use token + rpc Login (WorkspaceJoinRequest) returns (Token); +} + +message Token { + required string token = 1; +} + +// TODO one-request-to-do-it-all from login to workspace access +message WorkspaceJoinRequest { + required string username = 1; + required string password = 2; + optional string workspace_id = 3; +} diff --git a/proto/buffer.proto b/proto/buffer.proto index 23f919f..e018440 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -1,60 +1,20 @@ -syntax = "proto3"; +syntax = "proto2"; -package codemp.buffer; +import "common.proto"; + +package buffer; // handle buffer changes, keep in sync users service Buffer { - // attach to a buffer and receive operations - rpc Attach (BufferPayload) returns (stream RawOp); - // send an operation for a specific buffer - rpc Edit (OperationRequest) returns (BufferEditResponse); - // create a new buffer - rpc Create (BufferPayload) returns (BufferCreateResponse); - // get contents of buffer - rpc Sync (BufferPayload) returns (BufferResponse); + // attach to a buffer and receive operations + rpc Attach (stream Operation) returns (stream BufferEvent); } -// empty request -message BufferCreateResponse {} - -// empty request -message BufferEditResponse {} - -// raw wire operation sequence event -message RawOp { - // operation seq serialized to json - string opseq = 1; - - // user id that has executed the operation - string user = 2; +message Operation { + required bytes data = 1; } -// client buffer operation request -message OperationRequest { - // buffer path to operate onto - string path = 1; - - // buffer hash of source state - string hash = 2; - - // raw operation sequence - RawOp op = 3; -} - -// generic buffer operation request -message BufferPayload { - // buffer path to operate onto - string path = 1; - - // user id that is requesting the operation - string user = 2; - - // optional buffer full content for replacing - optional string content = 3; -} - -// response from server with buffer content -message BufferResponse { - // current buffer content - string content = 1; +message BufferEvent { + required Operation op = 1; + required common.Identity user = 2; } diff --git a/proto/common.proto b/proto/common.proto new file mode 100644 index 0000000..6b7ce31 --- /dev/null +++ b/proto/common.proto @@ -0,0 +1,18 @@ +syntax = "proto2"; + +package common; + + +// a wrapper payload representing an uuid +message Identity { + // uuid bytes, as string + required string id = 1; +} + +// a collection of identities +message IdentityList { + repeated Identity users = 1; +} + +//generic Empty message +message Empty { } diff --git a/proto/cursor.proto b/proto/cursor.proto index d48d7c5..0b4861b 100644 --- a/proto/cursor.proto +++ b/proto/cursor.proto @@ -1,44 +1,36 @@ -syntax = "proto3"; +syntax = "proto2"; -package codemp.cursor; +package cursor; +import "common.proto"; +import "files.proto"; // handle cursor events and broadcast to all users service Cursor { - // send cursor movement to server - rpc Moved (CursorEvent) returns (MovedResponse); - // attach to a workspace and receive cursor events - rpc Listen (UserIdentity) returns (stream CursorEvent); + // subscribe to a workspace's cursor events + rpc Attach (stream cursor.CursorPosition) returns (stream cursor.CursorEvent); } -// empty request -message MovedResponse {} // a tuple indicating row and column message RowCol { - int32 row = 1; - int32 col = 2; + required int32 row = 1; + required int32 col = 2; } // cursor position object message CursorPosition { // path of current buffer this cursor is into - string buffer = 1; + required files.BufferNode buffer = 1; // cursor start position - RowCol start = 2; + required RowCol start = 2; // cursor end position - RowCol end = 3; + required RowCol end = 3; } // cursor event, with user id and cursor position message CursorEvent { // user moving the cursor - string user = 1; + required common.Identity user = 1; // new cursor position - CursorPosition position = 2; -} - -// payload identifying user for cursor attaching -message UserIdentity { - // user identifier - string id = 1; + required CursorPosition position = 2; } diff --git a/proto/files.proto b/proto/files.proto new file mode 100644 index 0000000..5df3461 --- /dev/null +++ b/proto/files.proto @@ -0,0 +1,11 @@ +syntax = "proto2"; + +package files; + +message BufferNode { + required string path = 1; +} + +message BufferTree { + repeated BufferNode buffers = 1; +} diff --git a/proto/workspace.proto b/proto/workspace.proto new file mode 100644 index 0000000..26eaa04 --- /dev/null +++ b/proto/workspace.proto @@ -0,0 +1,54 @@ +syntax = "proto2"; + +package workspace; + +import "common.proto"; +import "files.proto"; +import "auth.proto"; + +service Workspace { + rpc Attach (common.Empty) returns (stream WorkspaceEvent); + + rpc CreateBuffer (files.BufferNode) returns (common.Empty); + rpc AccessBuffer (files.BufferNode) returns (BufferCredentials); + rpc DeleteBuffer (files.BufferNode) returns (common.Empty); + + rpc ListBuffers (common.Empty) returns (files.BufferTree); + rpc ListUsers (common.Empty) returns (common.IdentityList); + rpc ListBufferUsers (files.BufferNode) returns (common.IdentityList); +} + +message WorkspaceEvent { + message UserJoin { + required common.Identity user = 1; + } + message UserLeave { + required common.Identity user = 1; + } + message FileCreate { + required string path = 1; + } + message FileRename { + required string before = 1; + required string after = 2; + } + message FileDelete { + required string path = 1; + } + + oneof event { + UserJoin join = 1; + UserLeave leave = 2; + FileCreate create = 3; + FileRename rename = 4; + FileDelete delete = 5; + } +} + +// TODO this is very ugly because we can't just return a new token (which is already smelly but whatev), we also need to tell the underlying id so that +// the client can put it as metadata while attaching, because it can't really know the underlying id that the server is using for each buffer without +// parsing the token itself. meehhhhhh, this bleeds underlying implementation to the upper levels, how can we avoid this?? +message BufferCredentials { + required common.Identity id = 1; + required auth.Token token = 2; +} diff --git a/src/api/change.rs b/src/api/change.rs index 1894c18..c04e18d 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -3,6 +3,9 @@ //! an editor-friendly representation of a text change in a buffer //! to easily interface with codemp from various editors +#[cfg(feature = "woot")] +use crate::woot::{WootResult, woot::Woot, crdt::{TextEditor, CRDT, Op}}; + /// an editor-friendly representation of a text change in a buffer /// /// this represent a range in the previous state of the string and a new content which should be @@ -27,6 +30,7 @@ pub struct TextChange { } impl TextChange { + #[cfg(feature = "woot")] /// create a new TextChange from the difference of given strings pub fn from_diff(before: &str, after: &str) -> TextChange { let diff = similar::TextDiff::from_chars(before, after); @@ -57,6 +61,34 @@ impl TextChange { } } + #[cfg(feature = "woot")] + /// consume the [TextChange], transforming it into a Vec of [woot::crdt::Op] + pub fn transform(self, woot: &Woot) -> WootResult> { + let mut out = Vec::new(); + if self.is_empty() { return Ok(out); } // no-op + let view = woot.view(); + let Some(span) = view.get(self.span.clone()) else { + return Err(crate::woot::WootError::OutOfBounds); + }; + let diff = similar::TextDiff::from_chars(span, &self.content); + for (i, diff) in diff.iter_all_changes().enumerate() { + match diff.tag() { + similar::ChangeTag::Equal => {}, + similar::ChangeTag::Delete => match woot.delete_one(self.span.start + i) { + Err(e) => tracing::error!("could not create deletion: {}", e), + Ok(op) => out.push(op), + }, + similar::ChangeTag::Insert => { + match woot.insert(self.span.start + i, diff.value()) { + Ok(mut op) => out.append(&mut op), + Err(e) => tracing::error!("could not create insertion: {}", e), + } + }, + } + } + Ok(out) + } + /// returns true if this TextChange deletes existing text pub fn is_deletion(&self) -> bool { !self.span.is_empty() @@ -82,11 +114,12 @@ impl TextChange { /// convert from byte index to row and column /// txt must be the whole content of the buffer, in order to count lines - pub fn index_to_rowcol(txt: &str, index: usize) -> crate::proto::RowCol { + #[cfg(feature = "proto")] + pub fn index_to_rowcol(txt: &str, index: usize) -> crate::proto::cursor::RowCol { // FIXME might panic, use .get() let row = txt[..index].matches('\n').count() as i32; let col = txt[..index].split('\n').last().unwrap_or("").len() as i32; - crate::proto::RowCol { row, col } + crate::proto::cursor::RowCol { row, col } } } diff --git a/src/api/controller.rs b/src/api/controller.rs index 465a9f7..18e5136 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -19,8 +19,7 @@ pub(crate) trait ControllerWorker { /// /// this generic trait is implemented by actors managing stream procedures. /// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking -/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking -/// ([Controller::blocking_recv]) is implemented if feature `sync` is enabled. +/// api ([Controller::recv]) is provided to wait for server events. /// /// * if possible, prefer a pure [Controller::recv] consumer, awaiting for events /// * if async is not feasible a [Controller::poll]/[Controller::try_recv] approach is possible @@ -58,11 +57,4 @@ pub trait Controller : Sized + Send + Sync { /// attempt to receive a value without blocking, return None if nothing is available fn try_recv(&self) -> Result>; - - /// sync variant of [Self::recv], blocking invoking thread - /// this calls [Controller::recv] inside a [tokio::runtime::Runtime::block_on] - #[cfg(feature = "sync")] - fn blocking_recv(&self, rt: &tokio::runtime::Handle) -> Result { - rt.block_on(self.recv()) - } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 1610400..dfaff9f 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -9,6 +9,9 @@ /// buffer controller implementation pub mod controller; +/// assorted helpers to handle buffer controllers +pub mod tools; + pub(crate) mod worker; pub use controller::BufferController as Controller; diff --git a/src/tools.rs b/src/buffer/tools.rs similarity index 100% rename from src/tools.rs rename to src/buffer/tools.rs diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 5804a0d..e98ea7c 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,24 +1,21 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; -use similar::{TextDiff, ChangeTag}; use tokio::sync::{watch, mpsc, oneshot}; -use tonic::transport::Channel; use tonic::{async_trait, Streaming}; -use woot::crdt::{Op, CRDT, TextEditor}; +use uuid::Uuid; +use woot::crdt::{Op, CRDT}; use woot::woot::Woot; use crate::errors::IgnorableError; -use crate::proto::{OperationRequest, RawOp}; -use crate::proto::buffer_client::BufferClient; use crate::api::controller::ControllerWorker; use crate::api::TextChange; +use crate::proto::buffer::{BufferEvent, Operation}; use super::controller::BufferController; - -pub(crate) struct BufferControllerWorker { - uid: String, +pub(crate) struct BufferWorker { + _user_id: Uuid, name: String, buffer: Woot, content: watch::Sender, @@ -36,17 +33,17 @@ struct ClonableHandlesForController { content: watch::Receiver, } -impl BufferControllerWorker { - pub fn new(uid: String, path: &str) -> Self { +impl BufferWorker { + pub fn new(user_id: Uuid, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel("".to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let mut hasher = DefaultHasher::new(); - uid.hash(&mut hasher); + user_id.hash(&mut hasher); let site_id = hasher.finish() as usize; - BufferControllerWorker { - uid, + BufferWorker { + _user_id: user_id, name: path.to_string(), buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! content: txt_tx, @@ -62,26 +59,13 @@ impl BufferControllerWorker { stop: end_rx, } } - - async fn send_op(&self, tx: &mut BufferClient, outbound: &Op) -> crate::Result<()> { - let opseq = serde_json::to_string(outbound).expect("could not serialize opseq"); - let req = OperationRequest { - path: self.name.clone(), - hash: format!("{:x}", md5::compute(self.buffer.view())), - op: Some(RawOp { - opseq, user: self.uid.clone(), - }), - }; - let _ = tx.edit(req).await?; - Ok(()) - } } #[async_trait] -impl ControllerWorker for BufferControllerWorker { +impl ControllerWorker for BufferWorker { type Controller = BufferController; - type Tx = BufferClient; - type Rx = Streaming; + type Tx = mpsc::Sender; + type Rx = Streaming; fn subscribe(&self) -> BufferController { BufferController::new( @@ -93,7 +77,7 @@ impl ControllerWorker for BufferControllerWorker { ) } - async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { loop { // block until one of these is ready tokio::select! { @@ -110,49 +94,22 @@ impl ControllerWorker for BufferControllerWorker { // received a text change from editor res = self.operations.recv() => match res { - None => break, - Some(change) => { - if !change.is_empty() { - let view = self.buffer.view(); - match view.get(change.span.clone()) { - None => tracing::error!("received illegal span from client: {:?} but buffer is of len {}", change.span, view.len()), - Some(span) => { - let diff = TextDiff::from_chars(span, &change.content); - - let mut i = 0; - let mut ops = Vec::new(); - for diff in diff.iter_all_changes() { - match diff.tag() { - ChangeTag::Equal => i += 1, - ChangeTag::Delete => match self.buffer.delete(change.span.start + i) { - Ok(op) => ops.push(op), - Err(e) => tracing::error!("could not apply deletion: {}", e), - }, - ChangeTag::Insert => { - for c in diff.value().chars() { - match self.buffer.insert(change.span.start + i, c) { - Ok(op) => { - ops.push(op); - i += 1; - }, - Err(e) => tracing::error!("could not apply insertion: {}", e), - } - } - }, - } - } - - for op in ops { - match self.send_op(&mut tx, &op).await { - Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), - Ok(()) => { - self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); - }, - } - } - }, + None => break tracing::debug!("stopping: editor closed channel"), + Some(change) => match change.transform(&self.buffer) { + Err(e) => break tracing::error!("could not apply operation from client: {}", e), + Ok(ops) => { + for op in ops { + self.buffer.merge(op.clone()); + let operation = Operation { + data: postcard::to_extend(&op, Vec::new()).unwrap(), + }; + if let Err(e) = tx.send(operation).await { + tracing::error!("server refused to broadcast {}: {}", op, e); + } } - } + self.content.send(self.buffer.view()) + .unwrap_or_warn("could not send buffer update"); + }, } }, @@ -160,8 +117,8 @@ impl ControllerWorker for BufferControllerWorker { res = rx.message() => match res { Err(_e) => break, Ok(None) => break, - Ok(Some(change)) => match serde_json::from_str::(&change.opseq) { - Ok(op) => { + Ok(Some(change)) => match postcard::from_bytes::(&change.op.data) { + Ok(op) => { // TODO here in change we receive info about the author, maybe propagate? self.buffer.merge(op); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); for tx in self.pollers.drain(..) { diff --git a/src/client.rs b/src/client.rs index 5b7bf4a..bfd1e84 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,151 +1,155 @@ //! ### client -//! +//! //! codemp client manager, containing grpc services -use std::{sync::Arc, collections::BTreeMap}; +use std::sync::Arc; -use tonic::transport::Channel; +use dashmap::DashMap; +use tokio::sync::mpsc; +use tonic::service::interceptor::InterceptedService; +use tonic::service::Interceptor; +use tonic::transport::{Channel, Endpoint}; +use tonic::IntoRequest; +use uuid::Uuid; +use crate::proto::auth::auth_client::AuthClient; use crate::{ - cursor::{worker::CursorControllerWorker, controller::CursorController}, + api::controller::ControllerWorker, + cursor::worker::CursorWorker, proto::{ - buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, + common::Empty, + buffer::buffer_client::BufferClient, + cursor::cursor_client::CursorClient, + auth::{Token, WorkspaceJoinRequest}, + workspace::workspace_client::WorkspaceClient, }, - Error, api::controller::ControllerWorker, - buffer::{controller::BufferController, worker::BufferControllerWorker}, + workspace::Workspace }; - /// codemp client manager /// /// contains all required grpc services and the unique user id /// will disconnect when dropped /// can be used to interact with server pub struct Client { - id: String, - client: Services, - workspace: Option, + user_id: Uuid, + token_tx: Arc>, + workspaces: Arc>>, + services: Arc } -struct Services { - buffer: BufferClient, - cursor: CursorClient, +#[derive(Clone)] +pub(crate) struct ClientInterceptor { + token: tokio::sync::watch::Receiver } -struct Workspace { - cursor: Arc, - buffers: BTreeMap>, +impl Interceptor for ClientInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + if let Ok(token) = self.token.borrow().token.parse() { + request.metadata_mut().insert("auth", token); + } + + Ok(request) + } } +#[derive(Debug, Clone)] +pub(crate) struct Services { + pub(crate) workspace: WorkspaceClient>, + pub(crate) buffer: BufferClient>, + pub(crate) cursor: CursorClient>, + pub(crate) auth: AuthClient, +} + +// TODO meno losco +fn parse_codemp_connection_string(string: &str) -> (String, String) { + let url = string.replace("codemp://", ""); + let (host, workspace) = url.split_once('/').unwrap(); + (format!("http://{}", host), workspace.to_string()) +} + impl Client { /// instantiate and connect a new client - pub async fn new(dst: &str) -> Result { - let buffer = BufferClient::connect(dst.to_string()).await?; - let cursor = CursorClient::connect(dst.to_string()).await?; - let id = uuid::Uuid::new_v4().to_string(); - - Ok(Client { id, client: Services { buffer, cursor}, workspace: None }) - } + pub async fn new(dest: &str) -> crate::Result { + let (_host, _workspace_id) = parse_codemp_connection_string(dest); - /// return a reference to current cursor controller, if currently in a workspace - pub fn get_cursor(&self) -> Option> { - Some(self.workspace.as_ref()?.cursor.clone()) - } + let channel = Endpoint::from_shared(dest.to_string())? + .connect() + .await?; - /// leave current workspace if in one, disconnecting buffer and cursor controllers - pub fn leave_workspace(&mut self) { - // TODO need to stop tasks? - self.workspace = None - } - - /// disconnect from a specific buffer - pub fn disconnect_buffer(&mut self, path: &str) -> bool { - match &mut self.workspace { - Some(w) => w.buffers.remove(path).is_some(), - None => false, - } - } - - /// get a new reference to a buffer controller, if any is active to given path - pub fn get_buffer(&self, path: &str) -> Option> { - self.workspace.as_ref()?.buffers.get(path).cloned() - } - - /// join a workspace, starting a cursorcontroller and returning a new reference to it - /// - /// to interact with such workspace [crate::api::Controller::send] cursor events or - /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller]. - pub async fn join(&mut self, _session: &str) -> crate::Result> { - // TODO there is no real workspace handling in codemp server so it behaves like one big global - // session. I'm still creating this to start laying out the proper use flow - let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); - - let controller = CursorControllerWorker::new(self.id.clone()); - let client = self.client.cursor.clone(); - - let handle = Arc::new(controller.subscribe()); - - tokio::spawn(async move { - tracing::debug!("cursor worker started"); - controller.work(client, stream).await; - tracing::debug!("cursor worker stopped"); - }); - - self.workspace = Some( - Workspace { - cursor: handle.clone(), - buffers: BTreeMap::new() - } + let (token_tx, token_rx) = tokio::sync::watch::channel( + Token { token: "".to_string() } ); - Ok(handle) + let inter = ClientInterceptor { token: token_rx }; + + let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone()); + let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone()); + let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone()); + let auth = AuthClient::new(channel); + + let user_id = uuid::Uuid::new_v4(); + + Ok(Client { + user_id, + token_tx: Arc::new(token_tx), + workspaces: Arc::new(DashMap::default()), + services: Arc::new(Services { workspace, buffer, cursor, auth }) + }) } - /// create a new buffer in current workspace, with optional given content - pub async fn create(&mut self, path: &str, content: Option<&str>) -> crate::Result<()> { - if let Some(_workspace) = &self.workspace { - self.client.buffer - .create(BufferPayload { - user: self.id.clone(), - path: path.to_string(), - content: content.map(|x| x.to_string()), - }).await?; - - Ok(()) - } else { - Err(Error::InvalidState { msg: "join a workspace first".into() }) - } + pub async fn login(&self, username: String, password: String, workspace_id: Option) -> crate::Result<()> { + Ok(self.token_tx.send( + self.services.auth.clone() + .login(WorkspaceJoinRequest { username, password, workspace_id}) + .await? + .into_inner() + )?) } - /// attach to a buffer, starting a buffer controller and returning a new reference to it - /// - /// to interact with such buffer use [crate::api::Controller::send] or - /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] - pub async fn attach(&mut self, path: &str) -> crate::Result> { - if let Some(workspace) = &mut self.workspace { - let mut client = self.client.buffer.clone(); - let req = BufferPayload { - path: path.to_string(), user: self.id.clone(), content: None - }; + /// join a workspace, returns an [tokio::sync::RwLock] to interact with it + pub async fn join_workspace(&mut self, workspace: &str) -> crate::Result> { + let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner(); - let stream = client.attach(req).await?.into_inner(); + let (tx, rx) = mpsc::channel(256); + let cur_stream = self.services.cursor.clone() + .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await? + .into_inner(); - let controller = BufferControllerWorker::new(self.id.clone(), path); - let handler = Arc::new(controller.subscribe()); + let worker = CursorWorker::default(); + let controller = Arc::new(worker.subscribe()); + tokio::spawn(async move { + tracing::debug!("controller worker started"); + worker.work(tx, cur_stream).await; + tracing::debug!("controller worker stopped"); + }); - let _path = path.to_string(); - tokio::spawn(async move { - tracing::debug!("buffer[{}] worker started", _path); - controller.work(client, stream).await; - tracing::debug!("buffer[{}] worker stopped", _path); - }); + let ws = Arc::new(Workspace::new( + workspace.to_string(), + self.user_id, + self.token_tx.clone(), + controller, + self.services.clone() + )); - workspace.buffers.insert(path.to_string(), handler.clone()); + ws.fetch_users().await?; + ws.fetch_buffers().await?; - Ok(handler) - } else { - Err(Error::InvalidState { msg: "join a workspace first".into() }) - } + ws.run_actor(ws_stream); + + self.workspaces.insert(workspace.to_string(), ws.clone()); + + Ok(ws) + } + + pub fn get_workspace(&self, id: &str) -> Option> { + self.workspaces.get(id).map(|x| x.clone()) + } + + /// accessor for user id + pub fn user_id(&self) -> Uuid { + self.user_id } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index c77784e..a959b1a 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tonic::async_trait; -use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors::IgnorableError}; +use crate::{api::Controller, errors::IgnorableError, proto::cursor::{CursorEvent, CursorPosition}}; /// the cursor controller implementation /// @@ -20,8 +20,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors /// upon dropping this handle will stop the associated worker #[derive(Debug)] pub struct CursorController { - uid: String, - op: mpsc::UnboundedSender, + op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -35,13 +34,12 @@ impl Drop for CursorController { impl CursorController { pub(crate) fn new( - uid: String, - op: mpsc::UnboundedSender, + op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - CursorController { uid, op, last_op, stream, stop } + CursorController { op, last_op, stream, stop } } } @@ -51,14 +49,11 @@ impl Controller for CursorController { /// enqueue a cursor event to be broadcast to current workspace /// will automatically invert cursor start/end if they are inverted - fn send(&self, mut cursor: CursorPosition) -> Result<(), Error> { - if cursor.start() > cursor.end() { + fn send(&self, mut cursor: CursorPosition) -> crate::Result<()> { + if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self.op.send(CursorEvent { - user: self.uid.clone(), - position: Some(cursor), - })?) + Ok(self.op.send(cursor)?) } /// try to receive without blocking, but will still block on stream mutex @@ -67,7 +62,7 @@ impl Controller for CursorController { match stream.try_recv() { Ok(x) => Ok(Some(x)), Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), + Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(TryRecvError::Lagged(n)) => { tracing::warn!("cursor channel lagged, skipping {} events", n); Ok(stream.try_recv().ok()) @@ -78,11 +73,11 @@ impl Controller for CursorController { // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? /// get next cursor event from current workspace, or block until one is available - async fn recv(&self) -> Result { + async fn recv(&self) -> crate::Result { let mut stream = self.stream.lock().await; match stream.recv().await { Ok(x) => Ok(x), - Err(RecvError::Closed) => Err(Error::Channel { send: false }), + Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(RecvError::Lagged(n)) => { tracing::error!("cursor channel lagged behind, skipping {} events", n); Ok(stream.recv().await.expect("could not receive after lagging")) diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 0e34aa7..59d68f3 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -12,7 +12,7 @@ pub mod controller; pub use controller::CursorController as Controller; -use crate::proto::{RowCol, CursorPosition}; +use crate::proto::cursor::RowCol; impl From:: for (i32, i32) { fn from(pos: RowCol) -> (i32, i32) { @@ -26,25 +26,6 @@ impl From::<(i32, i32)> for RowCol { } } -impl RowCol { - /// create a RowCol and wrap into an Option, to help build protocol packets - pub fn wrap(row: i32, col: i32) -> Option { - Some(RowCol { row, col }) - } -} - -impl CursorPosition { - /// extract start position, defaulting to (0,0), to help build protocol packets - pub fn start(&self) -> RowCol { - self.start.clone().unwrap_or((0, 0).into()) - } - - /// extract end position, defaulting to (0,0), to help build protocol packets - pub fn end(&self) -> RowCol { - self.end.clone().unwrap_or((0, 0).into()) - } -} - impl PartialOrd for RowCol { fn partial_cmp(&self, other: &Self) -> Option { match self.row.partial_cmp(&other.row) { diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index e0196bd..3f45dfc 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -1,16 +1,15 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; -use tonic::{Streaming, transport::Channel, async_trait}; +use tonic::{Streaming, async_trait}; -use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::controller::ControllerWorker}; +use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::{CursorPosition, CursorEvent}}; use super::controller::CursorController; -pub(crate) struct CursorControllerWorker { - uid: String, - producer: mpsc::UnboundedSender, - op: mpsc::UnboundedReceiver, +pub(crate) struct CursorWorker { + producer: mpsc::UnboundedSender, + op: mpsc::UnboundedReceiver, changed: watch::Sender, last_op: watch::Receiver, channel: Arc>, @@ -18,14 +17,13 @@ pub(crate) struct CursorControllerWorker { stop_control: mpsc::UnboundedSender<()>, } -impl CursorControllerWorker { - pub(crate) fn new(uid: String) -> Self { +impl Default for CursorWorker { + fn default() -> Self { let (op_tx, op_rx) = mpsc::unbounded_channel(); let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (change_tx, change_rx) = watch::channel(CursorEvent::default()); Self { - uid, producer: op_tx, op: op_rx, changed: change_tx, @@ -38,14 +36,13 @@ impl CursorControllerWorker { } #[async_trait] -impl ControllerWorker for CursorControllerWorker { +impl ControllerWorker for CursorWorker { type Controller = CursorController; - type Tx = CursorClient; + type Tx = mpsc::Sender; type Rx = Streaming; fn subscribe(&self) -> CursorController { CursorController::new( - self.uid.clone(), self.producer.clone(), Mutex::new(self.last_op.clone()), Mutex::new(self.channel.subscribe()), @@ -53,19 +50,17 @@ impl ControllerWorker for CursorControllerWorker { ) } - async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { loop { tokio::select!{ Ok(Some(cur)) = rx.message() => { - if cur.user == self.uid { continue } self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.changed.send(cur).unwrap_or_warn("could not update last event"); }, - Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); }, Some(()) = self.stop.recv() => { break; }, else => break, } } } } - diff --git a/src/errors.rs b/src/errors.rs index 5409ca2..ac4d616 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -109,6 +109,13 @@ impl From> for Error { } } +#[cfg(feature = "client")] +impl From> for Error { + fn from(_value: tokio::sync::watch::error::SendError) -> Self { + Error::Channel { send: true } + } +} + #[cfg(feature = "client")] impl From for Error { fn from(_value: tokio::sync::broadcast::error::RecvError) -> Self { diff --git a/src/lib.rs b/src/lib.rs index 3345a0e..5421e05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,8 @@ //! > the core library of the codemp project, driving all editor plugins //! //! ## structure -//! The main entrypoint is the [Instance] object, that maintains a connection and can -//! be used to join workspaces or attach to buffers. It contains the underlying [client::Client] and +//! The main entrypoint is the [Client] object, that maintains a connection and can +//! be used to join workspaces or attach to buffers. It contains the underlying [Workspace] and //! stores active controllers. //! //! Some actions will return structs implementing the [api::Controller] trait. These can be polled @@ -21,67 +21,79 @@ //! immediately but instead deferred until compatible. //! //! ## features -//! * `woot` : include the underlying CRDT library and re-exports it (default enabled) -//! * `api` : include traits for core interfaces under [api] (default enabled) -//! * `proto` : include GRCP protocol definitions under [proto] (default enabled) -//! * `client`: include the local [client] implementation (default enabled) -//! * `global`: provide a lazy_static global INSTANCE in [instance::global] -//! * `sync` : wraps the [instance::a_sync::Instance] holder into a sync variant: [instance::sync::Instance] +//! * `api` : include traits for core interfaces under [api] (default enabled) +//! * `woot` : include the underlying CRDT library and re-exports it (default enabled) +//! * `proto` : include GRCP protocol definitions under [proto] (default enabled) +//! * `client` : include the local [client] implementation (default enabled) //! //! ## examples -//! while the [client::Client] itself is the core structure implementing all methods, plugins will mostly -//! interact with [Instance] managers. +//! most methods are split between the [Client] itself and the current [Workspace] //! //! ### async -//! this library is natively async and thus async usage should be preferred if possible with -//! [instance::a_sync::Instance] -//! //! ```rust,no_run //! use codemp::api::{Controller, TextChange}; -//! # use codemp::instance::a_sync::Instance; //! //! # async fn async_example() -> codemp::Result<()> { -//! let session = Instance::default(); // create global session -//! session.connect("http://alemi.dev:50051").await?; // connect to remote server +//! // creating a client session will immediately attempt to connect +//! let mut session = codemp::Client::new("http://alemi.dev:50053").await?; +//! +//! // login first, obtaining a new token granting access to 'some_workspace' +//! session.login( +//! "username".to_string(), +//! "password".to_string(), +//! Some("some_workspace".to_string()) +//! ).await?; //! -//! // join a remote workspace, obtaining a cursor controller -//! let cursor = session.join("some_workspace").await?; -//! cursor.send( // move cursor -//! codemp::proto::CursorPosition { +//! // join a remote workspace, obtaining a workspace handle +//! let workspace = session.join_workspace("some_workspace").await?; +//! +//! workspace.cursor().send( // move cursor +//! codemp::proto::cursor::CursorPosition { //! buffer: "test.txt".into(), -//! start: Some(codemp::proto::RowCol { row: 0, col: 0 }), -//! end: Some(codemp::proto::RowCol { row: 0, col: 1 }), +//! start: codemp::proto::cursor::RowCol { row: 0, col: 0 }, +//! end: codemp::proto::cursor::RowCol { row: 0, col: 1 }, //! } //! )?; -//! let op = cursor.recv().await?; // listen for event +//! let op = workspace.cursor().recv().await?; // receive event from server //! println!("received cursor event: {:?}", op); //! //! // attach to a new buffer and execute operations on it -//! session.create("test.txt", None).await?; // create new buffer -//! let buffer = session.attach("test.txt").await?; // attach to it +//! workspace.create("test.txt").await?; // create new buffer +//! let buffer = workspace.attach("test.txt").await?; // attach to it //! let local_change = TextChange { span: 0..0, content: "hello!".into() }; //! buffer.send(local_change)?; // insert some text -//! let remote_change = buffer.recv().await?; +//! let remote_change = buffer.recv().await?; // await remote change //! # //! # Ok(()) //! # } //! ``` //! +//! it's always possible to get a [Workspace] reference using [Client::get_workspace] +//! //! ### sync -//! if async is not viable, including the feature `sync` will provide a sync-only [instance::sync::Instance] variant +//! if async is not viable, a solution might be keeping a global tokio runtime and blocking on it: //! //! ```rust,no_run -//! # use codemp::instance::sync::Instance; //! # use std::sync::Arc; //! # use codemp::api::Controller; //! # //! # fn sync_example() -> codemp::Result<()> { -//! let session = Instance::default(); // instantiate sync variant -//! session.connect("http://alemi.dev:50051")?; // connect to server +//! let rt = tokio::runtime::Runtime::new().unwrap(); +//! let mut session = rt.block_on( // using block_on allows calling async code +//! codemp::Client::new("http://alemi.dev:50051") +//! )?; +//! +//! rt.block_on(session.login( +//! "username".to_string(), +//! "password".to_string(), +//! Some("some_workspace".to_string()) +//! ))?; +//! +//! let workspace = rt.block_on(session.join_workspace("some_workspace"))?; //! //! // attach to buffer and blockingly receive events -//! let buffer = session.attach("test.txt")?; // attach to buffer, must already exist -//! while let Ok(op) = buffer.blocking_recv(session.rt()) { // must pass runtime +//! let buffer = rt.block_on(workspace.attach("test.txt"))?; // attach to buffer, must already exist +//! while let Ok(op) = rt.block_on(buffer.recv()) { // must pass runtime //! println!("received buffer event: {:?}", op); //! } //! # @@ -89,30 +101,6 @@ //! # } //! ``` //! -//! ### global -//! if instantiating the [Instance] manager is not possible, adding the feature `global` will -//! provide a static lazyly-allocated global reference: [struct@instance::global::INSTANCE]. -//! -//! ```rust,no_run -//! # use codemp::instance::sync::Instance; -//! # use std::sync::Arc; -//! use codemp::prelude::*; // prelude includes everything with "Codemp" in front -//! # fn global_example() -> codemp::Result<()> { -//! CODEMP_INSTANCE.connect("http://alemi.dev:50051")?; // connect to server -//! let cursor = CODEMP_INSTANCE.join("some_workspace")?; // join workspace -//! std::thread::spawn(move || { -//! loop { -//! match cursor.try_recv() { -//! Ok(Some(event)) => println!("received cursor event: {:?}", event), // there might be more -//! Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)), // wait for more -//! Err(e) => break, // channel closed -//! } -//! } -//! }); -//! # Ok(()) -//! # } -//! ``` -//! //! ## references //! //! ![another cool pic coz why not](https://alemi.dev/img/about-slice-2.png) @@ -148,11 +136,9 @@ pub mod errors; #[cfg(feature = "client")] pub mod client; -pub mod tools; - -/// client wrapper to handle memory persistence +/// workspace operations #[cfg(feature = "client")] -pub mod instance; +pub mod workspace; /// all-in-one imports : `use codemp::prelude::*;` pub mod prelude; @@ -165,17 +151,70 @@ pub use woot; #[cfg(feature = "proto")] #[allow(non_snake_case)] pub mod proto { - tonic::include_proto!("codemp.buffer"); - tonic::include_proto!("codemp.cursor"); -} + pub mod common { + tonic::include_proto!("common"); + impl From for Identity { + fn from(id: uuid::Uuid) -> Self { + Identity { id: id.to_string() } + } + } + + impl From<&uuid::Uuid> for Identity { + fn from(id: &uuid::Uuid) -> Self { + Identity { id: id.to_string() } + } + } + + impl From for uuid::Uuid { + fn from(value: Identity) -> Self { + uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity") + } + } + + impl From<&Identity> for uuid::Uuid { + fn from(value: &Identity) -> Self { + uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity") + } + } + } + + + pub mod files { + tonic::include_proto!("files"); + + impl From for BufferNode { + fn from(value: String) -> Self { + BufferNode { path: value } + } + } + + impl From<&str> for BufferNode { + fn from(value: &str) -> Self { + BufferNode { path: value.to_string() } + } + } + + impl From for String { + fn from(value: BufferNode) -> Self { + value.path + } + } + } + + pub mod buffer { tonic::include_proto!("buffer"); } + pub mod cursor { tonic::include_proto!("cursor"); } + pub mod workspace { tonic::include_proto!("workspace"); } + pub mod auth { tonic::include_proto!("auth"); } +} pub use errors::Error; pub use errors::Result; -#[cfg(all(feature = "client", feature = "sync"))] -pub use instance::sync::Instance; +#[cfg(feature = "client")] +pub use client::Client; + +#[cfg(feature = "client")] +pub use workspace::Workspace; -#[cfg(all(feature = "client", not(feature = "sync")))] -pub use instance::a_sync::Instance; diff --git a/src/prelude.rs b/src/prelude.rs index 44170b9..c01864c 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -18,18 +18,17 @@ pub use crate::api::{ #[cfg(feature = "client")] pub use crate::{ - Instance as CodempInstance, + // Instance as CodempInstance, client::Client as CodempClient, + workspace::Workspace as CodempWorkspace, + workspace::UserInfo as CodempUserInfo, cursor::Controller as CodempCursorController, buffer::Controller as CodempBufferController, }; #[cfg(feature = "proto")] pub use crate::{ - proto::CursorPosition as CodempCursorPosition, - proto::CursorEvent as CodempCursorEvent, - proto::RowCol as CodempRowCol, + proto::cursor::CursorPosition as CodempCursorPosition, + proto::cursor::CursorEvent as CodempCursorEvent, + proto::cursor::RowCol as CodempRowCol, }; - -#[cfg(feature = "global")] -pub use crate::instance::global::INSTANCE as CODEMP_INSTANCE; diff --git a/src/workspace.rs b/src/workspace.rs new file mode 100644 index 0000000..9c42b23 --- /dev/null +++ b/src/workspace.rs @@ -0,0 +1,184 @@ +use std::{collections::BTreeSet, sync::Arc}; +use tokio::sync::mpsc; +use dashmap::{DashMap, DashSet}; +use tonic::Streaming; +use uuid::Uuid; +use crate::{ + api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor, + proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}} +}; + +//TODO may contain more info in the future +#[derive(Debug, Clone)] +pub struct UserInfo { + pub uuid: Uuid +} + +pub struct Workspace { + id: String, + user_id: Uuid, // reference to global user id + token: Arc>, + cursor: Arc, + buffers: Arc>>, + pub(crate) filetree: Arc>, + pub(crate) users: Arc>, + services: Arc +} + +impl Workspace { + /// create a new buffer and perform initial fetch operations + pub(crate) fn new( + id: String, + user_id: Uuid, + token: Arc>, + cursor: Arc, + services: Arc + ) -> Self { + Workspace { + id, + user_id, + token, + cursor, + buffers: Arc::new(DashMap::default()), + filetree: Arc::new(DashSet::default()), + users: Arc::new(DashMap::default()), + services + } + } + + pub(crate) fn run_actor(&self, mut stream: Streaming) { + let users = self.users.clone(); + let filetree = self.filetree.clone(); + let name = self.id(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), + Ok(None) => break tracing::info!("leaving workspace {}", name), + Ok(Some(WorkspaceEvent { event: None })) => tracing::warn!("workspace {} received empty event", name), + Ok(Some(WorkspaceEvent { event: Some(ev) })) => match ev { + WorkspaceEventInner::Join(UserJoin { user }) => { users.insert(user.clone().into(), UserInfo { uuid: user.into() }); }, + WorkspaceEventInner::Leave(UserLeave { user }) => { users.remove(&user.into()); }, + WorkspaceEventInner::Create(FileCreate { path }) => { filetree.insert(path); }, + WorkspaceEventInner::Rename(FileRename { before, after }) => { filetree.remove(&before); filetree.insert(after); }, + WorkspaceEventInner::Delete(FileDelete { path }) => { filetree.remove(&path); }, + }, + } + } + }); + } + + /// create a new buffer in current workspace + pub async fn create(&self, path: &str) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + workspace_client.create_buffer( + tonic::Request::new(BufferNode { path: path.to_string() }) + ).await?; + + // add to filetree + self.filetree.insert(path.to_string()); + + // fetch buffers + self.fetch_buffers().await?; + + Ok(()) + } + + /// attach to a buffer, starting a buffer controller and returning a new reference to it + /// + /// to interact with such buffer use [crate::api::Controller::send] or + /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] + pub async fn attach(&self, path: &str) -> crate::Result> { + let mut worskspace_client = self.services.workspace.clone(); + let request = tonic::Request::new(BufferNode { path: path.to_string() }); + let credentials = worskspace_client.access_buffer(request).await?.into_inner(); + self.token.send(credentials.token)?; + + let (tx, rx) = mpsc::channel(256); + let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); + req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence")); + let stream = self.services.buffer.clone().attach(req).await?.into_inner(); + + let worker = BufferWorker::new(self.user_id, path); + let controller = Arc::new(worker.subscribe()); + tokio::spawn(async move { + tracing::debug!("controller worker started"); + worker.work(tx, stream).await; + tracing::debug!("controller worker stopped"); + }); + + self.buffers.insert(path.to_string(), controller.clone()); + + Ok(controller) + } + + /// fetch a list of all buffers in a workspace + pub async fn fetch_buffers(&self) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + let buffers = workspace_client.list_buffers( + tonic::Request::new(Empty {}) + ).await?.into_inner().buffers; + + self.filetree.clear(); + for b in buffers { + self.filetree.insert(b.path); + } + + Ok(()) + } + + /// fetch a list of all users in a workspace + pub async fn fetch_users(&self) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + let users = BTreeSet::from_iter(workspace_client.list_users( + tonic::Request::new(Empty {}) + ).await?.into_inner().users.into_iter().map(Uuid::from)); + + self.users.clear(); + for u in users { + self.users.insert(u, UserInfo { uuid: u }); + } + + Ok(()) + } + + /// get a list of the users attached to a specific buffer + /// + /// TODO: discuss implementation details + pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { + let mut workspace_client = self.services.workspace.clone(); + let buffer_users = workspace_client.list_buffer_users( + tonic::Request::new(BufferNode { path: path.to_string() }) + ).await?.into_inner().users; + + Ok(buffer_users) + } + + /// delete a buffer + pub async fn delete(&self, path: &str) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + workspace_client.delete_buffer( + tonic::Request::new(BufferNode { path: path.to_string() }) + ).await?; + + self.filetree.remove(path); + + Ok(()) + } + + /// get the id of the workspace + pub fn id(&self) -> String { self.id.clone() } + + /// return a reference to current cursor controller, if currently in a workspace + pub fn cursor(&self) -> Arc { self.cursor.clone() } + + /// get a new reference to a buffer controller, if any is active to given path + pub fn buffer_by_name(&self, path: &str) -> Option> { + self.buffers.get(path).map(|x| x.clone()) + } + + /// get the currently cached "filetree" + pub fn filetree(&self) -> Vec { + self.filetree.iter().map(|f| f.clone()).collect() + } +}