mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 15:24:48 +01:00
fix: attempt to solve client edits race condition
basically send increments a counter and we compare that before transforming operations, allowing to stop processing, undo and try to receive again. This won't solve all issues but should make them much more rare? I hope
This commit is contained in:
parent
ed2f6688c3
commit
eeececf1b1
3 changed files with 85 additions and 48 deletions
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "codemp"
|
name = "codemp"
|
||||||
version = "0.4.6"
|
version = "0.5.0-pre"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
|
|
|
@ -2,6 +2,9 @@
|
||||||
//!
|
//!
|
||||||
//! a controller implementation for buffer actions
|
//! a controller implementation for buffer actions
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
use operational_transform::OperationSeq;
|
use operational_transform::OperationSeq;
|
||||||
use tokio::sync::broadcast::error::TryRecvError;
|
use tokio::sync::broadcast::error::TryRecvError;
|
||||||
use tokio::sync::{watch, mpsc, broadcast, Mutex};
|
use tokio::sync::{watch, mpsc, broadcast, Mutex};
|
||||||
|
@ -36,6 +39,7 @@ pub struct BufferController {
|
||||||
last_op: Mutex<watch::Receiver<String>>,
|
last_op: Mutex<watch::Receiver<String>>,
|
||||||
stream: Mutex<broadcast::Receiver<TextChange>>,
|
stream: Mutex<broadcast::Receiver<TextChange>>,
|
||||||
stop: mpsc::UnboundedSender<()>,
|
stop: mpsc::UnboundedSender<()>,
|
||||||
|
operation_tick: Arc<AtomicU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferController {
|
impl BufferController {
|
||||||
|
@ -44,10 +48,12 @@ impl BufferController {
|
||||||
operations: mpsc::UnboundedSender<OperationSeq>,
|
operations: mpsc::UnboundedSender<OperationSeq>,
|
||||||
stream: Mutex<broadcast::Receiver<TextChange>>,
|
stream: Mutex<broadcast::Receiver<TextChange>>,
|
||||||
stop: mpsc::UnboundedSender<()>,
|
stop: mpsc::UnboundedSender<()>,
|
||||||
|
operation_tick: Arc<AtomicU64>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
BufferController {
|
BufferController {
|
||||||
last_op: Mutex::new(content.clone()),
|
last_op: Mutex::new(content.clone()),
|
||||||
content, operations, stream, stop,
|
content, operations, stream, stop,
|
||||||
|
operation_tick,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,6 +99,9 @@ impl Controller<TextChange> for BufferController {
|
||||||
|
|
||||||
/// enqueue an opseq for processing
|
/// enqueue an opseq for processing
|
||||||
fn send(&self, op: OperationSeq) -> Result<(), Error> {
|
fn send(&self, op: OperationSeq) -> Result<(), Error> {
|
||||||
Ok(self.operations.send(op)?)
|
let tick = self.operation_tick.load(Ordering::Acquire);
|
||||||
|
self.operations.send(op)?;
|
||||||
|
self.operation_tick.store(tick + 1, Ordering::Release);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::{sync::Arc, collections::VecDeque};
|
use std::{sync::Arc, collections::VecDeque};
|
||||||
|
|
||||||
use operational_transform::{OperationSeq, OTError};
|
use operational_transform::{OperationSeq, OTError};
|
||||||
|
@ -17,16 +18,16 @@ use super::factory::{leading_noop, tailing_noop};
|
||||||
|
|
||||||
pub(crate) struct BufferControllerWorker {
|
pub(crate) struct BufferControllerWorker {
|
||||||
uid: String,
|
uid: String,
|
||||||
pub(crate) content: watch::Sender<String>,
|
content: watch::Sender<String>,
|
||||||
pub(crate) operations: mpsc::UnboundedReceiver<OperationSeq>,
|
operations: mpsc::UnboundedReceiver<OperationSeq>,
|
||||||
pub(crate) stream: Arc<broadcast::Sender<TextChange>>,
|
stream: Arc<broadcast::Sender<TextChange>>,
|
||||||
pub(crate) queue: VecDeque<OperationSeq>,
|
|
||||||
receiver: watch::Receiver<String>,
|
receiver: watch::Receiver<String>,
|
||||||
sender: mpsc::UnboundedSender<OperationSeq>,
|
sender: mpsc::UnboundedSender<OperationSeq>,
|
||||||
buffer: String,
|
buffer: String,
|
||||||
path: String,
|
path: String,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
stop: mpsc::UnboundedReceiver<()>,
|
||||||
stop_control: mpsc::UnboundedSender<()>,
|
stop_control: mpsc::UnboundedSender<()>,
|
||||||
|
operation_tick: Arc<AtomicU64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferControllerWorker {
|
impl BufferControllerWorker {
|
||||||
|
@ -42,15 +43,15 @@ impl BufferControllerWorker {
|
||||||
stream: Arc::new(s_tx),
|
stream: Arc::new(s_tx),
|
||||||
receiver: txt_rx,
|
receiver: txt_rx,
|
||||||
sender: op_tx,
|
sender: op_tx,
|
||||||
queue: VecDeque::new(),
|
|
||||||
buffer: buffer.to_string(),
|
buffer: buffer.to_string(),
|
||||||
path: path.to_string(),
|
path: path.to_string(),
|
||||||
stop: end_rx,
|
stop: end_rx,
|
||||||
stop_control: end_tx,
|
stop_control: end_tx,
|
||||||
|
operation_tick: Arc::new(AtomicU64::new(0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update(&mut self, op: OperationSeq) -> Result<TextChange, OTError> {
|
fn update(&mut self, op: &OperationSeq) -> Result<TextChange, OTError> {
|
||||||
let before = Arc::new(self.buffer.clone());
|
let before = Arc::new(self.buffer.clone());
|
||||||
let res = op.apply(&before)?;
|
let res = op.apply(&before)?;
|
||||||
self.content.send(res.clone())
|
self.content.send(res.clone())
|
||||||
|
@ -78,46 +79,90 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
|
||||||
self.sender.clone(),
|
self.sender.clone(),
|
||||||
Mutex::new(self.stream.subscribe()),
|
Mutex::new(self.stream.subscribe()),
|
||||||
self.stop_control.clone(),
|
self.stop_control.clone(),
|
||||||
|
self.operation_tick.clone(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
|
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
|
||||||
|
let mut clientside : VecDeque<OperationSeq> = VecDeque::new();
|
||||||
|
let mut serverside : VecDeque<OperationSeq> = VecDeque::new();
|
||||||
|
let mut last_seen_tick = 0;
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
|
// block until one of these is ready
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
|
||||||
Some(operation) = recv_opseq(&mut rx) => {
|
// received a new message from server (or an error)
|
||||||
let mut out = operation;
|
res = rx.message() => {
|
||||||
for op in self.queue.iter_mut() {
|
match res {
|
||||||
(*op, out) = match op.transform(&out) {
|
Err(e) => return tracing::error!("error receiving op from server: {}", e),
|
||||||
Ok((x, y)) => (x, y),
|
Ok(None) => return tracing::warn!("server closed operation stream"),
|
||||||
Err(e) => {
|
Ok(Some(msg)) => match serde_json::from_str(&msg.opseq) {
|
||||||
tracing::warn!("could not transform enqueued operation: {}", e);
|
Err(e) => return tracing::error!("error deserializing op from server: {}", e),
|
||||||
break
|
Ok(op) => serverside.push_back(op),
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let change = self.update(out)
|
|
||||||
.unwrap_or_warn_default("coult not update with (transformed) remote operation");
|
|
||||||
self.stream.send(change)
|
|
||||||
.unwrap_or_warn("could not send operation to server");
|
|
||||||
},
|
},
|
||||||
|
|
||||||
Some(op) = self.operations.recv() => {
|
// received a new operation from client (or channel got closed)
|
||||||
self.queue.push_back(op.clone());
|
res = self.operations.recv() => {
|
||||||
self.update(op)
|
match res {
|
||||||
.unwrap_or_warn("could not apply enqueued operation to current buffer");
|
None => return tracing::warn!("client closed operation stream"),
|
||||||
while let Some(op) = self.queue.get(0) {
|
Some(op) => {
|
||||||
if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break }
|
clientside.push_back(op.clone());
|
||||||
self.queue.pop_front();
|
last_seen_tick = self.operation_tick.load(Ordering::Acquire);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
Some(()) = self.stop.recv() => {
|
// received a stop request (or channel got closed)
|
||||||
break;
|
res = self.stop.recv() => {
|
||||||
|
match res {
|
||||||
|
None => return tracing::warn!("stop channel closed, stopping worker"),
|
||||||
|
Some(()) => return tracing::debug!("buffer worker stopping cleanly"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
else => break
|
}
|
||||||
|
|
||||||
|
// we must give priority to operations received from remote server, because we can transform
|
||||||
|
// our ops with server's ops but server won't transform its ops with ours. We must transform
|
||||||
|
// ALL enqueued client ops: if a new one arrived before we could transform and update, we
|
||||||
|
// should discard our progress and poll again.
|
||||||
|
while let Some(mut operation) = serverside.get(0).cloned() {
|
||||||
|
let mut queued_ops = clientside.clone();
|
||||||
|
for op in queued_ops.iter_mut() {
|
||||||
|
(*op, operation) = match op.transform(&operation) {
|
||||||
|
Ok((x, y)) => (x, y),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("could not transform enqueued operation: {}", e);
|
||||||
|
break
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let tick = self.operation_tick.load(std::sync::atomic::Ordering::Acquire);
|
||||||
|
if tick != last_seen_tick { break } // there are more ops to see first
|
||||||
|
clientside = queued_ops;
|
||||||
|
let change = self.update(&operation)
|
||||||
|
.unwrap_or_warn_default("coult not update with (transformed) remote operation");
|
||||||
|
self.stream.send(change)
|
||||||
|
.unwrap_or_warn("could not send operation to server");
|
||||||
|
serverside.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there are still serverside operations to be applied, we can't dispatch our local ones
|
||||||
|
// yet because we need them to transform the ones sent by the server before applying them on
|
||||||
|
// our local buffer. We may get here if a new local operation arrived before we could process
|
||||||
|
// and transform all received server ops. since the buffer is different, it isn't safe to
|
||||||
|
// apply them and we must transform them again. If the loop above left its queue not empty,
|
||||||
|
// we should be guaranteed to unblock immediately in the select above because we have a new
|
||||||
|
// client operation waiting for us to be enqueued
|
||||||
|
if serverside.is_empty() {
|
||||||
|
while let Some(op) = clientside.get(0) {
|
||||||
|
self.update(op);
|
||||||
|
if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break }
|
||||||
|
clientside.pop_front();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -145,20 +190,3 @@ async fn send_opseq(tx: &mut BufferClient<Channel>, uid: String, path: String, o
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn recv_opseq(rx: &mut Streaming<RawOp>) -> Option<OperationSeq> {
|
|
||||||
match rx.message().await {
|
|
||||||
Ok(Some(op)) => match serde_json::from_str(&op.opseq) {
|
|
||||||
Ok(x) => Some(x),
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("could not deserialize opseq: {}", e);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Ok(None) => None,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("could not receive edit from server: {}", e);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue