From 14145ec369ed01dd5f70afde16abd9982c915e83 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 7 Sep 2023 20:39:45 +0200 Subject: [PATCH] fix: kind of resolved local race condition... ... but it became so much spaghetti that I'm not eating any more pasta for a while seriously tho must be resolved, maybe moving this logic completely serverside? --- src/buffer/mod.rs | 31 +-------------------- src/buffer/worker.rs | 66 +++++++++++++++++++++++++------------------- 2 files changed, 39 insertions(+), 58 deletions(-) diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index dc799d4..c8c3fa5 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -6,7 +6,7 @@ //! this module contains buffer-related operations and helpers to create Operation Sequences //! (the underlying chunks of changes sent over the wire) -use std::{ops::Range, sync::Arc}; +use std::ops::Range; pub(crate) mod worker; @@ -19,8 +19,6 @@ pub mod factory; pub use factory::OperationFactory; pub use controller::BufferController as Controller; -use crate::proto::RowCol; - /// an editor-friendly representation of a text change in a buffer /// @@ -31,31 +29,4 @@ pub struct TextChange { pub span: Range, /// content of text change, as string pub content: String, - /// reference to previous content of buffer - pub before: Arc, - /// reference to current content of buffer - pub after: Arc, -} - -impl TextChange { - /// convert from byte index to row and column. - /// if `end` is true, span end will be used, otherwise span start - /// if `after` is true, buffer after change will be used, otherwise buffer before change - fn index_to_rowcol(&self, end: bool, after: bool) -> RowCol { - let txt = if after { &self.after } else { &self.before }; - let index = if end { self.span.end } else { self.span.start }; - let row = txt[..index].matches('\n').count() as i32; - let col = txt[..index].split('\n').last().unwrap_or("").len() as i32; - RowCol { row, col } - } - - /// retrn row and column of text change start - pub fn start(&self) -> RowCol { - self.index_to_rowcol(false, false) - } - - /// return row and column of text change end - pub fn end(&self) -> RowCol { - self.index_to_rowcol(true, false) - } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index ab36c12..537cc21 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,12 +1,12 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::{sync::Arc, collections::VecDeque}; -use operational_transform::{OperationSeq, OTError}; +use operational_transform::OperationSeq; use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::transport::Channel; use tonic::{async_trait, Streaming}; -use crate::errors::{IgnorableError, IgnorableDefaultableError}; +use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::api::ControllerWorker; @@ -50,21 +50,6 @@ impl BufferControllerWorker { operation_tick: Arc::new(AtomicU64::new(0)), } } - - fn update(&mut self, op: &OperationSeq) -> Result { - let before = Arc::new(self.buffer.clone()); - let res = op.apply(&before)?; - self.content.send(res.clone()) - .unwrap_or_warn("error showing updated buffer"); - let after = Arc::new(res.clone()); - self.buffer = res; - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Ok(TextChange { span, content, before, after }) - } } #[async_trait] @@ -87,6 +72,7 @@ impl ControllerWorker for BufferControllerWorker { let mut clientside : VecDeque = VecDeque::new(); let mut serverside : VecDeque = VecDeque::new(); let mut last_seen_tick = 0; + loop { // block until one of these is ready @@ -109,7 +95,6 @@ impl ControllerWorker for BufferControllerWorker { match res { None => return tracing::warn!("client closed operation stream"), Some(op) => { - let _ = self.update(&op); clientside.push_back(op.clone()); last_seen_tick = self.operation_tick.load(Ordering::Acquire); } @@ -130,22 +115,40 @@ impl ControllerWorker for BufferControllerWorker { // our ops with server's ops but server won't transform its ops with ours. We must transform // ALL enqueued client ops: if a new one arrived before we could transform and update, we // should discard our progress and poll again. - while let Some(mut operation) = serverside.get(0).cloned() { + while let Some(operation) = serverside.get(0).cloned() { + let mut transformed_op = operation.clone(); let mut queued_ops = clientside.clone(); + let mut txt_before = self.buffer.clone(); for op in queued_ops.iter_mut() { - (*op, operation) = match op.transform(&operation) { + txt_before = match op.apply(&txt_before) { + Ok(x) => x, + Err(_) => { tracing::error!("could not apply outgoing enqueued opseq to current buffer?"); break; }, + }; + (*op, transformed_op) = match op.transform(&transformed_op) { + Err(e) => { tracing::warn!("could not transform enqueued operation: {}", e); break; }, Ok((x, y)) => (x, y), - Err(e) => { - tracing::warn!("could not transform enqueued operation: {}", e); - break - }, - } + }; } + + let skip = leading_noop(transformed_op.ops()) as usize; + let tail = tailing_noop(transformed_op.ops()) as usize; + let span = skip..(transformed_op.base_len() - tail); + let after = transformed_op.apply(&txt_before).expect("could not apply transformed op"); + let change = TextChange { span, content: after[skip..after.len()-tail].to_string() }; + let tick = self.operation_tick.load(std::sync::atomic::Ordering::Acquire); - if tick != last_seen_tick { break } // there are more ops to see first + if tick != last_seen_tick { + tracing::warn!("skipping downstream because there are ops"); + break + } // there are more ops to see first clientside = queued_ops; - let change = self.update(&operation) - .unwrap_or_warn_default("coult not update with (transformed) remote operation"); + self.buffer = match operation.apply(&self.buffer) { + Ok(x) => x, + Err(_) => { tracing::error!("wtf received op could not be applied?"); break; }, + }; + if clientside.is_empty() { + self.content.send(self.buffer.clone()).expect("could not broadcast new buffer content"); + } self.stream.send(change) .unwrap_or_warn("could not send operation to server"); serverside.pop_front(); @@ -172,8 +175,15 @@ impl ControllerWorker for BufferControllerWorker { tracing::warn!("server rejected operation: {}", e); break; } + self.buffer = match op.apply(&self.buffer) { + Ok(x) => x, + Err(_) => { tracing::error!("wtf accepted remote op could not be applied to our buffer????"); break; }, + }; + self.content.send(self.buffer.clone()).expect("could not broadcast buffer update"); clientside.pop_front(); } + } else { + tracing::warn!("skipping upstream because there are ops"); } } }