diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs index 30f2a26..5f204af 100644 --- a/src/server/actor/workspace.rs +++ b/src/server/actor/workspace.rs @@ -1,114 +1,202 @@ use std::collections::HashMap; -use operational_transform::OperationSeq; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; use tracing::warn; -use super::{buffer::{BufferView, Buffer}, state::User}; +use crate::events::Event; -type Event = (String, OperationSeq); // TODO jank! +use super::{buffer::{BufferView, Buffer}, state::{User, UserCursor}}; -pub enum UserAction { - ADD {}, - REMOVE {}, +#[derive(Debug, Clone)] +pub struct UsersView { + watch: watch::Receiver>, + op: mpsc::Sender, +} + +impl UsersView { // TODO don't unwrap everything! + pub fn borrow(&self) -> Ref> { + return self.watch.borrow(); + } + + pub async fn add(&mut self, user: User) { + self.op.send(UserAction::ADD{ user }).await.unwrap(); + } + + pub async fn remove(&mut self, name: String) { + self.op.send(UserAction::REMOVE{ name }).await.unwrap(); + } + + pub async fn update(&mut self, user_name: String, cursor: UserCursor) { + self.op.send(UserAction::CURSOR { name: user_name, cursor }).await.unwrap(); + } +} + +#[derive(Debug, Clone)] +pub struct BuffersTreeView { + watch: watch::Receiver>, + op: mpsc::Sender, +} + +impl BuffersTreeView { + pub fn borrow(&self) -> Ref> { + return self.watch.borrow(); + } + + pub async fn add(&mut self, buffer: Buffer) { + self.op.send(BufferAction::ADD { buffer }).await.unwrap(); + } + + pub async fn remove(&mut self, path: String) { + self.op.send(BufferAction::REMOVE { path }).await.unwrap(); + } } pub struct WorkspaceView { - pub rx: broadcast::Receiver, - pub tx: mpsc::Sender, - pub users: watch::Receiver>, - pub buffers: watch::Receiver>, + rx: broadcast::Receiver, + pub users: UsersView, + pub buffers: BuffersTreeView, +} + +impl WorkspaceView { + pub async fn event(&mut self) -> Result { + self.rx.recv().await + } } // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk #[derive(Debug)] pub struct Workspace { pub name: String, - - buffers: watch::Receiver>, - users: watch::Receiver>, - pub bus: broadcast::Sender, - buf_tx: mpsc::Sender, - pub usr_tx: mpsc::Sender, + pub buffers: BuffersTreeView, + pub users: UsersView, - run: watch::Sender, + run_tx: watch::Sender, + run_rx: watch::Receiver, } impl Drop for Workspace { fn drop(&mut self) { - self.run.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e)); + self.run_tx.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e)); } } impl Workspace { pub fn new(name: String) -> Self { - let (buf_tx, mut buf_rx) = mpsc::channel(32); - let (usr_tx, mut _usr_rx) = mpsc::channel(32); - let (stop_tx, stop_rx) = watch::channel(true); - let (buffer_tx, buffer_rx) = watch::channel(HashMap::new()); - let (broadcast_tx, _broadcast_rx) = broadcast::channel(32); - let (_users_tx, users_rx) = watch::channel(HashMap::new()); + let (op_buf_tx, mut op_buf_rx) = mpsc::channel::(32); + let (op_usr_tx, mut op_usr_rx) = mpsc::channel::(32); + let (run_tx, run_rx) = watch::channel::(true); + let (buffer_tx, buffer_rx) = watch::channel::>(HashMap::new()); + let (users_tx, users_rx) = watch::channel(HashMap::new()); + let (broadcast_tx, _broadcast_rx) = broadcast::channel::(32); let w = Workspace { name, - run: stop_tx, - buf_tx, - usr_tx, - buffers: buffer_rx, bus: broadcast_tx, - users: users_rx, + buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx }, + users: UsersView{ op: op_usr_tx, watch: users_rx }, + run_tx, + run_rx, }; + w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users + w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers + + return w; + } + + fn buffers_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>) { + let bus = self.bus.clone(); + let run = self.run_rx.clone(); tokio::spawn(async move { - let mut buffers = HashMap::new(); - while stop_rx.borrow().to_owned() { + let mut buffers : HashMap = HashMap::new(); + + while run.borrow().to_owned() { // TODO handle these errors!! - let action = buf_rx.recv().await.unwrap(); + let action = rx.recv().await.unwrap(); match action { BufferAction::ADD { buffer } => { buffers.insert(buffer.view().name.clone(), buffer); } - BufferAction::REMOVE { name } => { - buffers.remove(&name); + BufferAction::REMOVE { path } => { + buffers.remove(&path); } } - buffer_tx.send( + tx.send( buffers.iter() .map(|(k, v)| (k.clone(), v.view())) .collect() ).unwrap(); } }); - - return w; } - pub fn buffers_ref(&self) -> Ref> { - self.buffers.borrow() - } + fn users_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>) { + let bus = self.bus.clone(); + let run = self.run_rx.clone(); + tokio::spawn(async move { + let mut users : HashMap = HashMap::new(); - pub fn users_ref(&self) -> Ref> { - self.users.borrow() + while run.borrow().to_owned() { + match rx.recv().await.unwrap() { + UserAction::ADD { user } => { + users.insert(user.name.clone(), user); + }, + UserAction::REMOVE { name } => { + if let None = users.remove(&name) { + continue; // don't update channel since this was a no-op + } + }, + UserAction::CURSOR { name, cursor } => { + if let Some(user) = users.get_mut(&name) { + user.cursor = cursor.clone(); + bus.send(Event::Cursor{user: name, cursor}).unwrap(); + } else { + continue; // don't update channel since this was a no-op + } + }, + } + + tx.send( + users.iter() + .map(|(k, u)| (k.clone(), u.clone())) + .collect() + ).unwrap(); + } + }); } pub fn view(&self) -> WorkspaceView { WorkspaceView { rx: self.bus.subscribe(), - tx: self.buf_tx.clone(), users: self.users.clone(), buffers: self.buffers.clone(), } } } +#[derive(Debug)] +pub enum UserAction { + ADD { + user: User, + }, + REMOVE { + name: String, + }, + CURSOR { + name: String, + cursor: UserCursor, + }, +} + +#[derive(Debug)] pub enum BufferAction { ADD { buffer: Buffer, }, REMOVE { - name: String, // TODO remove by id? + path: String, // TODO remove by id? }, }