feat: reworked buffer controller

basically now calling recv assumes we have locked the editor state, and
no more operations will be enqueued. this allows to safely transform and
send server operations. the way local ops are transformed and sent is
still kinda buggy but it mostly works? "dead"locks sometimes until more
stuff arrives. also buffercontroller no longer implements operation
factory, you gotta make a factory yourself
This commit is contained in:
əlemi 2023-09-10 03:01:37 +02:00
parent b2520898b5
commit 5277eceb01
3 changed files with 145 additions and 149 deletions

View file

@ -2,17 +2,13 @@
//!
//! a controller implementation for buffer actions
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use operational_transform::OperationSeq;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tokio::sync::{watch, mpsc, Mutex, oneshot};
use tonic::async_trait;
use crate::errors::IgnorableError;
use crate::{api::Controller, Error};
use crate::buffer::factory::OperationFactory;
use super::TextChange;
@ -36,26 +32,27 @@ use super::TextChange;
pub struct BufferController {
content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>,
last_op: Mutex<watch::Receiver<String>>,
stream: Mutex<broadcast::Receiver<TextChange>>,
last_op: Mutex<watch::Receiver<()>>,
stream: mpsc::UnboundedSender<oneshot::Sender<Option<TextChange>>>,
stop: mpsc::UnboundedSender<()>,
operation_tick: Arc<AtomicU64>,
}
impl BufferController {
pub(crate) fn new(
content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>,
stream: Mutex<broadcast::Receiver<TextChange>>,
stream: mpsc::UnboundedSender<oneshot::Sender<Option<TextChange>>>,
stop: mpsc::UnboundedSender<()>,
operation_tick: Arc<AtomicU64>,
last_op: Mutex<watch::Receiver<()>>,
) -> Self {
BufferController {
last_op: Mutex::new(content.clone()),
content, operations, stream, stop,
operation_tick,
last_op, content, operations, stream, stop,
}
}
pub fn content(&self) -> String {
self.content.borrow().clone()
}
}
impl Drop for BufferController {
@ -64,13 +61,6 @@ impl Drop for BufferController {
}
}
#[async_trait]
impl OperationFactory for BufferController {
fn content(&self) -> String {
self.content.borrow().clone()
}
}
#[async_trait]
impl Controller<TextChange> for BufferController {
type Input = OperationSeq;
@ -80,28 +70,25 @@ impl Controller<TextChange> for BufferController {
}
fn try_recv(&self) -> Result<Option<TextChange>, Error> {
match self.stream.blocking_lock().try_recv() {
Ok(op) => Ok(Some(op)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => {
tracing::warn!("buffer channel lagged, skipping {} events", n);
Ok(self.try_recv()?)
},
}
let (tx, rx) = oneshot::channel();
self.stream.send(tx)?;
rx.blocking_recv()
.map_err(|_| Error::Channel { send: false })
}
/// receive an operation seq and transform it into a TextChange from buffer content
async fn recv(&self) -> Result<TextChange, Error> {
let op = self.stream.lock().await.recv().await?;
Ok(op)
self.poll().await?;
let (tx, rx) = oneshot::channel();
self.stream.send(tx)?;
Ok(
rx.await
.map_err(|_| Error::Channel { send: false })?
.expect("empty channel after polling")
)
}
/// enqueue an opseq for processing
fn send(&self, op: OperationSeq) -> Result<(), Error> {
let tick = self.operation_tick.load(Ordering::Acquire);
self.operations.send(op)?;
self.operation_tick.store(tick + 1, Ordering::Release);
Ok(())
Ok(self.operations.send(op)?)
}
}

View file

@ -6,15 +6,11 @@
//! this module contains buffer-related operations and helpers to create Operation Sequences
//! (the underlying chunks of changes sent over the wire)
use std::ops::Range;
pub(crate) mod worker;
/// buffer controller implementation
pub mod controller;
pub(crate) mod worker;
pub use factory::OperationFactory;
pub use controller::BufferController as Controller;
@ -24,7 +20,10 @@ pub use controller::BufferController as Controller;
#[derive(Clone, Debug, Default)]
pub struct TextChange {
/// range of text change, as byte indexes in buffer
pub span: Range<usize>,
pub span: std::ops::Range<usize>,
/// content of text change, as string
pub content: String,
/// content after this text change
/// note that this field will probably be dropped, don't rely on it
pub after: String
}

View file

@ -1,12 +1,10 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::{sync::Arc, collections::VecDeque};
use std::collections::VecDeque;
use operational_transform::OperationSeq;
use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tokio::sync::{watch, mpsc, oneshot, Mutex};
use tonic::transport::Channel;
use tonic::{async_trait, Streaming};
use crate::errors::IgnorableError;
use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient;
use crate::api::controller::ControllerWorker;
@ -20,36 +18,54 @@ pub(crate) struct BufferControllerWorker {
uid: String,
content: watch::Sender<String>,
operations: mpsc::UnboundedReceiver<OperationSeq>,
stream: Arc<broadcast::Sender<TextChange>>,
stream: mpsc::UnboundedReceiver<oneshot::Sender<Option<TextChange>>>,
stream_requestor: mpsc::UnboundedSender<oneshot::Sender<Option<TextChange>>>,
receiver: watch::Receiver<String>,
sender: mpsc::UnboundedSender<OperationSeq>,
buffer: String,
path: String,
stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>,
operation_tick: Arc<AtomicU64>,
new_op_tx: watch::Sender<()>,
new_op_rx: watch::Receiver<()>,
}
impl BufferControllerWorker {
pub fn new(uid: String, buffer: &str, path: &str) -> Self {
let (txt_tx, txt_rx) = watch::channel(buffer.to_string());
let (op_tx, op_rx) = mpsc::unbounded_channel();
let (s_tx, _s_rx) = broadcast::channel(64);
let (s_tx, s_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel();
let (notx, norx) = watch::channel(());
BufferControllerWorker {
uid,
content: txt_tx,
operations: op_rx,
stream: Arc::new(s_tx),
stream: s_rx,
stream_requestor: s_tx,
receiver: txt_rx,
sender: op_tx,
buffer: buffer.to_string(),
path: path.to_string(),
stop: end_rx,
stop_control: end_tx,
operation_tick: Arc::new(AtomicU64::new(0)),
new_op_tx: notx,
new_op_rx: norx,
}
}
async fn send_op(&self, tx: &mut BufferClient<Channel>, outbound: &OperationSeq) -> crate::Result<()> {
let opseq = serde_json::to_string(outbound).expect("could not serialize opseq");
let req = OperationRequest {
path: self.path.clone(),
hash: format!("{:x}", md5::compute(&self.buffer)),
op: Some(RawOp {
opseq, user: self.uid.clone(),
}),
};
let _ = tx.edit(req).await?;
Ok(())
}
}
#[async_trait]
@ -62,128 +78,122 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
BufferController::new(
self.receiver.clone(),
self.sender.clone(),
Mutex::new(self.stream.subscribe()),
self.stream_requestor.clone(),
self.stop_control.clone(),
self.operation_tick.clone(),
Mutex::new(self.new_op_rx.clone()),
)
}
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
let mut clientside : VecDeque<OperationSeq> = VecDeque::new();
let mut serverside : VecDeque<OperationSeq> = VecDeque::new();
let mut last_seen_tick = 0;
loop {
// block until one of these is ready
tokio::select! {
// received a new message from server (or an error)
res = rx.message() => {
match res {
Err(e) => return tracing::error!("error receiving op from server: {}", e),
Ok(None) => return tracing::warn!("server closed operation stream"),
Ok(Some(msg)) => serverside.push_back(
serde_json::from_str(&msg.opseq)
.expect("could not deserialize server opseq")
),
}
},
// received a new operation from client (or channel got closed)
res = self.operations.recv() => {
match res {
None => return tracing::warn!("client closed operation stream"),
Some(op) => {
clientside.push_back(op.clone());
last_seen_tick = self.operation_tick.load(Ordering::Acquire);
}
}
},
biased;
// received a stop request (or channel got closed)
res = self.stop.recv() => {
tracing::info!("received stop signal");
match res {
None => return tracing::warn!("stop channel closed, stopping worker"),
Some(()) => return tracing::debug!("buffer worker stopping cleanly"),
}
}
}
// we must give priority to operations received from remote server, because we can transform
// our ops with server's ops but server won't transform its ops with ours. We must transform
// ALL enqueued client ops: if a new one arrived before we could transform and update, we
// should discard our progress and poll again.
while let Some(operation) = serverside.get(0).cloned() {
let mut transformed_op = operation.clone();
let mut queued_ops = clientside.clone();
let mut txt_before = self.buffer.clone();
for op in queued_ops.iter_mut() {
txt_before = match op.apply(&txt_before) {
Ok(x) => x,
Err(_) => { tracing::error!("could not apply outgoing enqueued opseq to current buffer?"); break; },
// received a new message from server (or an error)
res = rx.message() => {
tracing::info!("received msg from server");
let inbound : OperationSeq = match res {
Err(e) => return tracing::error!("error receiving op from server: {}", e),
Ok(None) => return tracing::warn!("server closed operation stream"),
Ok(Some(msg)) => serde_json::from_str(&msg.opseq)
.expect("could not deserialize server opseq"),
};
(*op, transformed_op) = match op.transform(&transformed_op) {
Err(e) => { tracing::warn!("could not transform enqueued operation: {}", e); break; },
Ok((x, y)) => (x, y),
};
}
let skip = leading_noop(transformed_op.ops()) as usize;
let tail = tailing_noop(transformed_op.ops()) as usize;
let span = skip..(transformed_op.base_len() - tail);
let after = transformed_op.apply(&txt_before).expect("could not apply transformed op");
let change = TextChange { span, content: after[skip..after.len()-tail].to_string() };
let tick = self.operation_tick.load(std::sync::atomic::Ordering::Acquire);
if tick != last_seen_tick {
tracing::warn!("skipping downstream because there are ops");
break
} // there are more ops to see first
clientside = queued_ops;
self.buffer = match operation.apply(&self.buffer) {
Ok(x) => x,
Err(_) => { tracing::error!("wtf received op could not be applied?"); break; },
};
if clientside.is_empty() {
self.content.send(self.buffer.clone()).expect("could not broadcast new buffer content");
}
self.stream.send(change)
.unwrap_or_warn("could not send operation to server");
serverside.pop_front();
}
// if there are still serverside operations to be applied, we can't dispatch our local ones
// yet because we need them to transform the ones sent by the server before applying them on
// our local buffer. We may get here if a new local operation arrived before we could process
// and transform all received server ops. since the buffer is different, it isn't safe to
// apply them and we must transform them again. If the loop above left its queue not empty,
// we should be guaranteed to unblock immediately in the select above because we have a new
// client operation waiting for us to be enqueued
if serverside.is_empty() {
while let Some(op) = clientside.get(0) {
let opseq = serde_json::to_string(&op).expect("could not serialize opseq");
let req = OperationRequest {
path: self.path.clone(),
hash: format!("{:x}", md5::compute(&self.buffer)),
op: Some(RawOp {
opseq, user: self.uid.clone(),
}),
};
if let Err(e) = tx.edit(req).await {
tracing::warn!("server rejected operation: {}", e);
break;
self.buffer = inbound.apply(&self.buffer).expect("could not apply remote opseq???");
serverside.push_back(inbound);
while let Some(mut outbound) = clientside.get(0).cloned() {
let mut serverside_tmp = serverside.clone();
for server_op in serverside_tmp.iter_mut() {
tracing::info!("transforming {:?} <-> {:?}", outbound, server_op);
(outbound, *server_op) = outbound.transform(server_op)
.expect("could not transform enqueued out with just received");
}
match self.send_op(&mut tx, &outbound).await {
Err(e) => { tracing::warn!("could not send op even after transforming: {}", e); break; },
Ok(()) => {
tracing::info!("back in sync");
serverside = serverside_tmp;
self.buffer = outbound.apply(&self.buffer).expect("could not apply op after synching back");
clientside.pop_front();
},
}
}
self.buffer = match op.apply(&self.buffer) {
Ok(x) => x,
Err(_) => { tracing::error!("wtf accepted remote op could not be applied to our buffer????"); break; },
};
self.content.send(self.buffer.clone()).expect("could not broadcast buffer update");
clientside.pop_front();
}
} else {
tracing::warn!("skipping upstream because there are ops");
self.new_op_tx.send(()).expect("could not activate client after new server event");
},
// received a new operation from client (or channel got closed)
res = self.operations.recv() => {
tracing::info!("received op from client");
match res {
None => return tracing::warn!("client closed operation stream"),
Some(op) => {
if clientside.is_empty() {
match self.send_op(&mut tx, &op).await {
Ok(()) => {
self.buffer = op.apply(&self.buffer).expect("could not apply op");
self.content.send(self.buffer.clone()).expect("could not update buffer view");
},
Err(e) => {
tracing::warn!("server rejected op: {}", e);
clientside.push_back(op);
},
}
} else { // I GET STUCK IN THIS BRANCH AND NOTHING HAPPENS AAAAAAAAAA
clientside.push_back(op);
}
}
}
},
// client requested a server operation, transform it and send it
res = self.stream.recv() => {
tracing::info!("received op REQUEST from client");
match res {
None => return tracing::error!("client closed requestor stream"),
Some(tx) => tx.send(match serverside.pop_front() {
None => {
tracing::warn!("requested change but none is available");
None
},
Some(mut operation) => {
let mut after = self.buffer.clone();
for op in clientside.iter_mut() {
(*op, operation) = match op.transform(&operation) {
Err(e) => return tracing::warn!("could not transform enqueued operation: {}", e),
Ok((x, y)) => (x, y),
};
after = match op.apply(&after) {
Err(_) => return tracing::error!("could not apply outgoing enqueued opseq to current buffer?"),
Ok(x) => x,
};
}
let skip = leading_noop(operation.ops()) as usize;
let tail = tailing_noop(operation.ops()) as usize;
let span = skip..(operation.base_len() - tail);
let content = if after.len() - tail < skip { "".into() } else { after[skip..after.len()-tail].to_string() };
let change = TextChange { span, content, after };
Some(change)
},
}).expect("client did not wait????"),
}
},
}
}
}