diff --git a/src/server/state.rs b/src/server/state.rs index 902dbf8..de9fb24 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc, watch}; +use tracing::error; use crate::workspace::Workspace; @@ -13,49 +14,52 @@ pub enum AlterState { REMOVE { key: String }, } +#[derive(Debug)] pub struct StateManager { - store: HashMap>, - rx: mpsc::Receiver, - tx: watch::Sender>> + pub workspaces: watch::Receiver>>, + pub op_tx: mpsc::Sender, // TODO make method for this + run: watch::Sender, +} + +impl Drop for StateManager { + fn drop(&mut self) { + self.run.send(false).unwrap_or_else(|e| { + error!("Could not stop StateManager worker: {:?}", e); + }) + } } impl StateManager { - pub fn new(rx: mpsc::Receiver, tx: watch::Sender>>) -> StateManager { - StateManager { - store: HashMap::new(), - rx, - tx - } - } + pub fn new() -> Self { + let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure + let (watch_tx, watch_rx) = watch::channel(HashMap::new()); + let (stop_tx, stop_rx) = watch::channel(true); - pub async fn run(mut self) { - loop { - if let Some(event) = self.rx.recv().await { - match event { - AlterState::ADD { key, w } => { - self.store.insert(key, Arc::new(w)); // TODO put in hashmap - }, - AlterState::REMOVE { key } => { - self.store.remove(&key); - }, + let s = StateManager { + workspaces: watch_rx, + op_tx: tx, + run: stop_tx, + }; + + tokio::spawn(async move { + let mut store = HashMap::new(); + while stop_rx.borrow().to_owned() { + if let Some(event) = rx.recv().await { + match event { + AlterState::ADD { key, w } => { + store.insert(key, Arc::new(w)); // TODO put in hashmap + }, + AlterState::REMOVE { key } => { + store.remove(&key); + }, + } + watch_tx.send(store.clone()).unwrap(); + } else { + break } - self.tx.send(self.store.clone()).unwrap(); - } else { - break } - } + }); + + return s; } } - - -pub fn run_state_manager() -> (mpsc::Sender, watch::Receiver>>) { - let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure - let (watch_tx, watch_rx) = watch::channel(HashMap::new()); - let state = StateManager::new(rx, watch_tx); - - let _task = tokio::spawn(async move { - state.run().await; - }); - - return (tx, watch_rx); -} diff --git a/src/server/workspace.rs b/src/server/workspace.rs index 9aec82b..c662260 100644 --- a/src/server/workspace.rs +++ b/src/server/workspace.rs @@ -1,7 +1,69 @@ -use std::sync::Arc; +use std::collections::HashMap; use operational_transform::OperationSeq; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, watch}; +use tracing::error; + +#[derive(Debug, Clone)] +pub struct BufferView { + pub name: String, + pub content: watch::Receiver, + op_tx: mpsc::Sender, +} + +impl BufferView { + pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError> { + self.op_tx.send(op).await + } +} + +#[derive(Debug)] +pub struct Buffer { + view: BufferView, + run: watch::Sender, +} + +impl Drop for Buffer { + fn drop(&mut self) { + self.run.send(false).unwrap_or_else(|e| { + error!("Could not stop Buffer worker task: {:?}", e); + }); + } +} + +impl Buffer { + pub fn new(name: String, bus: broadcast::Sender<(String, OperationSeq)>) -> Self { + let (op_tx, mut op_rx) = mpsc::channel(32); + let (stop_tx, stop_rx) = watch::channel(true); + let (content_tx, content_rx) = watch::channel(String::new()); + + let b = Buffer { + run: stop_tx, + view: BufferView { + name: name.clone(), + op_tx, + content: content_rx, + }, + }; + + tokio::spawn(async move { + let mut content = String::new(); + while stop_rx.borrow().to_owned() { + // TODO handle these errors!! + let op = op_rx.recv().await.unwrap(); + content = op.apply(content.as_str()).unwrap(); + bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed + content_tx.send(content.clone()).unwrap(); + } + }); + + return b; + } + + pub fn view(&self) -> BufferView { + return self.view.clone(); + } +} pub struct WorkspaceView { pub rx: broadcast::Receiver, @@ -12,47 +74,59 @@ pub struct WorkspaceView { #[derive(Debug)] pub struct Workspace { pub name: String, - pub content: String, - pub tx: mpsc::Sender, - w_tx: Arc>, + pub buffers: watch::Receiver>, + pub bus: broadcast::Sender<(String, OperationSeq)>, + op_tx: mpsc::Sender, + run: watch::Sender, } impl Workspace { - pub fn new( - name: String, - content: String, - tx: mpsc::Sender, - w_tx: Arc>, - ) -> Self { - Workspace { - name, - content, - tx, - w_tx, - } - } + pub fn new(name: String) -> Self { + let (op_tx, mut op_rx) = mpsc::channel(32); + let (stop_tx, stop_rx) = watch::channel(true); + let (buf_tx, buf_rx) = watch::channel(HashMap::new()); + let (broadcast_tx, broadcast_rx) = broadcast::channel(32); - pub fn view(&self) -> WorkspaceView { - WorkspaceView { - rx: self.w_tx.subscribe(), - tx: self.tx.clone(), - } + let w = Workspace { + name, + run: stop_tx, + op_tx, + buffers: buf_rx, + bus: broadcast_tx, + }; + + tokio::spawn(async move { + let mut buffers = HashMap::new(); + while stop_rx.borrow().to_owned() { + // TODO handle these errors!! + let action = op_rx.recv().await.unwrap(); + match action { + BufferAction::ADD { buffer } => { + buffers.insert(buffer.view.name.clone(), buffer); + } + BufferAction::REMOVE { name } => { + buffers.remove(&name); + } + } + buf_tx.send( + buffers.iter() + .map(|(k, v)| (k.clone(), v.view())) + .collect() + ).unwrap(); + } + }); + + return w; } } -pub async fn worker( - mut w: Workspace, - tx: Arc>, - mut rx: mpsc::Receiver, -) { - loop { - if let Some(op) = rx.recv().await { - w.content = op.apply(&w.content).unwrap(); - tx.send(op).unwrap(); - } else { - break; - } - } +pub enum BufferAction { + ADD { + buffer: Buffer, + }, + REMOVE { + name: String, // TODO remove by id? + }, } // impl Default for Workspace {