diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs index f1cac64..2b9b49c 100644 --- a/src/server/actor/state.rs +++ b/src/server/actor/state.rs @@ -5,13 +5,28 @@ use tracing::error; use crate::actor::workspace::Workspace; +#[derive(Debug, Clone)] +pub struct UserCursor{ + buffer: i64, + x: i32, + y: i32 +} + +#[derive(Debug, Clone)] +pub struct User { + name: String, + cursor: UserCursor, +} + #[derive(Debug)] pub enum AlterState { ADD { key: String, w: Workspace }, - REMOVE { key: String }, + REMOVE { + key: String + }, } #[derive(Debug)] @@ -32,28 +47,31 @@ impl Drop for StateManager { impl StateManager { pub fn new() -> Self { let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure - let (watch_tx, watch_rx) = watch::channel(HashMap::new()); + let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new()); let (stop_tx, stop_rx) = watch::channel(true); let s = StateManager { - workspaces: watch_rx, + workspaces: workspaces_rx, op_tx: tx, run: stop_tx, }; tokio::spawn(async move { let mut store = HashMap::new(); + let mut users = HashMap::::new(); + while stop_rx.borrow().to_owned() { if let Some(event) = rx.recv().await { match event { AlterState::ADD { key, w } => { store.insert(key, Arc::new(w)); // TODO put in hashmap + workspaces_tx.send(store.clone()).unwrap(); }, AlterState::REMOVE { key } => { store.remove(&key); }, } - watch_tx.send(store.clone()).unwrap(); + workspaces_tx.send(store.clone()).unwrap(); } else { break } diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs index d961c7a..e6225df 100644 --- a/src/server/actor/workspace.rs +++ b/src/server/actor/workspace.rs @@ -1,45 +1,64 @@ use std::collections::HashMap; use operational_transform::OperationSeq; -use tokio::sync::{broadcast, mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; -use super::buffer::{BufferView, Buffer}; +use super::{buffer::{BufferView, Buffer}, state::User}; + +type Event = (String, OperationSeq); // TODO jank! + +pub enum UserAction { + ADD {}, + REMOVE {}, +} pub struct WorkspaceView { - pub rx: broadcast::Receiver, - pub tx: mpsc::Sender, + pub rx: broadcast::Receiver, + pub tx: mpsc::Sender, + pub users: watch::Receiver>, + pub buffers: watch::Receiver>, } // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk #[derive(Debug)] pub struct Workspace { pub name: String, - pub buffers: watch::Receiver>, - pub bus: broadcast::Sender<(String, OperationSeq)>, - op_tx: mpsc::Sender, + + buffers: watch::Receiver>, + users: watch::Receiver>, + + pub bus: broadcast::Sender, + + buf_tx: mpsc::Sender, + usr_tx: mpsc::Sender, + run: watch::Sender, } impl Workspace { pub fn new(name: String) -> Self { - let (op_tx, mut op_rx) = mpsc::channel(32); + let (buf_tx, mut buf_rx) = mpsc::channel(32); + let (usr_tx, mut usr_rx) = mpsc::channel(32); let (stop_tx, stop_rx) = watch::channel(true); - let (buf_tx, buf_rx) = watch::channel(HashMap::new()); + let (buffer_tx, buffer_rx) = watch::channel(HashMap::new()); let (broadcast_tx, broadcast_rx) = broadcast::channel(32); + let (users_tx, users_rx) = watch::channel(HashMap::new()); let w = Workspace { name, run: stop_tx, - op_tx, - buffers: buf_rx, + buf_tx, + usr_tx, + buffers: buffer_rx, bus: broadcast_tx, + users: users_rx, }; tokio::spawn(async move { let mut buffers = HashMap::new(); while stop_rx.borrow().to_owned() { // TODO handle these errors!! - let action = op_rx.recv().await.unwrap(); + let action = buf_rx.recv().await.unwrap(); match action { BufferAction::ADD { buffer } => { buffers.insert(buffer.view().name.clone(), buffer); @@ -48,7 +67,7 @@ impl Workspace { buffers.remove(&name); } } - buf_tx.send( + buffer_tx.send( buffers.iter() .map(|(k, v)| (k.clone(), v.view())) .collect() @@ -58,6 +77,23 @@ impl Workspace { return w; } + + pub fn buffers_ref(&self) -> Ref> { + self.buffers.borrow() + } + + pub fn users_ref(&self) -> Ref> { + self.users.borrow() + } + + pub fn view(&self) -> WorkspaceView { + WorkspaceView { + rx: self.bus.subscribe(), + tx: self.buf_tx.clone(), + users: self.users.clone(), + buffers: self.buffers.clone(), + } + } } pub enum BufferAction { diff --git a/src/server/main.rs b/src/server/main.rs index 8ad6054..cc380f9 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -3,13 +3,13 @@ pub mod service; use std::sync::Arc; -use tracing::{debug, error, info, warn}; +use tracing::info; use tonic::transport::Server; use crate::{ actor::state::StateManager, - service::{buffer::BufferService, session::SessionService, workspace::WorkspaceService}, + service::{buffer::BufferService, workspace::WorkspaceService}, }; #[tokio::main] @@ -28,5 +28,6 @@ async fn main() -> Result<(), Box> { .serve(addr) .await?; + Ok(()) } diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs index 5bd1d17..6530d9e 100644 --- a/src/server/service/workspace.rs +++ b/src/server/service/workspace.rs @@ -37,7 +37,20 @@ impl Workspace for WorkspaceService { &self, req: Request, ) -> Result, Status> { - todo!() + let r = req.into_inner(); + match self.state.workspaces.borrow().get(&r.session_key) { + Some(w) => { + let out = Vec::new(); + for (_k, v) in w.buffers_ref().iter() { + out.push(v.name.clone()); + } + Ok(Response::new(BufferList { path: out })) + } + None => Err(Status::not_found(format!( + "No active workspace with session_key '{}'", + r.session_key + ))), + } } }