diff --git a/src/workspace/mod.rs b/src/workspace/mod.rs new file mode 100644 index 0000000..3a57e75 --- /dev/null +++ b/src/workspace/mod.rs @@ -0,0 +1,4 @@ +pub mod service; +pub mod worker; + +pub use worker::Workspace; diff --git a/src/workspace/service.rs b/src/workspace/service.rs new file mode 100644 index 0000000..04f7ffa --- /dev/null +++ b/src/workspace/service.rs @@ -0,0 +1,65 @@ +use codemp_proto::{auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, workspace::workspace_client::WorkspaceClient}; +use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}}; + + +#[derive(Clone)] +pub struct WorkspaceInterceptor { + token: tokio::sync::watch::Receiver +} + +impl Interceptor for WorkspaceInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + if let Ok(token) = self.token.borrow().token.parse() { + request.metadata_mut().insert("auth", token); + } + + Ok(request) + } +} + +type AuthedService = InterceptedService; + +#[derive(Debug)] +pub struct Services { + token: tokio::sync::watch::Sender, + workspace: WorkspaceClient, + buffer: BufferClient, + cursor: CursorClient, +} + +impl Services { + pub async fn try_new(dest: &str, token: Token) -> crate::Result { + let channel = Endpoint::from_shared(dest.to_string())? + .connect() + .await?; + let (token_tx, token_rx) = tokio::sync::watch::channel(token); + let inter = WorkspaceInterceptor { token: token_rx }; + Ok(Self { + token: token_tx, + buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()), + cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()), + workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()), + }) + } + + pub fn set_token(&self, token: Token) { + if self.token.send(token).is_err() { + tracing::warn!("could not update token: no more auth interceptors active"); + } + } + + // TODO just make fields pub(crate) ?? idk + + pub fn ws(&self) -> WorkspaceClient { + self.workspace.clone() + } + + pub fn buf(&self) -> BufferClient { + self.buffer.clone() + } + + pub fn cur(&self) -> CursorClient { + self.cursor.clone() + } + +} diff --git a/src/workspace.rs b/src/workspace/worker.rs similarity index 77% rename from src/workspace.rs rename to src/workspace/worker.rs index ec9d560..35d768d 100644 --- a/src/workspace.rs +++ b/src/workspace/worker.rs @@ -1,15 +1,13 @@ use crate::{ - api::{controller::ControllerWorker, Controller}, - buffer::{self, worker::BufferWorker}, + api::{controller::ControllerWorker, Controller, User}, + buffer::{self, tools::InternallyMutable, worker::BufferWorker}, cursor::{self, worker::CursorWorker}, + workspace::service::Services, }; use codemp_proto::{ common::Empty, - buffer::buffer_client::BufferClient, - cursor::cursor_client::CursorClient, auth::Token, - workspace::workspace_client::WorkspaceClient, common::Identity, files::BufferNode, workspace::{ @@ -23,58 +21,12 @@ use codemp_proto::{ use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; use tokio::sync::mpsc; -use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}, Streaming}; +use tonic::Streaming; use uuid::Uuid; #[cfg(feature = "js")] use napi_derive::napi; -//TODO may contain more info in the future -#[derive(Debug, Clone)] -pub struct UserInfo { - pub uuid: Uuid, -} - -#[derive(Clone)] -struct WorkspaceInterceptor { - token: tokio::sync::watch::Receiver -} - -impl Interceptor for WorkspaceInterceptor { - fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { - if let Ok(token) = self.token.borrow().token.parse() { - request.metadata_mut().insert("auth", token); - } - - Ok(request) - } -} - -type AuthedService = InterceptedService; - -#[derive(Debug)] -struct Services { - token: tokio::sync::watch::Sender, - workspace: WorkspaceClient, - buffer: BufferClient, - cursor: CursorClient, -} - -impl Services { - async fn try_new(dest: &str, token: Token) -> crate::Result { - let channel = Endpoint::from_shared(dest.to_string())? - .connect() - .await?; - let (token_tx, token_rx) = tokio::sync::watch::channel(token); - let inter = WorkspaceInterceptor { token: token_rx }; - Ok(Self { - token: token_tx, - buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()), - cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()), - workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()), - }) - } -} #[derive(Debug, Clone)] #[cfg_attr(feature = "python", pyo3::pyclass)] @@ -88,7 +40,7 @@ struct WorkspaceInner { cursor: cursor::Controller, buffers: DashMap, filetree: DashSet, - users: DashMap, + users: DashMap, services: Services } @@ -101,13 +53,13 @@ impl Workspace { token: Token, ) -> crate::Result { let services = Services::try_new(dest, token).await?; - let ws_stream = services.workspace.clone() + let ws_stream = services.ws() .attach(Empty{}) .await? .into_inner(); let (tx, rx) = mpsc::channel(256); - let cur_stream = services.cursor.clone() + let cur_stream = services.cur() .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .await? .into_inner(); @@ -151,7 +103,7 @@ impl Workspace { Ok(Some(WorkspaceEvent { event: Some(ev) })) => { match ev { WorkspaceEventInner::Join(UserJoin { user }) => { - inner.users.insert(user.clone().into(), UserInfo { uuid: user.into() }); + inner.users.insert(user.clone().into(), User { id: user.into() }); } WorkspaceEventInner::Leave(UserLeave { user }) => { inner.users.remove(&user.into()); @@ -178,7 +130,7 @@ impl Workspace { /// create a new buffer in current workspace pub async fn create(&self, path: &str) -> crate::Result<()> { - let mut workspace_client = self.0.services.workspace.clone(); + let mut workspace_client = self.0.services.ws(); workspace_client .create_buffer(tonic::Request::new(BufferNode { path: path.to_string(), @@ -199,12 +151,12 @@ impl Workspace { /// to interact with such buffer use [crate::api::Controller::send] or /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] pub async fn attach(&self, path: &str) -> crate::Result { - let mut worskspace_client = self.0.services.workspace.clone(); + let mut worskspace_client = self.0.services.ws(); let request = tonic::Request::new(BufferNode { path: path.to_string(), }); let credentials = worskspace_client.access_buffer(request).await?.into_inner(); - self.0.services.token.send(credentials.token)?; + self.0.services.set_token(credentials.token); let (tx, rx) = mpsc::channel(256); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); @@ -213,11 +165,7 @@ impl Workspace { tonic::metadata::MetadataValue::try_from(credentials.id.id) .expect("could not represent path as byte sequence"), ); - let stream = self - .0 - .services - .buffer - .clone() + let stream = self.0.services.buf() .attach(req) .await? .into_inner(); @@ -258,7 +206,7 @@ impl Workspace { /// fetch a list of all buffers in a workspace pub async fn fetch_buffers(&self) -> crate::Result<()> { - let mut workspace_client = self.0.services.workspace.clone(); + let mut workspace_client = self.0.services.ws(); let buffers = workspace_client .list_buffers(tonic::Request::new(Empty {})) .await? @@ -275,7 +223,7 @@ impl Workspace { /// fetch a list of all users in a workspace pub async fn fetch_users(&self) -> crate::Result<()> { - let mut workspace_client = self.0.services.workspace.clone(); + let mut workspace_client = self.0.services.ws(); let users = BTreeSet::from_iter( workspace_client .list_users(tonic::Request::new(Empty {})) @@ -288,7 +236,7 @@ impl Workspace { self.0.users.clear(); for u in users { - self.0.users.insert(u, UserInfo { uuid: u }); + self.0.users.insert(u, User { id: u }); } Ok(()) @@ -298,7 +246,7 @@ impl Workspace { /// /// TODO: discuss implementation details pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { - let mut workspace_client = self.0.services.workspace.clone(); + let mut workspace_client = self.0.services.ws(); let buffer_users = workspace_client .list_buffer_users(tonic::Request::new(BufferNode { path: path.to_string(), @@ -312,7 +260,7 @@ impl Workspace { /// delete a buffer pub async fn delete(&self, path: &str) -> crate::Result<()> { - let mut workspace_client = self.0.services.workspace.clone(); + let mut workspace_client = self.0.services.ws(); workspace_client .delete_buffer(tonic::Request::new(BufferNode { path: path.to_string(),