From 907a0329d30c77405a8509d097b635ed1f0b6ab6 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 02:30:46 +0200 Subject: [PATCH] feat!: ackable changes for buffer cntrls, sync send --- src/api/change.rs | 16 ++++++++++++ src/api/controller.rs | 8 +++--- src/buffer/controller.rs | 50 +++++++++++++++++++------------------ src/buffer/worker.rs | 53 ++++++++++++++++++++++++++-------------- src/cursor/controller.rs | 7 +++--- src/cursor/worker.rs | 4 +-- 6 files changed, 85 insertions(+), 53 deletions(-) diff --git a/src/api/change.rs b/src/api/change.rs index 5a2ebfe..f41e342 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -45,6 +45,22 @@ pub struct TextChange { pub hash: Option, } +/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change +/// application +#[derive(Debug)] +pub struct Delta { + /// The change received + pub change: TextChange, + /// The ack handle, must be called after correctly applying this change + pub ack: T, +} + +/// A token which can be used to acknowledge changes +pub trait Acknowledgeable { + /// Send Acknowledgement. This action is idempotent + fn send(&mut self); +} + impl TextChange { /// Returns the [`std::ops::Range`] representing this change's span. pub fn span(&self) -> std::ops::Range { diff --git a/src/api/controller.rs b/src/api/controller.rs index dda3b3f..0f19d70 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -37,11 +37,9 @@ where /// See [`Controller`]'s documentation for details. /// /// Details about the receiving end are left to the implementor. -#[allow(async_fn_in_trait)] -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait AsyncSender: Sized + Send + Sync { - /// Enqueue a new value to be sent to all other users. - async fn send(&self, x: T) -> ControllerResult<()>; +pub trait AsyncSender : Sized + Send + Sync { + /// Enqueue a new value to be sent to all other users without blocking + fn send(&self, x: T) -> ControllerResult<()>; } /// Asynchronous and thread-safe handle to receive data from a stream. diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index bcf115e..d52b472 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -6,13 +6,27 @@ use std::sync::Arc; use diamond_types::LocalVersion; use tokio::sync::{mpsc, oneshot, watch}; +use crate::api::change::{Acknowledgeable, Delta}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::TextChange; use crate::errors::ControllerResult; -use crate::ext::InternallyMutable; +use crate::ext::IgnorableError; use super::worker::DeltaRequest; +#[derive(Debug)] +pub(crate) struct BufferAck { + pub(crate) tx: mpsc::UnboundedSender, + pub(crate) version: LocalVersion, +} + +impl Acknowledgeable for BufferAck { + fn send(&mut self) { + self.tx.send(self.version.clone()) + .unwrap_or_warn("no worker to receive sent ack"); + } +} + /// A [Controller] to asynchronously interact with remote buffers. /// /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync @@ -33,9 +47,6 @@ impl BufferController { let (tx, rx) = oneshot::channel(); self.0.content_request.send(tx).await?; let content = rx.await?; - self.0 - .last_update - .set(self.0.latest_version.borrow().clone()); Ok(content) } } @@ -44,8 +55,8 @@ impl BufferController { pub(crate) struct BufferControllerInner { pub(crate) name: String, pub(crate) latest_version: watch::Receiver, - pub(crate) last_update: InternallyMutable, - pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, + pub(crate) local_version: watch::Receiver, + pub(crate) ops_in: mpsc::UnboundedSender, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender, @@ -53,23 +64,19 @@ pub(crate) struct BufferControllerInner { } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl Controller for BufferController {} +impl Controller> for BufferController {} -#[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for BufferController { - async fn send(&self, op: TextChange) -> ControllerResult<()> { - // we let the worker do the updating to the last version and send it back. - let (tx, rx) = oneshot::channel(); - self.0.ops_in.send((op, tx))?; - self.0.last_update.set(rx.await?); + fn send(&self, op: TextChange) -> ControllerResult<()> { + self.0.ops_in.send(op)?; Ok(()) } } #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -impl AsyncReceiver for BufferController { +impl AsyncReceiver> for BufferController { async fn poll(&self) -> ControllerResult<()> { - if self.0.last_update.get() != *self.0.latest_version.borrow() { + if *self.0.local_version.borrow() != *self.0.latest_version.borrow() { return Ok(()); } @@ -79,8 +86,8 @@ impl AsyncReceiver for BufferController { Ok(()) } - async fn try_recv(&self) -> ControllerResult> { - let last_update = self.0.last_update.get(); + async fn try_recv(&self) -> ControllerResult>> { + let last_update = self.0.local_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone(); if last_update == latest_version { @@ -89,16 +96,11 @@ impl AsyncReceiver for BufferController { let (tx, rx) = oneshot::channel(); self.0.delta_request.send((last_update, tx)).await?; - let (v, change) = rx.await?; - self.0.last_update.set(v); - Ok(change) + Ok(rx.await?) } fn callback(&self, cb: impl Into>) { - if self.0.callback.send(Some(cb.into())).is_err() { - // TODO should we panic? we failed what we were supposed to do - tracing::error!("no active buffer worker to run registered callback!"); - } + self.0.callback.send_replace(Some(cb.into())); } fn clear_callback(&self) { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index a5a730f..ba1a318 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -6,22 +6,26 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::Streaming; use uuid::Uuid; +use crate::api::change::Delta; use crate::api::controller::ControllerCallback; use crate::api::TextChange; -use crate::ext::{IgnorableError, InternallyMutable}; +use crate::ext::IgnorableError; use codemp_proto::buffer::{BufferEvent, Operation}; -use super::controller::{BufferController, BufferControllerInner}; +use super::controller::{BufferAck, BufferController, BufferControllerInner}; -pub(crate) type DeltaOp = (LocalVersion, Option); +pub(crate) type DeltaOp = Option>; pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender); struct BufferWorker { user_id: Uuid, path: String, latest_version: watch::Sender, - ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, + local_version: watch::Sender, + ack_rx: mpsc::UnboundedReceiver, + ack_tx: mpsc::UnboundedSender, + ops_in: mpsc::UnboundedReceiver, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, @@ -43,7 +47,9 @@ impl BufferController { let init = diamond_types::LocalVersion::default(); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); + let (my_version_tx, my_version_rx) = watch::channel(init.clone()); let (opin_tx, opin_rx) = mpsc::unbounded_channel(); + let (ack_tx, ack_rx) = mpsc::unbounded_channel(); let (req_tx, req_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); @@ -54,7 +60,7 @@ impl BufferController { let controller = Arc::new(BufferControllerInner { name: path.to_string(), latest_version: latest_version_rx, - last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), + local_version: my_version_rx, ops_in: opin_tx, poller: poller_tx, content_request: req_tx, @@ -68,6 +74,8 @@ impl BufferController { user_id, path: path.to_string(), latest_version: latest_version_tx, + local_version: my_version_tx, + ack_tx, ack_rx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), @@ -106,10 +114,18 @@ impl BufferController { Some(tx) => worker.pollers.push(tx), }, + // received new change ack, merge editor branch up to that version + res = worker.ack_rx.recv() => match res { + None => break tracing::error!("ack channel closed"), + Some(v) => { + worker.branch.merge(&worker.oplog, &v) + }, + }, + // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, + Some(change) => worker.handle_editor_change(change, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) @@ -142,12 +158,7 @@ impl BufferController { } impl BufferWorker { - async fn handle_editor_change( - &mut self, - change: TextChange, - ack: oneshot::Sender, - tx: &mpsc::Sender, - ) { + async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender) { let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); let last_ver = self.oplog.local_version(); // clip to buffer extents @@ -174,9 +185,9 @@ impl BufferWorker { self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); + self.local_version.send(self.branch.local_version()) + .unwrap_or_warn("failed to update local version!"); } - ack.send(self.branch.local_version()) - .unwrap_or_warn("controller didn't wait for ack"); } async fn handle_server_change(&mut self, change: BufferEvent) -> bool { @@ -243,11 +254,17 @@ impl BufferWorker { hash, }, }; - tx.send((new_local_v, Some(tc))) - .unwrap_or_warn("could not update ops channel -- is controller dead?"); + let delta = Delta { + change: tc, + ack: BufferAck { + tx: self.ack_tx.clone(), + version: step_ver, + }, + }; + self.local_version.send(new_local_v).unwrap_or_warn("could not update local version"); + tx.send(Some(delta)).unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send((last_ver, None)) - .unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send(None).unwrap_or_warn("could not update ops channel -- is controller dead?"); } } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index a0bced9..bac20cf 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -27,7 +27,7 @@ pub struct CursorController(pub(crate) Arc); #[derive(Debug)] pub(crate) struct CursorControllerInner { - pub(crate) op: mpsc::Sender, + pub(crate) op: mpsc::UnboundedSender, pub(crate) stream: mpsc::Sender>>, pub(crate) poll: mpsc::UnboundedSender>, pub(crate) callback: watch::Sender>>, @@ -38,7 +38,7 @@ impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for CursorController { - async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { + fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } @@ -57,8 +57,7 @@ impl AsyncSender for CursorController { row: cursor.end.0, col: cursor.end.1, }, - }) - .await?) + })?) } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index aab8c34..bd6b35b 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -13,7 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; use super::controller::{CursorController, CursorControllerInner}; struct CursorWorker { - op: mpsc::Receiver, + op: mpsc::UnboundedReceiver, map: Arc>, stream: mpsc::Receiver>>, poll: mpsc::UnboundedReceiver>, @@ -30,7 +30,7 @@ impl CursorController { rx: Streaming, ) -> Self { // TODO we should tweak the channel buffer size to better propagate backpressure - let (op_tx, op_rx) = mpsc::channel(64); + let (op_tx, op_rx) = mpsc::unbounded_channel(); let (stream_tx, stream_rx) = mpsc::channel(1); let (cb_tx, cb_rx) = watch::channel(None); let (poll_tx, poll_rx) = mpsc::unbounded_channel();