diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs index 4d27aca..f8fbad8 100644 --- a/src/server/actor/state.rs +++ b/src/server/actor/state.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, sync::Arc, fmt::Display}; use tokio::sync::{mpsc, watch}; use tracing::error; +use uuid::Uuid; use crate::actor::workspace::Workspace; @@ -9,30 +10,30 @@ use crate::actor::workspace::Workspace; #[derive(Debug)] enum WorkspaceAction { ADD { - key: String, + key: Uuid, w: Box, }, REMOVE { - key: String + key: Uuid }, } #[derive(Debug, Clone)] pub struct WorkspacesView { - watch: watch::Receiver>>, + watch: watch::Receiver>>, op: mpsc::Sender, } impl WorkspacesView { - pub fn borrow(&self) -> watch::Ref>> { + pub fn borrow(&self) -> watch::Ref>> { self.watch.borrow() } pub async fn add(&mut self, w: Workspace) { - self.op.send(WorkspaceAction::ADD { key: w.id.to_string(), w: Box::new(w) }).await.unwrap(); + self.op.send(WorkspaceAction::ADD { key: w.id, w: Box::new(w) }).await.unwrap(); } - pub async fn remove(&mut self, key: String) { + pub async fn remove(&mut self, key: Uuid) { self.op.send(WorkspaceAction::REMOVE { key }).await.unwrap(); } } @@ -68,7 +69,7 @@ impl StateManager { return s; } - fn workspaces_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>>) { + fn workspaces_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>>) { let run = self.run.clone(); tokio::spawn(async move { let mut store = HashMap::new(); @@ -96,7 +97,7 @@ impl StateManager { } /// get a workspace Arc directly, without passing by the WorkspacesView - pub fn get(&self, key: &String) -> Option> { + pub fn get(&self, key: &Uuid) -> Option> { if let Some(w) = self.workspaces.borrow().get(key) { return Some(w.clone()); } diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs index 3f90732..70b7d3d 100644 --- a/src/server/service/buffer.rs +++ b/src/server/service/buffer.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; use std::{pin::Pin, sync::Arc}; +use uuid::Uuid; + use tokio_stream::wrappers::ReceiverStream; use tracing::error; @@ -111,7 +113,7 @@ impl Buffer for BufferService { } // TODO make these above nicer? more concise? idk - if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { + if let Some(workspace) = self.state.workspaces.borrow().get(&Uuid::parse_str(session_id.as_str()).unwrap()) { let in_stream = req.into_inner(); let (tx_og, rx) = mpsc::channel::>(128); diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs index 9503ba0..b573bdf 100644 --- a/src/server/service/workspace.rs +++ b/src/server/service/workspace.rs @@ -1,8 +1,9 @@ use std::{pin::Pin, sync::Arc}; +use uuid::Uuid; use tonic::codegen::InterceptedService; use tonic::service::Interceptor; -use tracing::debug; +use tracing::info; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; @@ -40,6 +41,7 @@ impl Interceptor for WorkspaceInterceptor { // tree... match req.metadata().get("workspace") { Some(value) => { + info!("Metadata: {:?}", value); match value.to_str() { Ok(w_id) => { id = w_id.to_string(); @@ -50,7 +52,14 @@ impl Interceptor for WorkspaceInterceptor { None => return Err(Status::unauthenticated("No workspace key included in request")) } - if !self.state.workspaces.borrow().contains_key(&id) { + info!("checking request : {}", id); + + let uid = match Uuid::parse_str(id.as_str()) { + Ok(id) => id, + Err(e) => { return Err(Status::invalid_argument(format!("Invalid uuid : {}", e))); }, + }; + + if !self.state.workspaces.borrow().contains_key(&uid) { return Err(Status::not_found(format!("Workspace '{}' could not be found", id))); } @@ -77,7 +86,7 @@ impl Workspace for WorkspaceService { &self, req: Request, ) -> Result, Status> { - let session_id = req.extensions().get::().unwrap().id.clone(); + let session_id = Uuid::parse_str(req.extensions().get::().unwrap().id.as_str()).unwrap(); let r = req.into_inner(); let run = self.state.run.clone(); let user_name = r.name.clone(); @@ -87,17 +96,18 @@ impl Workspace for WorkspaceService { tokio::spawn(async move { let mut event_receiver = w.bus.subscribe(); w.view().users.add( - crate::actor::state::User { - name: "some-name".to_string(), // get from request + User { + name: r.name.clone(), cursor: UserCursor { buffer:0, x:0, y:0 } } - ); + ).await; + info!("User {} joined workspace {}", r.name, w.id); while run.borrow().to_owned() { let res = event_receiver.recv().await.unwrap(); let broadcasting = WorkspaceEvent { id: 1, body: Some(res.to_string()) }; // TODO actually process packet tx.send(Ok(broadcasting)).await.unwrap(); } - w.view().users.remove(user_name); + w.view().users.remove(user_name).await; }); return Ok(Response::new(Box::pin(ReceiverStream::new(rx)))); }, @@ -112,7 +122,7 @@ impl Workspace for WorkspaceService { &self, req: tonic::Request>, ) -> Result, Status> { - let s_id = req.extensions().get::().unwrap().id.clone(); + let s_id = Uuid::parse_str(req.extensions().get::().unwrap().id.as_str()).unwrap(); let mut r = req.into_inner(); match self.state.get(&s_id) { Some(w) => { @@ -125,17 +135,19 @@ impl Workspace for WorkspaceService { tokio::select!{ remote = workspace_bus.recv() => { if let Ok(cur) = remote { + info!("Sending cursor update : {:?}", cur); tx.send(Ok(cur)).await.unwrap(); } }, local = r.next() => { match local { Some(request) => { + info!("Received cursor update : {:?}", request); match request { Ok(cur) => { cursors_ref.send(cur).unwrap(); }, - Err(e) => {}, + Err(_e) => {}, } }, None => {}, @@ -158,7 +170,7 @@ impl Workspace for WorkspaceService { req: Request, ) -> Result, Status> { let r = req.into_inner(); - match self.state.get(&r.session_key) { + match self.state.get(&Uuid::parse_str(r.session_key.as_str()).unwrap()) { Some(w) => { let mut out = Vec::new(); for (_k, v) in w.buffers.borrow().iter() { @@ -179,7 +191,7 @@ impl Workspace for WorkspaceService { ) -> Result, Status> { let session_id = req.extensions().get::().unwrap().id.clone(); let r = req.into_inner(); - if let Some(w) = self.state.get(&session_id) { + if let Some(w) = self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) { let mut view = w.view(); let buf = Buffer::new(r.path, w.bus.clone()); view.buffers.add(buf).await; @@ -199,9 +211,9 @@ impl Workspace for WorkspaceService { ) -> Result, Status> { let session_id = req.extensions().get::().unwrap().id.clone(); let r = req.into_inner(); - match self.state.get(&session_id) { + match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) { Some(w) => { - w.view().buffers.remove(r.path); + w.view().buffers.remove(r.path).await; Ok(Response::new(WorkspaceResponse { accepted: true })) } None => Err(Status::not_found(format!( @@ -217,7 +229,7 @@ impl Workspace for WorkspaceService { ) -> Result, Status> { let session_id = req.extensions().get::().unwrap().id.clone(); let r = req.into_inner(); - match self.state.get(&session_id) { + match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) { Some(w) => { let mut out = Vec::new(); for (_k, v) in w.users.borrow().iter() {