mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 15:24:48 +01:00
feat: added back .callback() to controller api
This commit is contained in:
parent
2f955ecef4
commit
7900ca08a7
5 changed files with 57 additions and 3 deletions
|
@ -45,6 +45,10 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn callback(&self, cb: ControllerCallback);
|
||||||
|
|
||||||
|
fn clear_callback(&self);
|
||||||
|
|
||||||
/// block until next value is available without consuming it
|
/// block until next value is available without consuming it
|
||||||
///
|
///
|
||||||
/// this is just an async trait function wrapped by `async_trait`:
|
/// this is just an async trait function wrapped by `async_trait`:
|
||||||
|
@ -64,3 +68,11 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
|
||||||
/// (likely if worker is already stopped)
|
/// (likely if worker is already stopped)
|
||||||
fn stop(&self) -> bool;
|
fn stop(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// type alias for Boxed dyn callback
|
||||||
|
pub type ControllerCallback = Box<dyn ControllerCallbackTrait>;
|
||||||
|
|
||||||
|
/// underlying trait for controller callback: must be a threadsafe repeatable non-mut closure which
|
||||||
|
/// can be debug printed
|
||||||
|
pub trait ControllerCallbackTrait : Sync + Send + std::fmt::Debug + Fn() {}
|
||||||
|
|
|
@ -8,6 +8,7 @@ use diamond_types::LocalVersion;
|
||||||
use tokio::sync::{oneshot, mpsc, watch};
|
use tokio::sync::{oneshot, mpsc, watch};
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
|
|
||||||
|
use crate::api::controller::ControllerCallback;
|
||||||
use crate::api::Controller;
|
use crate::api::Controller;
|
||||||
|
|
||||||
use crate::api::TextChange;
|
use crate::api::TextChange;
|
||||||
|
@ -52,6 +53,7 @@ pub(crate) struct BufferControllerInner {
|
||||||
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
|
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
|
||||||
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
|
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
|
||||||
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
|
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
|
||||||
|
pub(crate) callback: watch::Sender<Option<ControllerCallback>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -96,6 +98,19 @@ impl Controller<TextChange> for BufferController {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn callback(&self, cb: ControllerCallback) {
|
||||||
|
if self.0.callback.send(Some(cb)).is_err() {
|
||||||
|
// TODO should we panic? we failed what we were supposed to do
|
||||||
|
tracing::error!("no active buffer worker to run registered callback!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear_callback(&self) {
|
||||||
|
if self.0.callback.send(None).is_err() {
|
||||||
|
tracing::warn!("no active buffer worker to clear callback");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn stop(&self) -> bool {
|
fn stop(&self) -> bool {
|
||||||
self.0.stopper.send(()).is_ok()
|
self.0.stopper.send(()).is_ok()
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch};
|
||||||
use tonic::{async_trait, Streaming};
|
use tonic::{async_trait, Streaming};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::api::controller::ControllerWorker;
|
use crate::api::controller::{ControllerCallback, ControllerWorker};
|
||||||
use crate::api::TextChange;
|
use crate::api::TextChange;
|
||||||
|
|
||||||
use crate::errors::IgnorableError;
|
use crate::errors::IgnorableError;
|
||||||
|
@ -24,6 +24,7 @@ pub(crate) struct BufferWorker {
|
||||||
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
|
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
stop: mpsc::UnboundedReceiver<()>,
|
||||||
controller: BufferController,
|
controller: BufferController,
|
||||||
|
callback: watch::Receiver<Option<ControllerCallback>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferWorker {
|
impl BufferWorker {
|
||||||
|
@ -35,6 +36,7 @@ impl BufferWorker {
|
||||||
|
|
||||||
let (req_tx, req_rx) = mpsc::channel(1);
|
let (req_tx, req_rx) = mpsc::channel(1);
|
||||||
let (recv_tx, recv_rx) = mpsc::channel(1);
|
let (recv_tx, recv_rx) = mpsc::channel(1);
|
||||||
|
let (cb_tx, cb_rx) = watch::channel(None);
|
||||||
|
|
||||||
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
@ -49,6 +51,7 @@ impl BufferWorker {
|
||||||
stopper: end_tx,
|
stopper: end_tx,
|
||||||
content_request: req_tx,
|
content_request: req_tx,
|
||||||
delta_request: recv_tx,
|
delta_request: recv_tx,
|
||||||
|
callback: cb_tx,
|
||||||
};
|
};
|
||||||
|
|
||||||
BufferWorker {
|
BufferWorker {
|
||||||
|
@ -61,6 +64,7 @@ impl BufferWorker {
|
||||||
controller: BufferController(Arc::new(controller)),
|
controller: BufferController(Arc::new(controller)),
|
||||||
content_checkout: req_rx,
|
content_checkout: req_rx,
|
||||||
delta_req: recv_rx,
|
delta_req: recv_rx,
|
||||||
|
callback: cb_rx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -130,6 +134,9 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
for tx in self.pollers.drain(..) {
|
for tx in self.pollers.drain(..) {
|
||||||
tx.send(()).unwrap_or_warn("could not wake up poller");
|
tx.send(()).unwrap_or_warn("could not wake up poller");
|
||||||
}
|
}
|
||||||
|
if let Some(cb) = self.callback.borrow().as_ref() {
|
||||||
|
cb(); // TODO should we run this on another task/thread?
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||||
use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex};
|
use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex};
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
|
|
||||||
use crate::api::{Controller, Cursor};
|
use crate::api::{controller::ControllerCallback, Controller, Cursor};
|
||||||
use codemp_proto::cursor::{CursorEvent, CursorPosition};
|
use codemp_proto::cursor::{CursorEvent, CursorPosition};
|
||||||
/// the cursor controller implementation
|
/// the cursor controller implementation
|
||||||
///
|
///
|
||||||
|
@ -63,6 +63,19 @@ impl Controller<Cursor> for CursorController {
|
||||||
Ok(self.0.last_op.lock().await.changed().await?)
|
Ok(self.0.last_op.lock().await.changed().await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn callback(&self, cb: ControllerCallback) {
|
||||||
|
if self.0.callback.send(Some(cb)).is_err() {
|
||||||
|
// TODO should we panic? we failed what we were supposed to do
|
||||||
|
tracing::error!("no active cursor worker to run registered callback!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear_callback(&self) {
|
||||||
|
if self.0.callback.send(None).is_err() {
|
||||||
|
tracing::warn!("no active cursor worker to clear callback");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn stop(&self) -> bool {
|
fn stop(&self) -> bool {
|
||||||
self.0.stop.send(()).is_ok()
|
self.0.stop.send(()).is_ok()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||||
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
|
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
|
||||||
use tonic::{Streaming, async_trait};
|
use tonic::{Streaming, async_trait};
|
||||||
|
|
||||||
use crate::{api::{controller::ControllerWorker, Cursor}, errors::IgnorableError};
|
use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, errors::IgnorableError};
|
||||||
use codemp_proto::cursor::{CursorPosition, CursorEvent};
|
use codemp_proto::cursor::{CursorPosition, CursorEvent};
|
||||||
|
|
||||||
use super::controller::{CursorController, CursorControllerInner};
|
use super::controller::{CursorController, CursorControllerInner};
|
||||||
|
@ -14,6 +14,7 @@ pub(crate) struct CursorWorker {
|
||||||
channel: broadcast::Sender<CursorEvent>,
|
channel: broadcast::Sender<CursorEvent>,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
stop: mpsc::UnboundedReceiver<()>,
|
||||||
controller: CursorController,
|
controller: CursorController,
|
||||||
|
callback: watch::Receiver<Option<ControllerCallback>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CursorWorker {
|
impl Default for CursorWorker {
|
||||||
|
@ -22,11 +23,13 @@ impl Default for CursorWorker {
|
||||||
let (cur_tx, _cur_rx) = broadcast::channel(64);
|
let (cur_tx, _cur_rx) = broadcast::channel(64);
|
||||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||||
let (change_tx, change_rx) = watch::channel(CursorEvent::default());
|
let (change_tx, change_rx) = watch::channel(CursorEvent::default());
|
||||||
|
let (cb_tx, cb_rx) = watch::channel(None);
|
||||||
let controller = CursorControllerInner {
|
let controller = CursorControllerInner {
|
||||||
op: op_tx,
|
op: op_tx,
|
||||||
last_op: Mutex::new(change_rx),
|
last_op: Mutex::new(change_rx),
|
||||||
stream: Mutex::new(cur_tx.subscribe()),
|
stream: Mutex::new(cur_tx.subscribe()),
|
||||||
stop: end_tx,
|
stop: end_tx,
|
||||||
|
callback: cb_tx,
|
||||||
};
|
};
|
||||||
Self {
|
Self {
|
||||||
op: op_rx,
|
op: op_rx,
|
||||||
|
@ -34,6 +37,7 @@ impl Default for CursorWorker {
|
||||||
channel: cur_tx,
|
channel: cur_tx,
|
||||||
stop: end_rx,
|
stop: end_rx,
|
||||||
controller: CursorController(Arc::new(controller)),
|
controller: CursorController(Arc::new(controller)),
|
||||||
|
callback: cb_rx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,6 +61,9 @@ impl ControllerWorker<Cursor> for CursorWorker {
|
||||||
Ok(Some(cur)) = rx.message() => {
|
Ok(Some(cur)) = rx.message() => {
|
||||||
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event");
|
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event");
|
||||||
self.changed.send(cur).unwrap_or_warn("could not update last event");
|
self.changed.send(cur).unwrap_or_warn("could not update last event");
|
||||||
|
if let Some(cb) = self.callback.borrow().as_ref() {
|
||||||
|
cb(); // TODO should this run in its own task/thread?
|
||||||
|
}
|
||||||
},
|
},
|
||||||
else => break,
|
else => break,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue