feat!: ackable changes for buffer cntrls, sync send

This commit is contained in:
əlemi 2024-10-03 02:30:46 +02:00 committed by alemi.dev
parent 1e3b3dc705
commit 907a0329d3
6 changed files with 85 additions and 53 deletions

View file

@ -45,6 +45,22 @@ pub struct TextChange {
pub hash: Option<i64>, pub hash: Option<i64>,
} }
/// This wrapper around a [`TextChange`] contains a handle to Acknowledge correct change
/// application
#[derive(Debug)]
pub struct Delta<T: Acknowledgeable> {
/// The change received
pub change: TextChange,
/// The ack handle, must be called after correctly applying this change
pub ack: T,
}
/// A token which can be used to acknowledge changes
pub trait Acknowledgeable {
/// Send Acknowledgement. This action is idempotent
fn send(&mut self);
}
impl TextChange { impl TextChange {
/// Returns the [`std::ops::Range`] representing this change's span. /// Returns the [`std::ops::Range`] representing this change's span.
pub fn span(&self) -> std::ops::Range<usize> { pub fn span(&self) -> std::ops::Range<usize> {

View file

@ -37,11 +37,9 @@ where
/// See [`Controller`]'s documentation for details. /// See [`Controller`]'s documentation for details.
/// ///
/// Details about the receiving end are left to the implementor. /// Details about the receiving end are left to the implementor.
#[allow(async_fn_in_trait)] pub trait AsyncSender<T : Sized + Send + Sync> : Sized + Send + Sync {
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] /// Enqueue a new value to be sent to all other users without blocking
pub trait AsyncSender<T: Sized + Send + Sync>: Sized + Send + Sync { fn send(&self, x: T) -> ControllerResult<()>;
/// Enqueue a new value to be sent to all other users.
async fn send(&self, x: T) -> ControllerResult<()>;
} }
/// Asynchronous and thread-safe handle to receive data from a stream. /// Asynchronous and thread-safe handle to receive data from a stream.

View file

@ -6,13 +6,27 @@ use std::sync::Arc;
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::api::change::{Acknowledgeable, Delta};
use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback};
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::ControllerResult; use crate::errors::ControllerResult;
use crate::ext::InternallyMutable; use crate::ext::IgnorableError;
use super::worker::DeltaRequest; use super::worker::DeltaRequest;
#[derive(Debug)]
pub(crate) struct BufferAck {
pub(crate) tx: mpsc::UnboundedSender<LocalVersion>,
pub(crate) version: LocalVersion,
}
impl Acknowledgeable for BufferAck {
fn send(&mut self) {
self.tx.send(self.version.clone())
.unwrap_or_warn("no worker to receive sent ack");
}
}
/// A [Controller] to asynchronously interact with remote buffers. /// A [Controller] to asynchronously interact with remote buffers.
/// ///
/// Each buffer controller internally tracks the last acknowledged state, remaining always in sync /// Each buffer controller internally tracks the last acknowledged state, remaining always in sync
@ -33,9 +47,6 @@ impl BufferController {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.content_request.send(tx).await?; self.0.content_request.send(tx).await?;
let content = rx.await?; let content = rx.await?;
self.0
.last_update
.set(self.0.latest_version.borrow().clone());
Ok(content) Ok(content)
} }
} }
@ -44,8 +55,8 @@ impl BufferController {
pub(crate) struct BufferControllerInner { pub(crate) struct BufferControllerInner {
pub(crate) name: String, pub(crate) name: String,
pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>, pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>, pub(crate) local_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>, pub(crate) ops_in: mpsc::UnboundedSender<TextChange>,
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<DeltaRequest>, pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
@ -53,23 +64,19 @@ pub(crate) struct BufferControllerInner {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<TextChange> for BufferController {} impl Controller<TextChange, Delta<BufferAck>> for BufferController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<TextChange> for BufferController { impl AsyncSender<TextChange> for BufferController {
async fn send(&self, op: TextChange) -> ControllerResult<()> { fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back. self.0.ops_in.send(op)?;
let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
Ok(()) Ok(())
} }
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<TextChange> for BufferController { impl AsyncReceiver<Delta<BufferAck>> for BufferController {
async fn poll(&self) -> ControllerResult<()> { async fn poll(&self) -> ControllerResult<()> {
if self.0.last_update.get() != *self.0.latest_version.borrow() { if *self.0.local_version.borrow() != *self.0.latest_version.borrow() {
return Ok(()); return Ok(());
} }
@ -79,8 +86,8 @@ impl AsyncReceiver<TextChange> for BufferController {
Ok(()) Ok(())
} }
async fn try_recv(&self) -> ControllerResult<Option<TextChange>> { async fn try_recv(&self) -> ControllerResult<Option<Delta<BufferAck>>> {
let last_update = self.0.last_update.get(); let last_update = self.0.local_version.borrow().clone();
let latest_version = self.0.latest_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone();
if last_update == latest_version { if last_update == latest_version {
@ -89,16 +96,11 @@ impl AsyncReceiver<TextChange> for BufferController {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.delta_request.send((last_update, tx)).await?; self.0.delta_request.send((last_update, tx)).await?;
let (v, change) = rx.await?; Ok(rx.await?)
self.0.last_update.set(v);
Ok(change)
} }
fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) { fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) {
if self.0.callback.send(Some(cb.into())).is_err() { self.0.callback.send_replace(Some(cb.into()));
// TODO should we panic? we failed what we were supposed to do
tracing::error!("no active buffer worker to run registered callback!");
}
} }
fn clear_callback(&self) { fn clear_callback(&self) {

View file

@ -6,22 +6,26 @@ use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
use crate::api::change::Delta;
use crate::api::controller::ControllerCallback; use crate::api::controller::ControllerCallback;
use crate::api::TextChange; use crate::api::TextChange;
use crate::ext::{IgnorableError, InternallyMutable}; use crate::ext::IgnorableError;
use codemp_proto::buffer::{BufferEvent, Operation}; use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::{BufferController, BufferControllerInner}; use super::controller::{BufferAck, BufferController, BufferControllerInner};
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>); pub(crate) type DeltaOp = Option<Delta<BufferAck>>;
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>); pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
struct BufferWorker { struct BufferWorker {
user_id: Uuid, user_id: Uuid,
path: String, path: String,
latest_version: watch::Sender<diamond_types::LocalVersion>, latest_version: watch::Sender<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>, local_version: watch::Sender<diamond_types::LocalVersion>,
ack_rx: mpsc::UnboundedReceiver<LocalVersion>,
ack_tx: mpsc::UnboundedSender<LocalVersion>,
ops_in: mpsc::UnboundedReceiver<TextChange>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>, content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
@ -43,7 +47,9 @@ impl BufferController {
let init = diamond_types::LocalVersion::default(); let init = diamond_types::LocalVersion::default();
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
let (my_version_tx, my_version_rx) = watch::channel(init.clone());
let (opin_tx, opin_rx) = mpsc::unbounded_channel(); let (opin_tx, opin_rx) = mpsc::unbounded_channel();
let (ack_tx, ack_rx) = mpsc::unbounded_channel();
let (req_tx, req_rx) = mpsc::channel(1); let (req_tx, req_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1);
@ -54,7 +60,7 @@ impl BufferController {
let controller = Arc::new(BufferControllerInner { let controller = Arc::new(BufferControllerInner {
name: path.to_string(), name: path.to_string(),
latest_version: latest_version_rx, latest_version: latest_version_rx,
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()), local_version: my_version_rx,
ops_in: opin_tx, ops_in: opin_tx,
poller: poller_tx, poller: poller_tx,
content_request: req_tx, content_request: req_tx,
@ -68,6 +74,8 @@ impl BufferController {
user_id, user_id,
path: path.to_string(), path: path.to_string(),
latest_version: latest_version_tx, latest_version: latest_version_tx,
local_version: my_version_tx,
ack_tx, ack_rx,
ops_in: opin_rx, ops_in: opin_rx,
poller: poller_rx, poller: poller_rx,
pollers: Vec::new(), pollers: Vec::new(),
@ -106,10 +114,18 @@ impl BufferController {
Some(tx) => worker.pollers.push(tx), Some(tx) => worker.pollers.push(tx),
}, },
// received new change ack, merge editor branch up to that version
res = worker.ack_rx.recv() => match res {
None => break tracing::error!("ack channel closed"),
Some(v) => {
worker.branch.merge(&worker.oplog, &v)
},
},
// received a text change from editor // received a text change from editor
res = worker.ops_in.recv() => match res { res = worker.ops_in.recv() => match res {
None => break tracing::debug!("stopping: editor closed channel"), None => break tracing::debug!("stopping: editor closed channel"),
Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await, Some(change) => worker.handle_editor_change(change, &tx).await,
}, },
// received a message from server: add to oplog and update latest version (+unlock pollers) // received a message from server: add to oplog and update latest version (+unlock pollers)
@ -142,12 +158,7 @@ impl BufferController {
} }
impl BufferWorker { impl BufferWorker {
async fn handle_editor_change( async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) {
&mut self,
change: TextChange,
ack: oneshot::Sender<LocalVersion>,
tx: &mpsc::Sender<Operation>,
) {
let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string());
let last_ver = self.oplog.local_version(); let last_ver = self.oplog.local_version();
// clip to buffer extents // clip to buffer extents
@ -174,9 +185,9 @@ impl BufferWorker {
self.latest_version self.latest_version
.send(self.oplog.local_version()) .send(self.oplog.local_version())
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
self.local_version.send(self.branch.local_version())
.unwrap_or_warn("failed to update local version!");
} }
ack.send(self.branch.local_version())
.unwrap_or_warn("controller didn't wait for ack");
} }
async fn handle_server_change(&mut self, change: BufferEvent) -> bool { async fn handle_server_change(&mut self, change: BufferEvent) -> bool {
@ -243,11 +254,17 @@ impl BufferWorker {
hash, hash,
}, },
}; };
tx.send((new_local_v, Some(tc))) let delta = Delta {
.unwrap_or_warn("could not update ops channel -- is controller dead?"); change: tc,
ack: BufferAck {
tx: self.ack_tx.clone(),
version: step_ver,
},
};
self.local_version.send(new_local_v).unwrap_or_warn("could not update local version");
tx.send(Some(delta)).unwrap_or_warn("could not update ops channel -- is controller dead?");
} else { } else {
tx.send((last_ver, None)) tx.send(None).unwrap_or_warn("could not update ops channel -- is controller dead?");
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} }
} }
} }

View file

@ -27,7 +27,7 @@ pub struct CursorController(pub(crate) Arc<CursorControllerInner>);
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct CursorControllerInner { pub(crate) struct CursorControllerInner {
pub(crate) op: mpsc::Sender<CursorPosition>, pub(crate) op: mpsc::UnboundedSender<CursorPosition>,
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>, pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
@ -38,7 +38,7 @@ impl Controller<Cursor> for CursorController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<Cursor> for CursorController { impl AsyncSender<Cursor> for CursorController {
async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { fn send(&self, mut cursor: Cursor) -> ControllerResult<()> {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
@ -57,8 +57,7 @@ impl AsyncSender<Cursor> for CursorController {
row: cursor.end.0, row: cursor.end.0,
col: cursor.end.1, col: cursor.end.1,
}, },
}) })?)
.await?)
} }
} }

View file

@ -13,7 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
struct CursorWorker { struct CursorWorker {
op: mpsc::Receiver<CursorPosition>, op: mpsc::UnboundedReceiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>, map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>, stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
@ -30,7 +30,7 @@ impl CursorController {
rx: Streaming<CursorEvent>, rx: Streaming<CursorEvent>,
) -> Self { ) -> Self {
// TODO we should tweak the channel buffer size to better propagate backpressure // TODO we should tweak the channel buffer size to better propagate backpressure
let (op_tx, op_rx) = mpsc::channel(64); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (stream_tx, stream_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1);
let (cb_tx, cb_rx) = watch::channel(None); let (cb_tx, cb_rx) = watch::channel(None);
let (poll_tx, poll_rx) = mpsc::unbounded_channel(); let (poll_tx, poll_rx) = mpsc::unbounded_channel();