From 94a7786812ae0b929bc5029657131e6c46cf3804 Mon Sep 17 00:00:00 2001 From: zaaarf Date: Thu, 25 Jan 2024 02:13:45 +0100 Subject: [PATCH] feat: workspaces and new library structure Co-authored-by: alemi Co-authored-by: frelodev --- Cargo.toml | 6 +- proto/cursor_service.proto | 6 +- proto/files.proto | 4 +- proto/user.proto | 2 +- proto/workspace.proto | 31 ++---- src/api/change.rs | 2 + src/buffer/worker.rs | 49 ++++----- src/client.rs | 199 +++++++++++++++------------------- src/cursor/controller.rs | 21 ++-- src/cursor/mod.rs | 6 +- src/cursor/worker.rs | 26 ++--- src/errors.rs | 7 ++ src/lib.rs | 6 +- src/prelude.rs | 4 +- src/workspace.rs | 215 +++++++++++++++++++++++++++++++++++++ 15 files changed, 380 insertions(+), 204 deletions(-) create mode 100644 src/workspace.rs diff --git a/Cargo.toml b/Cargo.toml index 6039141..575d071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ name = "codemp" # core tracing = "0.1" # woot -codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", tag = "v0.1.0", optional = true } +codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", features = ["serde"], tag = "v0.1.0", optional = true } # proto tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true } prost = { version = "0.11.8", optional = true } @@ -26,12 +26,13 @@ tokio-stream = { version = "0.1", optional = true } # global lazy_static = { version = "1.4", optional = true } serde = { version = "1.0.193", features = ["derive"] } +postcard = "1.0.8" [build-dependencies] tonic-build = "0.9" [features] -default = ["transport", "dep:serde_json"] +default = ["client"] api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"] woot = ["dep:codemp-woot"] transport = ["dep:prost", "dep:tonic"] @@ -39,3 +40,4 @@ client = ["transport", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "d server = ["transport"] global = ["client", "dep:lazy_static"] sync = ["client"] +backport = [] # TODO remove! diff --git a/proto/cursor_service.proto b/proto/cursor_service.proto index 088add5..4044f3e 100644 --- a/proto/cursor_service.proto +++ b/proto/cursor_service.proto @@ -6,8 +6,6 @@ import "user.proto"; // handle cursor events and broadcast to all users service Cursor { - // send cursor movement to server - rpc Moved (cursor.CursorEvent) returns (cursor.MovedResponse); - // attach to a workspace and receive cursor events - rpc Listen (user.UserIdentity) returns (stream cursor.CursorEvent); + // subscribe to a workspace's cursor events + rpc Attach (stream cursor.CursorEvent) returns (stream cursor.CursorEvent); } diff --git a/proto/files.proto b/proto/files.proto index 152a00b..e57cdc2 100644 --- a/proto/files.proto +++ b/proto/files.proto @@ -3,11 +3,11 @@ syntax = "proto2"; package files; -message BufferNode{ +message BufferNode { required string path = 1; } -message BufferTree{ +message BufferTree { repeated BufferNode buffers = 1; } diff --git a/proto/user.proto b/proto/user.proto index f639f8c..322a935 100644 --- a/proto/user.proto +++ b/proto/user.proto @@ -4,7 +4,7 @@ package user; // payload identifying user -message UserIdentity{ +message UserIdentity { // user identifier required string id = 1; } diff --git a/proto/workspace.proto b/proto/workspace.proto index f0f2427..b076cf3 100644 --- a/proto/workspace.proto +++ b/proto/workspace.proto @@ -4,14 +4,12 @@ package workspace; import "user.proto"; import "files.proto"; - message Empty {} - message TreeRequest {} // empty message UserRequest {} -message CursorResponse{} -message UserListRequest{} +message CursorResponse {} +message UserListRequest {} message WorkspaceUserList { repeated user.UserIdentity user = 1; @@ -21,19 +19,16 @@ message WorkspaceMessage { required int32 id = 1; } -message JoinRequest{ +message JoinRequest { required string username=1; required string password=2; } -message AttachRequest{ - required string bufferAttach = 1; +message AttachRequest { + required string id = 1; } - - - -message Token{ +message Token { required string token = 1; } @@ -44,7 +39,7 @@ enum FileEventType { } message FileEvent { - required string buffer = 1; + required string bufferbuffertree = 1; required FileEventType type = 2; } @@ -56,28 +51,18 @@ enum UserEventType { message UserEvent { required user.UserIdentity user = 1; - required UserEventType type = 2; } - - message BufferPayload { // buffer path to operate onto required string path = 1; - - // user id that is requesting the operation - required user.UserIdentity user = 2; - } - message BufferListRequest{ } - - -message UserList{ +message UserList { repeated user.UserIdentity users = 1; } \ No newline at end of file diff --git a/src/api/change.rs b/src/api/change.rs index 1894c18..e9e6792 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -3,6 +3,8 @@ //! an editor-friendly representation of a text change in a buffer //! to easily interface with codemp from various editors +use crate::proto::cursor::RowCol; + /// an editor-friendly representation of a text change in a buffer /// /// this represent a range in the previous state of the string and a new content which should be diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 5804a0d..c2e77bc 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -3,22 +3,20 @@ use std::hash::{Hash, Hasher}; use similar::{TextDiff, ChangeTag}; use tokio::sync::{watch, mpsc, oneshot}; -use tonic::transport::Channel; use tonic::{async_trait, Streaming}; +use uuid::Uuid; use woot::crdt::{Op, CRDT, TextEditor}; use woot::woot::Woot; use crate::errors::IgnorableError; -use crate::proto::{OperationRequest, RawOp}; -use crate::proto::buffer_client::BufferClient; use crate::api::controller::ControllerWorker; use crate::api::TextChange; +use crate::proto::buffer_service::Operation; use super::controller::BufferController; - -pub(crate) struct BufferControllerWorker { - uid: String, +pub(crate) struct BufferWorker { + _user_id: Uuid, name: String, buffer: Woot, content: watch::Sender, @@ -36,17 +34,17 @@ struct ClonableHandlesForController { content: watch::Receiver, } -impl BufferControllerWorker { - pub fn new(uid: String, path: &str) -> Self { +impl BufferWorker { + pub fn new(user_id: Uuid, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel("".to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let mut hasher = DefaultHasher::new(); - uid.hash(&mut hasher); + user_id.hash(&mut hasher); let site_id = hasher.finish() as usize; - BufferControllerWorker { - uid, + BufferWorker { + _user_id: user_id, name: path.to_string(), buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! content: txt_tx, @@ -62,26 +60,13 @@ impl BufferControllerWorker { stop: end_rx, } } - - async fn send_op(&self, tx: &mut BufferClient, outbound: &Op) -> crate::Result<()> { - let opseq = serde_json::to_string(outbound).expect("could not serialize opseq"); - let req = OperationRequest { - path: self.name.clone(), - hash: format!("{:x}", md5::compute(self.buffer.view())), - op: Some(RawOp { - opseq, user: self.uid.clone(), - }), - }; - let _ = tx.edit(req).await?; - Ok(()) - } } #[async_trait] impl ControllerWorker for BufferControllerWorker { type Controller = BufferController; - type Tx = BufferClient; - type Rx = Streaming; + type Tx = mpsc::Sender; + type Rx = Streaming; fn subscribe(&self) -> BufferController { BufferController::new( @@ -93,7 +78,7 @@ impl ControllerWorker for BufferControllerWorker { ) } - async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { loop { // block until one of these is ready tokio::select! { @@ -143,7 +128,13 @@ impl ControllerWorker for BufferControllerWorker { } for op in ops { - match self.send_op(&mut tx, &op).await { + let operation = Operation { + data: postcard::to_extend(&op, Vec::new()).unwrap(), + user: None, + path: Some(self.name.clone()) + }; + + match tx.send(operation).await { Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), Ok(()) => { self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); @@ -160,7 +151,7 @@ impl ControllerWorker for BufferControllerWorker { res = rx.message() => match res { Err(_e) => break, Ok(None) => break, - Ok(Some(change)) => match serde_json::from_str::(&change.opseq) { + Ok(Some(change)) => match postcard::from_bytes::(&change.data) { Ok(op) => { self.buffer.merge(op); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); diff --git a/src/client.rs b/src/client.rs index 5b7bf4a..fbfaf51 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,20 +1,21 @@ //! ### client -//! +//! //! codemp client manager, containing grpc services -use std::{sync::Arc, collections::BTreeMap}; - -use tonic::transport::Channel; - -use crate::{ - cursor::{worker::CursorControllerWorker, controller::CursorController}, - proto::{ - buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, - }, - Error, api::controller::ControllerWorker, - buffer::{controller::BufferController, worker::BufferControllerWorker}, -}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tonic::service::interceptor::InterceptedService; +use tonic::service::Interceptor; +use tonic::transport::{Channel, Endpoint}; +use uuid::Uuid; +use crate::api::controller::ControllerWorker; +use crate::cursor::worker::CursorWorker; +use crate::proto::buffer_service::buffer_client::BufferClient; +use crate::proto::cursor_service::cursor_client::CursorClient; +use crate::proto::workspace::{JoinRequest, Token}; +use crate::proto::workspace_service::workspace_client::WorkspaceClient; +use crate::workspace::Workspace; /// codemp client manager /// @@ -22,130 +23,102 @@ use crate::{ /// will disconnect when dropped /// can be used to interact with server pub struct Client { - id: String, - client: Services, + user_id: Uuid, + token_tx: Arc>, workspace: Option, + services: Arc } -struct Services { - buffer: BufferClient, - cursor: CursorClient, +#[derive(Clone)] +pub(crate) struct ClientInterceptor { + token: tokio::sync::watch::Receiver } -struct Workspace { - cursor: Arc, - buffers: BTreeMap>, +impl Interceptor for ClientInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + if let Ok(token) = self.token.borrow().token.parse() { + request.metadata_mut().insert("auth", token); + } + + Ok(request) + } } +#[derive(Debug, Clone)] +pub(crate) struct Services { + pub(crate) workspace: crate::proto::workspace_service::workspace_client::WorkspaceClient>, + pub(crate) buffer: crate::proto::buffer_service::buffer_client::BufferClient>, + pub(crate) cursor: crate::proto::cursor_service::cursor_client::CursorClient>, +} + +// TODO meno losco +fn parse_codemp_connection_string<'a>(string: &'a str) -> (String, String) { + let url = string.replace("codemp://", ""); + let (host, workspace) = url.split_once('/').unwrap(); + (format!("http://{}", host), workspace.to_string()) +} + impl Client { /// instantiate and connect a new client - pub async fn new(dst: &str) -> Result { - let buffer = BufferClient::connect(dst.to_string()).await?; - let cursor = CursorClient::connect(dst.to_string()).await?; - let id = uuid::Uuid::new_v4().to_string(); - - Ok(Client { id, client: Services { buffer, cursor}, workspace: None }) - } + pub async fn new(dest: &str) -> crate::Result { //TODO interceptor + let (_host, _workspace_id) = parse_codemp_connection_string(dest); - /// return a reference to current cursor controller, if currently in a workspace - pub fn get_cursor(&self) -> Option> { - Some(self.workspace.as_ref()?.cursor.clone()) - } + let channel = Endpoint::from_shared(dest.to_string())? + .connect() + .await?; - /// leave current workspace if in one, disconnecting buffer and cursor controllers - pub fn leave_workspace(&mut self) { - // TODO need to stop tasks? - self.workspace = None - } + let (token_tx, token_rx) = tokio::sync::watch::channel( + Token { token: "".to_string() } + ); - /// disconnect from a specific buffer - pub fn disconnect_buffer(&mut self, path: &str) -> bool { - match &mut self.workspace { - Some(w) => w.buffers.remove(path).is_some(), - None => false, - } - } + let inter = ClientInterceptor { token: token_rx }; - /// get a new reference to a buffer controller, if any is active to given path - pub fn get_buffer(&self, path: &str) -> Option> { - self.workspace.as_ref()?.buffers.get(path).cloned() + let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone()); + let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone()); + let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone()); + + let user_id = uuid::Uuid::new_v4(); + + Ok(Client { + user_id, + token_tx: Arc::new(token_tx), + workspace: None, + services: Arc::new(Services { workspace, buffer, cursor }) + }) } /// join a workspace, starting a cursorcontroller and returning a new reference to it - /// + /// /// to interact with such workspace [crate::api::Controller::send] cursor events or /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller]. - pub async fn join(&mut self, _session: &str) -> crate::Result> { - // TODO there is no real workspace handling in codemp server so it behaves like one big global - // session. I'm still creating this to start laying out the proper use flow - let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); + pub async fn join(&mut self, workspace_id: &str) -> crate::Result<()> { + self.token_tx.send(self.services.workspace.clone().join( + tonic::Request::new(JoinRequest { username: "".to_string(), password: "".to_string() }) //TODO + ).await?.into_inner())?; - let controller = CursorControllerWorker::new(self.id.clone()); - let client = self.client.cursor.clone(); - - let handle = Arc::new(controller.subscribe()); + let (tx, rx) = mpsc::channel(10); + let stream = self.services.cursor.clone() + .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await? + .into_inner(); + let worker = CursorWorker::new(self.user_id.clone()); + let controller = Arc::new(worker.subscribe()); tokio::spawn(async move { - tracing::debug!("cursor worker started"); - controller.work(client, stream).await; - tracing::debug!("cursor worker stopped"); + tracing::debug!("controller worker started"); + worker.work(tx, stream).await; + tracing::debug!("controller worker stopped"); }); - self.workspace = Some( - Workspace { - cursor: handle.clone(), - buffers: BTreeMap::new() - } - ); + self.workspace = Some(Workspace::new( + workspace_id.to_string(), + self.user_id, + self.token_tx.clone(), + controller, + self.services.clone() + ).await?); - Ok(handle) - } - - /// create a new buffer in current workspace, with optional given content - pub async fn create(&mut self, path: &str, content: Option<&str>) -> crate::Result<()> { - if let Some(_workspace) = &self.workspace { - self.client.buffer - .create(BufferPayload { - user: self.id.clone(), - path: path.to_string(), - content: content.map(|x| x.to_string()), - }).await?; - - Ok(()) - } else { - Err(Error::InvalidState { msg: "join a workspace first".into() }) - } - } - - /// attach to a buffer, starting a buffer controller and returning a new reference to it - /// - /// to interact with such buffer use [crate::api::Controller::send] or - /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] - pub async fn attach(&mut self, path: &str) -> crate::Result> { - if let Some(workspace) = &mut self.workspace { - let mut client = self.client.buffer.clone(); - let req = BufferPayload { - path: path.to_string(), user: self.id.clone(), content: None - }; - - let stream = client.attach(req).await?.into_inner(); - - let controller = BufferControllerWorker::new(self.id.clone(), path); - let handler = Arc::new(controller.subscribe()); - - let _path = path.to_string(); - tokio::spawn(async move { - tracing::debug!("buffer[{}] worker started", _path); - controller.work(client, stream).await; - tracing::debug!("buffer[{}] worker stopped", _path); - }); - - workspace.buffers.insert(path.to_string(), handler.clone()); - - Ok(handler) - } else { - Err(Error::InvalidState { msg: "join a workspace first".into() }) - } + Ok(()) } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index c77784e..3e793ac 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -4,8 +4,9 @@ use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tonic::async_trait; +use uuid::Uuid; -use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors::IgnorableError}; +use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEvent, CursorPosition}, user::UserIdentity}}; /// the cursor controller implementation /// @@ -20,7 +21,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors /// upon dropping this handle will stop the associated worker #[derive(Debug)] pub struct CursorController { - uid: String, + user_id: Uuid, op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, @@ -35,13 +36,13 @@ impl Drop for CursorController { impl CursorController { pub(crate) fn new( - uid: String, + user_id: Uuid, op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - CursorController { uid, op, last_op, stream, stop } + CursorController { user_id, op, last_op, stream, stop } } } @@ -51,13 +52,13 @@ impl Controller for CursorController { /// enqueue a cursor event to be broadcast to current workspace /// will automatically invert cursor start/end if they are inverted - fn send(&self, mut cursor: CursorPosition) -> Result<(), Error> { + fn send(&self, mut cursor: CursorPosition) -> crate::Result<()> { if cursor.start() > cursor.end() { std::mem::swap(&mut cursor.start, &mut cursor.end); } Ok(self.op.send(CursorEvent { - user: self.uid.clone(), - position: Some(cursor), + user: UserIdentity { id: self.user_id.to_string() }, + position: cursor, })?) } @@ -67,7 +68,7 @@ impl Controller for CursorController { match stream.try_recv() { Ok(x) => Ok(Some(x)), Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), + Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(TryRecvError::Lagged(n)) => { tracing::warn!("cursor channel lagged, skipping {} events", n); Ok(stream.try_recv().ok()) @@ -78,11 +79,11 @@ impl Controller for CursorController { // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? /// get next cursor event from current workspace, or block until one is available - async fn recv(&self) -> Result { + async fn recv(&self) -> crate::Result { let mut stream = self.stream.lock().await; match stream.recv().await { Ok(x) => Ok(x), - Err(RecvError::Closed) => Err(Error::Channel { send: false }), + Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(RecvError::Lagged(n)) => { tracing::error!("cursor channel lagged behind, skipping {} events", n); Ok(stream.recv().await.expect("could not receive after lagging")) diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 0e34aa7..5b5f5ff 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -12,7 +12,7 @@ pub mod controller; pub use controller::CursorController as Controller; -use crate::proto::{RowCol, CursorPosition}; +use crate::proto::cursor::{RowCol, CursorPosition}; impl From:: for (i32, i32) { fn from(pos: RowCol) -> (i32, i32) { @@ -36,12 +36,12 @@ impl RowCol { impl CursorPosition { /// extract start position, defaulting to (0,0), to help build protocol packets pub fn start(&self) -> RowCol { - self.start.clone().unwrap_or((0, 0).into()) + self.start.clone() } /// extract end position, defaulting to (0,0), to help build protocol packets pub fn end(&self) -> RowCol { - self.end.clone().unwrap_or((0, 0).into()) + self.end.clone() } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index e0196bd..08045be 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -1,14 +1,15 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; -use tonic::{Streaming, transport::Channel, async_trait}; +use tonic::{Streaming, async_trait}; +use uuid::Uuid; -use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::controller::ControllerWorker}; +use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::CursorEvent}; use super::controller::CursorController; -pub(crate) struct CursorControllerWorker { - uid: String, +pub(crate) struct CursorWorker { + user_id: Uuid, producer: mpsc::UnboundedSender, op: mpsc::UnboundedReceiver, changed: watch::Sender, @@ -18,14 +19,14 @@ pub(crate) struct CursorControllerWorker { stop_control: mpsc::UnboundedSender<()>, } -impl CursorControllerWorker { - pub(crate) fn new(uid: String) -> Self { +impl CursorWorker { + pub(crate) fn new(user_id: Uuid) -> Self { let (op_tx, op_rx) = mpsc::unbounded_channel(); let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (change_tx, change_rx) = watch::channel(CursorEvent::default()); Self { - uid, + user_id, producer: op_tx, op: op_rx, changed: change_tx, @@ -40,12 +41,12 @@ impl CursorControllerWorker { #[async_trait] impl ControllerWorker for CursorControllerWorker { type Controller = CursorController; - type Tx = CursorClient; + type Tx = mpsc::Sender; type Rx = Streaming; fn subscribe(&self) -> CursorController { CursorController::new( - self.uid.clone(), + self.user_id.clone(), self.producer.clone(), Mutex::new(self.last_op.clone()), Mutex::new(self.channel.subscribe()), @@ -53,19 +54,18 @@ impl ControllerWorker for CursorControllerWorker { ) } - async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { loop { tokio::select!{ Ok(Some(cur)) = rx.message() => { - if cur.user == self.uid { continue } + if cur.user.id == self.user_id.to_string() { continue } self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.changed.send(cur).unwrap_or_warn("could not update last event"); }, - Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); }, Some(()) = self.stop.recv() => { break; }, else => break, } } } } - diff --git a/src/errors.rs b/src/errors.rs index 5409ca2..ac4d616 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -109,6 +109,13 @@ impl From> for Error { } } +#[cfg(feature = "client")] +impl From> for Error { + fn from(_value: tokio::sync::watch::error::SendError) -> Self { + Error::Channel { send: true } + } +} + #[cfg(feature = "client")] impl From for Error { fn from(_value: tokio::sync::broadcast::error::RecvError) -> Self { diff --git a/src/lib.rs b/src/lib.rs index 1acd87c..74373c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,9 +151,11 @@ pub mod client; pub mod tools; /// client wrapper to handle memory persistence -#[cfg(feature = "client")] +#[cfg(feature = "backport")] pub mod instance; +pub mod workspace; + /// all-in-one imports : `use codemp::prelude::*;` pub mod prelude; @@ -181,6 +183,6 @@ pub use errors::Result; #[cfg(all(feature = "client", feature = "sync"))] pub use instance::sync::Instance; -#[cfg(all(feature = "client", not(feature = "sync")))] +#[cfg(all(feature = "backport", not(feature = "sync")))] pub use instance::a_sync::Instance; diff --git a/src/prelude.rs b/src/prelude.rs index 44170b9..836ace7 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -18,10 +18,10 @@ pub use crate::api::{ #[cfg(feature = "client")] pub use crate::{ - Instance as CodempInstance, + // Instance as CodempInstance, client::Client as CodempClient, cursor::Controller as CodempCursorController, - buffer::Controller as CodempBufferController, + // buffer::Controller as CodempBufferController, }; #[cfg(feature = "proto")] diff --git a/src/workspace.rs b/src/workspace.rs new file mode 100644 index 0000000..6a20afa --- /dev/null +++ b/src/workspace.rs @@ -0,0 +1,215 @@ +use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc}; +use tokio::sync::mpsc; +use uuid::Uuid; +use crate::{ + proto::{user::UserIdentity, workspace::{AttachRequest, BufferListRequest, BufferPayload, Token, UserListRequest}}, + api::controller::ControllerWorker, + buffer::{self, worker::BufferWorker}, + client::Services, + cursor +}; + +//TODO may contain more info in the future +#[derive(Debug, Clone)] +pub struct UserInfo { + pub uuid: Uuid +} + +impl From for UserInfo { + fn from(uuid: Uuid) -> Self { + UserInfo { + uuid + } + } +} + +impl From for Uuid { + fn from(uid: UserIdentity) -> Uuid { + Uuid::from_str(&uid.id).expect("expected an uuid") + } +} + +/// list_users -> A() , B() +/// get_user_info(B) -> B(cacca, pipu@piu) + +pub struct Workspace { + id: String, + user_id: Uuid, + token: Arc>, + cursor: Arc, + buffers: BTreeMap>, + filetree: BTreeSet, + users: BTreeMap, + services: Arc +} + +impl Workspace { + pub(crate) async fn new( + id: String, + user_id: Uuid, + token: Arc>, + cursor: Arc, + services: Arc + ) -> crate::Result { + let mut ws = Workspace { + id, + user_id, + token, + cursor, + buffers: BTreeMap::new(), + filetree: BTreeSet::new(), + users: BTreeMap::new(), + services + }; + + ws.fetch_buffers().await?; + ws.fetch_users().await?; + + Ok(ws) + } + + /// create a new buffer in current workspace, with optional given content + pub async fn create(&mut self, path: &str) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + workspace_client.create( + tonic::Request::new(BufferPayload { path: path.to_string() }) + ).await?; + + //add to filetree + self.filetree.insert(path.to_string()); + + Ok(()) + } + + /// attach to a buffer, starting a buffer controller and returning a new reference to it + /// + /// to interact with such buffer use [crate::api::Controller::send] or + /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] + pub async fn attach(&mut self, path: &str) -> crate::Result> { + let mut worskspace_client = self.services.workspace.clone(); + self.token.send(worskspace_client.attach( + tonic::Request::new(AttachRequest { id: path.to_string() }) + ).await?.into_inner())?; + + let (tx, rx) = mpsc::channel(10); + let stream = self.services.buffer.clone() + .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await? + .into_inner(); + + let worker = BufferWorker::new(self.user_id, path); + let controller = Arc::new(worker.subscribe()); + tokio::spawn(async move { + tracing::debug!("controller worker started"); + worker.work(tx, stream).await; + tracing::debug!("controller worker stopped"); + }); + + self.buffers.insert(path.to_string(), controller.clone()); + + Ok(controller) + } + + pub async fn fetch_buffers(&mut self) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + let buffers = workspace_client.list_buffers( + tonic::Request::new(BufferListRequest {}) + ).await?.into_inner().buffers; + + self.filetree.clear(); + for b in buffers { + self.filetree.insert(b.path); + } + + Ok(()) + } + + pub async fn fetch_users(&mut self) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + let users = BTreeSet::from_iter(workspace_client.list_users( + tonic::Request::new(UserListRequest {}) + ).await?.into_inner().users.into_iter().map(Uuid::from)); + + // only keep userinfo for users that still exist + self.users.retain(|k, _v| users.contains(k)); + + let _users = self.users.clone(); // damnnn rust + users.iter() + .filter(|u| _users.contains_key(u)) + .for_each(|u| { self.users.insert(*u, UserInfo::from(*u)); }); + + Ok(()) + } + + pub async fn list_buffer_users() { + todo!(); //TODO what is this + } + + pub async fn delete(&mut self, path: &str) -> crate::Result<()> { + let mut workspace_client = self.services.workspace.clone(); + workspace_client.delete( + tonic::Request::new(BufferPayload { path: path.to_string() }) + ).await?; + + self.filetree.remove(path); + + Ok(()) + } + + /// leave current workspace if in one, disconnecting buffer and cursor controllers + pub fn leave_workspace(&self) { + todo!(); //TODO need proto + } + + /// disconnect from a specific buffer + pub fn disconnect_buffer(&mut self, path: &str) -> bool { + match &mut self.buffers.remove(path) { + None => false, + Some(_) => true + } + } + + pub fn id(&self) -> String { self.id.clone() } + + /// get a new reference to a buffer controller, if any is active to given path + pub fn buffer_by_name(&self, path: &str) -> Option> { + self.buffers.get(path).cloned() + } + + /// return a reference to current cursor controller, if currently in a workspace + pub fn cursor(&self) -> Arc { self.cursor.clone() } + +} + +/* +impl Interceptor for Workspace { //TODO + fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + request.metadata_mut().insert("auth", self.token.token.parse().unwrap()); + Ok(request) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum FSNode { + File(String), + Directory(String, Vec), +} + fn file_tree_rec(path: &str, root: &mut Vec) { + if let Some(idx) = path.find("/") { + let dir = path[..idx].to_string(); + let mut dir_node = vec![]; + Self::file_tree_rec(&path[idx..], &mut dir_node); + root.push(FSNode::Directory(dir, dir_node)); + } else { + root.push(FSNode::File(path.to_string())); + } + } + + fn file_tree(&self) -> Vec { + let mut root = vec![]; + for path in &self.filetree { + Self::file_tree_rec(&path, &mut root); + } + root + } +*/ \ No newline at end of file