diff --git a/proto/session.proto b/proto/session.proto index 69e9890..a13674d 100644 --- a/proto/session.proto +++ b/proto/session.proto @@ -2,9 +2,8 @@ syntax = "proto3"; package session; service Session { - rpc Create (SessionRequest) returns (SessionResponse); - rpc Join (SessionRequest) returns (SessionResponse); - rpc Leave (SessionRequest) returns (SessionResponse); + rpc Authenticate(SessionRequest) returns (SessionResponse); + rpc ListWorkspaces(SessionRequest) returns (WorkspaceList); } message SessionRequest { @@ -15,3 +14,7 @@ message SessionResponse { string sessionKey = 1; bool accepted = 2; } + +message WorkspaceList { + repeated string name = 1; // TODO add more fields +} diff --git a/proto/workspace.proto b/proto/workspace.proto index 7691444..855aa17 100644 --- a/proto/workspace.proto +++ b/proto/workspace.proto @@ -2,8 +2,12 @@ syntax = "proto3"; package workspace; service Workspace { - rpc Subscribe (WorkspaceRequest) returns (stream Event); - rpc Buffers (WorkspaceRequest) returns (BufferList); + rpc Create (WorkspaceRequest) returns (WorkspaceResponse); + rpc Subscribe (WorkspaceRequest) returns (stream Event); + rpc Buffers (WorkspaceRequest) returns (BufferList); + rpc ListUsers (WorkspaceRequest) returns (UsersList); + rpc NewBuffer (BufferRequest) returns (WorkspaceResponse); + rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse); } message Event { @@ -15,7 +19,20 @@ message WorkspaceRequest { string sessionKey = 1; } +message BufferRequest { + string sessionKey = 1; + string path = 2; +} + +message WorkspaceResponse { + bool accepted = 1; +} + message BufferList { repeated string path = 1; } +message UsersList { + repeated string name = 1; +} + diff --git a/src/server/service/session.rs b/src/server/service/session.rs index 3099775..b47831a 100644 --- a/src/server/service/session.rs +++ b/src/server/service/session.rs @@ -21,64 +21,6 @@ pub struct SessionService { state: Arc, } -#[tonic::async_trait] -impl Session for SessionService { - async fn create( - &self, - request: Request, - ) -> Result, Status> { - debug!("create request: {:?}", request); - let r = request.into_inner(); - - let _w = WorkspaceInstance::new(r.session_key.clone()); - - let reply = proto::SessionResponse { - session_key: r.session_key.clone(), - accepted: true, - }; - - // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); - - Ok(Response::new(reply)) - } - - async fn join( - &self, - request: Request, - ) -> Result, Status> { - debug!("join request: {:?}", request); - - let reply = proto::SessionResponse { - session_key: request.into_inner().session_key, - accepted: true, - }; - - Ok(Response::new(reply)) - } - - async fn leave( - &self, - request: Request, - ) -> Result, Status> { - debug!("leave request: {:?}", request); - let r = request.into_inner(); - let mut removed = false; - - if self.state.workspaces_ref().get(&r.session_key).is_some() { - self.state - .op(AlterState::REMOVE { - key: r.session_key.clone(), - }) - .await - .unwrap(); - removed = true; // TODO this is a lie! Verify it - } - - let reply = proto::SessionResponse { - session_key: r.session_key, - accepted: removed, - }; - - Ok(Response::new(reply)) - } -} +// #[tonic::async_trait] +// impl Session for SessionService { +// } diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs index 79b58e3..e7de6bd 100644 --- a/src/server/service/workspace.rs +++ b/src/server/service/workspace.rs @@ -1,8 +1,10 @@ use std::{pin::Pin, sync::Arc}; -// use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, warn}; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; +use tokio::sync::{watch, mpsc}; pub mod proto { tonic::include_proto!("workspace"); @@ -11,9 +13,9 @@ pub mod proto { use tokio_stream::Stream; // TODO example used this? use proto::workspace_server::{Workspace, WorkspaceServer}; -use proto::{BufferList, Event, WorkspaceRequest}; +use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest}; -use crate::actor::state::StateManager; +use crate::actor::{buffer::Buffer, state::StateManager, workspace::{BufferAction, UserAction, Workspace as WorkspaceInstance}}; // TODO fuck x2! type EventStream = Pin> + Send>>; @@ -26,11 +28,50 @@ pub struct WorkspaceService { impl Workspace for WorkspaceService { type SubscribeStream = EventStream; + async fn create( + &self, + request: Request, + ) -> Result, Status> { + debug!("create request: {:?}", request); + let r = request.into_inner(); + + let _w = WorkspaceInstance::new(r.session_key.clone()); + + let reply = WorkspaceResponse { + // session_key: r.session_key.clone(), + accepted: true, + }; + + // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); + + Ok(Response::new(reply)) + } + async fn subscribe( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let r = req.into_inner(); + match self.state.workspaces_ref().get(&r.session_key) { + Some(w) => { + let bus_clone = w.bus.clone(); + let (_stop_tx, stop_rx) = watch::channel(true); + let (tx, rx) = mpsc::channel::>(128); + tokio::spawn(async move { + let mut event_receiver = bus_clone.subscribe(); + while stop_rx.borrow().to_owned() { + let _res = event_receiver.recv().await.unwrap(); + let broadcasting = Event { id: 1, body: Some("".to_string()) }; // TODO actually process packet + tx.send(Ok(broadcasting)).await.unwrap(); + } + }); + return Ok(Response::new(Box::pin(ReceiverStream::new(rx)))); + }, + None => Err(Status::not_found(format!( + "No active workspace with session_key '{}'", + r.session_key + ))) + } } async fn buffers( @@ -41,7 +82,7 @@ impl Workspace for WorkspaceService { match self.state.workspaces_ref().get(&r.session_key) { Some(w) => { let mut out = Vec::new(); - for (_k, v) in w.buffers_ref().iter() { + for (_k, v) in w.buffers.borrow().iter() { out.push(v.name.clone()); } Ok(Response::new(BufferList { path: out })) @@ -52,6 +93,53 @@ impl Workspace for WorkspaceService { ))), } } + + async fn new_buffer( + &self, + req: Request, + ) -> Result, Status> { + let r = req.into_inner(); + if let Some(w) = self.state.workspaces_ref().get(&r.session_key) { + let mut view = w.view(); + let buf = Buffer::new(r.path, w.bus.clone()); + view.buffers.add(buf); + + Ok(Response::new(WorkspaceResponse { accepted: true })) + } else { + return Err(Status::not_found(format!( + "No active workspace with session_key '{}'", + r.session_key + ))); + } + } + + async fn remove_buffer( + &self, + req: Request, + ) -> Result, Status> { + let r = req.into_inner(); + match self.state.workspaces_ref().get(&r.session_key) { + Some(w) => { + let mut out = Vec::new(); + for (_k, v) in w.buffers.borrow().iter() { + out.push(v.name.clone()); + } + Ok(Response::new(WorkspaceResponse { accepted: true })) + } + None => Err(Status::not_found(format!( + "No active workspace with session_key '{}'", + r.session_key + ))), + } + } + + async fn list_users( + &self, + req: Request, + ) -> Result, Status> { + todo!() + } + } impl WorkspaceService {