feat: better auth: multiworkspace, proper flow

Co-authored-by: zaaarf <me@zaaarf.foo>
This commit is contained in:
əlemi 2024-08-08 02:30:34 +02:00
parent 59d8a4640d
commit 893c3d31e0
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 146 additions and 125 deletions

View file

@ -2,29 +2,47 @@
//! //!
//! codemp client manager, containing grpc services //! codemp client manager, containing grpc services
use std::sync::Arc;
use dashmap::DashMap; use dashmap::DashMap;
use tokio::sync::mpsc;
use tonic::service::interceptor::InterceptedService;
use tonic::service::Interceptor;
use tonic::transport::{Channel, Endpoint}; use tonic::transport::{Channel, Endpoint};
use tonic::IntoRequest;
use uuid::Uuid; use uuid::Uuid;
use codemp_proto::auth::auth_client::AuthClient; use codemp_proto::auth::auth_client::AuthClient;
use codemp_proto::{ use codemp_proto::auth::{Token, WorkspaceJoinRequest};
common::Empty, use crate::workspace::Workspace;
buffer::buffer_client::BufferClient,
cursor::cursor_client::CursorClient, #[derive(Clone)]
auth::{Token, WorkspaceJoinRequest}, pub struct AuthWrap {
workspace::workspace_client::WorkspaceClient, username: String,
}; password: String,
use crate::{ service: AuthClient<Channel>,
api::controller::ControllerWorker, }
cursor::worker::CursorWorker,
workspace::Workspace impl AuthWrap {
}; async fn try_new(username: &str, password: &str, host: &str) -> crate::Result<Self> {
let channel = Endpoint::from_shared(host.to_string())?
.connect()
.await?;
Ok(AuthWrap {
username: username.to_string(),
password: password.to_string(),
service: AuthClient::new(channel),
})
}
async fn login_workspace(&self, ws: &str) -> crate::Result<Token> {
Ok(
self.service.clone()
.login(WorkspaceJoinRequest {
username: self.username.clone(),
password: self.password.clone(),
workspace_id: Some(ws.to_string())
})
.await?
.into_inner()
)
}
}
/// codemp client manager /// codemp client manager
/// ///
@ -33,113 +51,47 @@ use crate::{
/// can be used to interact with server /// can be used to interact with server
pub struct Client { pub struct Client {
user_id: Uuid, user_id: Uuid,
host: String,
workspaces: DashMap<String, Workspace>, workspaces: DashMap<String, Workspace>,
token_tx: Arc<tokio::sync::watch::Sender<Token>>, auth: AuthWrap,
services: Arc<Services>
}
#[derive(Clone)]
pub(crate) struct ClientInterceptor {
token: tokio::sync::watch::Receiver<Token>
}
impl Interceptor for ClientInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token);
}
Ok(request)
}
}
#[derive(Debug, Clone)]
pub(crate) struct Services {
pub(crate) workspace: WorkspaceClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) buffer: BufferClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) cursor: CursorClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) auth: AuthClient<Channel>,
}
// TODO meno losco
fn parse_codemp_connection_string(string: &str) -> (String, String) {
let url = string.replace("codemp://", "");
let (host, workspace) = url.split_once('/').unwrap();
(format!("http://{}", host), workspace.to_string())
} }
impl Client { impl Client {
/// instantiate and connect a new client /// instantiate and connect a new client
pub async fn new(dest: &str) -> crate::Result<Self> { pub async fn new(
let (_host, _workspace_id) = parse_codemp_connection_string(dest); host: impl AsRef<str>,
username: impl AsRef<str>,
let channel = Endpoint::from_shared(dest.to_string())? password: impl AsRef<str>
.connect() ) -> crate::Result<Self> {
.await?; let host = if host.as_ref().starts_with("http") {
host.as_ref().to_string()
let (token_tx, token_rx) = tokio::sync::watch::channel( } else {
Token { token: "".to_string() } format!("https://{}", host.as_ref())
); };
let inter = ClientInterceptor { token: token_rx };
let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone());
let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone());
let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone());
let auth = AuthClient::new(channel);
let user_id = uuid::Uuid::new_v4(); let user_id = uuid::Uuid::new_v4();
let auth = AuthWrap::try_new(username.as_ref(), password.as_ref(), &host).await?;
Ok(Client { Ok(Client {
user_id, user_id,
host,
workspaces: DashMap::default(), workspaces: DashMap::default(),
token_tx: Arc::new(token_tx), auth,
services: Arc::new(Services { workspace, buffer, cursor, auth })
}) })
} }
pub async fn login(&self, username: String, password: String, workspace_id: Option<String>) -> crate::Result<()> {
Ok(self.token_tx.send(
self.services.auth.clone()
.login(WorkspaceJoinRequest { username, password, workspace_id})
.await?
.into_inner()
)?)
}
/// join a workspace, returns an [tokio::sync::RwLock] to interact with it /// join a workspace, returns an [tokio::sync::RwLock] to interact with it
pub async fn join_workspace(&self, workspace: &str) -> crate::Result<Workspace> { pub async fn join_workspace(&self, workspace: impl AsRef<str>) -> crate::Result<Workspace> {
let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner(); let token = self.auth.login_workspace(workspace.as_ref()).await?;
let (tx, rx) = mpsc::channel(256); let ws = Workspace::try_new(
let cur_stream = self.services.cursor.clone() workspace.as_ref().to_string(),
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?
.into_inner();
let worker = CursorWorker::default();
let controller = worker.subscribe();
tokio::spawn(async move {
tracing::debug!("controller worker started");
worker.work(tx, cur_stream).await;
tracing::debug!("controller worker stopped");
});
let ws = Workspace::new(
workspace.to_string(),
self.user_id, self.user_id,
controller, &self.host,
self.token_tx.clone(), token.clone()
self.services.clone() ).await?;
);
ws.fetch_users().await?; self.workspaces.insert(workspace.as_ref().to_string(), ws.clone());
ws.fetch_buffers().await?;
ws.run_actor(ws_stream);
self.workspaces.insert(workspace.to_string(), ws.clone());
Ok(ws) Ok(ws)
} }

View file

@ -1,12 +1,16 @@
use crate::{ use crate::{
api::{Controller, controller::ControllerWorker}, api::{controller::ControllerWorker, Controller},
buffer::{self, worker::BufferWorker}, buffer::{self, worker::BufferWorker},
client::Services, cursor::{self, worker::CursorWorker},
cursor,
}; };
use codemp_proto::{ use codemp_proto::{
common::Empty,
buffer::buffer_client::BufferClient,
cursor::cursor_client::CursorClient,
auth::Token, auth::Token,
common::{Empty, Identity}, workspace::workspace_client::WorkspaceClient,
common::Identity,
files::BufferNode, files::BufferNode,
workspace::{ workspace::{
workspace_event::{ workspace_event::{
@ -15,10 +19,11 @@ use codemp_proto::{
WorkspaceEvent, WorkspaceEvent,
}, },
}; };
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use std::{collections::BTreeSet, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tonic::Streaming; use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}, Streaming};
use uuid::Uuid; use uuid::Uuid;
#[cfg(feature = "js")] #[cfg(feature = "js")]
@ -30,6 +35,47 @@ pub struct UserInfo {
pub uuid: Uuid, pub uuid: Uuid,
} }
#[derive(Clone)]
struct WorkspaceInterceptor {
token: tokio::sync::watch::Receiver<Token>
}
impl Interceptor for WorkspaceInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token);
}
Ok(request)
}
}
type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>;
#[derive(Debug)]
struct Services {
token: tokio::sync::watch::Sender<Token>,
workspace: WorkspaceClient<AuthedService>,
buffer: BufferClient<AuthedService>,
cursor: CursorClient<AuthedService>,
}
impl Services {
async fn try_new(dest: &str, token: Token) -> crate::Result<Self> {
let channel = Endpoint::from_shared(dest.to_string())?
.connect()
.await?;
let (token_tx, token_rx) = tokio::sync::watch::channel(token);
let inter = WorkspaceInterceptor { token: token_rx };
Ok(Self {
token: token_tx,
buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()),
cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()),
workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()),
})
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass)]
#[cfg_attr(feature = "js", napi)] #[cfg_attr(feature = "js", napi)]
@ -43,29 +89,52 @@ struct WorkspaceInner {
buffers: DashMap<String, buffer::Controller>, buffers: DashMap<String, buffer::Controller>,
filetree: DashSet<String>, filetree: DashSet<String>,
users: DashMap<Uuid, UserInfo>, users: DashMap<Uuid, UserInfo>,
token: Arc<tokio::sync::watch::Sender<Token>>, // shared services: Services
services: Arc<Services>, // shared
} }
impl Workspace { impl Workspace {
/// create a new buffer and perform initial fetch operations /// create a new buffer and perform initial fetch operations
pub(crate) fn new( pub(crate) async fn try_new(
id: String, id: String,
user_id: Uuid, user_id: Uuid,
cursor: cursor::Controller, dest: &str,
token: Arc<tokio::sync::watch::Sender<Token>>, token: Token,
services: Arc<Services>, ) -> crate::Result<Self> {
) -> Self { let services = Services::try_new(dest, token).await?;
Self(Arc::new(WorkspaceInner { let ws_stream = services.workspace.clone()
.attach(Empty{})
.await?
.into_inner();
let (tx, rx) = mpsc::channel(256);
let cur_stream = services.cursor.clone()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?
.into_inner();
let worker = CursorWorker::default();
let controller = worker.controller();
tokio::spawn(async move {
tracing::debug!("controller worker started");
worker.work(tx, cur_stream).await;
tracing::debug!("controller worker stopped");
});
let ws = Self(Arc::new(WorkspaceInner {
id, id,
user_id, user_id,
token, cursor: controller,
cursor,
buffers: DashMap::default(), buffers: DashMap::default(),
filetree: DashSet::default(), filetree: DashSet::default(),
users: DashMap::default(), users: DashMap::default(),
services, services,
})) }));
ws.fetch_users().await?;
ws.fetch_buffers().await?;
ws.run_actor(ws_stream);
Ok(ws)
} }
pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) { pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) {
@ -135,7 +204,7 @@ impl Workspace {
path: path.to_string(), path: path.to_string(),
}); });
let credentials = worskspace_client.access_buffer(request).await?.into_inner(); let credentials = worskspace_client.access_buffer(request).await?.into_inner();
self.0.token.send(credentials.token)?; self.0.services.token.send(credentials.token)?;
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
@ -154,7 +223,7 @@ impl Workspace {
.into_inner(); .into_inner();
let worker = BufferWorker::new(self.0.user_id, path); let worker = BufferWorker::new(self.0.user_id, path);
let controller = worker.subscribe(); let controller = worker.controller();
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("controller worker started"); tracing::debug!("controller worker started");
worker.work(tx, stream).await; worker.work(tx, stream).await;