diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index c9acc64..0e65583 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -2,17 +2,13 @@ //! //! a controller implementation for buffer actions -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use operational_transform::OperationSeq; -use tokio::sync::broadcast::error::TryRecvError; -use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tokio::sync::{watch, mpsc, Mutex, oneshot}; use tonic::async_trait; use crate::errors::IgnorableError; use crate::{api::Controller, Error}; -use crate::buffer::factory::OperationFactory; use super::TextChange; @@ -36,26 +32,27 @@ use super::TextChange; pub struct BufferController { content: watch::Receiver, operations: mpsc::UnboundedSender, - last_op: Mutex>, - stream: Mutex>, + last_op: Mutex>, + stream: mpsc::UnboundedSender>>, stop: mpsc::UnboundedSender<()>, - operation_tick: Arc, } impl BufferController { pub(crate) fn new( content: watch::Receiver, operations: mpsc::UnboundedSender, - stream: Mutex>, + stream: mpsc::UnboundedSender>>, stop: mpsc::UnboundedSender<()>, - operation_tick: Arc, + last_op: Mutex>, ) -> Self { BufferController { - last_op: Mutex::new(content.clone()), - content, operations, stream, stop, - operation_tick, + last_op, content, operations, stream, stop, } } + + pub fn content(&self) -> String { + self.content.borrow().clone() + } } impl Drop for BufferController { @@ -64,13 +61,6 @@ impl Drop for BufferController { } } -#[async_trait] -impl OperationFactory for BufferController { - fn content(&self) -> String { - self.content.borrow().clone() - } -} - #[async_trait] impl Controller for BufferController { type Input = OperationSeq; @@ -80,28 +70,25 @@ impl Controller for BufferController { } fn try_recv(&self) -> Result, Error> { - match self.stream.blocking_lock().try_recv() { - Ok(op) => Ok(Some(op)), - Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), - Err(TryRecvError::Lagged(n)) => { - tracing::warn!("buffer channel lagged, skipping {} events", n); - Ok(self.try_recv()?) - }, - } + let (tx, rx) = oneshot::channel(); + self.stream.send(tx)?; + rx.blocking_recv() + .map_err(|_| Error::Channel { send: false }) } - /// receive an operation seq and transform it into a TextChange from buffer content async fn recv(&self) -> Result { - let op = self.stream.lock().await.recv().await?; - Ok(op) + self.poll().await?; + let (tx, rx) = oneshot::channel(); + self.stream.send(tx)?; + Ok( + rx.await + .map_err(|_| Error::Channel { send: false })? + .expect("empty channel after polling") + ) } /// enqueue an opseq for processing fn send(&self, op: OperationSeq) -> Result<(), Error> { - let tick = self.operation_tick.load(Ordering::Acquire); - self.operations.send(op)?; - self.operation_tick.store(tick + 1, Ordering::Release); - Ok(()) + Ok(self.operations.send(op)?) } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 044547e..6dde13c 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -6,15 +6,11 @@ //! this module contains buffer-related operations and helpers to create Operation Sequences //! (the underlying chunks of changes sent over the wire) -use std::ops::Range; - -pub(crate) mod worker; - /// buffer controller implementation pub mod controller; +pub(crate) mod worker; -pub use factory::OperationFactory; pub use controller::BufferController as Controller; @@ -24,7 +20,10 @@ pub use controller::BufferController as Controller; #[derive(Clone, Debug, Default)] pub struct TextChange { /// range of text change, as byte indexes in buffer - pub span: Range, + pub span: std::ops::Range, /// content of text change, as string pub content: String, + /// content after this text change + /// note that this field will probably be dropped, don't rely on it + pub after: String } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 49e364f..72accfa 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,12 +1,10 @@ -use std::sync::atomic::{AtomicU64, Ordering}; -use std::{sync::Arc, collections::VecDeque}; +use std::collections::VecDeque; use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tokio::sync::{watch, mpsc, oneshot, Mutex}; use tonic::transport::Channel; use tonic::{async_trait, Streaming}; -use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::api::controller::ControllerWorker; @@ -20,36 +18,54 @@ pub(crate) struct BufferControllerWorker { uid: String, content: watch::Sender, operations: mpsc::UnboundedReceiver, - stream: Arc>, + stream: mpsc::UnboundedReceiver>>, + stream_requestor: mpsc::UnboundedSender>>, receiver: watch::Receiver, sender: mpsc::UnboundedSender, buffer: String, path: String, stop: mpsc::UnboundedReceiver<()>, stop_control: mpsc::UnboundedSender<()>, - operation_tick: Arc, + new_op_tx: watch::Sender<()>, + new_op_rx: watch::Receiver<()>, } impl BufferControllerWorker { pub fn new(uid: String, buffer: &str, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); - let (s_tx, _s_rx) = broadcast::channel(64); + let (s_tx, s_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); + let (notx, norx) = watch::channel(()); BufferControllerWorker { uid, content: txt_tx, operations: op_rx, - stream: Arc::new(s_tx), + stream: s_rx, + stream_requestor: s_tx, receiver: txt_rx, sender: op_tx, buffer: buffer.to_string(), path: path.to_string(), stop: end_rx, stop_control: end_tx, - operation_tick: Arc::new(AtomicU64::new(0)), + new_op_tx: notx, + new_op_rx: norx, } } + + async fn send_op(&self, tx: &mut BufferClient, outbound: &OperationSeq) -> crate::Result<()> { + let opseq = serde_json::to_string(outbound).expect("could not serialize opseq"); + let req = OperationRequest { + path: self.path.clone(), + hash: format!("{:x}", md5::compute(&self.buffer)), + op: Some(RawOp { + opseq, user: self.uid.clone(), + }), + }; + let _ = tx.edit(req).await?; + Ok(()) + } } #[async_trait] @@ -62,128 +78,122 @@ impl ControllerWorker for BufferControllerWorker { BufferController::new( self.receiver.clone(), self.sender.clone(), - Mutex::new(self.stream.subscribe()), + self.stream_requestor.clone(), self.stop_control.clone(), - self.operation_tick.clone(), + Mutex::new(self.new_op_rx.clone()), ) } async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { let mut clientside : VecDeque = VecDeque::new(); let mut serverside : VecDeque = VecDeque::new(); - let mut last_seen_tick = 0; loop { // block until one of these is ready tokio::select! { - - // received a new message from server (or an error) - res = rx.message() => { - match res { - Err(e) => return tracing::error!("error receiving op from server: {}", e), - Ok(None) => return tracing::warn!("server closed operation stream"), - Ok(Some(msg)) => serverside.push_back( - serde_json::from_str(&msg.opseq) - .expect("could not deserialize server opseq") - ), - } - }, - - // received a new operation from client (or channel got closed) - res = self.operations.recv() => { - match res { - None => return tracing::warn!("client closed operation stream"), - Some(op) => { - clientside.push_back(op.clone()); - last_seen_tick = self.operation_tick.load(Ordering::Acquire); - } - } - }, + biased; // received a stop request (or channel got closed) res = self.stop.recv() => { + tracing::info!("received stop signal"); match res { None => return tracing::warn!("stop channel closed, stopping worker"), Some(()) => return tracing::debug!("buffer worker stopping cleanly"), } } - } - - // we must give priority to operations received from remote server, because we can transform - // our ops with server's ops but server won't transform its ops with ours. We must transform - // ALL enqueued client ops: if a new one arrived before we could transform and update, we - // should discard our progress and poll again. - while let Some(operation) = serverside.get(0).cloned() { - let mut transformed_op = operation.clone(); - let mut queued_ops = clientside.clone(); - let mut txt_before = self.buffer.clone(); - for op in queued_ops.iter_mut() { - txt_before = match op.apply(&txt_before) { - Ok(x) => x, - Err(_) => { tracing::error!("could not apply outgoing enqueued opseq to current buffer?"); break; }, + // received a new message from server (or an error) + res = rx.message() => { + tracing::info!("received msg from server"); + let inbound : OperationSeq = match res { + Err(e) => return tracing::error!("error receiving op from server: {}", e), + Ok(None) => return tracing::warn!("server closed operation stream"), + Ok(Some(msg)) => serde_json::from_str(&msg.opseq) + .expect("could not deserialize server opseq"), }; - (*op, transformed_op) = match op.transform(&transformed_op) { - Err(e) => { tracing::warn!("could not transform enqueued operation: {}", e); break; }, - Ok((x, y)) => (x, y), - }; - } - - let skip = leading_noop(transformed_op.ops()) as usize; - let tail = tailing_noop(transformed_op.ops()) as usize; - let span = skip..(transformed_op.base_len() - tail); - let after = transformed_op.apply(&txt_before).expect("could not apply transformed op"); - let change = TextChange { span, content: after[skip..after.len()-tail].to_string() }; - - let tick = self.operation_tick.load(std::sync::atomic::Ordering::Acquire); - if tick != last_seen_tick { - tracing::warn!("skipping downstream because there are ops"); - break - } // there are more ops to see first - clientside = queued_ops; - self.buffer = match operation.apply(&self.buffer) { - Ok(x) => x, - Err(_) => { tracing::error!("wtf received op could not be applied?"); break; }, - }; - if clientside.is_empty() { - self.content.send(self.buffer.clone()).expect("could not broadcast new buffer content"); - } - self.stream.send(change) - .unwrap_or_warn("could not send operation to server"); - serverside.pop_front(); - } - - // if there are still serverside operations to be applied, we can't dispatch our local ones - // yet because we need them to transform the ones sent by the server before applying them on - // our local buffer. We may get here if a new local operation arrived before we could process - // and transform all received server ops. since the buffer is different, it isn't safe to - // apply them and we must transform them again. If the loop above left its queue not empty, - // we should be guaranteed to unblock immediately in the select above because we have a new - // client operation waiting for us to be enqueued - if serverside.is_empty() { - while let Some(op) = clientside.get(0) { - let opseq = serde_json::to_string(&op).expect("could not serialize opseq"); - let req = OperationRequest { - path: self.path.clone(), - hash: format!("{:x}", md5::compute(&self.buffer)), - op: Some(RawOp { - opseq, user: self.uid.clone(), - }), - }; - if let Err(e) = tx.edit(req).await { - tracing::warn!("server rejected operation: {}", e); - break; + self.buffer = inbound.apply(&self.buffer).expect("could not apply remote opseq???"); + serverside.push_back(inbound); + while let Some(mut outbound) = clientside.get(0).cloned() { + let mut serverside_tmp = serverside.clone(); + for server_op in serverside_tmp.iter_mut() { + tracing::info!("transforming {:?} <-> {:?}", outbound, server_op); + (outbound, *server_op) = outbound.transform(server_op) + .expect("could not transform enqueued out with just received"); + } + match self.send_op(&mut tx, &outbound).await { + Err(e) => { tracing::warn!("could not send op even after transforming: {}", e); break; }, + Ok(()) => { + tracing::info!("back in sync"); + serverside = serverside_tmp; + self.buffer = outbound.apply(&self.buffer).expect("could not apply op after synching back"); + clientside.pop_front(); + }, + } } - self.buffer = match op.apply(&self.buffer) { - Ok(x) => x, - Err(_) => { tracing::error!("wtf accepted remote op could not be applied to our buffer????"); break; }, - }; self.content.send(self.buffer.clone()).expect("could not broadcast buffer update"); - clientside.pop_front(); - } - } else { - tracing::warn!("skipping upstream because there are ops"); + self.new_op_tx.send(()).expect("could not activate client after new server event"); + }, + + // received a new operation from client (or channel got closed) + res = self.operations.recv() => { + tracing::info!("received op from client"); + match res { + None => return tracing::warn!("client closed operation stream"), + Some(op) => { + if clientside.is_empty() { + match self.send_op(&mut tx, &op).await { + Ok(()) => { + self.buffer = op.apply(&self.buffer).expect("could not apply op"); + self.content.send(self.buffer.clone()).expect("could not update buffer view"); + }, + Err(e) => { + tracing::warn!("server rejected op: {}", e); + clientside.push_back(op); + }, + } + } else { // I GET STUCK IN THIS BRANCH AND NOTHING HAPPENS AAAAAAAAAA + clientside.push_back(op); + } + } + } + }, + + // client requested a server operation, transform it and send it + res = self.stream.recv() => { + tracing::info!("received op REQUEST from client"); + match res { + None => return tracing::error!("client closed requestor stream"), + Some(tx) => tx.send(match serverside.pop_front() { + None => { + tracing::warn!("requested change but none is available"); + None + }, + Some(mut operation) => { + let mut after = self.buffer.clone(); + for op in clientside.iter_mut() { + (*op, operation) = match op.transform(&operation) { + Err(e) => return tracing::warn!("could not transform enqueued operation: {}", e), + Ok((x, y)) => (x, y), + }; + after = match op.apply(&after) { + Err(_) => return tracing::error!("could not apply outgoing enqueued opseq to current buffer?"), + Ok(x) => x, + }; + } + + let skip = leading_noop(operation.ops()) as usize; + let tail = tailing_noop(operation.ops()) as usize; + let span = skip..(operation.base_len() - tail); + let content = if after.len() - tail < skip { "".into() } else { after[skip..after.len()-tail].to_string() }; + let change = TextChange { span, content, after }; + + Some(change) + }, + }).expect("client did not wait????"), + } + }, + } } }