From ca04601bea6d5af26c72c28f36e1e1da5c771bca Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 00:59:11 +0200 Subject: [PATCH] chore: refactor worker move stuff out of tokio select so that RA doesnt choke --- src/buffer/worker.rs | 192 ++++++++++++++++++++++++------------------- 1 file changed, 106 insertions(+), 86 deletions(-) diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 9e6bfa7..e1f568a 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use diamond_types::list::{Branch, OpLog}; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; @@ -27,6 +28,9 @@ struct BufferWorker { delta_req: mpsc::Receiver, controller: std::sync::Weak, callback: watch::Receiver>>, + oplog: OpLog, + branch: Branch, + timer: Timer, } impl BufferController { @@ -66,6 +70,9 @@ impl BufferController { content_checkout: req_rx, delta_req: recv_rx, callback: cb_rx, + oplog: OpLog::new(), + branch: Branch::new(), + timer: Timer::new(10), // TODO configurable! }; tokio::spawn(async move { @@ -76,11 +83,7 @@ impl BufferController { } async fn work(mut worker: BufferWorker, tx: mpsc::Sender, mut rx: Streaming) { - let mut branch = diamond_types::list::Branch::new(); - let mut oplog = diamond_types::list::OpLog::new(); - let mut timer = Timer::new(10); // TODO configurable!! tracing::debug!("controller worker started"); - loop { if worker.controller.upgrade().is_none() { break }; @@ -97,105 +100,28 @@ impl BufferController { // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some((change, ack)) => { - let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string()); - let last_ver = oplog.local_version(); - // clip to buffer extents - let clip_end = std::cmp::min(branch.len(), change.end as usize); - let clip_start = std::cmp::max(0, change.start as usize); - - // in case we have a "replace" span - if change.is_delete() { - branch.delete_without_content(&mut oplog, agent_id, clip_start..clip_end); - } - - if change.is_insert() { - branch.insert(&mut oplog, agent_id, clip_start, &change.content); - } - - if change.is_delete() || change.is_insert() { - tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await - .unwrap_or_warn("failed to send change!"); - worker.latest_version.send(oplog.local_version()) - .unwrap_or_warn("failed to update latest version!"); - } - ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); - }, + Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) res = rx.message() => match res { Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path), Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path), - Ok(Some(change)) => match worker.controller.upgrade() { - None => break, // clean exit actually, just weird we caught it here - Some(controller) => match oplog.decode_and_add(&change.op.data) { - Ok(local_version) => { - worker.latest_version.send(local_version) - .unwrap_or_warn("failed to update latest version!"); - for tx in worker.pollers.drain(..) { - tx.send(()).unwrap_or_warn("could not wake up poller"); - } - if let Some(cb) = worker.callback.borrow().as_ref() { - cb.call(BufferController(controller)); // TODO should we run this on another task/thread? - } - }, - Err(e) => tracing::error!("could not deserialize operation from server: {}", e), - } - }, + Ok(Some(change)) => if worker.handle_server_change(change).await { break }, }, // controller is ready to apply change and recv(), calculate it and send it back res = worker.delta_req.recv() => match res { None => break tracing::error!("no more active controllers: can't send changes"), - Some((last_ver, tx)) => { - if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() { - // x.0.start should always be after lastver! - // this step_ver will be the version after we apply the operation - // we give it to the controller so that he knows where it's at. - let step_ver = oplog.version_union(&[lv.end-1], &last_ver); - branch.merge(&oplog, &step_ver); - let new_local_v = branch.local_version(); - - let hash = if timer.step() { - Some(crate::ext::hash(branch.content().to_string())) - } else { None }; - - let tc = match dtop.kind { - diamond_types::list::operation::OpKind::Ins => { - if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { - tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); - } - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.start() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - }, - - diamond_types::list::operation::OpKind::Del => { - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - } - }; - tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } else { - tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); - } - }, + Some((last_ver, tx)) => worker.handle_delta_request(last_ver, tx).await, }, // received a request for full CRDT content res = worker.content_checkout.recv() => match res { None => break tracing::error!("no more active controllers: can't update content"), Some(tx) => { - branch.merge(&oplog, oplog.local_version_ref()); - let content = branch.content().to_string(); + worker.branch.merge(&worker.oplog, worker.oplog.local_version_ref()); + let content = worker.branch.content().to_string(); tx.send(content).unwrap_or_warn("checkout request dropped"); }, } @@ -206,6 +132,100 @@ impl BufferController { } } +impl BufferWorker { + async fn handle_editor_change(&mut self, change: TextChange, ack: oneshot::Sender, tx: &mpsc::Sender) { + let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); + let last_ver = self.oplog.local_version(); + // clip to buffer extents + let clip_end = std::cmp::min(self.branch.len(), change.end as usize); + let clip_start = std::cmp::max(0, change.start as usize); + + // in case we have a "replace" span + if change.is_delete() { + self.branch.delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); + } + + if change.is_insert() { + self.branch.insert(&mut self.oplog, agent_id, clip_start, &change.content); + } + + if change.is_delete() || change.is_insert() { + tx.send(Operation { data: self.oplog.encode_from(Default::default(), &last_ver) }).await + .unwrap_or_warn("failed to send change!"); + self.latest_version.send(self.oplog.local_version()) + .unwrap_or_warn("failed to update latest version!"); + } + ack.send(self.branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); + } + + async fn handle_server_change(&mut self, change: BufferEvent) -> bool { + match self.controller.upgrade() { + None => true, // clean exit actually, just weird we caught it here + Some(controller) => match self.oplog.decode_and_add(&change.op.data) { + Ok(local_version) => { + self.latest_version.send(local_version) + .unwrap_or_warn("failed to update latest version!"); + for tx in self.pollers.drain(..) { + tx.send(()).unwrap_or_warn("could not wake up poller"); + } + if let Some(cb) = self.callback.borrow().as_ref() { + cb.call(BufferController(controller)); // TODO should we run this on another task/thread? + } + false + }, + Err(e) => { + tracing::error!("could not deserialize operation from server: {}", e); + true + }, + } + } + } + + async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { + if let Some((lv, Some(dtop))) = self.oplog + .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) + .next() + { + // x.0.start should always be after lastver! + // this step_ver will be the version after we apply the operation + // we give it to the controller so that he knows where it's at. + let step_ver = self.oplog.version_union(&[lv.end-1], &last_ver); + self.branch.merge(&self.oplog, &step_ver); + let new_local_v = self.branch.local_version(); + + let hash = if self.timer.step() { + Some(crate::ext::hash(self.branch.content().to_string())) + } else { None }; + + let tc = match dtop.kind { + diamond_types::list::operation::OpKind::Ins => { + if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { + tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); + } + crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.start() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash + } + }, + + diamond_types::list::operation::OpKind::Del => { + crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash + } + } + }; + tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } else { + tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + } + } +} + struct Timer(u32, u32); impl Timer { fn new(period: u32) -> Self {