chore: move stuff around

im not sure about getters on Services, nor names, but ehh
This commit is contained in:
əlemi 2024-08-08 21:58:20 +02:00
parent f6edc2cbb0
commit f2be80427a
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 86 additions and 69 deletions

4
src/workspace/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod service;
pub mod worker;
pub use worker::Workspace;

65
src/workspace/service.rs Normal file
View file

@ -0,0 +1,65 @@
use codemp_proto::{auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, workspace::workspace_client::WorkspaceClient};
use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}};
#[derive(Clone)]
pub struct WorkspaceInterceptor {
token: tokio::sync::watch::Receiver<Token>
}
impl Interceptor for WorkspaceInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token);
}
Ok(request)
}
}
type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>;
#[derive(Debug)]
pub struct Services {
token: tokio::sync::watch::Sender<Token>,
workspace: WorkspaceClient<AuthedService>,
buffer: BufferClient<AuthedService>,
cursor: CursorClient<AuthedService>,
}
impl Services {
pub async fn try_new(dest: &str, token: Token) -> crate::Result<Self> {
let channel = Endpoint::from_shared(dest.to_string())?
.connect()
.await?;
let (token_tx, token_rx) = tokio::sync::watch::channel(token);
let inter = WorkspaceInterceptor { token: token_rx };
Ok(Self {
token: token_tx,
buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()),
cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()),
workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()),
})
}
pub fn set_token(&self, token: Token) {
if self.token.send(token).is_err() {
tracing::warn!("could not update token: no more auth interceptors active");
}
}
// TODO just make fields pub(crate) ?? idk
pub fn ws(&self) -> WorkspaceClient<AuthedService> {
self.workspace.clone()
}
pub fn buf(&self) -> BufferClient<AuthedService> {
self.buffer.clone()
}
pub fn cur(&self) -> CursorClient<AuthedService> {
self.cursor.clone()
}
}

View file

@ -1,15 +1,13 @@
use crate::{ use crate::{
api::{controller::ControllerWorker, Controller}, api::{controller::ControllerWorker, Controller, User},
buffer::{self, worker::BufferWorker}, buffer::{self, tools::InternallyMutable, worker::BufferWorker},
cursor::{self, worker::CursorWorker}, cursor::{self, worker::CursorWorker},
workspace::service::Services,
}; };
use codemp_proto::{ use codemp_proto::{
common::Empty, common::Empty,
buffer::buffer_client::BufferClient,
cursor::cursor_client::CursorClient,
auth::Token, auth::Token,
workspace::workspace_client::WorkspaceClient,
common::Identity, common::Identity,
files::BufferNode, files::BufferNode,
workspace::{ workspace::{
@ -23,58 +21,12 @@ use codemp_proto::{
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use std::{collections::BTreeSet, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}, Streaming}; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
#[cfg(feature = "js")] #[cfg(feature = "js")]
use napi_derive::napi; use napi_derive::napi;
//TODO may contain more info in the future
#[derive(Debug, Clone)]
pub struct UserInfo {
pub uuid: Uuid,
}
#[derive(Clone)]
struct WorkspaceInterceptor {
token: tokio::sync::watch::Receiver<Token>
}
impl Interceptor for WorkspaceInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token);
}
Ok(request)
}
}
type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>;
#[derive(Debug)]
struct Services {
token: tokio::sync::watch::Sender<Token>,
workspace: WorkspaceClient<AuthedService>,
buffer: BufferClient<AuthedService>,
cursor: CursorClient<AuthedService>,
}
impl Services {
async fn try_new(dest: &str, token: Token) -> crate::Result<Self> {
let channel = Endpoint::from_shared(dest.to_string())?
.connect()
.await?;
let (token_tx, token_rx) = tokio::sync::watch::channel(token);
let inter = WorkspaceInterceptor { token: token_rx };
Ok(Self {
token: token_tx,
buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()),
cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()),
workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()),
})
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass)]
@ -88,7 +40,7 @@ struct WorkspaceInner {
cursor: cursor::Controller, cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>, buffers: DashMap<String, buffer::Controller>,
filetree: DashSet<String>, filetree: DashSet<String>,
users: DashMap<Uuid, UserInfo>, users: DashMap<Uuid, User>,
services: Services services: Services
} }
@ -101,13 +53,13 @@ impl Workspace {
token: Token, token: Token,
) -> crate::Result<Self> { ) -> crate::Result<Self> {
let services = Services::try_new(dest, token).await?; let services = Services::try_new(dest, token).await?;
let ws_stream = services.workspace.clone() let ws_stream = services.ws()
.attach(Empty{}) .attach(Empty{})
.await? .await?
.into_inner(); .into_inner();
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let cur_stream = services.cursor.clone() let cur_stream = services.cur()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await? .await?
.into_inner(); .into_inner();
@ -151,7 +103,7 @@ impl Workspace {
Ok(Some(WorkspaceEvent { event: Some(ev) })) => { Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
match ev { match ev {
WorkspaceEventInner::Join(UserJoin { user }) => { WorkspaceEventInner::Join(UserJoin { user }) => {
inner.users.insert(user.clone().into(), UserInfo { uuid: user.into() }); inner.users.insert(user.clone().into(), User { id: user.into() });
} }
WorkspaceEventInner::Leave(UserLeave { user }) => { WorkspaceEventInner::Leave(UserLeave { user }) => {
inner.users.remove(&user.into()); inner.users.remove(&user.into());
@ -178,7 +130,7 @@ impl Workspace {
/// create a new buffer in current workspace /// create a new buffer in current workspace
pub async fn create(&self, path: &str) -> crate::Result<()> { pub async fn create(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone(); let mut workspace_client = self.0.services.ws();
workspace_client workspace_client
.create_buffer(tonic::Request::new(BufferNode { .create_buffer(tonic::Request::new(BufferNode {
path: path.to_string(), path: path.to_string(),
@ -199,12 +151,12 @@ impl Workspace {
/// to interact with such buffer use [crate::api::Controller::send] or /// to interact with such buffer use [crate::api::Controller::send] or
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange] /// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&self, path: &str) -> crate::Result<buffer::Controller> { pub async fn attach(&self, path: &str) -> crate::Result<buffer::Controller> {
let mut worskspace_client = self.0.services.workspace.clone(); let mut worskspace_client = self.0.services.ws();
let request = tonic::Request::new(BufferNode { let request = tonic::Request::new(BufferNode {
path: path.to_string(), path: path.to_string(),
}); });
let credentials = worskspace_client.access_buffer(request).await?.into_inner(); let credentials = worskspace_client.access_buffer(request).await?.into_inner();
self.0.services.token.send(credentials.token)?; self.0.services.set_token(credentials.token);
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
@ -213,11 +165,7 @@ impl Workspace {
tonic::metadata::MetadataValue::try_from(credentials.id.id) tonic::metadata::MetadataValue::try_from(credentials.id.id)
.expect("could not represent path as byte sequence"), .expect("could not represent path as byte sequence"),
); );
let stream = self let stream = self.0.services.buf()
.0
.services
.buffer
.clone()
.attach(req) .attach(req)
.await? .await?
.into_inner(); .into_inner();
@ -258,7 +206,7 @@ impl Workspace {
/// fetch a list of all buffers in a workspace /// fetch a list of all buffers in a workspace
pub async fn fetch_buffers(&self) -> crate::Result<()> { pub async fn fetch_buffers(&self) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone(); let mut workspace_client = self.0.services.ws();
let buffers = workspace_client let buffers = workspace_client
.list_buffers(tonic::Request::new(Empty {})) .list_buffers(tonic::Request::new(Empty {}))
.await? .await?
@ -275,7 +223,7 @@ impl Workspace {
/// fetch a list of all users in a workspace /// fetch a list of all users in a workspace
pub async fn fetch_users(&self) -> crate::Result<()> { pub async fn fetch_users(&self) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone(); let mut workspace_client = self.0.services.ws();
let users = BTreeSet::from_iter( let users = BTreeSet::from_iter(
workspace_client workspace_client
.list_users(tonic::Request::new(Empty {})) .list_users(tonic::Request::new(Empty {}))
@ -288,7 +236,7 @@ impl Workspace {
self.0.users.clear(); self.0.users.clear();
for u in users { for u in users {
self.0.users.insert(u, UserInfo { uuid: u }); self.0.users.insert(u, User { id: u });
} }
Ok(()) Ok(())
@ -298,7 +246,7 @@ impl Workspace {
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> { pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> {
let mut workspace_client = self.0.services.workspace.clone(); let mut workspace_client = self.0.services.ws();
let buffer_users = workspace_client let buffer_users = workspace_client
.list_buffer_users(tonic::Request::new(BufferNode { .list_buffer_users(tonic::Request::new(BufferNode {
path: path.to_string(), path: path.to_string(),
@ -312,7 +260,7 @@ impl Workspace {
/// delete a buffer /// delete a buffer
pub async fn delete(&self, path: &str) -> crate::Result<()> { pub async fn delete(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone(); let mut workspace_client = self.0.services.ws();
workspace_client workspace_client
.delete_buffer(tonic::Request::new(BufferNode { .delete_buffer(tonic::Request::new(BufferNode {
path: path.to_string(), path: path.to_string(),