diff --git a/src/server/actor/buffer.rs b/src/server/actor/buffer.rs index c6d9fd5..2526ea4 100644 --- a/src/server/actor/buffer.rs +++ b/src/server/actor/buffer.rs @@ -2,7 +2,10 @@ use operational_transform::OperationSeq; use tokio::sync::{broadcast, mpsc, watch}; use tracing::error; +use crate::events::Event; + #[derive(Debug, Clone)] +/// A view of a buffer, with references to access value and send operations pub struct BufferView { pub name: String, pub content: watch::Receiver, @@ -30,7 +33,7 @@ impl Drop for Buffer { } impl Buffer { - pub fn new(name: String, bus: broadcast::Sender<(String, OperationSeq)>) -> Self { + pub fn new(name: String, bus: broadcast::Sender) -> Self { let (op_tx, mut op_rx) = mpsc::channel(32); let (stop_tx, stop_rx) = watch::channel(true); let (content_tx, content_rx) = watch::channel(String::new()); @@ -50,7 +53,7 @@ impl Buffer { // TODO handle these errors!! let op = op_rx.recv().await.unwrap(); content = op.apply(content.as_str()).unwrap(); - bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed + // bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed content_tx.send(content.clone()).unwrap(); } }); diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs index aebb868..3d9142b 100644 --- a/src/server/actor/state.rs +++ b/src/server/actor/state.rs @@ -7,22 +7,22 @@ use crate::actor::workspace::Workspace; #[derive(Debug, Clone)] pub struct UserCursor{ - // buffer: i64, - // x: i32, - // y: i32 + pub buffer: i64, + pub x: i32, + pub y: i32 } #[derive(Debug, Clone)] pub struct User { - // name: String, - // cursor: UserCursor, + pub name: String, + pub cursor: UserCursor, } #[derive(Debug)] pub enum AlterState { ADD { key: String, - w: Workspace + // w: Workspace }, REMOVE { key: String @@ -63,9 +63,9 @@ impl StateManager { while stop_rx.borrow().to_owned() { if let Some(event) = rx.recv().await { match event { - AlterState::ADD { key, w } => { - store.insert(key, Arc::new(w)); // TODO put in hashmap - workspaces_tx.send(store.clone()).unwrap(); + AlterState::ADD { key/*, w */} => { + // store.insert(key, Arc::new(w)); // TODO put in hashmap + // workspaces_tx.send(store.clone()).unwrap(); }, AlterState::REMOVE { key } => { store.remove(&key); diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs index 18149fe..8f64b97 100644 --- a/src/server/service/buffer.rs +++ b/src/server/service/buffer.rs @@ -23,6 +23,7 @@ use tonic::Streaming; //use futures::{Stream, StreamExt}; use crate::actor::{buffer::BufferView, state::StateManager}; +use crate::events::Event; use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2! @@ -44,7 +45,7 @@ async fn buffer_worker( bv: BufferView, mut client_rx: Streaming, tx_client: mpsc::Sender>, - mut rx_core: broadcast::Receiver<(String, OperationSeq)>, + mut rx_core: broadcast::Receiver, ) { let mut queue: VecDeque = VecDeque::new(); loop { @@ -77,7 +78,7 @@ async fn buffer_worker( } } if send_op { - tx_client.send(Ok(op_net(&oop.1))).await.unwrap(); + // tx_client.send(Ok(op_net(&oop.1))).await.unwrap(); } } } @@ -115,7 +116,7 @@ impl Buffer for BufferService { let in_stream = req.into_inner(); let (tx_og, rx) = mpsc::channel::>(128); - let b: BufferView = workspace.buffers_ref().get(&path).unwrap().clone(); + let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); let w = workspace.clone(); tokio::spawn(async move { buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await;