diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 3250e7e..239a38b 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -36,7 +36,9 @@ impl BufferController { pub async fn content(&self) -> crate::Result { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; - Ok(rx.await?) + let content = rx.await?; + self.0.last_update.set(self.0.latest_version.borrow().clone()); + Ok(content) } } @@ -45,7 +47,7 @@ pub(crate) struct BufferControllerInner { pub(crate) name: String, pub(crate) latest_version: watch::Receiver, pub(crate) last_update: InternallyMutable, - pub(crate) ops_in: mpsc::UnboundedSender, + pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, @@ -86,9 +88,12 @@ impl Controller for BufferController { /// enqueue a text change for processing /// this also updates internal buffer previous state - fn send(&self, op: TextChange) -> crate::Result<()> { + async fn send(&self, op: TextChange) -> crate::Result<()> { // we let the worker do the updating to the last version and send it back. - Ok(self.0.ops_in.send(op)?) + let (tx, rx) = oneshot::channel(); + self.0.ops_in.send((op, tx))?; + self.0.last_update.set(rx.await?); + Ok(()) } fn stop(&self) -> bool { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 99776b8..82ec367 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -17,7 +17,7 @@ use super::controller::{BufferController, BufferControllerInner}; pub(crate) struct BufferWorker { user_id: Uuid, latest_version: watch::Sender, - ops_in: mpsc::UnboundedReceiver, + ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, @@ -96,21 +96,21 @@ impl ControllerWorker for BufferWorker { // received a text change from editor res = self.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some(change) => { + Some((change, ack)) => { let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string()); let last_ver = oplog.local_version(); if change.is_insert() { - oplog.add_insert(agent_id, change.start as usize, &change.content) + branch.insert(&mut oplog, agent_id, change.start as usize, &change.content) } else if change.is_delete() { - oplog.add_delete_without_content(1, change.span()) + branch.delete_without_content(&mut oplog, 1, change.span()) } else { continue; }; tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await .unwrap_or_warn("failed to send change!"); self.latest_version.send(oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); - + ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); }, }, @@ -141,11 +141,10 @@ impl ControllerWorker for BufferWorker { // this step_ver will be the version after we apply the operation // we give it to the controller so that he knows where it's at. let step_ver = oplog.version_union(&[lv.start], &last_ver); + branch.merge(&oplog, &step_ver); + let new_local_v = branch.local_version(); - // moved the merging inside as we only need - // an up to date content when we hash the content. let hash = if timer.step() { - branch.merge(&oplog, &step_ver); let hash = xxhash_rust::xxh3::xxh3_64(branch.content().to_string().as_bytes()); Some(i64::from_ne_bytes(hash.to_ne_bytes())) } else { None }; @@ -156,7 +155,7 @@ impl ControllerWorker for BufferWorker { content: dtop.content_as_str().unwrap_or_default().to_string(), hash }; - tx.send((step_ver, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((new_local_v, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?"); } }, },