diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index d394717..8bafd61 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -19,17 +19,21 @@ use super::controller::BufferController; pub(crate) struct BufferControllerWorker { uid: String, + name: String, + buffer: Woot, content: watch::Sender, operations: mpsc::UnboundedReceiver, - receiver: watch::Receiver, - sender: mpsc::UnboundedSender, - buffer: Woot, - name: String, - stop: mpsc::UnboundedReceiver<()>, - stop_control: mpsc::UnboundedSender<()>, - poller_rx: mpsc::Receiver>, - poller_tx: mpsc::Sender>, + poller: mpsc::Receiver>, pollers: Vec>, + handles: ClonableHandlesForController, + stop: mpsc::UnboundedReceiver<()>, +} + +struct ClonableHandlesForController { + operations: mpsc::UnboundedSender, + poller: mpsc::Sender>, + stop: mpsc::UnboundedSender<()>, + content: watch::Receiver, } impl BufferControllerWorker { @@ -42,16 +46,20 @@ impl BufferControllerWorker { uid.hash(&mut hasher); let site_id = hasher.finish() as usize; BufferControllerWorker { - uid, poller_rx, poller_tx, - pollers: Vec::new(), + uid, + name: path.to_string(), + buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! content: txt_tx, operations: op_rx, - receiver: txt_rx, - sender: op_tx, - buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! - name: path.to_string(), + poller: poller_rx, + pollers: Vec::new(), + handles: ClonableHandlesForController { + operations: op_tx, + poller: poller_tx, + stop: end_tx, + content: txt_rx, + }, stop: end_rx, - stop_control: end_tx, } } @@ -78,10 +86,10 @@ impl ControllerWorker for BufferControllerWorker { fn subscribe(&self) -> BufferController { BufferController::new( self.name.clone(), - self.receiver.clone(), - self.sender.clone(), - self.poller_tx.clone(), - self.stop_control.clone(), + self.handles.content.clone(), + self.handles.operations.clone(), + self.handles.poller.clone(), + self.handles.stop.clone(), ) } @@ -95,7 +103,7 @@ impl ControllerWorker for BufferControllerWorker { _ = self.stop.recv() => break, // received a new poller, add it to collection - res = self.poller_rx.recv() => match res { + res = self.poller.recv() => match res { None => break tracing::error!("poller channel closed"), Some(tx) => self.pollers.push(tx), },