feat: include references to buffer before and after

This commit is contained in:
əlemi 2023-09-04 03:09:32 +02:00
parent adf6009472
commit 690a1915c9
3 changed files with 47 additions and 34 deletions

View file

@ -9,7 +9,7 @@ use tonic::async_trait;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
use crate::{api::Controller, Error}; use crate::{api::Controller, Error};
use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; use crate::buffer::factory::OperationFactory;
use super::TextChange; use super::TextChange;
@ -34,7 +34,7 @@ pub struct BufferController {
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>, operations: mpsc::UnboundedSender<OperationSeq>,
last_op: Mutex<watch::Receiver<String>>, last_op: Mutex<watch::Receiver<String>>,
stream: Mutex<broadcast::Receiver<OperationSeq>>, stream: Mutex<broadcast::Receiver<TextChange>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
} }
@ -42,7 +42,7 @@ impl BufferController {
pub(crate) fn new( pub(crate) fn new(
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>, operations: mpsc::UnboundedSender<OperationSeq>,
stream: Mutex<broadcast::Receiver<OperationSeq>>, stream: Mutex<broadcast::Receiver<TextChange>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
BufferController { BufferController {
@ -50,16 +50,6 @@ impl BufferController {
content, operations, stream, stop, content, operations, stream, stop,
} }
} }
fn op_to_change(&self, op: OperationSeq) -> TextChange {
let after = self.content.borrow().clone();
let skip = leading_noop(op.ops()) as usize;
let before_len = op.base_len();
let tail = tailing_noop(op.ops()) as usize;
let span = skip..before_len-tail;
let content = after[skip..after.len()-tail].to_string();
TextChange { span, content }
}
} }
impl Drop for BufferController { impl Drop for BufferController {
@ -85,7 +75,7 @@ impl Controller<TextChange> for BufferController {
fn try_recv(&self) -> Result<Option<TextChange>, Error> { fn try_recv(&self) -> Result<Option<TextChange>, Error> {
match self.stream.blocking_lock().try_recv() { match self.stream.blocking_lock().try_recv() {
Ok(op) => Ok(Some(self.op_to_change(op))), Ok(op) => Ok(Some(op)),
Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), Err(TryRecvError::Closed) => Err(Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => { Err(TryRecvError::Lagged(n)) => {
@ -98,7 +88,7 @@ impl Controller<TextChange> for BufferController {
/// receive an operation seq and transform it into a TextChange from buffer content /// receive an operation seq and transform it into a TextChange from buffer content
async fn recv(&self) -> Result<TextChange, Error> { async fn recv(&self) -> Result<TextChange, Error> {
let op = self.stream.lock().await.recv().await?; let op = self.stream.lock().await.recv().await?;
Ok(self.op_to_change(op)) Ok(op)
} }
/// enqueue an opseq for processing /// enqueue an opseq for processing

View file

@ -6,7 +6,7 @@
//! this module contains buffer-related operations and helpers to create Operation Sequences //! this module contains buffer-related operations and helpers to create Operation Sequences
//! (the underlying chunks of changes sent over the wire) //! (the underlying chunks of changes sent over the wire)
use std::ops::Range; use std::{ops::Range, sync::Arc};
pub(crate) mod worker; pub(crate) mod worker;
@ -23,10 +23,15 @@ pub use controller::BufferController as Controller;
/// an editor-friendly representation of a text change in a buffer /// an editor-friendly representation of a text change in a buffer
/// ///
/// TODO move in proto /// TODO move in proto
#[derive(Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct TextChange { pub struct TextChange {
/// range of text change, as byte indexes in buffer /// range of text change, as byte indexes in buffer
pub span: Range<usize>, pub span: Range<usize>,
/// content of text change, as string /// content of text change, as string
pub content: String, pub content: String,
/// reference to previous content of buffer
pub before: Arc<String>,
/// reference to current content of buffer
pub after: Arc<String>,
}
} }

View file

@ -1,24 +1,25 @@
use std::{sync::Arc, collections::VecDeque}; use std::{sync::Arc, collections::VecDeque};
use operational_transform::OperationSeq; use operational_transform::{OperationSeq, OTError};
use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tonic::transport::Channel; use tonic::transport::Channel;
use tonic::{async_trait, Streaming}; use tonic::{async_trait, Streaming};
use crate::errors::IgnorableError; use crate::errors::{IgnorableError, IgnorableDefaultableError};
use crate::proto::{OperationRequest, RawOp}; use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient; use crate::proto::buffer_client::BufferClient;
use crate::api::ControllerWorker; use crate::api::ControllerWorker;
use super::TextChange; use super::TextChange;
use super::controller::BufferController; use super::controller::BufferController;
use super::factory::{leading_noop, tailing_noop};
pub(crate) struct BufferControllerWorker { pub(crate) struct BufferControllerWorker {
uid: String, uid: String,
pub(crate) content: watch::Sender<String>, pub(crate) content: watch::Sender<String>,
pub(crate) operations: mpsc::UnboundedReceiver<OperationSeq>, pub(crate) operations: mpsc::UnboundedReceiver<OperationSeq>,
pub(crate) stream: Arc<broadcast::Sender<OperationSeq>>, pub(crate) stream: Arc<broadcast::Sender<TextChange>>,
pub(crate) queue: VecDeque<OperationSeq>, pub(crate) queue: VecDeque<OperationSeq>,
receiver: watch::Receiver<String>, receiver: watch::Receiver<String>,
sender: mpsc::UnboundedSender<OperationSeq>, sender: mpsc::UnboundedSender<OperationSeq>,
@ -48,6 +49,21 @@ impl BufferControllerWorker {
stop_control: end_tx, stop_control: end_tx,
} }
} }
fn update(&mut self, op: OperationSeq) -> Result<TextChange, OTError> {
let before = Arc::new(self.buffer.clone());
let res = op.apply(&before)?;
self.content.send(res.clone())
.unwrap_or_warn("error showing updated buffer");
let after = Arc::new(res.clone());
self.buffer = res;
let skip = leading_noop(op.ops()) as usize;
let before_len = op.base_len();
let tail = tailing_noop(op.ops()) as usize;
let span = skip..before_len-tail;
let content = after[skip..after.len()-tail].to_string();
Ok(TextChange { span, content, before, after })
}
} }
#[async_trait] #[async_trait]
@ -67,7 +83,8 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
loop { loop {
let op = tokio::select! { tokio::select! {
Some(operation) = recv_opseq(&mut rx) => { Some(operation) = recv_opseq(&mut rx) => {
let mut out = operation; let mut out = operation;
for op in self.queue.iter_mut() { for op in self.queue.iter_mut() {
@ -79,27 +96,28 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
}, },
} }
} }
self.stream.send(out.clone()).unwrap_or_warn("could not send operation to server"); let change = self.update(out)
out .unwrap_or_warn_default("coult not update with (transformed) remote operation");
self.stream.send(change)
.unwrap_or_warn("could not send operation to server");
}, },
Some(op) = self.operations.recv() => { Some(op) = self.operations.recv() => {
self.queue.push_back(op.clone()); self.queue.push_back(op.clone());
op self.update(op)
.unwrap_or_warn("could not apply enqueued operation to current buffer");
while let Some(op) = self.queue.get(0) {
if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break }
self.queue.pop_front();
}
}, },
Some(()) = self.stop.recv() => { Some(()) = self.stop.recv() => {
break; break;
} }
else => break
};
self.buffer = op.apply(&self.buffer).unwrap_or_else(|e| {
tracing::error!("could not update buffer string: {}", e);
self.buffer
});
self.content.send(self.buffer.clone()).unwrap_or_warn("error showing updated buffer");
while let Some(op) = self.queue.get(0) { else => break
if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break }
self.queue.pop_front();
} }
} }
} }