2023-08-20 00:46:55 +02:00
|
|
|
//! ### controller
|
|
|
|
//!
|
|
|
|
//! a controller implementation for buffer actions
|
|
|
|
|
2023-09-05 20:13:09 +02:00
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2023-11-23 15:52:36 +01:00
|
|
|
use tokio::sync::oneshot;
|
2023-11-30 03:03:09 +01:00
|
|
|
use tokio::sync::{watch, mpsc};
|
2023-08-16 23:09:47 +02:00
|
|
|
use tonic::async_trait;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
2023-08-19 04:02:21 +02:00
|
|
|
use crate::errors::IgnorableError;
|
2023-11-17 05:47:57 +01:00
|
|
|
use crate::api::Controller;
|
2023-08-16 23:09:47 +02:00
|
|
|
|
2023-11-17 05:45:31 +01:00
|
|
|
use crate::api::TextChange;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
2023-08-20 00:46:55 +02:00
|
|
|
/// the buffer controller implementation
|
|
|
|
///
|
|
|
|
/// for each controller a worker exists, managing outgoing and inbound
|
|
|
|
/// queues, transforming outbound delayed ops and applying remote changes
|
|
|
|
/// to the local buffer
|
|
|
|
///
|
|
|
|
/// upon dropping this handle will stop the associated worker
|
2023-11-09 05:21:24 +01:00
|
|
|
#[derive(Debug, Clone)]
|
2024-08-05 19:13:48 +02:00
|
|
|
pub struct BufferController(Arc<BufferControllerInner>);
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct BufferControllerInner {
|
2023-11-30 03:41:53 +01:00
|
|
|
name: String,
|
2023-08-16 23:09:47 +02:00
|
|
|
content: watch::Receiver<String>,
|
2023-11-30 03:41:53 +01:00
|
|
|
seen: StatusCheck<String>, // internal buffer previous state
|
2023-11-09 05:21:24 +01:00
|
|
|
operations: mpsc::UnboundedSender<TextChange>,
|
2023-11-30 03:03:09 +01:00
|
|
|
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
2023-11-09 05:21:24 +01:00
|
|
|
_stop: Arc<StopOnDrop>, // just exist
|
2023-07-09 03:44:27 +02:00
|
|
|
}
|
|
|
|
|
2023-08-17 00:04:37 +02:00
|
|
|
impl BufferController {
|
|
|
|
pub(crate) fn new(
|
2023-11-17 17:38:47 +01:00
|
|
|
name: String,
|
2023-08-17 00:04:37 +02:00
|
|
|
content: watch::Receiver<String>,
|
2023-11-09 05:21:24 +01:00
|
|
|
operations: mpsc::UnboundedSender<TextChange>,
|
2023-11-30 03:03:09 +01:00
|
|
|
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
2023-08-19 04:02:21 +02:00
|
|
|
stop: mpsc::UnboundedSender<()>,
|
2023-08-17 00:04:37 +02:00
|
|
|
) -> Self {
|
2024-08-05 19:13:48 +02:00
|
|
|
Self(Arc::new(
|
|
|
|
BufferControllerInner {
|
|
|
|
name,
|
|
|
|
content, operations, poller,
|
|
|
|
seen: StatusCheck::default(),
|
|
|
|
_stop: Arc::new(StopOnDrop(stop)),
|
|
|
|
}
|
|
|
|
))
|
2023-09-10 03:01:37 +02:00
|
|
|
}
|
2023-11-23 15:52:36 +01:00
|
|
|
|
2023-11-30 03:41:53 +01:00
|
|
|
/// unique identifier of buffer
|
|
|
|
pub fn name(&self) -> &str {
|
2024-08-05 19:13:48 +02:00
|
|
|
&self.0.name
|
2023-11-30 03:41:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// return buffer whole content, updating internal buffer previous state
|
2023-11-23 15:52:36 +01:00
|
|
|
pub fn content(&self) -> String {
|
2024-08-05 19:13:48 +02:00
|
|
|
self.0.seen.update(self.0.content.borrow().clone());
|
|
|
|
self.0.content.borrow().clone()
|
2023-11-23 15:52:36 +01:00
|
|
|
}
|
2023-08-19 04:02:21 +02:00
|
|
|
}
|
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct StopOnDrop(mpsc::UnboundedSender<()>);
|
|
|
|
|
|
|
|
impl Drop for StopOnDrop {
|
2023-08-19 04:02:21 +02:00
|
|
|
fn drop(&mut self) {
|
2023-11-09 05:21:24 +01:00
|
|
|
self.0.send(()).unwrap_or_warn("could not send stop message to worker");
|
2023-08-17 00:04:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
#[async_trait]
|
2023-11-16 06:52:25 +01:00
|
|
|
impl Controller<TextChange> for BufferController {
|
2023-11-09 05:21:24 +01:00
|
|
|
type Input = TextChange;
|
2023-08-16 23:09:47 +02:00
|
|
|
|
2023-11-30 03:41:53 +01:00
|
|
|
/// block until a text change is available
|
|
|
|
/// this returns immediately if one is already available
|
2023-11-17 05:47:57 +01:00
|
|
|
async fn poll(&self) -> crate::Result<()> {
|
2024-08-05 19:13:48 +02:00
|
|
|
if self.0.seen.check() != *self.0.content.borrow() {
|
2023-11-30 03:03:09 +01:00
|
|
|
return Ok(()); // short circuit: already available!
|
|
|
|
}
|
2023-11-23 15:52:36 +01:00
|
|
|
let (tx, rx) = oneshot::channel::<()>();
|
2024-08-05 19:13:48 +02:00
|
|
|
self.0.poller.send(tx)?;
|
2023-11-30 03:03:09 +01:00
|
|
|
rx.await.map_err(|_| crate::Error::Channel { send: false })?;
|
|
|
|
Ok(())
|
2023-08-21 02:35:56 +02:00
|
|
|
}
|
|
|
|
|
2023-11-30 03:41:53 +01:00
|
|
|
/// if a text change is available, return it immediately
|
2023-11-17 05:47:57 +01:00
|
|
|
fn try_recv(&self) -> crate::Result<Option<TextChange>> {
|
2024-08-05 19:13:48 +02:00
|
|
|
let seen = self.0.seen.check();
|
|
|
|
let actual = self.0.content.borrow().clone();
|
2023-11-17 18:38:29 +01:00
|
|
|
if seen == actual {
|
|
|
|
return Ok(None);
|
2023-11-16 06:52:25 +01:00
|
|
|
}
|
2023-11-17 18:38:29 +01:00
|
|
|
let change = TextChange::from_diff(&seen, &actual);
|
2024-08-05 19:13:48 +02:00
|
|
|
self.0.seen.update(actual);
|
2023-11-17 18:38:29 +01:00
|
|
|
Ok(Some(change))
|
2023-08-21 02:35:56 +02:00
|
|
|
}
|
|
|
|
|
2023-11-30 03:41:53 +01:00
|
|
|
/// block until a new text change is available, and return it
|
2023-11-17 05:47:57 +01:00
|
|
|
async fn recv(&self) -> crate::Result<TextChange> {
|
2023-11-16 06:52:25 +01:00
|
|
|
self.poll().await?;
|
2024-08-05 19:13:48 +02:00
|
|
|
let seen = self.0.seen.check();
|
|
|
|
let actual = self.0.content.borrow().clone();
|
2023-11-30 03:03:09 +01:00
|
|
|
let change = TextChange::from_diff(&seen, &actual);
|
2024-08-05 19:13:48 +02:00
|
|
|
self.0.seen.update(actual);
|
2023-11-17 05:47:57 +01:00
|
|
|
Ok(change)
|
2023-07-09 03:44:27 +02:00
|
|
|
}
|
|
|
|
|
2023-11-30 03:41:53 +01:00
|
|
|
/// enqueue a text change for processing
|
|
|
|
/// this also updates internal buffer previous state
|
2023-11-17 05:47:57 +01:00
|
|
|
fn send(&self, op: TextChange) -> crate::Result<()> {
|
2024-08-05 19:13:48 +02:00
|
|
|
let before = self.0.seen.check();
|
|
|
|
self.0.seen.update(op.apply(&before));
|
|
|
|
Ok(self.0.operations.send(op)?)
|
2023-07-09 03:44:27 +02:00
|
|
|
}
|
2023-08-11 15:33:40 +02:00
|
|
|
}
|
2023-11-30 03:03:09 +01:00
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
2023-11-30 03:41:53 +01:00
|
|
|
struct StatusCheck<T : Clone> {
|
2023-11-30 03:03:09 +01:00
|
|
|
state: watch::Receiver<T>,
|
|
|
|
updater: Arc<watch::Sender<T>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T : Clone + Default> Default for StatusCheck<T> {
|
|
|
|
fn default() -> Self {
|
|
|
|
let (tx, rx) = watch::channel(T::default());
|
|
|
|
StatusCheck { state: rx, updater: Arc::new(tx) }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T : Clone> StatusCheck<T> {
|
2023-11-30 03:41:53 +01:00
|
|
|
fn update(&self, state: T) -> T {
|
2023-11-30 03:03:09 +01:00
|
|
|
self.updater.send_replace(state)
|
|
|
|
}
|
|
|
|
|
2023-11-30 03:41:53 +01:00
|
|
|
fn check(&self) -> T {
|
2023-11-30 03:03:09 +01:00
|
|
|
self.state.borrow().clone()
|
|
|
|
}
|
|
|
|
}
|