chore: proto cleanup and simplification

reuse as much as possible, keep rpc messages close with their rpc,
helper struct for uuid with into() and from(). also replaced the simple
things, such as imports and struct fields
This commit is contained in:
əlemi 2024-02-07 01:09:28 +01:00
parent 3738f7beb4
commit 1cf17dc151
14 changed files with 135 additions and 131 deletions

View file

@ -5,14 +5,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// .build_transport(cfg!(feature = "transport"))
.compile(
&[
"proto/user.proto",
"proto/common.proto",
"proto/cursor.proto",
"proto/files.proto",
"proto/auth.proto",
"proto/workspace.proto",
"proto/buffer_service.proto",
"proto/cursor_service.proto",
"proto/workspace_service.proto",
"proto/auth_service.proto",
"proto/buffer.proto",
],
&["proto"],
)?;

View file

@ -2,21 +2,19 @@ syntax = "proto2";
package auth;
// authenticates users, issuing tokens
service Auth {
// send credentials and join a workspace
// send credentials and join a workspace, returns ready to use token
rpc Login (WorkspaceJoinRequest) returns (Token);
}
message Token {
required string token = 1;
}
// TODO one-request-to-do-it-all from login to workspace access
message WorkspaceJoinRequest {
required string workspace_id = 1;
required string username = 2;
required string password = 3;
required string username = 1;
required string password = 2;
optional string workspace_id = 3;
}

View file

@ -1,15 +1,20 @@
syntax = "proto2";
import "common.proto";
package buffer;
// handle buffer changes, keep in sync users
service Buffer {
// attach to a buffer and receive operations
rpc Attach (stream Operation) returns (stream Operation);
rpc Attach (stream Operation) returns (stream BufferEvent);
}
message Operation {
required bytes data = 1;
optional string user = 2;
optional string path = 3;
}
message BufferEvent {
required Operation op = 1;
required common.Identity user = 2;
}

View file

@ -3,16 +3,16 @@ syntax = "proto2";
package common;
// payload identifying user
message UserIdentity {
// user identifier
// a wrapper payload representing an uuid
message Identity {
// uuid bytes, as string
required string id = 1;
}
message UserList {
repeated UserIdentity users = 1;
// a collection of identities
message IdentityList {
repeated Identity users = 1;
}
message Empty{
//generic Empty message
}
//generic Empty message
message Empty { }

View file

@ -2,17 +2,15 @@ syntax = "proto2";
package cursor;
import "common.proto";
import "files.proto";
// handle cursor events and broadcast to all users
service Cursor {
// subscribe to a workspace's cursor events
rpc Attach (stream cursor.CursorEvent) returns (stream cursor.CursorEvent);
rpc Attach (stream cursor.CursorPosition) returns (stream cursor.CursorEvent);
}
// empty request
message MovedResponse {}
// a tuple indicating row and column
message RowCol {
required int32 row = 1;
@ -22,7 +20,7 @@ message RowCol {
// cursor position object
message CursorPosition {
// path of current buffer this cursor is into
required string buffer = 1;
required files.BufferNode buffer = 1;
// cursor start position
required RowCol start = 2;
// cursor end position
@ -32,7 +30,7 @@ message CursorPosition {
// cursor event, with user id and cursor position
message CursorEvent {
// user moving the cursor
required common.UserIdentity user = 1;
required common.Identity user = 1;
// new cursor position
required CursorPosition position = 2;
}

View file

@ -9,8 +9,3 @@ message BufferNode {
message BufferTree {
repeated BufferNode buffers = 1;
}
message WorkspaceFileTree {
// list of strings may be more efficient but it's a lot more hassle
required string payload = 1; // spappolata di json
}

View file

@ -1,10 +0,0 @@
syntax = "proto2";
package user;
// payload identifying user
message UserIdentity {
// user identifier
required string id = 1;
}

View file

@ -1,29 +1,29 @@
syntax = "proto2";
package workspace;
import "common.proto";
import "files.proto";
import "auth.proto";
import "common.proto";
service Workspace {
rpc CreateWorkspace (workspace.WorkspaceId) returns (common.Empty);
rpc Attach (common.Empty) returns (stream WorkspaceEvent);
rpc CreateBuffer (files.BufferNode) returns (common.Empty);
rpc AccessBuffer (files.BufferNode) returns (BufferCredentials);
rpc DeleteBuffer (files.BufferNode) returns (common.Empty);
rpc RequestAccess (workspace.BufferPath) returns (auth.Token);
rpc LeaveWorkspace (workspace.WorkspaceId) returns (common.Empty);
rpc CreateBuffer (workspace.BufferPath) returns (common.Empty);
rpc ListBuffers (common.Empty) returns (files.BufferTree);
rpc ListUsers (common.Empty) returns (common.UserList);
rpc ListBufferUsers (workspace.BufferPath) returns (common.UserList); //TODO discuss
rpc Attach (common.Empty) returns (stream workspace.WorkspaceEvent);
rpc Delete (workspace.BufferPath) returns (common.Empty); //deletes buffer
rpc ListUsers (common.Empty) returns (common.IdentityList);
rpc ListBufferUsers (files.BufferNode) returns (common.IdentityList); //TODO discuss
}
message WorkspaceEvent {
message UserJoin {
required common.UserIdentity id = 1;
required common.Identity user = 1;
}
message UserLeave {
required common.UserIdentity id = 1;
required common.Identity user = 1;
}
message FileCreate {
required string path = 1;
@ -45,12 +45,7 @@ message WorkspaceEvent {
}
}
message BufferPath {
// buffer path to operate onto
required string path = 1;
}
message WorkspaceId {
required string id = 1;
message BufferCredentials {
required common.Identity id = 1;
required auth.Token token = 2;
}

View file

@ -11,7 +11,7 @@ use woot::woot::Woot;
use crate::errors::IgnorableError;
use crate::api::controller::ControllerWorker;
use crate::api::TextChange;
use crate::proto::buffer_service::Operation;
use crate::proto::buffer::{BufferEvent, Operation};
use super::controller::BufferController;
@ -66,7 +66,7 @@ impl BufferWorker {
impl ControllerWorker<TextChange> for BufferWorker {
type Controller = BufferController;
type Tx = mpsc::Sender<Operation>;
type Rx = Streaming<Operation>;
type Rx = Streaming<BufferEvent>;
fn subscribe(&self) -> BufferController {
BufferController::new(
@ -130,8 +130,6 @@ impl ControllerWorker<TextChange> for BufferWorker {
for op in ops {
let operation = Operation {
data: postcard::to_extend(&op, Vec::new()).unwrap(),
user: None,
path: Some(self.name.clone())
};
match tx.send(operation).await {
@ -151,8 +149,8 @@ impl ControllerWorker<TextChange> for BufferWorker {
res = rx.message() => match res {
Err(_e) => break,
Ok(None) => break,
Ok(Some(change)) => match postcard::from_bytes::<Op>(&change.data) {
Ok(op) => {
Ok(Some(change)) => match postcard::from_bytes::<Op>(&change.op.data) {
Ok(op) => { // TODO here in change we receive info about the author, maybe propagate?
self.buffer.merge(op);
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
for tx in self.pollers.drain(..) {

View file

@ -11,13 +11,19 @@ use tonic::service::Interceptor;
use tonic::transport::{Channel, Endpoint};
use uuid::Uuid;
use crate::api::controller::ControllerWorker;
use crate::cursor::worker::CursorWorker;
use crate::proto::buffer_service::buffer_client::BufferClient;
use crate::proto::cursor_service::cursor_client::CursorClient;
use crate::proto::workspace::{JoinRequest, Token, WorkspaceDetails};
use crate::proto::workspace_service::workspace_client::WorkspaceClient;
use crate::workspace::Workspace;
use crate::proto::auth::auth_client::AuthClient;
use crate::{
api::controller::ControllerWorker,
cursor::worker::CursorWorker,
proto::{
common::Empty,
buffer::buffer_client::BufferClient,
cursor::cursor_client::CursorClient,
auth::{Token, WorkspaceJoinRequest},
workspace::workspace_client::WorkspaceClient,
},
workspace::Workspace
};
/// codemp client manager
///
@ -49,9 +55,10 @@ impl Interceptor for ClientInterceptor {
#[derive(Debug, Clone)]
pub(crate) struct Services {
pub(crate) workspace: crate::proto::workspace_service::workspace_client::WorkspaceClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) buffer: crate::proto::buffer_service::buffer_client::BufferClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) cursor: crate::proto::cursor_service::cursor_client::CursorClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) workspace: WorkspaceClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) buffer: BufferClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) cursor: CursorClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) auth: AuthClient<Channel>,
}
// TODO meno losco
@ -90,14 +97,13 @@ impl Client {
})
}
/// creates a new workspace (and joins it implicitly), returns an [tokio::sync::RwLock] to interact with it
pub async fn create_workspace(&mut self, workspace_id: &str) -> crate::Result<Arc<RwLock<Workspace>>> {
let mut workspace_client = self.services.workspace.clone();
workspace_client.create_workspace(
tonic::Request::new(WorkspaceDetails { id: workspace_id.to_string() })
).await?;
self.join_workspace(workspace_id).await
pub async fn login(&self, username: String, password: String, workspace_id: Option<String>) -> crate::Result<()> {
Ok(self.token_tx.send(
self.services.auth.clone()
.login(WorkspaceJoinRequest { username, password, workspace_id})
.await?
.into_inner()
)?)
}
/// join a workspace, returns an [tokio::sync::RwLock] to interact with it

View file

@ -6,7 +6,7 @@ use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mut
use tonic::async_trait;
use uuid::Uuid;
use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEvent, CursorPosition}, user::UserIdentity}};
use crate::{api::Controller, errors::IgnorableError, proto::cursor::{CursorEvent, CursorPosition}};
/// the cursor controller implementation
///
@ -22,7 +22,7 @@ use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEven
#[derive(Debug)]
pub struct CursorController {
user_id: Uuid,
op: mpsc::UnboundedSender<CursorEvent>,
op: mpsc::UnboundedSender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>,
@ -37,7 +37,7 @@ impl Drop for CursorController {
impl CursorController {
pub(crate) fn new(
user_id: Uuid,
op: mpsc::UnboundedSender<CursorEvent>,
op: mpsc::UnboundedSender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>,
@ -56,10 +56,7 @@ impl Controller<CursorEvent> for CursorController {
if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end);
}
Ok(self.op.send(CursorEvent {
user: UserIdentity { id: self.user_id.to_string() },
position: cursor,
})?)
Ok(self.op.send(cursor)?)
}
/// try to receive without blocking, but will still block on stream mutex

View file

@ -4,14 +4,14 @@ use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
use tonic::{Streaming, async_trait};
use uuid::Uuid;
use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::CursorEvent};
use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::{CursorPosition, CursorEvent}};
use super::controller::CursorController;
pub(crate) struct CursorWorker {
user_id: Uuid,
producer: mpsc::UnboundedSender<CursorEvent>,
op: mpsc::UnboundedReceiver<CursorEvent>,
producer: mpsc::UnboundedSender<CursorPosition>,
op: mpsc::UnboundedReceiver<CursorPosition>,
changed: watch::Sender<CursorEvent>,
last_op: watch::Receiver<CursorEvent>,
channel: Arc<broadcast::Sender<CursorEvent>>,
@ -41,7 +41,7 @@ impl CursorWorker {
#[async_trait]
impl ControllerWorker<CursorEvent> for CursorWorker {
type Controller = CursorController;
type Tx = mpsc::Sender<CursorEvent>;
type Tx = mpsc::Sender<CursorPosition>;
type Rx = Streaming<CursorEvent>;
fn subscribe(&self) -> CursorController {

View file

@ -163,14 +163,55 @@ pub use woot;
#[cfg(feature = "transport")]
#[allow(non_snake_case)]
pub mod proto {
pub mod user { tonic::include_proto!("user"); }
pub mod common {
tonic::include_proto!("common");
impl From<uuid::Uuid> for Identity {
fn from(id: uuid::Uuid) -> Self {
Identity { id: id.to_string() }
}
}
impl From<&uuid::Uuid> for Identity {
fn from(id: &uuid::Uuid) -> Self {
Identity { id: id.to_string() }
}
}
impl From<Identity> for uuid::Uuid {
fn from(value: Identity) -> Self {
uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity")
}
}
impl From<&Identity> for uuid::Uuid {
fn from(value: &Identity) -> Self {
uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity")
}
}
}
pub mod files {
tonic::include_proto!("files");
impl From<String> for BufferNode {
fn from(value: String) -> Self {
BufferNode { path: value }
}
}
impl From<BufferNode> for String {
fn from(value: BufferNode) -> Self {
value.path
}
}
}
pub mod buffer { tonic::include_proto!("buffer"); }
pub mod cursor { tonic::include_proto!("cursor"); }
pub mod files { tonic::include_proto!("files"); }
pub mod workspace { tonic::include_proto!("workspace"); }
pub mod buffer_service { tonic::include_proto!("buffer_service"); }
pub mod cursor_service { tonic::include_proto!("cursor_service"); }
pub mod workspace_service { tonic::include_proto!("workspace_service"); }
pub mod auth_service { tonic::include_proto!("auth_service"); }
pub mod auth { tonic::include_proto!("auth"); }
}
pub use errors::Error;

View file

@ -2,11 +2,8 @@ use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc};
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::{
proto::{user::UserIdentity, workspace::{AttachRequest, BufferListRequest, BufferPayload, Token, UserListRequest}},
api::controller::ControllerWorker,
buffer::{self, worker::BufferWorker},
client::Services,
cursor
api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor,
proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}}
};
//TODO may contain more info in the future
@ -15,20 +12,6 @@ pub struct UserInfo {
pub uuid: Uuid
}
impl From<Uuid> for UserInfo {
fn from(uuid: Uuid) -> Self {
UserInfo {
uuid
}
}
}
impl From<UserIdentity> for Uuid {
fn from(uid: UserIdentity) -> Uuid {
Uuid::from_str(&uid.id).expect("expected an uuid")
}
}
pub struct Workspace {
id: String,
user_id: Uuid,
@ -70,7 +53,7 @@ impl Workspace {
pub async fn create(&mut self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
workspace_client.create_buffer(
tonic::Request::new(BufferPayload { path: path.to_string() })
tonic::Request::new(BufferNode { path: path.to_string() })
).await?;
// add to filetree
@ -115,7 +98,7 @@ impl Workspace {
pub async fn fetch_buffers(&mut self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
let buffers = workspace_client.list_buffers(
tonic::Request::new(BufferListRequest {})
tonic::Request::new(Empty {})
).await?.into_inner().buffers;
self.filetree.clear();
@ -130,7 +113,7 @@ impl Workspace {
pub async fn fetch_users(&mut self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
let users = BTreeSet::from_iter(workspace_client.list_users(
tonic::Request::new(UserListRequest {})
tonic::Request::new(Empty {})
).await?.into_inner().users.into_iter().map(Uuid::from));
// only keep userinfo for users that still exist
@ -150,7 +133,7 @@ impl Workspace {
pub async fn list_buffer_users(&mut self, path: &str) -> crate::Result<Vec<UserIdentity>> {
let mut workspace_client = self.services.workspace.clone();
let buffer_users = workspace_client.list_buffer_users(
tonic::Request::new(BufferPayload { path: path.to_string() })
tonic::Request::new(BufferNode { path: path.to_string() })
).await?.into_inner().users;
Ok(buffer_users)
@ -167,8 +150,8 @@ impl Workspace {
/// delete a buffer
pub async fn delete(&mut self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
workspace_client.delete(
tonic::Request::new(BufferPayload { path: path.to_string() })
workspace_client.delete_buffer(
tonic::Request::new(BufferNode { path: path.to_string() })
).await?;
self.filetree.remove(path);