From 948a1b4de5330cbc156ecd65fe506a6586372405 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 7 Feb 2024 01:12:05 +0100 Subject: [PATCH] feat: workspace streaming attach and lists to join a workspace, attach to it. to leave a workspace, close the channel. on such channel you get workspace events (new buffers, user leave, ...). must fetch current buffers and users upon join. if workspace doesn't exist, server should create it on attach also dashmap everywhere to get/put simple --- Cargo.toml | 1 + src/client.rs | 59 ++++++++++++--------------- src/workspace.rs | 101 +++++++++++++++++++++++++---------------------- 3 files changed, 81 insertions(+), 80 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7cca7ef..4948bb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ tokio-stream = { version = "0.1", optional = true } lazy_static = { version = "1.4", optional = true } serde = { version = "1.0.193", features = ["derive"] } postcard = "1.0.8" +dashmap = "5.5.3" [build-dependencies] tonic-build = "0.9" diff --git a/src/client.rs b/src/client.rs index 6b72c00..520ac91 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,13 +2,14 @@ //! //! codemp client manager, containing grpc services -use std::collections::BTreeMap; use std::sync::Arc; -use tokio::sync::{mpsc, RwLock}; +use dashmap::DashMap; +use tokio::sync::mpsc; use tonic::service::interceptor::InterceptedService; use tonic::service::Interceptor; use tonic::transport::{Channel, Endpoint}; +use tonic::IntoRequest; use uuid::Uuid; use crate::proto::auth::auth_client::AuthClient; @@ -33,7 +34,7 @@ use crate::{ pub struct Client { user_id: Uuid, token_tx: Arc>, - pub workspaces: BTreeMap>>, + workspaces: Arc>>, services: Arc } @@ -62,7 +63,7 @@ pub(crate) struct Services { } // TODO meno losco -fn parse_codemp_connection_string<'a>(string: &'a str) -> (String, String) { +fn parse_codemp_connection_string(string: &str) -> (String, String) { let url = string.replace("codemp://", ""); let (host, workspace) = url.split_once('/').unwrap(); (format!("http://{}", host), workspace.to_string()) @@ -86,14 +87,15 @@ impl Client { let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone()); let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone()); let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone()); + let auth = AuthClient::new(channel); let user_id = uuid::Uuid::new_v4(); Ok(Client { user_id, token_tx: Arc::new(token_tx), - workspaces: BTreeMap::new(), - services: Arc::new(Services { workspace, buffer, cursor }) + workspaces: Arc::new(DashMap::default()), + services: Arc::new(Services { workspace, buffer, cursor, auth }) }) } @@ -107,52 +109,43 @@ impl Client { } /// join a workspace, returns an [tokio::sync::RwLock] to interact with it - pub async fn join_workspace(&mut self, workspace_id: &str) -> crate::Result>> { - self.token_tx.send(self.services.workspace.clone().join( - tonic::Request::new(JoinRequest { username: "".to_string(), password: "".to_string() }) //TODO - ).await?.into_inner())?; + pub async fn join_workspace(&mut self, workspace: &str) -> crate::Result> { + let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner(); let (tx, rx) = mpsc::channel(10); - let stream = self.services.cursor.clone() + let cur_stream = self.services.cursor.clone() .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .await? .into_inner(); - let worker = CursorWorker::new(self.user_id.clone()); + let worker = CursorWorker::default(); let controller = Arc::new(worker.subscribe()); tokio::spawn(async move { tracing::debug!("controller worker started"); - worker.work(tx, stream).await; + worker.work(tx, cur_stream).await; tracing::debug!("controller worker stopped"); }); - let lock = Arc::new(RwLock::new( - Workspace::new( - workspace_id.to_string(), - self.user_id, - self.token_tx.clone(), - controller, - self.services.clone() - ).await? + let ws = Arc::new(Workspace::new( + workspace.to_string(), + self.user_id, + self.token_tx.clone(), + controller, + self.services.clone() )); - self.workspaces.insert(workspace_id.to_string(), lock.clone()); + ws.fetch_users().await?; + ws.fetch_buffers().await?; - Ok(lock) - } + ws.run_actor(ws_stream); - /// leave given workspace, disconnecting buffer and cursor controllers - pub async fn leave_workspace(&self, workspace_id: &str) -> crate::Result<()> { - let mut workspace_client = self.services.workspace.clone(); - workspace_client.leave_workspace( - tonic::Request::new(WorkspaceDetails { id: workspace_id.to_string() }) - ).await?; - - Ok(()) + self.workspaces.insert(workspace.to_string(), ws.clone()); + + Ok(ws) } /// accessor for user id pub fn user_id(&self) -> Uuid { - self.user_id.clone() + self.user_id } } diff --git a/src/workspace.rs b/src/workspace.rs index a964eeb..605ef1a 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -1,5 +1,7 @@ -use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc}; +use std::{collections::BTreeSet, sync::Arc}; use tokio::sync::mpsc; +use dashmap::{DashMap, DashSet}; +use tonic::Streaming; use uuid::Uuid; use crate::{ api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor, @@ -14,43 +16,60 @@ pub struct UserInfo { pub struct Workspace { id: String, - user_id: Uuid, + user_id: Uuid, // reference to global user id token: Arc>, cursor: Arc, - buffers: BTreeMap>, - filetree: BTreeSet, - users: BTreeMap, + buffers: Arc>>, + pub(crate) filetree: Arc>, + pub(crate) users: Arc>, services: Arc } impl Workspace { /// create a new buffer and perform initial fetch operations - pub(crate) async fn new( + pub(crate) fn new( id: String, user_id: Uuid, token: Arc>, cursor: Arc, services: Arc - ) -> crate::Result { - let mut ws = Workspace { + ) -> Self { + Workspace { id, user_id, token, cursor, - buffers: BTreeMap::new(), - filetree: BTreeSet::new(), - users: BTreeMap::new(), + buffers: Arc::new(DashMap::default()), + filetree: Arc::new(DashSet::default()), + users: Arc::new(DashMap::default()), services - }; + } + } - ws.fetch_buffers().await?; - ws.fetch_users().await?; - - Ok(ws) + pub(crate) fn run_actor(&self, mut stream: Streaming) { + let users = self.users.clone(); + let filetree = self.filetree.clone(); + let name = self.id(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), + Ok(None) => break tracing::info!("leaving workspace {}", name), + Ok(Some(WorkspaceEvent { event: None })) => tracing::warn!("workspace {} received empty event", name), + Ok(Some(WorkspaceEvent { event: Some(ev) })) => match ev { + WorkspaceEventInner::Join(UserJoin { user }) => { users.insert(user.clone().into(), UserInfo { uuid: user.into() }); }, + WorkspaceEventInner::Leave(UserLeave { user }) => { users.remove(&user.into()); }, + WorkspaceEventInner::Create(FileCreate { path }) => { filetree.insert(path); }, + WorkspaceEventInner::Rename(FileRename { before, after }) => { filetree.remove(&before); filetree.insert(after); }, + WorkspaceEventInner::Delete(FileDelete { path }) => { filetree.remove(&path); }, + }, + } + } + }); } /// create a new buffer in current workspace - pub async fn create(&mut self, path: &str) -> crate::Result<()> { + pub async fn create(&self, path: &str) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); workspace_client.create_buffer( tonic::Request::new(BufferNode { path: path.to_string() }) @@ -69,17 +88,16 @@ impl Workspace { /// /// to interact with such buffer use [crate::api::Controller::send] or /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] - pub async fn attach(&mut self, path: &str) -> crate::Result> { + pub async fn attach(&self, path: &str) -> crate::Result> { let mut worskspace_client = self.services.workspace.clone(); - let mut request = tonic::Request::new(AttachRequest { path: path.to_string() }); - request.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(path).expect("could not represent path as byte sequence")); - self.token.send(worskspace_client.attach(request).await?.into_inner())?; + let request = tonic::Request::new(BufferNode { path: path.to_string() }); + let credentials = worskspace_client.access_buffer(request).await?.into_inner(); + self.token.send(credentials.token)?; let (tx, rx) = mpsc::channel(10); - let stream = self.services.buffer.clone() - .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) - .await? - .into_inner(); + let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); + req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(path).expect("could not represent path as byte sequence")); + let stream = self.services.buffer.clone().attach(req).await?.into_inner(); let worker = BufferWorker::new(self.user_id, path); let controller = Arc::new(worker.subscribe()); @@ -95,7 +113,7 @@ impl Workspace { } /// fetch a list of all buffers in a workspace - pub async fn fetch_buffers(&mut self) -> crate::Result<()> { + pub async fn fetch_buffers(&self) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); let buffers = workspace_client.list_buffers( tonic::Request::new(Empty {}) @@ -110,19 +128,16 @@ impl Workspace { } /// fetch a list of all users in a workspace - pub async fn fetch_users(&mut self) -> crate::Result<()> { + pub async fn fetch_users(&self) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); let users = BTreeSet::from_iter(workspace_client.list_users( tonic::Request::new(Empty {}) ).await?.into_inner().users.into_iter().map(Uuid::from)); - // only keep userinfo for users that still exist - self.users.retain(|k, _v| users.contains(k)); - - let _users = self.users.clone(); // damnnn rust - users.iter() - .filter(|u| _users.contains_key(u)) - .for_each(|u| { self.users.insert(*u, UserInfo::from(*u)); }); + self.users.clear(); + for u in users { + self.users.insert(u, UserInfo { uuid: u }); + } Ok(()) } @@ -130,7 +145,7 @@ impl Workspace { /// get a list of the users attached to a specific buffer /// /// TODO: discuss implementation details - pub async fn list_buffer_users(&mut self, path: &str) -> crate::Result> { + pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { let mut workspace_client = self.services.workspace.clone(); let buffer_users = workspace_client.list_buffer_users( tonic::Request::new(BufferNode { path: path.to_string() }) @@ -139,16 +154,8 @@ impl Workspace { Ok(buffer_users) } - /// detach from a specific buffer, returns false if there - pub fn detach(&mut self, path: &str) -> bool { - match &mut self.buffers.remove(path) { - None => false, - Some(_) => true - } - } - /// delete a buffer - pub async fn delete(&mut self, path: &str) -> crate::Result<()> { + pub async fn delete(&self, path: &str) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); workspace_client.delete_buffer( tonic::Request::new(BufferNode { path: path.to_string() }) @@ -167,11 +174,11 @@ impl Workspace { /// get a new reference to a buffer controller, if any is active to given path pub fn buffer_by_name(&self, path: &str) -> Option> { - self.buffers.get(path).cloned() + self.buffers.get(path).map(|x| x.clone()) } /// get the currently cached "filetree" pub fn filetree(&self) -> Vec { self.filetree.iter().map(|f| f.clone()).collect() - } -} \ No newline at end of file + } +}