feat: workspace streaming attach and lists

to join a workspace, attach to it. to leave a workspace, close the
channel. on such channel you get workspace events (new buffers, user
leave, ...). must fetch current buffers and users upon join. if
workspace doesn't exist, server should create it on attach
also dashmap everywhere to get/put simple
This commit is contained in:
əlemi 2024-02-07 01:12:05 +01:00
parent f61836e4ca
commit 948a1b4de5
3 changed files with 81 additions and 80 deletions

View file

@ -27,6 +27,7 @@ tokio-stream = { version = "0.1", optional = true }
lazy_static = { version = "1.4", optional = true } lazy_static = { version = "1.4", optional = true }
serde = { version = "1.0.193", features = ["derive"] } serde = { version = "1.0.193", features = ["derive"] }
postcard = "1.0.8" postcard = "1.0.8"
dashmap = "5.5.3"
[build-dependencies] [build-dependencies]
tonic-build = "0.9" tonic-build = "0.9"

View file

@ -2,13 +2,14 @@
//! //!
//! codemp client manager, containing grpc services //! codemp client manager, containing grpc services
use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, RwLock}; use dashmap::DashMap;
use tokio::sync::mpsc;
use tonic::service::interceptor::InterceptedService; use tonic::service::interceptor::InterceptedService;
use tonic::service::Interceptor; use tonic::service::Interceptor;
use tonic::transport::{Channel, Endpoint}; use tonic::transport::{Channel, Endpoint};
use tonic::IntoRequest;
use uuid::Uuid; use uuid::Uuid;
use crate::proto::auth::auth_client::AuthClient; use crate::proto::auth::auth_client::AuthClient;
@ -33,7 +34,7 @@ use crate::{
pub struct Client { pub struct Client {
user_id: Uuid, user_id: Uuid,
token_tx: Arc<tokio::sync::watch::Sender<Token>>, token_tx: Arc<tokio::sync::watch::Sender<Token>>,
pub workspaces: BTreeMap<String, Arc<RwLock<Workspace>>>, workspaces: Arc<DashMap<String, Arc<Workspace>>>,
services: Arc<Services> services: Arc<Services>
} }
@ -62,7 +63,7 @@ pub(crate) struct Services {
} }
// TODO meno losco // TODO meno losco
fn parse_codemp_connection_string<'a>(string: &'a str) -> (String, String) { fn parse_codemp_connection_string(string: &str) -> (String, String) {
let url = string.replace("codemp://", ""); let url = string.replace("codemp://", "");
let (host, workspace) = url.split_once('/').unwrap(); let (host, workspace) = url.split_once('/').unwrap();
(format!("http://{}", host), workspace.to_string()) (format!("http://{}", host), workspace.to_string())
@ -86,14 +87,15 @@ impl Client {
let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone()); let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone());
let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone()); let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone());
let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone()); let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone());
let auth = AuthClient::new(channel);
let user_id = uuid::Uuid::new_v4(); let user_id = uuid::Uuid::new_v4();
Ok(Client { Ok(Client {
user_id, user_id,
token_tx: Arc::new(token_tx), token_tx: Arc::new(token_tx),
workspaces: BTreeMap::new(), workspaces: Arc::new(DashMap::default()),
services: Arc::new(Services { workspace, buffer, cursor }) services: Arc::new(Services { workspace, buffer, cursor, auth })
}) })
} }
@ -107,52 +109,43 @@ impl Client {
} }
/// join a workspace, returns an [tokio::sync::RwLock] to interact with it /// join a workspace, returns an [tokio::sync::RwLock] to interact with it
pub async fn join_workspace(&mut self, workspace_id: &str) -> crate::Result<Arc<RwLock<Workspace>>> { pub async fn join_workspace(&mut self, workspace: &str) -> crate::Result<Arc<Workspace>> {
self.token_tx.send(self.services.workspace.clone().join( let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner();
tonic::Request::new(JoinRequest { username: "".to_string(), password: "".to_string() }) //TODO
).await?.into_inner())?;
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(10);
let stream = self.services.cursor.clone() let cur_stream = self.services.cursor.clone()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await? .await?
.into_inner(); .into_inner();
let worker = CursorWorker::new(self.user_id.clone()); let worker = CursorWorker::default();
let controller = Arc::new(worker.subscribe()); let controller = Arc::new(worker.subscribe());
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("controller worker started"); tracing::debug!("controller worker started");
worker.work(tx, stream).await; worker.work(tx, cur_stream).await;
tracing::debug!("controller worker stopped"); tracing::debug!("controller worker stopped");
}); });
let lock = Arc::new(RwLock::new( let ws = Arc::new(Workspace::new(
Workspace::new( workspace.to_string(),
workspace_id.to_string(),
self.user_id, self.user_id,
self.token_tx.clone(), self.token_tx.clone(),
controller, controller,
self.services.clone() self.services.clone()
).await?
)); ));
self.workspaces.insert(workspace_id.to_string(), lock.clone()); ws.fetch_users().await?;
ws.fetch_buffers().await?;
Ok(lock) ws.run_actor(ws_stream);
}
/// leave given workspace, disconnecting buffer and cursor controllers self.workspaces.insert(workspace.to_string(), ws.clone());
pub async fn leave_workspace(&self, workspace_id: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
workspace_client.leave_workspace(
tonic::Request::new(WorkspaceDetails { id: workspace_id.to_string() })
).await?;
Ok(()) Ok(ws)
} }
/// accessor for user id /// accessor for user id
pub fn user_id(&self) -> Uuid { pub fn user_id(&self) -> Uuid {
self.user_id.clone() self.user_id
} }
} }

View file

@ -1,5 +1,7 @@
use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use dashmap::{DashMap, DashSet};
use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor, api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor,
@ -14,43 +16,60 @@ pub struct UserInfo {
pub struct Workspace { pub struct Workspace {
id: String, id: String,
user_id: Uuid, user_id: Uuid, // reference to global user id
token: Arc<tokio::sync::watch::Sender<Token>>, token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: Arc<cursor::Controller>, cursor: Arc<cursor::Controller>,
buffers: BTreeMap<String, Arc<buffer::Controller>>, buffers: Arc<DashMap<String, Arc<buffer::Controller>>>,
filetree: BTreeSet<String>, pub(crate) filetree: Arc<DashSet<String>>,
users: BTreeMap<Uuid, UserInfo>, pub(crate) users: Arc<DashMap<Uuid, UserInfo>>,
services: Arc<Services> services: Arc<Services>
} }
impl Workspace { impl Workspace {
/// create a new buffer and perform initial fetch operations /// create a new buffer and perform initial fetch operations
pub(crate) async fn new( pub(crate) fn new(
id: String, id: String,
user_id: Uuid, user_id: Uuid,
token: Arc<tokio::sync::watch::Sender<Token>>, token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: Arc<cursor::Controller>, cursor: Arc<cursor::Controller>,
services: Arc<Services> services: Arc<Services>
) -> crate::Result<Self> { ) -> Self {
let mut ws = Workspace { Workspace {
id, id,
user_id, user_id,
token, token,
cursor, cursor,
buffers: BTreeMap::new(), buffers: Arc::new(DashMap::default()),
filetree: BTreeSet::new(), filetree: Arc::new(DashSet::default()),
users: BTreeMap::new(), users: Arc::new(DashMap::default()),
services services
}; }
}
ws.fetch_buffers().await?; pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) {
ws.fetch_users().await?; let users = self.users.clone();
let filetree = self.filetree.clone();
Ok(ws) let name = self.id();
tokio::spawn(async move {
loop {
match stream.message().await {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name),
Ok(Some(WorkspaceEvent { event: None })) => tracing::warn!("workspace {} received empty event", name),
Ok(Some(WorkspaceEvent { event: Some(ev) })) => match ev {
WorkspaceEventInner::Join(UserJoin { user }) => { users.insert(user.clone().into(), UserInfo { uuid: user.into() }); },
WorkspaceEventInner::Leave(UserLeave { user }) => { users.remove(&user.into()); },
WorkspaceEventInner::Create(FileCreate { path }) => { filetree.insert(path); },
WorkspaceEventInner::Rename(FileRename { before, after }) => { filetree.remove(&before); filetree.insert(after); },
WorkspaceEventInner::Delete(FileDelete { path }) => { filetree.remove(&path); },
},
}
}
});
} }
/// create a new buffer in current workspace /// create a new buffer in current workspace
pub async fn create(&mut self, path: &str) -> crate::Result<()> { pub async fn create(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.services.workspace.clone();
workspace_client.create_buffer( workspace_client.create_buffer(
tonic::Request::new(BufferNode { path: path.to_string() }) tonic::Request::new(BufferNode { path: path.to_string() })
@ -69,17 +88,16 @@ impl Workspace {
/// ///
/// to interact with such buffer use [crate::api::Controller::send] or /// to interact with such buffer use [crate::api::Controller::send] or
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange] /// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&mut self, path: &str) -> crate::Result<Arc<buffer::Controller>> { pub async fn attach(&self, path: &str) -> crate::Result<Arc<buffer::Controller>> {
let mut worskspace_client = self.services.workspace.clone(); let mut worskspace_client = self.services.workspace.clone();
let mut request = tonic::Request::new(AttachRequest { path: path.to_string() }); let request = tonic::Request::new(BufferNode { path: path.to_string() });
request.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(path).expect("could not represent path as byte sequence")); let credentials = worskspace_client.access_buffer(request).await?.into_inner();
self.token.send(worskspace_client.attach(request).await?.into_inner())?; self.token.send(credentials.token)?;
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(10);
let stream = self.services.buffer.clone() let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(path).expect("could not represent path as byte sequence"));
.await? let stream = self.services.buffer.clone().attach(req).await?.into_inner();
.into_inner();
let worker = BufferWorker::new(self.user_id, path); let worker = BufferWorker::new(self.user_id, path);
let controller = Arc::new(worker.subscribe()); let controller = Arc::new(worker.subscribe());
@ -95,7 +113,7 @@ impl Workspace {
} }
/// fetch a list of all buffers in a workspace /// fetch a list of all buffers in a workspace
pub async fn fetch_buffers(&mut self) -> crate::Result<()> { pub async fn fetch_buffers(&self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.services.workspace.clone();
let buffers = workspace_client.list_buffers( let buffers = workspace_client.list_buffers(
tonic::Request::new(Empty {}) tonic::Request::new(Empty {})
@ -110,19 +128,16 @@ impl Workspace {
} }
/// fetch a list of all users in a workspace /// fetch a list of all users in a workspace
pub async fn fetch_users(&mut self) -> crate::Result<()> { pub async fn fetch_users(&self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.services.workspace.clone();
let users = BTreeSet::from_iter(workspace_client.list_users( let users = BTreeSet::from_iter(workspace_client.list_users(
tonic::Request::new(Empty {}) tonic::Request::new(Empty {})
).await?.into_inner().users.into_iter().map(Uuid::from)); ).await?.into_inner().users.into_iter().map(Uuid::from));
// only keep userinfo for users that still exist self.users.clear();
self.users.retain(|k, _v| users.contains(k)); for u in users {
self.users.insert(u, UserInfo { uuid: u });
let _users = self.users.clone(); // damnnn rust }
users.iter()
.filter(|u| _users.contains_key(u))
.for_each(|u| { self.users.insert(*u, UserInfo::from(*u)); });
Ok(()) Ok(())
} }
@ -130,7 +145,7 @@ impl Workspace {
/// get a list of the users attached to a specific buffer /// get a list of the users attached to a specific buffer
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&mut self, path: &str) -> crate::Result<Vec<UserIdentity>> { pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.services.workspace.clone();
let buffer_users = workspace_client.list_buffer_users( let buffer_users = workspace_client.list_buffer_users(
tonic::Request::new(BufferNode { path: path.to_string() }) tonic::Request::new(BufferNode { path: path.to_string() })
@ -139,16 +154,8 @@ impl Workspace {
Ok(buffer_users) Ok(buffer_users)
} }
/// detach from a specific buffer, returns false if there
pub fn detach(&mut self, path: &str) -> bool {
match &mut self.buffers.remove(path) {
None => false,
Some(_) => true
}
}
/// delete a buffer /// delete a buffer
pub async fn delete(&mut self, path: &str) -> crate::Result<()> { pub async fn delete(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.services.workspace.clone();
workspace_client.delete_buffer( workspace_client.delete_buffer(
tonic::Request::new(BufferNode { path: path.to_string() }) tonic::Request::new(BufferNode { path: path.to_string() })
@ -167,7 +174,7 @@ impl Workspace {
/// get a new reference to a buffer controller, if any is active to given path /// get a new reference to a buffer controller, if any is active to given path
pub fn buffer_by_name(&self, path: &str) -> Option<Arc<buffer::Controller>> { pub fn buffer_by_name(&self, path: &str) -> Option<Arc<buffer::Controller>> {
self.buffers.get(path).cloned() self.buffers.get(path).map(|x| x.clone())
} }
/// get the currently cached "filetree" /// get the currently cached "filetree"