alemi ca42874590 feat: refactored and improved operation controller
now polling for changes returns span and text so that it's possible to
edit just the changed region. greatly improved controller internal api
with crate-level traits keeping error handling localized
2023-07-09 03:44:27 +02:00

139 lines
3.8 KiB

use std::{sync::Arc, collections::VecDeque, ops::Range};
use operational_transform::OperationSeq;
use tokio::sync::{watch, mpsc, broadcast};
use tonic::async_trait;
use super::{leading_noop, tailing_noop, ControllerWorker};
use crate::errors::IgnorableError;
use crate::factory::OperationFactory;
pub struct TextChange {
pub span: Range<usize>,
pub content: String,
pub trait OperationControllerSubscriber {
async fn poll(&mut self) -> Option<TextChange>;
async fn apply(&self, op: OperationSeq);
pub struct OperationControllerHandle {
content: watch::Receiver<String>,
operations: mpsc::Sender<OperationSeq>,
original: Arc<broadcast::Sender<OperationSeq>>,
stream: broadcast::Receiver<OperationSeq>,
impl Clone for OperationControllerHandle {
fn clone(&self) -> Self {
OperationControllerHandle {
content: self.content.clone(),
operations: self.operations.clone(),
original: self.original.clone(),
stream: self.original.subscribe(),
impl OperationFactory for OperationControllerHandle {
fn content(&self) -> String {
impl OperationControllerSubscriber for OperationControllerHandle {
async fn poll(&mut self) -> Option<TextChange> {
let op =;
let after = self.content.borrow().clone();
let skip = leading_noop(op.ops()) as usize;
let before_len = op.base_len();
let tail = tailing_noop(op.ops()) as usize;
let span = skip..before_len-tail;
let content = after[skip..after.len()-tail].to_string();
Some(TextChange { span, content })
async fn apply(&self, op: OperationSeq) {
.unwrap_or_warn("could not apply+send operation")
pub(crate) trait OperationControllerEditor {
async fn edit(&mut self, path: String, op: OperationSeq) -> bool;
async fn recv(&mut self) -> Option<OperationSeq>;
pub(crate) struct OperationControllerWorker<C : OperationControllerEditor> {
pub(crate) content: watch::Sender<String>,
pub(crate) operations: mpsc::Receiver<OperationSeq>,
pub(crate) stream: Arc<broadcast::Sender<OperationSeq>>,
pub(crate) queue: VecDeque<OperationSeq>,
receiver: watch::Receiver<String>,
sender: mpsc::Sender<OperationSeq>,
client: C,
buffer: String,
path: String,
impl<C : OperationControllerEditor + Send> ControllerWorker<OperationControllerHandle> for OperationControllerWorker<C> {
fn subscribe(&self) -> OperationControllerHandle {
OperationControllerHandle {
content: self.receiver.clone(),
operations: self.sender.clone(),
async fn work(mut self) {
loop {
let op = tokio::select! {
Some(operation) = self.client.recv() => {
let mut out = operation;
for op in self.queue.iter_mut() {
(*op, out) = op.transform(&out).unwrap();
Some(op) = self.operations.recv() => {
else => break
self.buffer = op.apply(&self.buffer).unwrap();
while let Some(op) = self.queue.get(0) {
if !self.client.edit(self.path.clone(), op.clone()).await { break }
impl<C : OperationControllerEditor> OperationControllerWorker<C> {
pub fn new(client: C, buffer: String, path: String) -> Self {
let (txt_tx, txt_rx) = watch::channel(buffer.clone());
let (op_tx, op_rx) = mpsc::channel(64);
let (s_tx, _s_rx) = broadcast::channel(64);
OperationControllerWorker {
content: txt_tx,
operations: op_rx,
stream: Arc::new(s_tx),
receiver: txt_rx,
sender: op_tx,
queue: VecDeque::new(),
client, buffer, path