From 851781b0413fec5814d71985f5ffd4469dcebc96 Mon Sep 17 00:00:00 2001 From: alemidev Date: Mon, 29 Aug 2022 02:48:09 +0200 Subject: [PATCH] feat: made statemanager more friendly in its usage basically removed the need to manually compose messages and added methods to handle it. --- src/server/actor/state.rs | 77 ++++++++++++++++++++++----------- src/server/service/buffer.rs | 2 +- src/server/service/session.rs | 2 +- src/server/service/workspace.rs | 12 ++--- 4 files changed, 59 insertions(+), 34 deletions(-) diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs index 3d9142b..85c4c62 100644 --- a/src/server/actor/state.rs +++ b/src/server/actor/state.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{mpsc, watch::{self, Ref}}; +use tokio::sync::{mpsc, watch}; use tracing::error; use crate::actor::workspace::Workspace; @@ -19,26 +19,46 @@ pub struct User { } #[derive(Debug)] -pub enum AlterState { +enum WorkspaceAction { ADD { key: String, - // w: Workspace + w: Box, }, REMOVE { key: String }, } +#[derive(Debug, Clone)] +pub struct WorkspacesView { + watch: watch::Receiver>>, + op: mpsc::Sender, +} + +impl WorkspacesView { + pub fn borrow(&self) -> watch::Ref>> { + self.watch.borrow() + } + + pub async fn add(&mut self, w: Workspace) { + self.op.send(WorkspaceAction::ADD { key: w.name.clone(), w: Box::new(w) }).await.unwrap(); + } + + pub async fn remove(&mut self, key: String) { + self.op.send(WorkspaceAction::REMOVE { key }).await.unwrap(); + } +} + #[derive(Debug)] pub struct StateManager { - op_tx: mpsc::Sender, // TODO make method for this - workspaces: watch::Receiver>>, - run: watch::Sender, + pub workspaces: WorkspacesView, + run_tx: watch::Sender, + run_rx: watch::Receiver, } impl Drop for StateManager { fn drop(&mut self) { - self.run.send(false).unwrap_or_else(|e| { + self.run_tx.send(false).unwrap_or_else(|e| { error!("Could not stop StateManager worker: {:?}", e); }) } @@ -46,47 +66,52 @@ impl Drop for StateManager { impl StateManager { pub fn new() -> Self { - let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure + let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new()); - let (stop_tx, stop_rx) = watch::channel(true); + let (run_tx, run_rx) = watch::channel(true); let s = StateManager { - workspaces: workspaces_rx, - op_tx: tx, - run: stop_tx, + workspaces: WorkspacesView { watch: workspaces_rx, op: tx }, + run_tx, run_rx, }; + s.workspaces_worker(rx, workspaces_tx); + + return s; + } + + fn workspaces_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>>) { + let run = self.run_rx.clone(); tokio::spawn(async move { let mut store = HashMap::new(); - let mut _users = HashMap::::new(); - while stop_rx.borrow().to_owned() { + while run.borrow().to_owned() { if let Some(event) = rx.recv().await { match event { - AlterState::ADD { key/*, w */} => { - // store.insert(key, Arc::new(w)); // TODO put in hashmap - // workspaces_tx.send(store.clone()).unwrap(); + WorkspaceAction::ADD { key, w } => { + store.insert(key, Arc::new(*w)); // TODO put in hashmap }, - AlterState::REMOVE { key } => { + WorkspaceAction::REMOVE { key } => { store.remove(&key); }, } - workspaces_tx.send(store.clone()).unwrap(); + tx.send(store.clone()).unwrap(); } else { break } } }); - - return s; } - pub fn workspaces_ref(&self) -> Ref>> { - self.workspaces.borrow() + pub fn view(&self) -> WorkspacesView { + return self.workspaces.clone(); } - // TODO wrap result of this func? - pub async fn op(&self, op: AlterState) -> Result<(), mpsc::error::SendError> { - self.op_tx.send(op).await + /// get a workspace Arc directly, without passing by the WorkspacesView + pub fn get(&self, key: &String) -> Option> { + if let Some(w) = self.workspaces.borrow().get(key) { + return Some(w.clone()); + } + return None; } } diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs index 8f64b97..fc9a8d3 100644 --- a/src/server/service/buffer.rs +++ b/src/server/service/buffer.rs @@ -112,7 +112,7 @@ impl Buffer for BufferService { } // TODO make these above nicer? more concise? idk - if let Some(workspace) = self.state.workspaces_ref().get(&session_id) { + if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { let in_stream = req.into_inner(); let (tx_og, rx) = mpsc::channel::>(128); diff --git a/src/server/service/session.rs b/src/server/service/session.rs index b47831a..2da2a06 100644 --- a/src/server/service/session.rs +++ b/src/server/service/session.rs @@ -12,7 +12,7 @@ use proto::session_server::Session; use proto::{SessionRequest, SessionResponse}; use crate::actor::{ - state::{AlterState, StateManager}, + state::StateManager, workspace::Workspace as WorkspaceInstance, // TODO fuck x2! }; diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs index e7de6bd..2292d58 100644 --- a/src/server/service/workspace.rs +++ b/src/server/service/workspace.rs @@ -15,7 +15,7 @@ use tokio_stream::Stream; // TODO example used this? use proto::workspace_server::{Workspace, WorkspaceServer}; use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest}; -use crate::actor::{buffer::Buffer, state::StateManager, workspace::{BufferAction, UserAction, Workspace as WorkspaceInstance}}; // TODO fuck x2! +use crate::actor::{buffer::Buffer, state::StateManager, workspace::{Workspace as WorkspaceInstance}}; // TODO fuck x2! type EventStream = Pin> + Send>>; @@ -52,7 +52,7 @@ impl Workspace for WorkspaceService { req: Request, ) -> Result, Status> { let r = req.into_inner(); - match self.state.workspaces_ref().get(&r.session_key) { + match self.state.get(&r.session_key) { Some(w) => { let bus_clone = w.bus.clone(); let (_stop_tx, stop_rx) = watch::channel(true); @@ -79,7 +79,7 @@ impl Workspace for WorkspaceService { req: Request, ) -> Result, Status> { let r = req.into_inner(); - match self.state.workspaces_ref().get(&r.session_key) { + match self.state.get(&r.session_key) { Some(w) => { let mut out = Vec::new(); for (_k, v) in w.buffers.borrow().iter() { @@ -99,10 +99,10 @@ impl Workspace for WorkspaceService { req: Request, ) -> Result, Status> { let r = req.into_inner(); - if let Some(w) = self.state.workspaces_ref().get(&r.session_key) { + if let Some(w) = self.state.get(&r.session_key) { let mut view = w.view(); let buf = Buffer::new(r.path, w.bus.clone()); - view.buffers.add(buf); + view.buffers.add(buf).await; Ok(Response::new(WorkspaceResponse { accepted: true })) } else { @@ -118,7 +118,7 @@ impl Workspace for WorkspaceService { req: Request, ) -> Result, Status> { let r = req.into_inner(); - match self.state.workspaces_ref().get(&r.session_key) { + match self.state.get(&r.session_key) { Some(w) => { let mut out = Vec::new(); for (_k, v) in w.buffers.borrow().iter() {