From e9500afd557fad69ca6f0a9c18c29f9594832fa1 Mon Sep 17 00:00:00 2001 From: alemidev Date: Sat, 24 Sep 2022 01:14:12 +0200 Subject: [PATCH] feat: did some plumbing for events and cursors channels --- Cargo.toml | 1 + proto/session.proto | 9 ++- proto/workspace.proto | 25 +++++-- src/server/actor/state.rs | 26 +++++-- src/server/actor/workspace.rs | 51 ++++++++----- src/server/events.rs | 15 +++- src/server/main.rs | 3 +- src/server/service/buffer.rs | 2 - src/server/service/session.rs | 54 +++++++++++--- src/server/service/workspace.rs | 123 ++++++++++++++++++++------------ 10 files changed, 219 insertions(+), 90 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1ef1128..70283ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ tokio-stream = "0.1" rmpv = "1" operational-transform = "0.6" nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature +uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] } [build-dependencies] tonic-build = "0.7" diff --git a/proto/session.proto b/proto/session.proto index a13674d..7273946 100644 --- a/proto/session.proto +++ b/proto/session.proto @@ -2,8 +2,9 @@ syntax = "proto3"; package session; service Session { - rpc Authenticate(SessionRequest) returns (SessionResponse); - rpc ListWorkspaces(SessionRequest) returns (WorkspaceList); + // rpc Authenticate(SessionRequest) returns (SessionResponse); + // rpc ListWorkspaces(SessionRequest) returns (WorkspaceList); + rpc CreateWorkspace(WorkspaceBuilderRequest) returns (SessionResponse); } message SessionRequest { @@ -15,6 +16,10 @@ message SessionResponse { bool accepted = 2; } +message WorkspaceBuilderRequest { + string name = 1; +} + message WorkspaceList { repeated string name = 1; // TODO add more fields } diff --git a/proto/workspace.proto b/proto/workspace.proto index 855aa17..e45a68e 100644 --- a/proto/workspace.proto +++ b/proto/workspace.proto @@ -2,19 +2,30 @@ syntax = "proto3"; package workspace; service Workspace { - rpc Create (WorkspaceRequest) returns (WorkspaceResponse); - rpc Subscribe (WorkspaceRequest) returns (stream Event); - rpc Buffers (WorkspaceRequest) returns (BufferList); - rpc ListUsers (WorkspaceRequest) returns (UsersList); - rpc NewBuffer (BufferRequest) returns (WorkspaceResponse); - rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse); + rpc Join (JoinRequest) returns (stream WorkspaceEvent); + rpc Subscribe (stream CursorUpdate) returns (stream CursorUpdate); + rpc ListUsers (WorkspaceRequest) returns (UsersList); + rpc Buffers (WorkspaceRequest) returns (BufferList); + rpc NewBuffer (BufferRequest) returns (WorkspaceResponse); + rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse); } -message Event { +message JoinRequest { + string name = 1; +} + +message WorkspaceEvent { int32 id = 1; optional string body = 2; } +message CursorUpdate { + string username = 1; + int32 buffer = 2; + int32 col = 3; + int32 row = 4; +} + message WorkspaceRequest { string sessionKey = 1; } diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs index 85c4c62..e2094d6 100644 --- a/src/server/actor/state.rs +++ b/src/server/actor/state.rs @@ -1,5 +1,5 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, fmt::Display}; use tokio::sync::{mpsc, watch}; use tracing::error; @@ -7,17 +7,31 @@ use crate::actor::workspace::Workspace; #[derive(Debug, Clone)] pub struct UserCursor{ - pub buffer: i64, + pub buffer: i32, pub x: i32, pub y: i32 } +impl Display for UserCursor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Cursor(buffer:{}, x:{}, y:{})", self.buffer, self.x, self.y) + } +} + + #[derive(Debug, Clone)] pub struct User { pub name: String, pub cursor: UserCursor, } +impl Display for User { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "User(name:{}, cursor:{})", self.name, self.cursor) + } +} + + #[derive(Debug)] enum WorkspaceAction { ADD { @@ -41,7 +55,7 @@ impl WorkspacesView { } pub async fn add(&mut self, w: Workspace) { - self.op.send(WorkspaceAction::ADD { key: w.name.clone(), w: Box::new(w) }).await.unwrap(); + self.op.send(WorkspaceAction::ADD { key: w.id.to_string(), w: Box::new(w) }).await.unwrap(); } pub async fn remove(&mut self, key: String) { @@ -52,8 +66,8 @@ impl WorkspacesView { #[derive(Debug)] pub struct StateManager { pub workspaces: WorkspacesView, + pub run: watch::Receiver, run_tx: watch::Sender, - run_rx: watch::Receiver, } impl Drop for StateManager { @@ -72,7 +86,7 @@ impl StateManager { let s = StateManager { workspaces: WorkspacesView { watch: workspaces_rx, op: tx }, - run_tx, run_rx, + run_tx, run: run_rx, }; s.workspaces_worker(rx, workspaces_tx); @@ -81,7 +95,7 @@ impl StateManager { } fn workspaces_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>>) { - let run = self.run_rx.clone(); + let run = self.run.clone(); tokio::spawn(async move { let mut store = HashMap::new(); diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs index 5b19890..d7e84d9 100644 --- a/src/server/actor/workspace.rs +++ b/src/server/actor/workspace.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; use tracing::warn; -use crate::events::Event; +use crate::{events::Event, service::workspace::proto::CursorUpdate}; use super::{buffer::{BufferView, Buffer}, state::{User, UserCursor}}; @@ -66,8 +66,10 @@ impl WorkspaceView { // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk #[derive(Debug)] pub struct Workspace { + pub id: uuid::Uuid, pub name: String, pub bus: broadcast::Sender, + pub cursors: broadcast::Sender, pub buffers: BuffersTreeView, pub users: UsersView, @@ -90,10 +92,13 @@ impl Workspace { let (buffer_tx, buffer_rx) = watch::channel::>(HashMap::new()); let (users_tx, users_rx) = watch::channel(HashMap::new()); let (broadcast_tx, _broadcast_rx) = broadcast::channel::(32); + let (cursors_tx, _cursors_rx) = broadcast::channel::(32); let w = Workspace { + id: uuid::Uuid::new_v4(), name, bus: broadcast_tx, + cursors: cursors_tx, buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx }, users: UsersView{ op: op_usr_tx, watch: users_rx }, run_tx, @@ -137,28 +142,42 @@ impl Workspace { fn users_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>) { let bus = self.bus.clone(); + let cursors_tx = self.cursors.clone(); let run = self.run_rx.clone(); tokio::spawn(async move { + let mut cursors_rx = cursors_tx.subscribe(); let mut users : HashMap = HashMap::new(); while run.borrow().to_owned() { - match rx.recv().await.unwrap() { - UserAction::ADD { user } => { - users.insert(user.name.clone(), user); + tokio::select!{ + action = rx.recv() => { + match action.unwrap() { + UserAction::ADD { user } => { + users.insert(user.name.clone(), user.clone()); + bus.send(Event::UserJoin { user }).unwrap(); + }, + UserAction::REMOVE { name } => { + if let None = users.remove(&name) { + continue; // don't update channel since this was a no-op + } else { + bus.send(Event::UserLeave { name }).unwrap(); + } + }, + UserAction::CURSOR { name, cursor } => { + if let Some(user) = users.get_mut(&name) { + user.cursor = cursor.clone(); + } else { + continue; // don't update channel since this was a no-op + } + }, + }; }, - UserAction::REMOVE { name } => { - if let None = users.remove(&name) { - continue; // don't update channel since this was a no-op + cursor = cursors_rx.recv() => { + let cursor = cursor.unwrap(); + if let Some(user) = users.get_mut(&cursor.username) { + user.cursor = UserCursor { buffer: cursor.buffer, x:cursor.col, y:cursor.row }; } - }, - UserAction::CURSOR { name, cursor } => { - if let Some(user) = users.get_mut(&name) { - user.cursor = cursor.clone(); - bus.send(Event::Cursor{user: name, cursor}).unwrap(); - } else { - continue; // don't update channel since this was a no-op - } - }, + } } tx.send( diff --git a/src/server/events.rs b/src/server/events.rs index 23e7d12..53551a6 100644 --- a/src/server/events.rs +++ b/src/server/events.rs @@ -1,14 +1,25 @@ -use crate::actor::state::{User, UserCursor}; +use std::fmt::Display; +use crate::actor::state::User; #[derive(Debug, Clone)] pub enum Event { UserJoin { user: User }, UserLeave { name: String }, - Cursor { user: String, cursor: UserCursor }, BufferNew { path: String }, BufferDelete { path: String }, } +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::UserJoin { user } => write!(f, "UserJoin(user:{})", user), + Self::UserLeave { name } => write!(f, "UserLeave(user:{})", name), + Self::BufferNew { path } => write!(f, "BufferNew(path:{})", path), + Self::BufferDelete { path } => write!(f, "BufferDelete(path:{})", path), + } + } +} + // pub type Event = Box; // // pub trait EventInterface { diff --git a/src/server/main.rs b/src/server/main.rs index 1bf0bfc..a217e14 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -16,7 +16,7 @@ use tonic::transport::Server; use crate::{ actor::state::StateManager, - service::{buffer::BufferService, workspace::WorkspaceService}, + service::{buffer::BufferService, workspace::WorkspaceService, session::SessionService}, }; #[tokio::main] @@ -30,6 +30,7 @@ async fn main() -> Result<(), Box> { info!("Starting server"); Server::builder() + .add_service(SessionService::new(state.clone()).server()) .add_service(WorkspaceService::new(state.clone()).server()) .add_service(BufferService::new(state.clone()).server()) .serve(addr) diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs index 0da5181..723bf47 100644 --- a/src/server/service/buffer.rs +++ b/src/server/service/buffer.rs @@ -8,8 +8,6 @@ use operational_transform::OperationSeq; use tonic::{Request, Response, Status}; pub mod proto { - tonic::include_proto!("session"); - tonic::include_proto!("workspace"); tonic::include_proto!("buffer"); } diff --git a/src/server/service/session.rs b/src/server/service/session.rs index 2da2a06..bf23f80 100644 --- a/src/server/service/session.rs +++ b/src/server/service/session.rs @@ -4,23 +4,59 @@ pub mod proto { use std::sync::Arc; -use tracing::debug; - +use proto::{session_server::{Session}, WorkspaceBuilderRequest, SessionRequest, SessionResponse, WorkspaceList}; use tonic::{Request, Response, Status}; -use proto::session_server::Session; -use proto::{SessionRequest, SessionResponse}; use crate::actor::{ - state::StateManager, - workspace::Workspace as WorkspaceInstance, // TODO fuck x2! + state::StateManager, workspace::Workspace, // TODO fuck x2! }; +use self::proto::session_server::SessionServer; + +use super::workspace::WorkspaceExtension; + #[derive(Debug)] pub struct SessionService { state: Arc, } -// #[tonic::async_trait] -// impl Session for SessionService { -// } +#[tonic::async_trait] +impl Session for SessionService { + async fn create_workspace( + &self, + req: Request, + ) -> Result, Status> { + let name = req.extensions().get::().unwrap().id.clone(); + + let w = Workspace::new(name); + let res = SessionResponse { accepted:true, session_key: w.id.to_string() }; + + self.state.view().add(w).await; + Ok(Response::new(res)) + } + + // async fn authenticate( + // &self, + // req: Request, + // ) -> Result, Status> { + // todo!() + // } + + // async fn list_workspaces( + // &self, + // req: Request, + // ) -> Result, Status> { + // todo!() + // } +} + +impl SessionService { + pub fn new(state: Arc) -> SessionService { + SessionService { state } + } + + pub fn server(self) -> SessionServer { + SessionServer::new(self) + } +} diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs index 3318975..9897b22 100644 --- a/src/server/service/workspace.rs +++ b/src/server/service/workspace.rs @@ -5,22 +5,23 @@ use tonic::service::Interceptor; use tracing::debug; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; +use tonic::{Request, Response, Status, Streaming}; use tokio::sync::{watch, mpsc}; pub mod proto { tonic::include_proto!("workspace"); } -use tokio_stream::Stream; // TODO example used this? +use tokio_stream::{Stream, StreamExt}; // TODO example used this? use proto::workspace_server::{Workspace, WorkspaceServer}; -use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest}; +use proto::{BufferList, WorkspaceEvent, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest, CursorUpdate, JoinRequest}; +use crate::actor::state::UserCursor; use crate::actor::{buffer::Buffer, state::StateManager, workspace::Workspace as WorkspaceInstance}; // TODO fuck x2! -struct WorkspaceExtension { - id: String +pub struct WorkspaceExtension { + pub id: String } #[derive(Debug, Clone)] @@ -57,9 +58,8 @@ impl Interceptor for WorkspaceInterceptor { } - - -type EventStream = Pin> + Send>>; +type EventStream = Pin> + Send>>; +type CursorUpdateStream = Pin> + Send>>; #[derive(Debug)] pub struct WorkspaceService { @@ -68,52 +68,85 @@ pub struct WorkspaceService { #[tonic::async_trait] impl Workspace for WorkspaceService { - type SubscribeStream = EventStream; + type JoinStream = EventStream; + type SubscribeStream = CursorUpdateStream; - async fn create( + async fn join( &self, - request: Request, - ) -> Result, Status> { - debug!("create request: {:?}", request); - // We should always have an extension because of the interceptor but maybe don't unwrap? - let ext = request.extensions().get::().unwrap(); - let r = request.into_inner(); - - let _w = WorkspaceInstance::new(ext.id); - - let reply = WorkspaceResponse { - // session_key: r.session_key.clone(), - accepted: true, - }; - - // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); - - Ok(Response::new(reply)) + req: Request, + ) -> Result, Status> { + let session_id = req.extensions().get::().unwrap().id.clone(); + let r = req.into_inner(); + let run = self.state.run.clone(); + let user_name = r.name.clone(); + match self.state.get(&session_id) { + Some(w) => { + let (tx, rx) = mpsc::channel::>(128); + tokio::spawn(async move { + let mut event_receiver = w.bus.subscribe(); + w.view().users.add( + crate::actor::state::User { + name: r.name.clone(), + cursor: UserCursor { buffer:0, x:0, y:0 } + } + ); + while run.borrow().to_owned() { + let res = event_receiver.recv().await.unwrap(); + let broadcasting = WorkspaceEvent { id: 1, body: Some(res.to_string()) }; // TODO actually process packet + tx.send(Ok(broadcasting)).await.unwrap(); + } + w.view().users.remove(user_name); + }); + return Ok(Response::new(Box::pin(ReceiverStream::new(rx)))); + }, + None => Err(Status::not_found(format!( + "No active workspace with session_key '{}'", + session_id + ))) + } } async fn subscribe( &self, - req: Request, - ) -> Result, Status> { - let r = req.into_inner(); - match self.state.get(&r.session_key) { + req: tonic::Request>, + ) -> Result, Status> { + let s_id = req.extensions().get::().unwrap().id.clone(); + let mut r = req.into_inner(); + match self.state.get(&s_id) { Some(w) => { - let bus_clone = w.bus.clone(); + let cursors_ref = w.cursors.clone(); let (_stop_tx, stop_rx) = watch::channel(true); - let (tx, rx) = mpsc::channel::>(128); + let (tx, rx) = mpsc::channel::>(128); tokio::spawn(async move { - let mut event_receiver = bus_clone.subscribe(); + let mut workspace_bus = cursors_ref.subscribe(); while stop_rx.borrow().to_owned() { - let _res = event_receiver.recv().await.unwrap(); - let broadcasting = Event { id: 1, body: Some("".to_string()) }; // TODO actually process packet - tx.send(Ok(broadcasting)).await.unwrap(); + tokio::select!{ + remote = workspace_bus.recv() => { + if let Ok(cur) = remote { + tx.send(Ok(cur)).await.unwrap(); + } + }, + local = r.next() => { + match local { + Some(request) => { + match request { + Ok(cur) => { + cursors_ref.send(cur).unwrap(); + }, + Err(e) => {}, + } + }, + None => {}, + } + }, + } } }); return Ok(Response::new(Box::pin(ReceiverStream::new(rx)))); }, None => Err(Status::not_found(format!( "No active workspace with session_key '{}'", - r.session_key + s_id ))) } } @@ -142,8 +175,9 @@ impl Workspace for WorkspaceService { &self, req: Request, ) -> Result, Status> { + let session_id = req.extensions().get::().unwrap().id.clone(); let r = req.into_inner(); - if let Some(w) = self.state.get(&r.session_key) { + if let Some(w) = self.state.get(&session_id) { let mut view = w.view(); let buf = Buffer::new(r.path, w.bus.clone()); view.buffers.add(buf).await; @@ -161,13 +195,11 @@ impl Workspace for WorkspaceService { &self, req: Request, ) -> Result, Status> { + let session_id = req.extensions().get::().unwrap().id.clone(); let r = req.into_inner(); - match self.state.get(&r.session_key) { + match self.state.get(&session_id) { Some(w) => { - let mut out = Vec::new(); - for (_k, v) in w.buffers.borrow().iter() { - out.push(v.name.clone()); - } + w.view().buffers.remove(r.path); Ok(Response::new(WorkspaceResponse { accepted: true })) } None => Err(Status::not_found(format!( @@ -181,8 +213,9 @@ impl Workspace for WorkspaceService { &self, req: Request, ) -> Result, Status> { + let session_id = req.extensions().get::().unwrap().id.clone(); let r = req.into_inner(); - match self.state.get(&r.session_key) { + match self.state.get(&session_id) { Some(w) => { let mut out = Vec::new(); for (_k, v) in w.users.borrow().iter() {