From 18d86020c0c69ba201df18ef95531ad2ea21a2e1 Mon Sep 17 00:00:00 2001 From: alemidev Date: Wed, 3 Aug 2022 18:51:56 +0200 Subject: [PATCH] fix: add getter to state workspaces, imports fix --- src/server/actor/state.rs | 27 ++++++++++++++++++--------- src/server/actor/workspace.rs | 15 +++++++++++---- src/server/service/buffer.rs | 22 ++++++++++------------ src/server/service/session.rs | 16 ++++++---------- src/server/service/workspace.rs | 6 +++--- 5 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs index 2b9b49c..aebb868 100644 --- a/src/server/actor/state.rs +++ b/src/server/actor/state.rs @@ -1,21 +1,21 @@ use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch::{self, Ref}}; use tracing::error; use crate::actor::workspace::Workspace; #[derive(Debug, Clone)] pub struct UserCursor{ - buffer: i64, - x: i32, - y: i32 + // buffer: i64, + // x: i32, + // y: i32 } #[derive(Debug, Clone)] pub struct User { - name: String, - cursor: UserCursor, + // name: String, + // cursor: UserCursor, } #[derive(Debug)] @@ -31,8 +31,8 @@ pub enum AlterState { #[derive(Debug)] pub struct StateManager { - pub workspaces: watch::Receiver>>, - pub op_tx: mpsc::Sender, // TODO make method for this + op_tx: mpsc::Sender, // TODO make method for this + workspaces: watch::Receiver>>, run: watch::Sender, } @@ -58,7 +58,7 @@ impl StateManager { tokio::spawn(async move { let mut store = HashMap::new(); - let mut users = HashMap::::new(); + let mut _users = HashMap::::new(); while stop_rx.borrow().to_owned() { if let Some(event) = rx.recv().await { @@ -80,4 +80,13 @@ impl StateManager { return s; } + + pub fn workspaces_ref(&self) -> Ref>> { + self.workspaces.borrow() + } + + // TODO wrap result of this func? + pub async fn op(&self, op: AlterState) -> Result<(), mpsc::error::SendError> { + self.op_tx.send(op).await + } } diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs index e6225df..30f2a26 100644 --- a/src/server/actor/workspace.rs +++ b/src/server/actor/workspace.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use operational_transform::OperationSeq; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; +use tracing::warn; use super::{buffer::{BufferView, Buffer}, state::User}; @@ -30,19 +31,25 @@ pub struct Workspace { pub bus: broadcast::Sender, buf_tx: mpsc::Sender, - usr_tx: mpsc::Sender, + pub usr_tx: mpsc::Sender, run: watch::Sender, } +impl Drop for Workspace { + fn drop(&mut self) { + self.run.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e)); + } +} + impl Workspace { pub fn new(name: String) -> Self { let (buf_tx, mut buf_rx) = mpsc::channel(32); - let (usr_tx, mut usr_rx) = mpsc::channel(32); + let (usr_tx, mut _usr_rx) = mpsc::channel(32); let (stop_tx, stop_rx) = watch::channel(true); let (buffer_tx, buffer_rx) = watch::channel(HashMap::new()); - let (broadcast_tx, broadcast_rx) = broadcast::channel(32); - let (users_tx, users_rx) = watch::channel(HashMap::new()); + let (broadcast_tx, _broadcast_rx) = broadcast::channel(32); + let (_users_tx, users_rx) = watch::channel(HashMap::new()); let w = Workspace { name, diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs index fb07ae4..18149fe 100644 --- a/src/server/service/buffer.rs +++ b/src/server/service/buffer.rs @@ -2,10 +2,10 @@ use std::collections::VecDeque; use std::{pin::Pin, sync::Arc}; use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, error, info, warn}; +use tracing::error; use operational_transform::OperationSeq; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{Request, Response, Status}; pub mod proto { tonic::include_proto!("session"); @@ -17,14 +17,12 @@ use tokio::sync::{broadcast, mpsc}; use tokio_stream::{Stream, StreamExt}; // TODO example used this? use proto::buffer_server::{Buffer, BufferServer}; -use proto::session_server::{Session, SessionServer}; -use proto::workspace_server::{Workspace, WorkspaceServer}; -use proto::{BufferList, Event, Operation, SessionRequest, SessionResponse, WorkspaceRequest}; +use proto::Operation; use tonic::Streaming; //use futures::{Stream, StreamExt}; -use crate::actor::{buffer::BufferView, state::{AlterState, StateManager}, workspace::Workspace as WorkspaceInstance}; +use crate::actor::{buffer::BufferView, state::StateManager}; use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2! @@ -34,10 +32,10 @@ pub struct BufferService { state: Arc, } -fn op_seq(o: &Operation) -> OperationSeq { +fn op_seq(_o: &Operation) -> OperationSeq { todo!() } -fn op_net(o: &OperationSeq) -> Operation { +fn op_net(_o: &OperationSeq) -> Operation { todo!() } @@ -113,11 +111,11 @@ impl Buffer for BufferService { } // TODO make these above nicer? more concise? idk - if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { + if let Some(workspace) = self.state.workspaces_ref().get(&session_id) { let in_stream = req.into_inner(); let (tx_og, rx) = mpsc::channel::>(128); - let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); + let b: BufferView = workspace.buffers_ref().get(&path).unwrap().clone(); let w = workspace.clone(); tokio::spawn(async move { buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; @@ -135,11 +133,11 @@ impl Buffer for BufferService { } } - async fn push(&self, req:Request) -> Result, Status> { + async fn push(&self, _req:Request) -> Result, Status> { todo!() } - async fn pull(&self, req:Request) -> Result, Status> { + async fn pull(&self, _req:Request) -> Result, Status> { todo!() } diff --git a/src/server/service/session.rs b/src/server/service/session.rs index ff5ae1c..3099775 100644 --- a/src/server/service/session.rs +++ b/src/server/service/session.rs @@ -4,17 +4,14 @@ pub mod proto { use std::sync::Arc; -use tracing::{debug, error, info, warn}; +use tracing::debug; -use tokio_stream::{Stream, StreamExt}; // TODO example used this? +use tonic::{Request, Response, Status}; -use tonic::{transport::Server, Request, Response, Status}; - -use proto::session_server::{Session, SessionServer}; +use proto::session_server::Session; use proto::{SessionRequest, SessionResponse}; use crate::actor::{ - buffer::BufferView, state::{AlterState, StateManager}, workspace::Workspace as WorkspaceInstance, // TODO fuck x2! }; @@ -33,7 +30,7 @@ impl Session for SessionService { debug!("create request: {:?}", request); let r = request.into_inner(); - let w = WorkspaceInstance::new(r.session_key.clone()); + let _w = WorkspaceInstance::new(r.session_key.clone()); let reply = proto::SessionResponse { session_key: r.session_key.clone(), @@ -67,10 +64,9 @@ impl Session for SessionService { let r = request.into_inner(); let mut removed = false; - if self.state.workspaces.borrow().get(&r.session_key).is_some() { + if self.state.workspaces_ref().get(&r.session_key).is_some() { self.state - .op_tx - .send(AlterState::REMOVE { + .op(AlterState::REMOVE { key: r.session_key.clone(), }) .await diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs index 6530d9e..79b58e3 100644 --- a/src/server/service/workspace.rs +++ b/src/server/service/workspace.rs @@ -28,7 +28,7 @@ impl Workspace for WorkspaceService { async fn subscribe( &self, - req: Request, + _req: Request, ) -> Result, Status> { todo!() } @@ -38,9 +38,9 @@ impl Workspace for WorkspaceService { req: Request, ) -> Result, Status> { let r = req.into_inner(); - match self.state.workspaces.borrow().get(&r.session_key) { + match self.state.workspaces_ref().get(&r.session_key) { Some(w) => { - let out = Vec::new(); + let mut out = Vec::new(); for (_k, v) in w.buffers_ref().iter() { out.push(v.name.clone()); }