diff --git a/build.rs b/build.rs index c035671..504944e 100644 --- a/build.rs +++ b/build.rs @@ -5,14 +5,12 @@ fn main() -> Result<(), Box> { // .build_transport(cfg!(feature = "transport")) .compile( &[ - "proto/user.proto", + "proto/common.proto", "proto/cursor.proto", "proto/files.proto", + "proto/auth.proto", "proto/workspace.proto", - "proto/buffer_service.proto", - "proto/cursor_service.proto", - "proto/workspace_service.proto", - "proto/auth_service.proto", + "proto/buffer.proto", ], &["proto"], )?; diff --git a/proto/auth.proto b/proto/auth.proto index c62fe72..39f7a20 100644 --- a/proto/auth.proto +++ b/proto/auth.proto @@ -2,21 +2,19 @@ syntax = "proto2"; package auth; - // authenticates users, issuing tokens service Auth { - // send credentials and join a workspace + // send credentials and join a workspace, returns ready to use token rpc Login (WorkspaceJoinRequest) returns (Token); } - message Token { required string token = 1; } - +// TODO one-request-to-do-it-all from login to workspace access message WorkspaceJoinRequest { - required string workspace_id = 1; - required string username = 2; - required string password = 3; + required string username = 1; + required string password = 2; + optional string workspace_id = 3; } diff --git a/proto/buffer.proto b/proto/buffer.proto index 1a3ba47..e018440 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -1,15 +1,20 @@ syntax = "proto2"; +import "common.proto"; + package buffer; // handle buffer changes, keep in sync users service Buffer { // attach to a buffer and receive operations - rpc Attach (stream Operation) returns (stream Operation); + rpc Attach (stream Operation) returns (stream BufferEvent); } message Operation { required bytes data = 1; - optional string user = 2; - optional string path = 3; -} \ No newline at end of file +} + +message BufferEvent { + required Operation op = 1; + required common.Identity user = 2; +} diff --git a/proto/common.proto b/proto/common.proto index 982f738..6b7ce31 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -3,16 +3,16 @@ syntax = "proto2"; package common; -// payload identifying user -message UserIdentity { - // user identifier +// a wrapper payload representing an uuid +message Identity { + // uuid bytes, as string required string id = 1; } -message UserList { - repeated UserIdentity users = 1; +// a collection of identities +message IdentityList { + repeated Identity users = 1; } -message Empty{ - //generic Empty message -} +//generic Empty message +message Empty { } diff --git a/proto/cursor.proto b/proto/cursor.proto index 9667c4f..0b4861b 100644 --- a/proto/cursor.proto +++ b/proto/cursor.proto @@ -2,17 +2,15 @@ syntax = "proto2"; package cursor; import "common.proto"; +import "files.proto"; // handle cursor events and broadcast to all users service Cursor { // subscribe to a workspace's cursor events - rpc Attach (stream cursor.CursorEvent) returns (stream cursor.CursorEvent); + rpc Attach (stream cursor.CursorPosition) returns (stream cursor.CursorEvent); } -// empty request -message MovedResponse {} - // a tuple indicating row and column message RowCol { required int32 row = 1; @@ -22,7 +20,7 @@ message RowCol { // cursor position object message CursorPosition { // path of current buffer this cursor is into - required string buffer = 1; + required files.BufferNode buffer = 1; // cursor start position required RowCol start = 2; // cursor end position @@ -32,7 +30,7 @@ message CursorPosition { // cursor event, with user id and cursor position message CursorEvent { // user moving the cursor - required common.UserIdentity user = 1; + required common.Identity user = 1; // new cursor position required CursorPosition position = 2; -} \ No newline at end of file +} diff --git a/proto/files.proto b/proto/files.proto index 4423582..5df3461 100644 --- a/proto/files.proto +++ b/proto/files.proto @@ -9,8 +9,3 @@ message BufferNode { message BufferTree { repeated BufferNode buffers = 1; } - -message WorkspaceFileTree { - // list of strings may be more efficient but it's a lot more hassle - required string payload = 1; // spappolata di json -} \ No newline at end of file diff --git a/proto/user.proto b/proto/user.proto deleted file mode 100644 index 322a935..0000000 --- a/proto/user.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto2"; - -package user; - - -// payload identifying user -message UserIdentity { - // user identifier - required string id = 1; -} diff --git a/proto/workspace.proto b/proto/workspace.proto index 3405dfa..6fedcae 100644 --- a/proto/workspace.proto +++ b/proto/workspace.proto @@ -1,29 +1,29 @@ syntax = "proto2"; package workspace; + +import "common.proto"; import "files.proto"; import "auth.proto"; -import "common.proto"; service Workspace { - rpc CreateWorkspace (workspace.WorkspaceId) returns (common.Empty); + rpc Attach (common.Empty) returns (stream WorkspaceEvent); - rpc RequestAccess (workspace.BufferPath) returns (auth.Token); - rpc LeaveWorkspace (workspace.WorkspaceId) returns (common.Empty); - rpc CreateBuffer (workspace.BufferPath) returns (common.Empty); + rpc CreateBuffer (files.BufferNode) returns (common.Empty); + rpc AccessBuffer (files.BufferNode) returns (BufferCredentials); + rpc DeleteBuffer (files.BufferNode) returns (common.Empty); + rpc ListBuffers (common.Empty) returns (files.BufferTree); - rpc ListUsers (common.Empty) returns (common.UserList); - rpc ListBufferUsers (workspace.BufferPath) returns (common.UserList); //TODO discuss - rpc Attach (common.Empty) returns (stream workspace.WorkspaceEvent); - rpc Delete (workspace.BufferPath) returns (common.Empty); //deletes buffer + rpc ListUsers (common.Empty) returns (common.IdentityList); + rpc ListBufferUsers (files.BufferNode) returns (common.IdentityList); //TODO discuss } message WorkspaceEvent { message UserJoin { - required common.UserIdentity id = 1; + required common.Identity user = 1; } message UserLeave { - required common.UserIdentity id = 1; + required common.Identity user = 1; } message FileCreate { required string path = 1; @@ -45,12 +45,7 @@ message WorkspaceEvent { } } -message BufferPath { - // buffer path to operate onto - required string path = 1; -} - - -message WorkspaceId { - required string id = 1; +message BufferCredentials { + required common.Identity id = 1; + required auth.Token token = 2; } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 031fc57..cccf1d9 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -11,7 +11,7 @@ use woot::woot::Woot; use crate::errors::IgnorableError; use crate::api::controller::ControllerWorker; use crate::api::TextChange; -use crate::proto::buffer_service::Operation; +use crate::proto::buffer::{BufferEvent, Operation}; use super::controller::BufferController; @@ -66,7 +66,7 @@ impl BufferWorker { impl ControllerWorker for BufferWorker { type Controller = BufferController; type Tx = mpsc::Sender; - type Rx = Streaming; + type Rx = Streaming; fn subscribe(&self) -> BufferController { BufferController::new( @@ -130,8 +130,6 @@ impl ControllerWorker for BufferWorker { for op in ops { let operation = Operation { data: postcard::to_extend(&op, Vec::new()).unwrap(), - user: None, - path: Some(self.name.clone()) }; match tx.send(operation).await { @@ -151,8 +149,8 @@ impl ControllerWorker for BufferWorker { res = rx.message() => match res { Err(_e) => break, Ok(None) => break, - Ok(Some(change)) => match postcard::from_bytes::(&change.data) { - Ok(op) => { + Ok(Some(change)) => match postcard::from_bytes::(&change.op.data) { + Ok(op) => { // TODO here in change we receive info about the author, maybe propagate? self.buffer.merge(op); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); for tx in self.pollers.drain(..) { diff --git a/src/client.rs b/src/client.rs index 35b3faa..6b72c00 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,13 +11,19 @@ use tonic::service::Interceptor; use tonic::transport::{Channel, Endpoint}; use uuid::Uuid; -use crate::api::controller::ControllerWorker; -use crate::cursor::worker::CursorWorker; -use crate::proto::buffer_service::buffer_client::BufferClient; -use crate::proto::cursor_service::cursor_client::CursorClient; -use crate::proto::workspace::{JoinRequest, Token, WorkspaceDetails}; -use crate::proto::workspace_service::workspace_client::WorkspaceClient; -use crate::workspace::Workspace; +use crate::proto::auth::auth_client::AuthClient; +use crate::{ + api::controller::ControllerWorker, + cursor::worker::CursorWorker, + proto::{ + common::Empty, + buffer::buffer_client::BufferClient, + cursor::cursor_client::CursorClient, + auth::{Token, WorkspaceJoinRequest}, + workspace::workspace_client::WorkspaceClient, + }, + workspace::Workspace +}; /// codemp client manager /// @@ -49,9 +55,10 @@ impl Interceptor for ClientInterceptor { #[derive(Debug, Clone)] pub(crate) struct Services { - pub(crate) workspace: crate::proto::workspace_service::workspace_client::WorkspaceClient>, - pub(crate) buffer: crate::proto::buffer_service::buffer_client::BufferClient>, - pub(crate) cursor: crate::proto::cursor_service::cursor_client::CursorClient>, + pub(crate) workspace: WorkspaceClient>, + pub(crate) buffer: BufferClient>, + pub(crate) cursor: CursorClient>, + pub(crate) auth: AuthClient, } // TODO meno losco @@ -90,14 +97,13 @@ impl Client { }) } - /// creates a new workspace (and joins it implicitly), returns an [tokio::sync::RwLock] to interact with it - pub async fn create_workspace(&mut self, workspace_id: &str) -> crate::Result>> { - let mut workspace_client = self.services.workspace.clone(); - workspace_client.create_workspace( - tonic::Request::new(WorkspaceDetails { id: workspace_id.to_string() }) - ).await?; - - self.join_workspace(workspace_id).await + pub async fn login(&self, username: String, password: String, workspace_id: Option) -> crate::Result<()> { + Ok(self.token_tx.send( + self.services.auth.clone() + .login(WorkspaceJoinRequest { username, password, workspace_id}) + .await? + .into_inner() + )?) } /// join a workspace, returns an [tokio::sync::RwLock] to interact with it diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index af1f783..a311150 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,7 @@ use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mut use tonic::async_trait; use uuid::Uuid; -use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEvent, CursorPosition}, user::UserIdentity}}; +use crate::{api::Controller, errors::IgnorableError, proto::cursor::{CursorEvent, CursorPosition}}; /// the cursor controller implementation /// @@ -22,7 +22,7 @@ use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEven #[derive(Debug)] pub struct CursorController { user_id: Uuid, - op: mpsc::UnboundedSender, + op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -37,7 +37,7 @@ impl Drop for CursorController { impl CursorController { pub(crate) fn new( user_id: Uuid, - op: mpsc::UnboundedSender, + op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, stop: mpsc::UnboundedSender<()>, @@ -56,10 +56,7 @@ impl Controller for CursorController { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self.op.send(CursorEvent { - user: UserIdentity { id: self.user_id.to_string() }, - position: cursor, - })?) + Ok(self.op.send(cursor)?) } /// try to receive without blocking, but will still block on stream mutex diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 1d27c6b..f4c0072 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -4,14 +4,14 @@ use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, async_trait}; use uuid::Uuid; -use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::CursorEvent}; +use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::{CursorPosition, CursorEvent}}; use super::controller::CursorController; pub(crate) struct CursorWorker { user_id: Uuid, - producer: mpsc::UnboundedSender, - op: mpsc::UnboundedReceiver, + producer: mpsc::UnboundedSender, + op: mpsc::UnboundedReceiver, changed: watch::Sender, last_op: watch::Receiver, channel: Arc>, @@ -41,7 +41,7 @@ impl CursorWorker { #[async_trait] impl ControllerWorker for CursorWorker { type Controller = CursorController; - type Tx = mpsc::Sender; + type Tx = mpsc::Sender; type Rx = Streaming; fn subscribe(&self) -> CursorController { diff --git a/src/lib.rs b/src/lib.rs index 1b8e3be..5b602ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,14 +163,55 @@ pub use woot; #[cfg(feature = "transport")] #[allow(non_snake_case)] pub mod proto { - pub mod user { tonic::include_proto!("user"); } + pub mod common { + tonic::include_proto!("common"); + + impl From for Identity { + fn from(id: uuid::Uuid) -> Self { + Identity { id: id.to_string() } + } + } + + impl From<&uuid::Uuid> for Identity { + fn from(id: &uuid::Uuid) -> Self { + Identity { id: id.to_string() } + } + } + + impl From for uuid::Uuid { + fn from(value: Identity) -> Self { + uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity") + } + } + + impl From<&Identity> for uuid::Uuid { + fn from(value: &Identity) -> Self { + uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity") + } + } + } + + + pub mod files { + tonic::include_proto!("files"); + + impl From for BufferNode { + fn from(value: String) -> Self { + BufferNode { path: value } + } + } + + impl From for String { + fn from(value: BufferNode) -> Self { + value.path + } + } + } + + pub mod buffer { tonic::include_proto!("buffer"); } pub mod cursor { tonic::include_proto!("cursor"); } - pub mod files { tonic::include_proto!("files"); } pub mod workspace { tonic::include_proto!("workspace"); } - pub mod buffer_service { tonic::include_proto!("buffer_service"); } - pub mod cursor_service { tonic::include_proto!("cursor_service"); } - pub mod workspace_service { tonic::include_proto!("workspace_service"); } - pub mod auth_service { tonic::include_proto!("auth_service"); } + pub mod auth { tonic::include_proto!("auth"); } } pub use errors::Error; diff --git a/src/workspace.rs b/src/workspace.rs index 1e00f5e..a964eeb 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -2,11 +2,8 @@ use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc}; use tokio::sync::mpsc; use uuid::Uuid; use crate::{ - proto::{user::UserIdentity, workspace::{AttachRequest, BufferListRequest, BufferPayload, Token, UserListRequest}}, - api::controller::ControllerWorker, - buffer::{self, worker::BufferWorker}, - client::Services, - cursor + api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor, + proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}} }; //TODO may contain more info in the future @@ -15,20 +12,6 @@ pub struct UserInfo { pub uuid: Uuid } -impl From for UserInfo { - fn from(uuid: Uuid) -> Self { - UserInfo { - uuid - } - } -} - -impl From for Uuid { - fn from(uid: UserIdentity) -> Uuid { - Uuid::from_str(&uid.id).expect("expected an uuid") - } -} - pub struct Workspace { id: String, user_id: Uuid, @@ -70,7 +53,7 @@ impl Workspace { pub async fn create(&mut self, path: &str) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); workspace_client.create_buffer( - tonic::Request::new(BufferPayload { path: path.to_string() }) + tonic::Request::new(BufferNode { path: path.to_string() }) ).await?; // add to filetree @@ -115,7 +98,7 @@ impl Workspace { pub async fn fetch_buffers(&mut self) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); let buffers = workspace_client.list_buffers( - tonic::Request::new(BufferListRequest {}) + tonic::Request::new(Empty {}) ).await?.into_inner().buffers; self.filetree.clear(); @@ -130,7 +113,7 @@ impl Workspace { pub async fn fetch_users(&mut self) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); let users = BTreeSet::from_iter(workspace_client.list_users( - tonic::Request::new(UserListRequest {}) + tonic::Request::new(Empty {}) ).await?.into_inner().users.into_iter().map(Uuid::from)); // only keep userinfo for users that still exist @@ -150,7 +133,7 @@ impl Workspace { pub async fn list_buffer_users(&mut self, path: &str) -> crate::Result> { let mut workspace_client = self.services.workspace.clone(); let buffer_users = workspace_client.list_buffer_users( - tonic::Request::new(BufferPayload { path: path.to_string() }) + tonic::Request::new(BufferNode { path: path.to_string() }) ).await?.into_inner().users; Ok(buffer_users) @@ -167,8 +150,8 @@ impl Workspace { /// delete a buffer pub async fn delete(&mut self, path: &str) -> crate::Result<()> { let mut workspace_client = self.services.workspace.clone(); - workspace_client.delete( - tonic::Request::new(BufferPayload { path: path.to_string() }) + workspace_client.delete_buffer( + tonic::Request::new(BufferNode { path: path.to_string() }) ).await?; self.filetree.remove(path);