From 1cf17dc151ff5eb171c071b41d74643f088bc983 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 7 Feb 2024 01:09:28 +0100 Subject: [PATCH] chore: proto cleanup and simplification reuse as much as possible, keep rpc messages close with their rpc, helper struct for uuid with into() and from(). also replaced the simple things, such as imports and struct fields --- build.rs | 8 +++--- proto/auth.proto | 12 ++++----- proto/buffer.proto | 13 +++++++--- proto/common.proto | 16 ++++++------ proto/cursor.proto | 12 ++++----- proto/files.proto | 5 ---- proto/user.proto | 10 -------- proto/workspace.proto | 33 +++++++++++-------------- src/buffer/worker.rs | 10 +++----- src/client.rs | 42 +++++++++++++++++-------------- src/cursor/controller.rs | 11 +++------ src/cursor/worker.rs | 8 +++--- src/lib.rs | 53 +++++++++++++++++++++++++++++++++++----- src/workspace.rs | 33 ++++++------------------- 14 files changed, 135 insertions(+), 131 deletions(-) delete mode 100644 proto/user.proto diff --git a/build.rs b/build.rs index c035671..504944e 100644 --- a/build.rs +++ b/build.rs @@ -5,14 +5,12 @@ fn main() -> Result<(), Box> { // .build_transport(cfg!(feature = "transport")) .compile( &[ - "proto/user.proto", + "proto/common.proto", "proto/cursor.proto", "proto/files.proto", + "proto/auth.proto", "proto/workspace.proto", - "proto/buffer_service.proto", - "proto/cursor_service.proto", - "proto/workspace_service.proto", - "proto/auth_service.proto", + "proto/buffer.proto", ], &["proto"], )?; diff --git a/proto/auth.proto b/proto/auth.proto index c62fe72..39f7a20 100644 --- a/proto/auth.proto +++ b/proto/auth.proto @@ -2,21 +2,19 @@ syntax = "proto2"; package auth; - // authenticates users, issuing tokens service Auth { - // send credentials and join a workspace + // send credentials and join a workspace, returns ready to use token rpc Login (WorkspaceJoinRequest) returns (Token); } - message Token { required string token = 1; } - +// TODO one-request-to-do-it-all from login to workspace access message WorkspaceJoinRequest { - required string workspace_id = 1; - required string username = 2; - required string password = 3; + required string username = 1; + required string password = 2; + optional string workspace_id = 3; } diff --git a/proto/buffer.proto b/proto/buffer.proto index 1a3ba47..e018440 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -1,15 +1,20 @@ syntax = "proto2"; +import "common.proto"; + package buffer; // handle buffer changes, keep in sync users service Buffer { // attach to a buffer and receive operations - rpc Attach (stream Operation) returns (stream Operation); + rpc Attach (stream Operation) returns (stream BufferEvent); } message Operation { required bytes data = 1; - optional string user = 2; - optional string path = 3; -} \ No newline at end of file +} + +message BufferEvent { + required Operation op = 1; + required common.Identity user = 2; +} diff --git a/proto/common.proto b/proto/common.proto index 982f738..6b7ce31 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -3,16 +3,16 @@ syntax = "proto2"; package common; -// payload identifying user -message UserIdentity { - // user identifier +// a wrapper payload representing an uuid +message Identity { + // uuid bytes, as string required string id = 1; } -message UserList { - repeated UserIdentity users = 1; +// a collection of identities +message IdentityList { + repeated Identity users = 1; } -message Empty{ - //generic Empty message -} +//generic Empty message +message Empty { } diff --git a/proto/cursor.proto b/proto/cursor.proto index 9667c4f..0b4861b 100644 --- a/proto/cursor.proto +++ b/proto/cursor.proto @@ -2,17 +2,15 @@ syntax = "proto2"; package cursor; import "common.proto"; +import "files.proto"; // handle cursor events and broadcast to all users service Cursor { // subscribe to a workspace's cursor events - rpc Attach (stream cursor.CursorEvent) returns (stream cursor.CursorEvent); + rpc Attach (stream cursor.CursorPosition) returns (stream cursor.CursorEvent); } -// empty request -message MovedResponse {} - // a tuple indicating row and column message RowCol { required int32 row = 1; @@ -22,7 +20,7 @@ message RowCol { // cursor position object message CursorPosition { // path of current buffer this cursor is into - required string buffer = 1; + required files.BufferNode buffer = 1; // cursor start position required RowCol start = 2; // cursor end position @@ -32,7 +30,7 @@ message CursorPosition { // cursor event, with user id and cursor position message CursorEvent { // user moving the cursor - required common.UserIdentity user = 1; + required common.Identity user = 1; // new cursor position required CursorPosition position = 2; -} \ No newline at end of file +} diff --git a/proto/files.proto b/proto/files.proto index 4423582..5df3461 100644 --- a/proto/files.proto +++ b/proto/files.proto @@ -9,8 +9,3 @@ message BufferNode { message BufferTree { repeated BufferNode buffers = 1; } - -message WorkspaceFileTree { - // list of strings may be more efficient but it's a lot more hassle - required string payload = 1; // spappolata di json -} \ No newline at end of file diff --git a/proto/user.proto b/proto/user.proto deleted file mode 100644 index 322a935..0000000 --- a/proto/user.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto2"; - -package user; - - -// payload identifying user -message UserIdentity { - // user identifier - required string id = 1; -} diff --git a/proto/workspace.proto b/proto/workspace.proto index 3405dfa..6fedcae 100644 --- a/proto/workspace.proto +++ b/proto/workspace.proto @@ -1,29 +1,29 @@ syntax = "proto2"; package workspace; + +import "common.proto"; import "files.proto"; import "auth.proto"; -import "common.proto"; service Workspace { - rpc CreateWorkspace (workspace.WorkspaceId) returns (common.Empty); + rpc Attach (common.Empty) returns (stream WorkspaceEvent); - rpc RequestAccess (workspace.BufferPath) returns (auth.Token); - rpc LeaveWorkspace (workspace.WorkspaceId) returns (common.Empty); - rpc CreateBuffer (workspace.BufferPath) returns (common.Empty); + rpc CreateBuffer (files.BufferNode) returns (common.Empty); + rpc AccessBuffer (files.BufferNode) returns (BufferCredentials); + rpc DeleteBuffer (files.BufferNode) returns (common.Empty); + rpc ListBuffers (common.Empty) returns (files.BufferTree); - rpc ListUsers (common.Empty) returns (common.UserList); - rpc ListBufferUsers (workspace.BufferPath) returns (common.UserList); //TODO discuss - rpc Attach (common.Empty) returns (stream workspace.WorkspaceEvent); - rpc Delete (workspace.BufferPath) returns (common.Empty); //deletes buffer + rpc ListUsers (common.Empty) returns (common.IdentityList); + rpc ListBufferUsers (files.BufferNode) returns (common.IdentityList); //TODO discuss } message WorkspaceEvent { message UserJoin { - required common.UserIdentity id = 1; + required common.Identity user = 1; } message UserLeave { - required common.UserIdentity id = 1; + required common.Identity user = 1; } message FileCreate { required string path = 1; @@ -45,12 +45,7 @@ message WorkspaceEvent { } } -message BufferPath { - // buffer path to operate onto - required string path = 1; -} - - -message WorkspaceId { - required string id = 1; +message BufferCredentials { + required common.Identity id = 1; + required auth.Token token = 2; } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 031fc57..cccf1d9 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -11,7 +11,7 @@ use woot::woot::Woot; use crate::errors::IgnorableError; use crate::api::controller::ControllerWorker; use crate::api::TextChange; -use crate::proto::buffer_service::Operation; +use crate::proto::buffer::{BufferEvent, Operation}; use super::controller::BufferController; @@ -66,7 +66,7 @@ impl BufferWorker { impl ControllerWorker for BufferWorker { type Controller = BufferController; type Tx = mpsc::Sender; - type Rx = Streaming; + type Rx = Streaming; fn subscribe(&self) -> BufferController { BufferController::new( @@ -130,8 +130,6 @@ impl ControllerWorker for BufferWorker { for op in ops { let operation = Operation { data: postcard::to_extend(&op, Vec::new()).unwrap(), - user: None, - path: Some(self.name.clone()) }; match tx.send(operation).await { @@ -151,8 +149,8 @@ impl ControllerWorker for BufferWorker { res = rx.message() => match res { Err(_e) => break, Ok(None) => break, - Ok(Some(change)) => match postcard::from_bytes::(&change.data) { - Ok(op) => { + Ok(Some(change)) => match postcard::from_bytes::(&change.op.data) { + Ok(op) => { // TODO here in change we receive info about the author, maybe propagate? self.buffer.merge(op); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); for tx in self.pollers.drain(..) { diff --git a/src/client.rs b/src/client.rs index 35b3faa..6b72c00 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,13 +11,19 @@ use tonic::service::Interceptor; use tonic::transport::{Channel, Endpoint}; use uuid::Uuid; -use crate::api::controller::ControllerWorker; -use crate::cursor::worker::CursorWorker; -use crate::proto::buffer_service::buffer_client::BufferClient; -use crate::proto::cursor_service::cursor_client::CursorClient; -use crate::proto::workspace::{JoinRequest, Token, WorkspaceDetails}; -use crate::proto::workspace_service::workspace_client::WorkspaceClient; -use crate::workspace::Workspace; +use crate::proto::auth::auth_client::AuthClient; +use crate::{ + api::controller::ControllerWorker, + cursor::worker::CursorWorker, + proto::{ + common::Empty, + buffer::buffer_client::BufferClient, + cursor::cursor_client::CursorClient, + auth::{Token, WorkspaceJoinRequest}, + workspace::workspace_client::WorkspaceClient, + }, + workspace::Workspace +}; /// codemp client manager /// @@ -49,9 +55,10 @@ impl Interceptor for ClientInterceptor { #[derive(Debug, Clone)] pub(crate) struct Services { - pub(crate) workspace: crate::proto::workspace_service::workspace_client::WorkspaceClient>, - pub(crate) buffer: crate::proto::buffer_service::buffer_client::BufferClient>, - pub(crate) cursor: crate::proto::cursor_service::cursor_client::CursorClient>, + pub(crate) workspace: WorkspaceClient>, + pub(crate) buffer: BufferClient>, + pub(crate) cursor: CursorClient>, + pub(crate) auth: AuthClient, } // TODO meno losco @@ -90,14 +97,13 @@ impl Client { }) } - /// creates a new workspace (and joins it implicitly), returns an [tokio::sync::RwLock] to interact with it - pub async fn create_workspace(&mut self, workspace_id: &str) -> crate::Result>> { - let mut workspace_client = self.services.workspace.clone(); - workspace_client.create_workspace( - tonic::Request::new(WorkspaceDetails { id: workspace_id.to_string() }) - ).await?; - - self.join_workspace(workspace_id).await + pub async fn login(&self, username: String, password: String, workspace_id: Option) -> crate::Result<()> { + Ok(self.token_tx.send( + self.services.auth.clone() + .login(WorkspaceJoinRequest { username, password, workspace_id}) + .await? + .into_inner() + )?) } /// join a workspace, returns an [tokio::sync::RwLock] to interact with it diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index af1f783..a311150 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,7 @@ use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mut use tonic::async_trait; use uuid::Uuid; -use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEvent, CursorPosition}, user::UserIdentity}}; +use crate::{api::Controller, errors::IgnorableError, proto::cursor::{CursorEvent, CursorPosition}}; /// the cursor controller implementation /// @@ -22,7 +22,7 @@ use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEven #[derive(Debug)] pub struct CursorController { user_id: Uuid, - op: mpsc::UnboundedSender, + op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -37,7 +37,7 @@ impl Drop for CursorController { impl CursorController { pub(crate) fn new( user_id: Uuid, - op: mpsc::UnboundedSender, + op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -56,10 +56,7 @@ impl Controller for CursorController { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self.op.send(CursorEvent { - user: UserIdentity { id: self.user_id.to_string() }, - position: cursor, - })?) + Ok(self.op.send(cursor)?) } /// try to receive without blocking, but will still block on stream mutex diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 1d27c6b..f4c0072 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -4,14 +4,14 @@ use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, async_trait}; use uuid::Uuid; -use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::CursorEvent}; +use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::{CursorPosition, CursorEvent}}; use super::controller::CursorController; pub(crate) struct CursorWorker { user_id: Uuid, - producer: mpsc::UnboundedSender, - op: mpsc::UnboundedReceiver, + producer: mpsc::UnboundedSender, + op: mpsc::UnboundedReceiver, changed: watch::Sender, last_op: watch::Receiver, channel: Arc>, @@ -41,7 +41,7 @@ impl CursorWorker { #[async_trait] impl ControllerWorker for CursorWorker { type Controller = CursorController; - type Tx = mpsc::Sender; + type Tx = mpsc::Sender; type Rx = Streaming; fn subscribe(&self) -> CursorController { diff --git a/src/lib.rs b/src/lib.rs index 1b8e3be..5b602ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,14 +163,55 @@ pub use woot; #[cfg(feature = "transport")] #[allow(non_snake_case)] pub mod proto { - pub mod user { tonic::include_proto!("user"); } + pub mod common { + tonic::include_proto!("common"); + + impl From for Identity { + fn from(id: uuid::Uuid) -> Self { + Identity { id: id.to_string() } + } + } + + impl From<&uuid::Uuid> for Identity { + fn from(id: &uuid::Uuid) -> Self { + Identity { id: id.to_string() } + } + } + + impl From for uuid::Uuid { + fn from(value: Identity) -> Self { + uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity") + } + } + + impl From<&Identity> for uuid::Uuid { + fn from(value: &Identity) -> Self { + uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity") + } + } + } + + + pub mod files { + tonic::include_proto!("files"); + + impl From for BufferNode { + fn from(value: String) -> Self { + BufferNode { path: value } + } + } + + impl From for String { + fn from(value: BufferNode) -> Self { + value.path + } + } + } + + pub mod buffer { tonic::include_proto!("buffer"); } pub mod cursor { tonic::include_proto!("cursor"); } - pub mod files { tonic::include_proto!("files"); } pub mod workspace { tonic::include_proto!("workspace"); } - pub mod buffer_service { tonic::include_proto!("buffer_service"); } - pub mod cursor_service { tonic::include_proto!("cursor_service"); } - pub mod workspace_service { tonic::include_proto!("workspace_service"); } - pub mod auth_service { tonic::include_proto!("auth_service"); } + pub mod auth { tonic::include_proto!("auth"); } } pub use errors::Error; diff --git a/src/workspace.rs b/src/workspace.rs index 1e00f5e..a964eeb 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -2,11 +2,8 @@ use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc}; use tokio::sync::mpsc; use uuid::Uuid; use crate::{ - proto::{user::UserIdentity, workspace::{AttachRequest, BufferListRequest, BufferPayload, Token, UserListRequest}}, - api::controller::ControllerWorker, - buffer::{self, worker::BufferWorker}, - client::Services, - cursor + api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor, + proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}} }; //TODO may contain more info in the future @@ -15,20 +12,6 @@ pub struct UserInfo { pub uuid: Uuid } -impl From for UserInfo { - fn from(uuid: Uuid) -> Self { - UserInfo { - uuid - } - } -} - -impl From for Uuid { - fn from(uid: UserIdentity) -> Uuid { - Uuid::from_str(&uid.id).expect("expected an uuid") - } -} - pub struct Workspace { id: String, user_id: Uuid, @@ -70,7 +53,7 @@ impl Workspace { pub async fn create(&mut self, path: &str) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); workspace_client.create_buffer( - tonic::Request::new(BufferPayload { path: path.to_string() }) + tonic::Request::new(BufferNode { path: path.to_string() }) ).await?; // add to filetree @@ -115,7 +98,7 @@ impl Workspace { pub async fn fetch_buffers(&mut self) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); let buffers = workspace_client.list_buffers( - tonic::Request::new(BufferListRequest {}) + tonic::Request::new(Empty {}) ).await?.into_inner().buffers; self.filetree.clear(); @@ -130,7 +113,7 @@ impl Workspace { pub async fn fetch_users(&mut self) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); let users = BTreeSet::from_iter(workspace_client.list_users( - tonic::Request::new(UserListRequest {}) + tonic::Request::new(Empty {}) ).await?.into_inner().users.into_iter().map(Uuid::from)); // only keep userinfo for users that still exist @@ -150,7 +133,7 @@ impl Workspace { pub async fn list_buffer_users(&mut self, path: &str) -> crate::Result> { let mut workspace_client = self.services.workspace.clone(); let buffer_users = workspace_client.list_buffer_users( - tonic::Request::new(BufferPayload { path: path.to_string() }) + tonic::Request::new(BufferNode { path: path.to_string() }) ).await?.into_inner().users; Ok(buffer_users) @@ -167,8 +150,8 @@ impl Workspace { /// delete a buffer pub async fn delete(&mut self, path: &str) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); - workspace_client.delete( - tonic::Request::new(BufferPayload { path: path.to_string() }) + workspace_client.delete_buffer( + tonic::Request::new(BufferNode { path: path.to_string() }) ).await?; self.filetree.remove(path);