diff --git a/src/client.rs b/src/client.rs index 5ecb0e3..44de291 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,29 +2,47 @@ //! //! codemp client manager, containing grpc services -use std::sync::Arc; - use dashmap::DashMap; -use tokio::sync::mpsc; -use tonic::service::interceptor::InterceptedService; -use tonic::service::Interceptor; use tonic::transport::{Channel, Endpoint}; -use tonic::IntoRequest; use uuid::Uuid; use codemp_proto::auth::auth_client::AuthClient; -use codemp_proto::{ - common::Empty, - buffer::buffer_client::BufferClient, - cursor::cursor_client::CursorClient, - auth::{Token, WorkspaceJoinRequest}, - workspace::workspace_client::WorkspaceClient, -}; -use crate::{ - api::controller::ControllerWorker, - cursor::worker::CursorWorker, - workspace::Workspace -}; +use codemp_proto::auth::{Token, WorkspaceJoinRequest}; +use crate::workspace::Workspace; + +#[derive(Clone)] +pub struct AuthWrap { + username: String, + password: String, + service: AuthClient, +} + +impl AuthWrap { + async fn try_new(username: &str, password: &str, host: &str) -> crate::Result { + let channel = Endpoint::from_shared(host.to_string())? + .connect() + .await?; + + Ok(AuthWrap { + username: username.to_string(), + password: password.to_string(), + service: AuthClient::new(channel), + }) + } + + async fn login_workspace(&self, ws: &str) -> crate::Result { + Ok( + self.service.clone() + .login(WorkspaceJoinRequest { + username: self.username.clone(), + password: self.password.clone(), + workspace_id: Some(ws.to_string()) + }) + .await? + .into_inner() + ) + } +} /// codemp client manager /// @@ -33,113 +51,47 @@ use crate::{ /// can be used to interact with server pub struct Client { user_id: Uuid, + host: String, workspaces: DashMap, - token_tx: Arc>, - services: Arc -} - -#[derive(Clone)] -pub(crate) struct ClientInterceptor { - token: tokio::sync::watch::Receiver -} - -impl Interceptor for ClientInterceptor { - fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { - if let Ok(token) = self.token.borrow().token.parse() { - request.metadata_mut().insert("auth", token); - } - - Ok(request) - } -} - - -#[derive(Debug, Clone)] -pub(crate) struct Services { - pub(crate) workspace: WorkspaceClient>, - pub(crate) buffer: BufferClient>, - pub(crate) cursor: CursorClient>, - pub(crate) auth: AuthClient, -} - -// TODO meno losco -fn parse_codemp_connection_string(string: &str) -> (String, String) { - let url = string.replace("codemp://", ""); - let (host, workspace) = url.split_once('/').unwrap(); - (format!("http://{}", host), workspace.to_string()) + auth: AuthWrap, } impl Client { /// instantiate and connect a new client - pub async fn new(dest: &str) -> crate::Result { - let (_host, _workspace_id) = parse_codemp_connection_string(dest); - - let channel = Endpoint::from_shared(dest.to_string())? - .connect() - .await?; - - let (token_tx, token_rx) = tokio::sync::watch::channel( - Token { token: "".to_string() } - ); - - let inter = ClientInterceptor { token: token_rx }; - - let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone()); - let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone()); - let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone()); - let auth = AuthClient::new(channel); + pub async fn new( + host: impl AsRef, + username: impl AsRef, + password: impl AsRef + ) -> crate::Result { + let host = if host.as_ref().starts_with("http") { + host.as_ref().to_string() + } else { + format!("https://{}", host.as_ref()) + }; let user_id = uuid::Uuid::new_v4(); + let auth = AuthWrap::try_new(username.as_ref(), password.as_ref(), &host).await?; Ok(Client { user_id, + host, workspaces: DashMap::default(), - token_tx: Arc::new(token_tx), - services: Arc::new(Services { workspace, buffer, cursor, auth }) + auth, }) } - pub async fn login(&self, username: String, password: String, workspace_id: Option) -> crate::Result<()> { - Ok(self.token_tx.send( - self.services.auth.clone() - .login(WorkspaceJoinRequest { username, password, workspace_id}) - .await? - .into_inner() - )?) - } - /// join a workspace, returns an [tokio::sync::RwLock] to interact with it - pub async fn join_workspace(&self, workspace: &str) -> crate::Result { - let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner(); + pub async fn join_workspace(&self, workspace: impl AsRef) -> crate::Result { + let token = self.auth.login_workspace(workspace.as_ref()).await?; - let (tx, rx) = mpsc::channel(256); - let cur_stream = self.services.cursor.clone() - .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) - .await? - .into_inner(); - - let worker = CursorWorker::default(); - let controller = worker.subscribe(); - tokio::spawn(async move { - tracing::debug!("controller worker started"); - worker.work(tx, cur_stream).await; - tracing::debug!("controller worker stopped"); - }); - - let ws = Workspace::new( - workspace.to_string(), + let ws = Workspace::try_new( + workspace.as_ref().to_string(), self.user_id, - controller, - self.token_tx.clone(), - self.services.clone() - ); + &self.host, + token.clone() + ).await?; - ws.fetch_users().await?; - ws.fetch_buffers().await?; - - ws.run_actor(ws_stream); - - self.workspaces.insert(workspace.to_string(), ws.clone()); + self.workspaces.insert(workspace.as_ref().to_string(), ws.clone()); Ok(ws) } diff --git a/src/workspace.rs b/src/workspace.rs index 501c41e..ec9d560 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -1,12 +1,16 @@ use crate::{ - api::{Controller, controller::ControllerWorker}, + api::{controller::ControllerWorker, Controller}, buffer::{self, worker::BufferWorker}, - client::Services, - cursor, + cursor::{self, worker::CursorWorker}, }; + use codemp_proto::{ + common::Empty, + buffer::buffer_client::BufferClient, + cursor::cursor_client::CursorClient, auth::Token, - common::{Empty, Identity}, + workspace::workspace_client::WorkspaceClient, + common::Identity, files::BufferNode, workspace::{ workspace_event::{ @@ -15,10 +19,11 @@ use codemp_proto::{ WorkspaceEvent, }, }; + use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; use tokio::sync::mpsc; -use tonic::Streaming; +use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}, Streaming}; use uuid::Uuid; #[cfg(feature = "js")] @@ -30,6 +35,47 @@ pub struct UserInfo { pub uuid: Uuid, } +#[derive(Clone)] +struct WorkspaceInterceptor { + token: tokio::sync::watch::Receiver +} + +impl Interceptor for WorkspaceInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + if let Ok(token) = self.token.borrow().token.parse() { + request.metadata_mut().insert("auth", token); + } + + Ok(request) + } +} + +type AuthedService = InterceptedService; + +#[derive(Debug)] +struct Services { + token: tokio::sync::watch::Sender, + workspace: WorkspaceClient, + buffer: BufferClient, + cursor: CursorClient, +} + +impl Services { + async fn try_new(dest: &str, token: Token) -> crate::Result { + let channel = Endpoint::from_shared(dest.to_string())? + .connect() + .await?; + let (token_tx, token_rx) = tokio::sync::watch::channel(token); + let inter = WorkspaceInterceptor { token: token_rx }; + Ok(Self { + token: token_tx, + buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()), + cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()), + workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()), + }) + } +} + #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "js", napi)] @@ -43,29 +89,52 @@ struct WorkspaceInner { buffers: DashMap, filetree: DashSet, users: DashMap, - token: Arc>, // shared - services: Arc, // shared + services: Services } impl Workspace { /// create a new buffer and perform initial fetch operations - pub(crate) fn new( + pub(crate) async fn try_new( id: String, user_id: Uuid, - cursor: cursor::Controller, - token: Arc>, - services: Arc, - ) -> Self { - Self(Arc::new(WorkspaceInner { + dest: &str, + token: Token, + ) -> crate::Result { + let services = Services::try_new(dest, token).await?; + let ws_stream = services.workspace.clone() + .attach(Empty{}) + .await? + .into_inner(); + + let (tx, rx) = mpsc::channel(256); + let cur_stream = services.cursor.clone() + .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await? + .into_inner(); + + let worker = CursorWorker::default(); + let controller = worker.controller(); + tokio::spawn(async move { + tracing::debug!("controller worker started"); + worker.work(tx, cur_stream).await; + tracing::debug!("controller worker stopped"); + }); + + let ws = Self(Arc::new(WorkspaceInner { id, user_id, - token, - cursor, + cursor: controller, buffers: DashMap::default(), filetree: DashSet::default(), users: DashMap::default(), services, - })) + })); + + ws.fetch_users().await?; + ws.fetch_buffers().await?; + ws.run_actor(ws_stream); + + Ok(ws) } pub(crate) fn run_actor(&self, mut stream: Streaming) { @@ -135,7 +204,7 @@ impl Workspace { path: path.to_string(), }); let credentials = worskspace_client.access_buffer(request).await?.into_inner(); - self.0.token.send(credentials.token)?; + self.0.services.token.send(credentials.token)?; let (tx, rx) = mpsc::channel(256); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); @@ -154,7 +223,7 @@ impl Workspace { .into_inner(); let worker = BufferWorker::new(self.0.user_id, path); - let controller = worker.subscribe(); + let controller = worker.controller(); tokio::spawn(async move { tracing::debug!("controller worker started"); worker.work(tx, stream).await;