diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 0065e38..689f74c 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -56,7 +56,7 @@ pub(crate) struct BufferControllerInner { pub(crate) ops_in: mpsc::UnboundedSender, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, - pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender>)>, + pub(crate) delta_request: mpsc::Sender>>, pub(crate) callback: watch::Sender>>, pub(crate) ack_tx: mpsc::UnboundedSender, } @@ -93,7 +93,7 @@ impl AsyncReceiver for BufferController { } let (tx, rx) = oneshot::channel(); - self.0.delta_request.send((last_update, tx)).await?; + self.0.delta_request.send(tx).await?; Ok(rx.await?) } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 21a4746..0124107 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use diamond_types::list::encoding::EncodeOptions; use diamond_types::list::{Branch, OpLog}; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; @@ -25,7 +26,7 @@ struct BufferWorker { poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, - delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender>)>, + delta_req: mpsc::Receiver>>, controller: std::sync::Weak, callback: watch::Receiver>>, oplog: OpLog, @@ -107,20 +108,22 @@ impl BufferController { tokio::select! { biased; - // received a new poller, add it to collection - res = worker.poller.recv() => match res { - None => break tracing::error!("poller channel closed"), - Some(tx) => worker.pollers.push(tx), - }, - // received new change ack, merge editor branch up to that version res = worker.ack_rx.recv() => match res { None => break tracing::error!("ack channel closed"), Some(v) => { - worker.branch.merge(&worker.oplog, &v) + worker.branch.merge(&worker.oplog, &v); + worker.local_version.send(worker.branch.local_version()) + .unwrap_or_warn("could not ack local version"); }, }, + // received a new poller, add it to collection + res = worker.poller.recv() => match res { + None => break tracing::error!("poller channel closed"), + Some(tx) => worker.pollers.push(tx), + }, + // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), @@ -137,7 +140,7 @@ impl BufferController { // controller is ready to apply change and recv(), calculate it and send it back res = worker.delta_req.recv() => match res { None => break tracing::error!("no more active controllers: can't send changes"), - Some((last_ver, tx)) => worker.handle_delta_request(last_ver, tx).await, + Some(tx) => worker.handle_delta_request(tx).await, }, // received a request for full CRDT content @@ -145,8 +148,11 @@ impl BufferController { None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { worker.branch.merge(&worker.oplog, worker.oplog.local_version_ref()); + worker.local_version.send(worker.branch.local_version()) + .unwrap_or_warn("could not checkout local version"); let content = worker.branch.content().to_string(); - tx.send(content).unwrap_or_warn("checkout request dropped"); + tx.send(content) + .unwrap_or_warn("checkout request dropped"); }, } } @@ -184,7 +190,7 @@ impl BufferWorker { if change.is_delete() || change.is_insert() { tx.send(Operation { - data: self.oplog.encode_from(Default::default(), &last_ver), + data: self.oplog.encode_from(EncodeOptions::default(), &last_ver), }) .await .unwrap_or_warn("failed to send change!"); @@ -221,11 +227,8 @@ impl BufferWorker { } } - async fn handle_delta_request( - &mut self, - last_ver: LocalVersion, - tx: oneshot::Sender>, - ) { + async fn handle_delta_request(&mut self, tx: oneshot::Sender>) { + let last_ver = self.branch.local_version(); if let Some((lv, Some(dtop))) = self .oplog .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) @@ -235,8 +238,6 @@ impl BufferWorker { // this step_ver will be the version after we apply the operation // we give it to the controller so that he knows where it's at. let step_ver = self.oplog.version_union(&[lv.end - 1], &last_ver); - self.branch.merge(&self.oplog, &step_ver); - let new_local_v = self.branch.local_version(); let hash = if self.timer.step() { Some(crate::ext::hash(self.branch.content().to_string())) @@ -282,9 +283,6 @@ impl BufferWorker { }, }, }; - self.local_version - .send(new_local_v) - .unwrap_or_warn("could not update local version"); tx.send(Some(tc)) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else {