feat: added session, reworked auth

This commit is contained in:
əlemi 2024-08-22 00:57:24 +02:00
parent d7e9003c26
commit fea7cfcbe1
Signed by: alemi
GPG key ID: A4895B84D311642C
9 changed files with 232 additions and 144 deletions

View file

@ -54,7 +54,7 @@ impl From<proto::cursor::CursorEvent> for Cursor {
start: (value.position.start.row, value.position.start.col),
end: (value.position.end.row, value.position.end.col),
buffer: value.position.buffer.path,
user: Uuid::parse_str(&value.user.id).ok(),
user: Some(value.user.uuid()),
}
}
}
@ -62,9 +62,7 @@ impl From<proto::cursor::CursorEvent> for Cursor {
impl From<Cursor> for proto::cursor::CursorEvent {
fn from(value: Cursor) -> Self {
Self {
user: proto::common::Identity {
id: value.user.unwrap_or_default().to_string(),
},
user: value.user.unwrap_or_default().into(),
position: proto::cursor::CursorPosition {
buffer: proto::files::BufferNode { path: value.buffer },
start: proto::cursor::RowCol {

View file

@ -8,14 +8,20 @@ pub enum Event {
UserLeave(String),
}
impl From<WorkspaceEventInner> for Event {
fn from(event: WorkspaceEventInner) -> Self {
match event {
WorkspaceEventInner::Join(e) => Self::UserJoin(e.user.name),
WorkspaceEventInner::Leave(e) => Self::UserLeave(e.user.name),
WorkspaceEventInner::Create(e) => Self::FileTreeUpdated(e.path),
WorkspaceEventInner::Delete(e) => Self::FileTreeUpdated(e.path),
WorkspaceEventInner::Rename(e) => Self::FileTreeUpdated(e.after),
}
}
}
impl From<&WorkspaceEventInner> for Event {
fn from(event: &WorkspaceEventInner) -> Self {
match event {
WorkspaceEventInner::Join(e) => Self::UserJoin(e.user.id.clone()),
WorkspaceEventInner::Leave(e) => Self::UserLeave(e.user.id.clone()),
WorkspaceEventInner::Create(e) => Self::FileTreeUpdated(e.path.clone()),
WorkspaceEventInner::Delete(e) => Self::FileTreeUpdated(e.path.clone()),
WorkspaceEventInner::Rename(e) => Self::FileTreeUpdated(e.after.clone()),
}
Self::from(event.clone())
}
}

View file

@ -10,20 +10,43 @@ use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct User {
pub id: Uuid,
pub name: String,
}
impl From<codemp_proto::common::Identity> for User {
fn from(value: codemp_proto::common::Identity) -> Self {
impl From<codemp_proto::common::User> for User {
fn from(value: codemp_proto::common::User) -> Self {
Self {
id: uuid::Uuid::parse_str(&value.id).expect("invalid uuid"),
id: value.id.uuid(),
name: value.name,
}
}
}
impl From<User> for codemp_proto::common::Identity {
impl From<User> for codemp_proto::common::User {
fn from(value: User) -> Self {
Self {
id: value.id.to_string(),
id: value.id.into(),
name: value.name,
}
}
}
impl PartialEq for User {
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
}
}
impl Eq for User {}
impl PartialOrd for User {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.id.cmp(&other.id))
}
}
impl Ord for User {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id)
}
}

View file

@ -5,48 +5,17 @@
use std::sync::Arc;
use dashmap::DashMap;
use tonic::transport::{Channel, Endpoint};
use uuid::Uuid;
use tonic::{service::interceptor::InterceptedService, transport::{Channel, Endpoint}};
use crate::workspace::Workspace;
use codemp_proto::auth::auth_client::AuthClient;
use codemp_proto::auth::{Token, WorkspaceJoinRequest};
use crate::{api::User, ext::InternallyMutable, workspace::Workspace};
use codemp_proto::{
auth::{auth_client::AuthClient, LoginRequest},
common::{Empty, Token}, session::{session_client::SessionClient, InviteRequest, WorkspaceRequest},
};
#[cfg(feature = "python")]
use pyo3::prelude::*;
#[derive(Debug)]
pub struct AuthWrap {
username: String,
password: String,
service: AuthClient<Channel>,
}
impl AuthWrap {
async fn try_new(username: &str, password: &str, host: &str) -> crate::Result<Self> {
let channel = Endpoint::from_shared(host.to_string())?.connect().await?;
Ok(AuthWrap {
username: username.to_string(),
password: password.to_string(),
service: AuthClient::new(channel),
})
}
async fn login_workspace(&self, ws: &str) -> crate::Result<Token> {
Ok(self
.service
.clone()
.login(WorkspaceJoinRequest {
username: self.username.clone(),
password: self.password.clone(),
workspace_id: Some(ws.to_string()),
})
.await?
.into_inner())
}
}
/// codemp client manager
///
/// contains all required grpc services and the unique user id
@ -59,15 +28,17 @@ pub struct Client(Arc<ClientInner>);
#[derive(Debug)]
struct ClientInner {
user_id: Uuid,
user: User,
host: String,
workspaces: DashMap<String, Workspace>,
auth: AuthWrap,
auth: AuthClient<Channel>,
session: SessionClient<InterceptedService<Channel, SessionInterceptor>>,
claims: InternallyMutable<Token>,
}
impl Client {
/// instantiate and connect a new client
pub async fn new(
pub async fn connect(
host: impl AsRef<str>,
username: impl AsRef<str>,
password: impl AsRef<str>,
@ -78,26 +49,100 @@ impl Client {
format!("https://{}", host.as_ref())
};
let user_id = uuid::Uuid::new_v4();
let auth = AuthWrap::try_new(username.as_ref(), password.as_ref(), &host).await?;
let channel = Endpoint::from_shared(host.clone())?.connect().await?;
let mut auth = AuthClient::new(channel.clone());
let resp = auth.login(LoginRequest {
username: username.as_ref().to_string(),
password: password.as_ref().to_string(),
})
.await?
.into_inner();
let claims = InternallyMutable::new(resp.token);
let session = SessionClient::with_interceptor(
channel, SessionInterceptor(claims.channel())
);
Ok(Client(Arc::new(ClientInner {
user_id,
host,
user: resp.user.into(),
workspaces: DashMap::default(),
auth,
claims,
auth, session,
})))
}
/// join a workspace, returns an [tokio::sync::RwLock] to interact with it
/// refresh session token
pub async fn refresh(&self) -> crate::Result<()> {
let new_token = self.0.auth.clone().refresh(self.0.claims.get())
.await?
.into_inner();
self.0.claims.set(new_token);
Ok(())
}
/// attempts to create a new workspace with given name
pub async fn create_workspace(&self, name: impl AsRef<str>) -> crate::Result<()> {
self.0.session
.clone()
.create_workspace(WorkspaceRequest { workspace: name.as_ref().to_string() })
.await?;
Ok(())
}
/// delete an existing workspace if possible
pub async fn delete_workspace(&self, name: impl AsRef<str>) -> crate::Result<()> {
self.0.session
.clone()
.delete_workspace(WorkspaceRequest { workspace: name.as_ref().to_string() })
.await?;
Ok(())
}
/// invite user associated with username to workspace, if possible
pub async fn invite_to_workspace(&self, workspace_name: impl AsRef<str>, user_name: impl AsRef<str>) -> crate::Result<()> {
self.0.session
.clone()
.invite_to_workspace(InviteRequest {
workspace: workspace_name.as_ref().to_string(),
user: user_name.as_ref().to_string(),
})
.await?;
Ok(())
}
/// list all available workspaces, filtering between those owned and those invited to
pub async fn list_workspaces(&self, owned: bool, invited: bool) -> crate::Result<Vec<String>> {
let mut workspaces = self.0.session
.clone()
.list_workspaces(Empty {})
.await?
.into_inner();
let mut out = Vec::new();
if owned { out.append(&mut workspaces.owned) }
if invited { out.append(&mut workspaces.invited) }
Ok(out)
}
/// join a workspace, returns [Workspace]
pub async fn join_workspace(&self, workspace: impl AsRef<str>) -> crate::Result<Workspace> {
let token = self.0.auth.login_workspace(workspace.as_ref()).await?;
let token = self.0.session
.clone()
.access_workspace(WorkspaceRequest { workspace: workspace.as_ref().to_string() })
.await?
.into_inner();
let ws = Workspace::try_new(
workspace.as_ref().to_string(),
self.0.user_id,
self.0.user.clone(),
&self.0.host,
token.clone(),
token,
self.0.claims.channel(),
)
.await?;
@ -128,7 +173,22 @@ impl Client {
}
/// accessor for user id
pub fn user_id(&self) -> Uuid {
self.0.user_id
pub fn user(&self) -> &User {
&self.0.user
}
}
#[derive(Debug, Clone)]
struct SessionInterceptor(tokio::sync::watch::Receiver<codemp_proto::common::Token>);
impl tonic::service::Interceptor for SessionInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.0.borrow().token.parse() {
request.metadata_mut().insert("session", token);
}
Ok(request)
}
}

View file

@ -79,6 +79,10 @@ impl<T> InternallyMutable<T> {
pub fn set(&self, state: T) -> T {
self.setter.send_replace(state)
}
pub fn channel(&self) -> tokio::sync::watch::Receiver<T> {
self.getter.clone()
}
}
impl<T: Clone> InternallyMutable<T> {

View file

@ -125,29 +125,28 @@ pub mod cursor;
/// buffer operations, factory, controller and types
pub mod buffer;
/// crate error types and helpers
pub mod errors;
/// underlying client session manager
pub mod client;
/// workspace operations
pub mod workspace;
pub use workspace::Workspace;
/// session
pub mod session;
/// codemp client, wrapping all above
pub mod client;
pub use client::Client;
/// crate error types and helpers
pub mod errors;
pub use errors::Error;
pub use errors::Result;
/// all-in-one imports : `use codemp::prelude::*;`
pub mod prelude;
/// language-specific ffi "glue"
pub mod ffi;
/// common utils used in this library and re-exposed
pub mod ext;
pub use errors::Error;
pub use errors::Result;
pub use client::Client;
pub use workspace::Workspace;
pub use ext::hash;
/// language-specific ffi "glue"
pub mod ffi;

0
src/session.rs Normal file
View file

View file

@ -1,5 +1,5 @@
use codemp_proto::{
auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient,
common::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient,
workspace::workspace_client::WorkspaceClient,
};
use tonic::{
@ -7,55 +7,33 @@ use tonic::{
transport::{Channel, Endpoint},
};
#[derive(Clone)]
pub struct WorkspaceInterceptor {
token: tokio::sync::watch::Receiver<Token>,
}
impl Interceptor for WorkspaceInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token);
}
Ok(request)
}
}
type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>;
#[derive(Debug)]
pub struct Services {
token: tokio::sync::watch::Sender<Token>,
workspace: WorkspaceClient<AuthedService>,
buffer: BufferClient<AuthedService>,
cursor: CursorClient<AuthedService>,
}
impl Services {
pub async fn try_new(dest: &str, token: Token) -> crate::Result<Self> {
pub async fn try_new(
dest: &str,
session: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
workspace: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
) -> crate::Result<Self> {
let channel = Endpoint::from_shared(dest.to_string())?.connect().await?;
let (token_tx, token_rx) = tokio::sync::watch::channel(token);
let inter = WorkspaceInterceptor { token: token_rx };
let inter = WorkspaceInterceptor { session, workspace };
Ok(Self {
token: token_tx,
buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()),
cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()),
workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()),
// TODO technically we could keep buffers on separate servers, and thus manage buffer
// connections separately, but for now it's more convenient to bundle them with workspace
buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()),
})
}
pub fn set_token(&self, token: Token) {
if self.token.send(token).is_err() {
tracing::warn!("could not update token: no more auth interceptors active");
}
}
// TODO just make fields pub(crate) ?? idk
pub fn ws(&self) -> WorkspaceClient<AuthedService> {
self.workspace.clone()
}
@ -68,3 +46,26 @@ impl Services {
self.cursor.clone()
}
}
#[derive(Clone)]
pub struct WorkspaceInterceptor {
session: tokio::sync::watch::Receiver<Token>,
workspace: tokio::sync::watch::Receiver<Token>,
}
impl Interceptor for WorkspaceInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.session.borrow().token.parse() {
request.metadata_mut().insert("session", token);
}
if let Ok(token) = self.workspace.borrow().token.parse() {
request.metadata_mut().insert("workspace", token);
}
Ok(request)
}
}

View file

@ -1,13 +1,9 @@
use crate::{
api::{controller::ControllerWorker, Controller, Event, User},
buffer::{self, worker::BufferWorker},
cursor::{self, worker::CursorWorker},
workspace::service::Services,
api::{controller::ControllerWorker, Controller, Event, User}, buffer::{self, worker::BufferWorker}, cursor::{self, worker::CursorWorker}, ext::InternallyMutable, workspace::service::Services
};
use codemp_proto::{
auth::Token,
common::Empty,
common::{Empty, Token},
files::BufferNode,
workspace::{
workspace_event::{
@ -33,8 +29,8 @@ pub struct Workspace(Arc<WorkspaceInner>);
#[derive(Debug)]
struct WorkspaceInner {
id: String,
user_id: Uuid, // reference to global user id
name: String,
user: User, // TODO back-reference to global user id... needed for buffer controllers
cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>,
filetree: DashSet<String>,
@ -45,17 +41,18 @@ struct WorkspaceInner {
}
impl Workspace {
/// create a new buffer and perform initial fetch operations
pub(crate) async fn try_new(
id: String,
user_id: Uuid,
name: String,
user: User,
dest: &str,
token: Token,
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>, // TODO ughh receiving this
) -> crate::Result<Self> {
let services = Services::try_new(dest, token).await?;
let workspace_claim = InternallyMutable::new(token);
let services = Services::try_new(dest, claims, workspace_claim.channel()).await?;
let ws_stream = services.ws().attach(Empty {}).await?.into_inner();
let (tx, rx) = mpsc::channel(256);
let (tx, rx) = mpsc::channel(128);
let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let cur_stream = services
.cur()
@ -72,8 +69,8 @@ impl Workspace {
});
let ws = Self(Arc::new(WorkspaceInner {
id,
user_id,
name,
user,
cursor: controller,
buffers: DashMap::default(),
filetree: DashSet::default(),
@ -112,10 +109,10 @@ impl Workspace {
WorkspaceEventInner::Join(UserJoin { user }) => {
inner
.users
.insert(user.clone().into(), User { id: user.into() });
.insert(user.id.uuid(), user.into());
}
WorkspaceEventInner::Leave(UserLeave { user }) => {
inner.users.remove(&user.into());
inner.users.remove(&user.id.uuid());
}
// buffer
WorkspaceEventInner::Create(FileCreate { path }) => {
@ -171,18 +168,18 @@ impl Workspace {
path: path.to_string(),
});
let credentials = worskspace_client.access_buffer(request).await?.into_inner();
self.0.services.set_token(credentials.token);
let (tx, rx) = mpsc::channel(256);
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
req.metadata_mut().insert(
"path",
tonic::metadata::MetadataValue::try_from(credentials.id.id)
.expect("could not represent path as byte sequence"),
req.metadata_mut()
.insert(
"buffer",
tonic::metadata::MetadataValue::try_from(credentials.token)
.map_err(|e| tonic::Status::internal(format!("failed representing token to string: {e}")))?,
);
let stream = self.0.services.buf().attach(req).await?.into_inner();
let worker = BufferWorker::new(self.0.user_id, path);
let worker = BufferWorker::new(self.0.user.id, path);
let controller = worker.controller();
tokio::spawn(async move {
tracing::debug!("controller worker started");
@ -256,12 +253,12 @@ impl Workspace {
.into_inner()
.users
.into_iter()
.map(Uuid::from),
.map(User::from),
);
self.0.users.clear();
for u in users {
self.0.users.insert(u, User { id: u });
self.0.users.insert(u.id, u);
}
Ok(())
@ -307,7 +304,7 @@ impl Workspace {
/// get the id of the workspace
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
pub fn id(&self) -> String {
self.0.id.clone()
self.0.name.clone()
}
/// return a reference to current cursor controller, if currently in a workspace
@ -349,12 +346,12 @@ impl Drop for WorkspaceInner {
tracing::warn!(
"could not stop buffer worker {} for workspace {}",
entry.value().name(),
self.id
self.name
);
}
}
if !self.cursor.stop() {
tracing::warn!("could not stop cursor worker for workspace {}", self.id);
tracing::warn!("could not stop cursor worker for workspace {}", self.name);
}
}
}