feat: made statemanager more friendly in its usage

basically removed the need to manually compose messages and added
methods to handle it.
This commit is contained in:
əlemi 2022-08-29 02:48:09 +02:00
parent 824cb62933
commit 851781b041
4 changed files with 59 additions and 34 deletions

View file

@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, watch::{self, Ref}}; use tokio::sync::{mpsc, watch};
use tracing::error; use tracing::error;
use crate::actor::workspace::Workspace; use crate::actor::workspace::Workspace;
@ -19,26 +19,46 @@ pub struct User {
} }
#[derive(Debug)] #[derive(Debug)]
pub enum AlterState { enum WorkspaceAction {
ADD { ADD {
key: String, key: String,
// w: Workspace w: Box<Workspace>,
}, },
REMOVE { REMOVE {
key: String key: String
}, },
} }
#[derive(Debug, Clone)]
pub struct WorkspacesView {
watch: watch::Receiver<HashMap<String, Arc<Workspace>>>,
op: mpsc::Sender<WorkspaceAction>,
}
impl WorkspacesView {
pub fn borrow(&self) -> watch::Ref<HashMap<String, Arc<Workspace>>> {
self.watch.borrow()
}
pub async fn add(&mut self, w: Workspace) {
self.op.send(WorkspaceAction::ADD { key: w.name.clone(), w: Box::new(w) }).await.unwrap();
}
pub async fn remove(&mut self, key: String) {
self.op.send(WorkspaceAction::REMOVE { key }).await.unwrap();
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct StateManager { pub struct StateManager {
op_tx: mpsc::Sender<AlterState>, // TODO make method for this pub workspaces: WorkspacesView,
workspaces: watch::Receiver<HashMap<String, Arc<Workspace>>>, run_tx: watch::Sender<bool>,
run: watch::Sender<bool>, run_rx: watch::Receiver<bool>,
} }
impl Drop for StateManager { impl Drop for StateManager {
fn drop(&mut self) { fn drop(&mut self) {
self.run.send(false).unwrap_or_else(|e| { self.run_tx.send(false).unwrap_or_else(|e| {
error!("Could not stop StateManager worker: {:?}", e); error!("Could not stop StateManager worker: {:?}", e);
}) })
} }
@ -46,47 +66,52 @@ impl Drop for StateManager {
impl StateManager { impl StateManager {
pub fn new() -> Self { pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure
let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new()); let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new());
let (stop_tx, stop_rx) = watch::channel(true); let (run_tx, run_rx) = watch::channel(true);
let s = StateManager { let s = StateManager {
workspaces: workspaces_rx, workspaces: WorkspacesView { watch: workspaces_rx, op: tx },
op_tx: tx, run_tx, run_rx,
run: stop_tx,
}; };
s.workspaces_worker(rx, workspaces_tx);
return s;
}
fn workspaces_worker(&self, mut rx: mpsc::Receiver<WorkspaceAction>, tx: watch::Sender<HashMap<String, Arc<Workspace>>>) {
let run = self.run_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut store = HashMap::new(); let mut store = HashMap::new();
let mut _users = HashMap::<String, User>::new();
while stop_rx.borrow().to_owned() { while run.borrow().to_owned() {
if let Some(event) = rx.recv().await { if let Some(event) = rx.recv().await {
match event { match event {
AlterState::ADD { key/*, w */} => { WorkspaceAction::ADD { key, w } => {
// store.insert(key, Arc::new(w)); // TODO put in hashmap store.insert(key, Arc::new(*w)); // TODO put in hashmap
// workspaces_tx.send(store.clone()).unwrap();
}, },
AlterState::REMOVE { key } => { WorkspaceAction::REMOVE { key } => {
store.remove(&key); store.remove(&key);
}, },
} }
workspaces_tx.send(store.clone()).unwrap(); tx.send(store.clone()).unwrap();
} else { } else {
break break
} }
} }
}); });
return s;
} }
pub fn workspaces_ref(&self) -> Ref<HashMap<String, Arc<Workspace>>> { pub fn view(&self) -> WorkspacesView {
self.workspaces.borrow() return self.workspaces.clone();
} }
// TODO wrap result of this func? /// get a workspace Arc directly, without passing by the WorkspacesView
pub async fn op(&self, op: AlterState) -> Result<(), mpsc::error::SendError<AlterState>> { pub fn get(&self, key: &String) -> Option<Arc<Workspace>> {
self.op_tx.send(op).await if let Some(w) = self.workspaces.borrow().get(key) {
return Some(w.clone());
}
return None;
} }
} }

View file

@ -112,7 +112,7 @@ impl Buffer for BufferService {
} }
// TODO make these above nicer? more concise? idk // TODO make these above nicer? more concise? idk
if let Some(workspace) = self.state.workspaces_ref().get(&session_id) { if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) {
let in_stream = req.into_inner(); let in_stream = req.into_inner();
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128); let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);

View file

@ -12,7 +12,7 @@ use proto::session_server::Session;
use proto::{SessionRequest, SessionResponse}; use proto::{SessionRequest, SessionResponse};
use crate::actor::{ use crate::actor::{
state::{AlterState, StateManager}, state::StateManager,
workspace::Workspace as WorkspaceInstance, // TODO fuck x2! workspace::Workspace as WorkspaceInstance, // TODO fuck x2!
}; };

View file

@ -15,7 +15,7 @@ use tokio_stream::Stream; // TODO example used this?
use proto::workspace_server::{Workspace, WorkspaceServer}; use proto::workspace_server::{Workspace, WorkspaceServer};
use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest}; use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest};
use crate::actor::{buffer::Buffer, state::StateManager, workspace::{BufferAction, UserAction, Workspace as WorkspaceInstance}}; // TODO fuck x2! use crate::actor::{buffer::Buffer, state::StateManager, workspace::{Workspace as WorkspaceInstance}}; // TODO fuck x2!
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Status>> + Send>>; type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Status>> + Send>>;
@ -52,7 +52,7 @@ impl Workspace for WorkspaceService {
req: Request<WorkspaceRequest>, req: Request<WorkspaceRequest>,
) -> Result<tonic::Response<EventStream>, Status> { ) -> Result<tonic::Response<EventStream>, Status> {
let r = req.into_inner(); let r = req.into_inner();
match self.state.workspaces_ref().get(&r.session_key) { match self.state.get(&r.session_key) {
Some(w) => { Some(w) => {
let bus_clone = w.bus.clone(); let bus_clone = w.bus.clone();
let (_stop_tx, stop_rx) = watch::channel(true); let (_stop_tx, stop_rx) = watch::channel(true);
@ -79,7 +79,7 @@ impl Workspace for WorkspaceService {
req: Request<WorkspaceRequest>, req: Request<WorkspaceRequest>,
) -> Result<Response<BufferList>, Status> { ) -> Result<Response<BufferList>, Status> {
let r = req.into_inner(); let r = req.into_inner();
match self.state.workspaces_ref().get(&r.session_key) { match self.state.get(&r.session_key) {
Some(w) => { Some(w) => {
let mut out = Vec::new(); let mut out = Vec::new();
for (_k, v) in w.buffers.borrow().iter() { for (_k, v) in w.buffers.borrow().iter() {
@ -99,10 +99,10 @@ impl Workspace for WorkspaceService {
req: Request<BufferRequest>, req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> { ) -> Result<Response<WorkspaceResponse>, Status> {
let r = req.into_inner(); let r = req.into_inner();
if let Some(w) = self.state.workspaces_ref().get(&r.session_key) { if let Some(w) = self.state.get(&r.session_key) {
let mut view = w.view(); let mut view = w.view();
let buf = Buffer::new(r.path, w.bus.clone()); let buf = Buffer::new(r.path, w.bus.clone());
view.buffers.add(buf); view.buffers.add(buf).await;
Ok(Response::new(WorkspaceResponse { accepted: true })) Ok(Response::new(WorkspaceResponse { accepted: true }))
} else { } else {
@ -118,7 +118,7 @@ impl Workspace for WorkspaceService {
req: Request<BufferRequest>, req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> { ) -> Result<Response<WorkspaceResponse>, Status> {
let r = req.into_inner(); let r = req.into_inner();
match self.state.workspaces_ref().get(&r.session_key) { match self.state.get(&r.session_key) {
Some(w) => { Some(w) => {
let mut out = Vec::new(); let mut out = Vec::new();
for (_k, v) in w.buffers.borrow().iter() { for (_k, v) in w.buffers.borrow().iter() {