feat: added buffer object, reworked objects with internal worker

This commit is contained in:
əlemi 2022-07-30 03:02:04 +02:00
parent 97e9b1f737
commit 60e6f4640c
2 changed files with 151 additions and 73 deletions

View file

@ -1,6 +1,7 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tracing::error;
use crate::workspace::Workspace; use crate::workspace::Workspace;
@ -13,49 +14,52 @@ pub enum AlterState {
REMOVE { key: String }, REMOVE { key: String },
} }
#[derive(Debug)]
pub struct StateManager { pub struct StateManager {
store: HashMap<String, Arc<Workspace>>, pub workspaces: watch::Receiver<HashMap<String, Arc<Workspace>>>,
rx: mpsc::Receiver<AlterState>, pub op_tx: mpsc::Sender<AlterState>, // TODO make method for this
tx: watch::Sender<HashMap<String, Arc<Workspace>>> run: watch::Sender<bool>,
}
impl Drop for StateManager {
fn drop(&mut self) {
self.run.send(false).unwrap_or_else(|e| {
error!("Could not stop StateManager worker: {:?}", e);
})
}
} }
impl StateManager { impl StateManager {
pub fn new(rx: mpsc::Receiver<AlterState>, tx: watch::Sender<HashMap<String, Arc<Workspace>>>) -> StateManager { pub fn new() -> Self {
StateManager { let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure
store: HashMap::new(), let (watch_tx, watch_rx) = watch::channel(HashMap::new());
rx, let (stop_tx, stop_rx) = watch::channel(true);
tx
}
}
pub async fn run(mut self) { let s = StateManager {
loop { workspaces: watch_rx,
if let Some(event) = self.rx.recv().await { op_tx: tx,
run: stop_tx,
};
tokio::spawn(async move {
let mut store = HashMap::new();
while stop_rx.borrow().to_owned() {
if let Some(event) = rx.recv().await {
match event { match event {
AlterState::ADD { key, w } => { AlterState::ADD { key, w } => {
self.store.insert(key, Arc::new(w)); // TODO put in hashmap store.insert(key, Arc::new(w)); // TODO put in hashmap
}, },
AlterState::REMOVE { key } => { AlterState::REMOVE { key } => {
self.store.remove(&key); store.remove(&key);
}, },
} }
self.tx.send(self.store.clone()).unwrap(); watch_tx.send(store.clone()).unwrap();
} else { } else {
break break
} }
} }
}
}
pub fn run_state_manager() -> (mpsc::Sender<AlterState>, watch::Receiver<HashMap<String, Arc<Workspace>>>) {
let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure
let (watch_tx, watch_rx) = watch::channel(HashMap::new());
let state = StateManager::new(rx, watch_tx);
let _task = tokio::spawn(async move {
state.run().await;
}); });
return (tx, watch_rx); return s;
}
} }

View file

@ -1,7 +1,69 @@
use std::sync::Arc; use std::collections::HashMap;
use operational_transform::OperationSeq; use operational_transform::OperationSeq;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc, watch};
use tracing::error;
#[derive(Debug, Clone)]
pub struct BufferView {
pub name: String,
pub content: watch::Receiver<String>,
op_tx: mpsc::Sender<OperationSeq>,
}
impl BufferView {
pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError<OperationSeq>> {
self.op_tx.send(op).await
}
}
#[derive(Debug)]
pub struct Buffer {
view: BufferView,
run: watch::Sender<bool>,
}
impl Drop for Buffer {
fn drop(&mut self) {
self.run.send(false).unwrap_or_else(|e| {
error!("Could not stop Buffer worker task: {:?}", e);
});
}
}
impl Buffer {
pub fn new(name: String, bus: broadcast::Sender<(String, OperationSeq)>) -> Self {
let (op_tx, mut op_rx) = mpsc::channel(32);
let (stop_tx, stop_rx) = watch::channel(true);
let (content_tx, content_rx) = watch::channel(String::new());
let b = Buffer {
run: stop_tx,
view: BufferView {
name: name.clone(),
op_tx,
content: content_rx,
},
};
tokio::spawn(async move {
let mut content = String::new();
while stop_rx.borrow().to_owned() {
// TODO handle these errors!!
let op = op_rx.recv().await.unwrap();
content = op.apply(content.as_str()).unwrap();
bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed
content_tx.send(content.clone()).unwrap();
}
});
return b;
}
pub fn view(&self) -> BufferView {
return self.view.clone();
}
}
pub struct WorkspaceView { pub struct WorkspaceView {
pub rx: broadcast::Receiver<OperationSeq>, pub rx: broadcast::Receiver<OperationSeq>,
@ -12,47 +74,59 @@ pub struct WorkspaceView {
#[derive(Debug)] #[derive(Debug)]
pub struct Workspace { pub struct Workspace {
pub name: String, pub name: String,
pub content: String, pub buffers: watch::Receiver<HashMap<String, BufferView>>,
pub tx: mpsc::Sender<OperationSeq>, pub bus: broadcast::Sender<(String, OperationSeq)>,
w_tx: Arc<broadcast::Sender<OperationSeq>>, op_tx: mpsc::Sender<BufferAction>,
run: watch::Sender<bool>,
} }
impl Workspace { impl Workspace {
pub fn new( pub fn new(name: String) -> Self {
name: String, let (op_tx, mut op_rx) = mpsc::channel(32);
content: String, let (stop_tx, stop_rx) = watch::channel(true);
tx: mpsc::Sender<OperationSeq>, let (buf_tx, buf_rx) = watch::channel(HashMap::new());
w_tx: Arc<broadcast::Sender<OperationSeq>>, let (broadcast_tx, broadcast_rx) = broadcast::channel(32);
) -> Self {
Workspace {
name,
content,
tx,
w_tx,
}
}
pub fn view(&self) -> WorkspaceView { let w = Workspace {
WorkspaceView { name,
rx: self.w_tx.subscribe(), run: stop_tx,
tx: self.tx.clone(), op_tx,
buffers: buf_rx,
bus: broadcast_tx,
};
tokio::spawn(async move {
let mut buffers = HashMap::new();
while stop_rx.borrow().to_owned() {
// TODO handle these errors!!
let action = op_rx.recv().await.unwrap();
match action {
BufferAction::ADD { buffer } => {
buffers.insert(buffer.view.name.clone(), buffer);
} }
BufferAction::REMOVE { name } => {
buffers.remove(&name);
}
}
buf_tx.send(
buffers.iter()
.map(|(k, v)| (k.clone(), v.view()))
.collect()
).unwrap();
}
});
return w;
} }
} }
pub async fn worker( pub enum BufferAction {
mut w: Workspace, ADD {
tx: Arc<broadcast::Sender<OperationSeq>>, buffer: Buffer,
mut rx: mpsc::Receiver<OperationSeq>, },
) { REMOVE {
loop { name: String, // TODO remove by id?
if let Some(op) = rx.recv().await { },
w.content = op.apply(&w.content).unwrap();
tx.send(op).unwrap();
} else {
break;
}
}
} }
// impl Default for Workspace { // impl Default for Workspace {