From 8316439a3e7c489bb6550ebbd750de26c51a1f31 Mon Sep 17 00:00:00 2001 From: alemidev Date: Sun, 31 Jul 2022 13:46:21 +0200 Subject: [PATCH] feat: moved state managers under 'actor' dir Since we're building (sort of) around the actor model, might as well sort stuff that way. --- src/server/actor/buffer.rs | 64 +++++++++++++++ src/server/actor/mod.rs | 3 + src/server/{ => actor}/state.rs | 2 +- src/server/actor/workspace.rs | 71 ++++++++++++++++ src/server/workspace.rs | 139 -------------------------------- 5 files changed, 139 insertions(+), 140 deletions(-) create mode 100644 src/server/actor/buffer.rs create mode 100644 src/server/actor/mod.rs rename src/server/{ => actor}/state.rs (97%) create mode 100644 src/server/actor/workspace.rs delete mode 100644 src/server/workspace.rs diff --git a/src/server/actor/buffer.rs b/src/server/actor/buffer.rs new file mode 100644 index 0000000..c6d9fd5 --- /dev/null +++ b/src/server/actor/buffer.rs @@ -0,0 +1,64 @@ +use operational_transform::OperationSeq; +use tokio::sync::{broadcast, mpsc, watch}; +use tracing::error; + +#[derive(Debug, Clone)] +pub struct BufferView { + pub name: String, + pub content: watch::Receiver, + op_tx: mpsc::Sender, +} + +impl BufferView { + pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError> { + self.op_tx.send(op).await + } +} + +#[derive(Debug)] +pub struct Buffer { + view: BufferView, + run: watch::Sender, +} + +impl Drop for Buffer { + fn drop(&mut self) { + self.run.send(false).unwrap_or_else(|e| { + error!("Could not stop Buffer worker task: {:?}", e); + }); + } +} + +impl Buffer { + pub fn new(name: String, bus: broadcast::Sender<(String, OperationSeq)>) -> Self { + let (op_tx, mut op_rx) = mpsc::channel(32); + let (stop_tx, stop_rx) = watch::channel(true); + let (content_tx, content_rx) = watch::channel(String::new()); + + let b = Buffer { + run: stop_tx, + view: BufferView { + name: name.clone(), + op_tx, + content: content_rx, + }, + }; + + tokio::spawn(async move { + let mut content = String::new(); + while stop_rx.borrow().to_owned() { + // TODO handle these errors!! + let op = op_rx.recv().await.unwrap(); + content = op.apply(content.as_str()).unwrap(); + bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed + content_tx.send(content.clone()).unwrap(); + } + }); + + return b; + } + + pub fn view(&self) -> BufferView { + return self.view.clone(); + } +} diff --git a/src/server/actor/mod.rs b/src/server/actor/mod.rs new file mode 100644 index 0000000..42110ea --- /dev/null +++ b/src/server/actor/mod.rs @@ -0,0 +1,3 @@ +pub mod buffer; +pub mod workspace; +pub mod state; diff --git a/src/server/state.rs b/src/server/actor/state.rs similarity index 97% rename from src/server/state.rs rename to src/server/actor/state.rs index de9fb24..f1cac64 100644 --- a/src/server/state.rs +++ b/src/server/actor/state.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc, watch}; use tracing::error; -use crate::workspace::Workspace; +use crate::actor::workspace::Workspace; #[derive(Debug)] pub enum AlterState { diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs new file mode 100644 index 0000000..d961c7a --- /dev/null +++ b/src/server/actor/workspace.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; + +use operational_transform::OperationSeq; +use tokio::sync::{broadcast, mpsc, watch}; + +use super::buffer::{BufferView, Buffer}; + +pub struct WorkspaceView { + pub rx: broadcast::Receiver, + pub tx: mpsc::Sender, +} + +// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk +#[derive(Debug)] +pub struct Workspace { + pub name: String, + pub buffers: watch::Receiver>, + pub bus: broadcast::Sender<(String, OperationSeq)>, + op_tx: mpsc::Sender, + run: watch::Sender, +} + +impl Workspace { + pub fn new(name: String) -> Self { + let (op_tx, mut op_rx) = mpsc::channel(32); + let (stop_tx, stop_rx) = watch::channel(true); + let (buf_tx, buf_rx) = watch::channel(HashMap::new()); + let (broadcast_tx, broadcast_rx) = broadcast::channel(32); + + let w = Workspace { + name, + run: stop_tx, + op_tx, + buffers: buf_rx, + bus: broadcast_tx, + }; + + tokio::spawn(async move { + let mut buffers = HashMap::new(); + while stop_rx.borrow().to_owned() { + // TODO handle these errors!! + let action = op_rx.recv().await.unwrap(); + match action { + BufferAction::ADD { buffer } => { + buffers.insert(buffer.view().name.clone(), buffer); + } + BufferAction::REMOVE { name } => { + buffers.remove(&name); + } + } + buf_tx.send( + buffers.iter() + .map(|(k, v)| (k.clone(), v.view())) + .collect() + ).unwrap(); + } + }); + + return w; + } +} + +pub enum BufferAction { + ADD { + buffer: Buffer, + }, + REMOVE { + name: String, // TODO remove by id? + }, +} + diff --git a/src/server/workspace.rs b/src/server/workspace.rs deleted file mode 100644 index c662260..0000000 --- a/src/server/workspace.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::collections::HashMap; - -use operational_transform::OperationSeq; -use tokio::sync::{broadcast, mpsc, watch}; -use tracing::error; - -#[derive(Debug, Clone)] -pub struct BufferView { - pub name: String, - pub content: watch::Receiver, - op_tx: mpsc::Sender, -} - -impl BufferView { - pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError> { - self.op_tx.send(op).await - } -} - -#[derive(Debug)] -pub struct Buffer { - view: BufferView, - run: watch::Sender, -} - -impl Drop for Buffer { - fn drop(&mut self) { - self.run.send(false).unwrap_or_else(|e| { - error!("Could not stop Buffer worker task: {:?}", e); - }); - } -} - -impl Buffer { - pub fn new(name: String, bus: broadcast::Sender<(String, OperationSeq)>) -> Self { - let (op_tx, mut op_rx) = mpsc::channel(32); - let (stop_tx, stop_rx) = watch::channel(true); - let (content_tx, content_rx) = watch::channel(String::new()); - - let b = Buffer { - run: stop_tx, - view: BufferView { - name: name.clone(), - op_tx, - content: content_rx, - }, - }; - - tokio::spawn(async move { - let mut content = String::new(); - while stop_rx.borrow().to_owned() { - // TODO handle these errors!! - let op = op_rx.recv().await.unwrap(); - content = op.apply(content.as_str()).unwrap(); - bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed - content_tx.send(content.clone()).unwrap(); - } - }); - - return b; - } - - pub fn view(&self) -> BufferView { - return self.view.clone(); - } -} - -pub struct WorkspaceView { - pub rx: broadcast::Receiver, - pub tx: mpsc::Sender, -} - -// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk -#[derive(Debug)] -pub struct Workspace { - pub name: String, - pub buffers: watch::Receiver>, - pub bus: broadcast::Sender<(String, OperationSeq)>, - op_tx: mpsc::Sender, - run: watch::Sender, -} - -impl Workspace { - pub fn new(name: String) -> Self { - let (op_tx, mut op_rx) = mpsc::channel(32); - let (stop_tx, stop_rx) = watch::channel(true); - let (buf_tx, buf_rx) = watch::channel(HashMap::new()); - let (broadcast_tx, broadcast_rx) = broadcast::channel(32); - - let w = Workspace { - name, - run: stop_tx, - op_tx, - buffers: buf_rx, - bus: broadcast_tx, - }; - - tokio::spawn(async move { - let mut buffers = HashMap::new(); - while stop_rx.borrow().to_owned() { - // TODO handle these errors!! - let action = op_rx.recv().await.unwrap(); - match action { - BufferAction::ADD { buffer } => { - buffers.insert(buffer.view.name.clone(), buffer); - } - BufferAction::REMOVE { name } => { - buffers.remove(&name); - } - } - buf_tx.send( - buffers.iter() - .map(|(k, v)| (k.clone(), v.view())) - .collect() - ).unwrap(); - } - }); - - return w; - } -} - -pub enum BufferAction { - ADD { - buffer: Buffer, - }, - REMOVE { - name: String, // TODO remove by id? - }, -} - -// impl Default for Workspace { -// fn default() -> Self { -// Workspace { -// name: "fuck you".to_string(), -// content: "too".to_string(), -// } -// } -// }