diff --git a/src/api/controller.rs b/src/api/controller.rs index 5f41bed..1e0e736 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -45,6 +45,10 @@ pub trait Controller : Sized + Send + Sync { } } + fn callback(&self, cb: ControllerCallback); + + fn clear_callback(&self); + /// block until next value is available without consuming it /// /// this is just an async trait function wrapped by `async_trait`: @@ -64,3 +68,11 @@ pub trait Controller : Sized + Send + Sync { /// (likely if worker is already stopped) fn stop(&self) -> bool; } + + +/// type alias for Boxed dyn callback +pub type ControllerCallback = Box; + +/// underlying trait for controller callback: must be a threadsafe repeatable non-mut closure which +/// can be debug printed +pub trait ControllerCallbackTrait : Sync + Send + std::fmt::Debug + Fn() {} diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 239a38b..7f4977d 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -8,6 +8,7 @@ use diamond_types::LocalVersion; use tokio::sync::{oneshot, mpsc, watch}; use tonic::async_trait; +use crate::api::controller::ControllerCallback; use crate::api::Controller; use crate::api::TextChange; @@ -52,6 +53,7 @@ pub(crate) struct BufferControllerInner { pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, + pub(crate) callback: watch::Sender>, } #[async_trait] @@ -96,6 +98,19 @@ impl Controller for BufferController { Ok(()) } + fn callback(&self, cb: ControllerCallback) { + if self.0.callback.send(Some(cb)).is_err() { + // TODO should we panic? we failed what we were supposed to do + tracing::error!("no active buffer worker to run registered callback!"); + } + } + + fn clear_callback(&self) { + if self.0.callback.send(None).is_err() { + tracing::warn!("no active buffer worker to clear callback"); + } + } + fn stop(&self) -> bool { self.0.stopper.send(()).is_ok() } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 14e3913..d59a207 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tonic::{async_trait, Streaming}; use uuid::Uuid; -use crate::api::controller::ControllerWorker; +use crate::api::controller::{ControllerCallback, ControllerWorker}; use crate::api::TextChange; use crate::errors::IgnorableError; @@ -24,6 +24,7 @@ pub(crate) struct BufferWorker { delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, stop: mpsc::UnboundedReceiver<()>, controller: BufferController, + callback: watch::Receiver>, } impl BufferWorker { @@ -35,6 +36,7 @@ impl BufferWorker { let (req_tx, req_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); + let (cb_tx, cb_rx) = watch::channel(None); let (poller_tx, poller_rx) = mpsc::unbounded_channel(); @@ -49,6 +51,7 @@ impl BufferWorker { stopper: end_tx, content_request: req_tx, delta_request: recv_tx, + callback: cb_tx, }; BufferWorker { @@ -61,6 +64,7 @@ impl BufferWorker { controller: BufferController(Arc::new(controller)), content_checkout: req_rx, delta_req: recv_rx, + callback: cb_rx, } } } @@ -130,6 +134,9 @@ impl ControllerWorker for BufferWorker { for tx in self.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); } + if let Some(cb) = self.callback.borrow().as_ref() { + cb(); // TODO should we run this on another task/thread? + } }, Err(e) => tracing::error!("could not deserialize operation from server: {}", e), } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 51d1ab6..0e46055 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex}; use tonic::async_trait; -use crate::api::{Controller, Cursor}; +use crate::api::{controller::ControllerCallback, Controller, Cursor}; use codemp_proto::cursor::{CursorEvent, CursorPosition}; /// the cursor controller implementation /// @@ -63,6 +63,19 @@ impl Controller for CursorController { Ok(self.0.last_op.lock().await.changed().await?) } + fn callback(&self, cb: ControllerCallback) { + if self.0.callback.send(Some(cb)).is_err() { + // TODO should we panic? we failed what we were supposed to do + tracing::error!("no active cursor worker to run registered callback!"); + } + } + + fn clear_callback(&self) { + if self.0.callback.send(None).is_err() { + tracing::warn!("no active cursor worker to clear callback"); + } + } + fn stop(&self) -> bool { self.0.stop.send(()).is_ok() } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 89456a2..4dddd6e 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tonic::{Streaming, async_trait}; -use crate::{api::{controller::ControllerWorker, Cursor}, errors::IgnorableError}; +use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, errors::IgnorableError}; use codemp_proto::cursor::{CursorPosition, CursorEvent}; use super::controller::{CursorController, CursorControllerInner}; @@ -14,6 +14,7 @@ pub(crate) struct CursorWorker { channel: broadcast::Sender, stop: mpsc::UnboundedReceiver<()>, controller: CursorController, + callback: watch::Receiver>, } impl Default for CursorWorker { @@ -22,11 +23,13 @@ impl Default for CursorWorker { let (cur_tx, _cur_rx) = broadcast::channel(64); let (end_tx, end_rx) = mpsc::unbounded_channel(); let (change_tx, change_rx) = watch::channel(CursorEvent::default()); + let (cb_tx, cb_rx) = watch::channel(None); let controller = CursorControllerInner { op: op_tx, last_op: Mutex::new(change_rx), stream: Mutex::new(cur_tx.subscribe()), stop: end_tx, + callback: cb_tx, }; Self { op: op_rx, @@ -34,6 +37,7 @@ impl Default for CursorWorker { channel: cur_tx, stop: end_rx, controller: CursorController(Arc::new(controller)), + callback: cb_rx, } } } @@ -57,6 +61,9 @@ impl ControllerWorker for CursorWorker { Ok(Some(cur)) = rx.message() => { self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.changed.send(cur).unwrap_or_warn("could not update last event"); + if let Some(cb) = self.callback.borrow().as_ref() { + cb(); // TODO should this run in its own task/thread? + } }, else => break, }