2023-08-20 00:46:55 +02:00
|
|
|
//! ### controller
|
|
|
|
//!
|
|
|
|
//! a controller implementation for buffer actions
|
|
|
|
|
2023-09-05 20:13:09 +02:00
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2023-11-17 03:15:13 +01:00
|
|
|
use tokio::sync::{watch, mpsc, RwLock};
|
2023-08-16 23:09:47 +02:00
|
|
|
use tonic::async_trait;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
2023-08-19 04:02:21 +02:00
|
|
|
use crate::errors::IgnorableError;
|
2023-09-03 23:04:08 +02:00
|
|
|
use crate::{api::Controller, Error};
|
2023-08-16 23:09:47 +02:00
|
|
|
|
|
|
|
use super::TextChange;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
2023-08-20 00:46:55 +02:00
|
|
|
/// the buffer controller implementation
|
|
|
|
///
|
|
|
|
/// this contains
|
|
|
|
/// * a watch channel which always contains an updated view of the buffer content
|
|
|
|
/// * a sink to send buffer operations into
|
|
|
|
/// * a mutexed broadcast receiver for buffer operations
|
|
|
|
/// * a channel to stop the associated worker
|
|
|
|
///
|
|
|
|
/// for each controller a worker exists, managing outgoing and inbound
|
|
|
|
/// queues, transforming outbound delayed ops and applying remote changes
|
|
|
|
/// to the local buffer
|
|
|
|
///
|
2023-09-10 03:06:47 +02:00
|
|
|
/// this controller implements [crate::api::OperationFactory], allowing to produce
|
2023-08-20 00:46:55 +02:00
|
|
|
/// Operation Sequences easily
|
|
|
|
///
|
|
|
|
/// upon dropping this handle will stop the associated worker
|
2023-11-09 05:21:24 +01:00
|
|
|
#[derive(Debug, Clone)]
|
2023-08-11 15:33:40 +02:00
|
|
|
pub struct BufferController {
|
2023-08-16 23:09:47 +02:00
|
|
|
content: watch::Receiver<String>,
|
2023-11-16 06:52:25 +01:00
|
|
|
seen: Arc<RwLock<String>>,
|
2023-11-09 05:21:24 +01:00
|
|
|
operations: mpsc::UnboundedSender<TextChange>,
|
|
|
|
_stop: Arc<StopOnDrop>, // just exist
|
2023-07-09 03:44:27 +02:00
|
|
|
}
|
|
|
|
|
2023-08-17 00:04:37 +02:00
|
|
|
impl BufferController {
|
|
|
|
pub(crate) fn new(
|
|
|
|
content: watch::Receiver<String>,
|
2023-11-09 05:21:24 +01:00
|
|
|
operations: mpsc::UnboundedSender<TextChange>,
|
2023-08-19 04:02:21 +02:00
|
|
|
stop: mpsc::UnboundedSender<()>,
|
2023-08-17 00:04:37 +02:00
|
|
|
) -> Self {
|
2023-11-16 06:52:25 +01:00
|
|
|
BufferController {
|
|
|
|
content, operations,
|
|
|
|
_stop: Arc::new(StopOnDrop(stop)),
|
|
|
|
seen: Arc::new(RwLock::new("".into())),
|
|
|
|
}
|
2023-09-10 03:01:37 +02:00
|
|
|
}
|
2023-08-19 04:02:21 +02:00
|
|
|
}
|
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct StopOnDrop(mpsc::UnboundedSender<()>);
|
|
|
|
|
|
|
|
impl Drop for StopOnDrop {
|
2023-08-19 04:02:21 +02:00
|
|
|
fn drop(&mut self) {
|
2023-11-09 05:21:24 +01:00
|
|
|
self.0.send(()).unwrap_or_warn("could not send stop message to worker");
|
2023-08-17 00:04:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
#[async_trait]
|
2023-11-16 06:52:25 +01:00
|
|
|
impl Controller<TextChange> for BufferController {
|
2023-11-09 05:21:24 +01:00
|
|
|
type Input = TextChange;
|
2023-08-16 23:09:47 +02:00
|
|
|
|
2023-08-21 02:35:56 +02:00
|
|
|
async fn poll(&self) -> Result<(), Error> {
|
2023-11-16 06:52:25 +01:00
|
|
|
let mut poller = self.content.clone();
|
|
|
|
loop {
|
|
|
|
poller.changed().await?;
|
|
|
|
let seen = self.seen.read().await.clone();
|
|
|
|
if *poller.borrow() != seen {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
2023-08-21 02:35:56 +02:00
|
|
|
}
|
|
|
|
|
2023-11-16 06:52:25 +01:00
|
|
|
fn try_recv(&self) -> Result<Option<TextChange>, Error> {
|
|
|
|
let cur = match self.seen.try_read() {
|
|
|
|
Err(e) => {
|
|
|
|
tracing::error!("try_recv invoked while being mutated: {}", e);
|
|
|
|
return Ok(None);
|
|
|
|
},
|
|
|
|
Ok(x) => x.clone(),
|
|
|
|
};
|
|
|
|
if *self.content.borrow() != cur {
|
|
|
|
match self.seen.try_write() {
|
|
|
|
Err(e) => {
|
|
|
|
tracing::error!("try_recv mutating while being mutated: {}", e);
|
|
|
|
return Ok(None);
|
|
|
|
},
|
|
|
|
Ok(mut w) => {
|
|
|
|
*w = self.content.borrow().clone();
|
|
|
|
// TODO it's not the whole buffer that changed
|
|
|
|
return Ok(Some(TextChange {
|
|
|
|
span: 0..cur.len(),
|
|
|
|
content: self.content.borrow().clone(),
|
|
|
|
after: "".to_string(),
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return Ok(None);
|
2023-08-21 02:35:56 +02:00
|
|
|
}
|
|
|
|
|
2023-11-16 06:52:25 +01:00
|
|
|
async fn recv(&self) -> Result<TextChange, Error> {
|
|
|
|
self.poll().await?;
|
|
|
|
match self.try_recv()? {
|
|
|
|
Some(x) => Ok(x),
|
|
|
|
None => Err(crate::Error::Filler { message: "wtfff".into() }),
|
|
|
|
}
|
2023-07-09 03:44:27 +02:00
|
|
|
}
|
|
|
|
|
2023-08-20 00:46:55 +02:00
|
|
|
/// enqueue an opseq for processing
|
2023-11-09 05:21:24 +01:00
|
|
|
fn send(&self, op: TextChange) -> Result<(), Error> {
|
2023-09-10 03:01:37 +02:00
|
|
|
Ok(self.operations.send(op)?)
|
2023-07-09 03:44:27 +02:00
|
|
|
}
|
2023-08-11 15:33:40 +02:00
|
|
|
}
|