From 243f23a04bb8910588d7430af0b6913fe189e658 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 19 Apr 2023 20:13:36 +0200 Subject: [PATCH] feat: propagate range of buffer affected by change added some const functions to calculate leading and tailing noops, made apply() and process() return Range. actors implementation is still awful but will do them properly in the future --- src/client/nvim/main.rs | 3 ++- src/lib/client.rs | 8 +------- src/lib/operation/mod.rs | 22 ++++++++++++++++++++++ src/lib/operation/processor.rs | 33 +++++++++++++++++++-------------- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 61aa3c2..4760693 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -147,7 +147,8 @@ impl Handler for NeovimHandler { Ok(()) => { tokio::spawn(async move { loop { - _controller.wait().await; + let _span = _controller.wait().await; + // TODO only change lines affected! let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); if let Err(e) = buffer.set_lines(0, -1, false, lines).await { error!("could not update buffer: {}", e); diff --git a/src/lib/client.rs b/src/lib/client.rs index f16ce15..50d407d 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -24,10 +24,6 @@ impl From::> for CodempClient { } impl CodempClient { - pub fn new(id: String, client: BufferClient) -> Self { - CodempClient { id, client } - } - pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { path, content, @@ -90,9 +86,7 @@ impl CodempClient { Err(e) => break error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_txt) => { - // send event containing where the change happened - } + Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here } }, } diff --git a/src/lib/operation/mod.rs b/src/lib/operation/mod.rs index 8796b59..39f22da 100644 --- a/src/lib/operation/mod.rs +++ b/src/lib/operation/mod.rs @@ -1,5 +1,27 @@ pub mod factory; pub mod processor; +use std::ops::Range; + +use operational_transform::{Operation, OperationSeq}; pub use processor::{OperationController, OperationProcessor}; pub use factory::OperationFactory; + +pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } +pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } + +const fn count_noop(op: Option<&Operation>) -> u64 { + match op { + None => 0, + Some(op) => match op { + Operation::Retain(n) => *n, + _ => 0, + } + } +} + +pub fn op_effective_range(op: &OperationSeq) -> Range { + let first = leading_noop(op.ops()); + let last = op.base_len() as u64 - tailing_noop(op.ops()); + first..last +} diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index 9786341..2832dfe 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -1,4 +1,4 @@ -use std::{sync::Mutex, collections::VecDeque}; +use std::{sync::Mutex, collections::VecDeque, ops::Range}; use operational_transform::{OperationSeq, OTError}; use tokio::sync::watch; @@ -6,15 +6,17 @@ use tracing::warn; use crate::operation::factory::OperationFactory; +use super::op_effective_range; + #[tonic::async_trait] pub trait OperationProcessor : OperationFactory { - async fn apply(&self, op: OperationSeq) -> Result; - async fn process(&self, op: OperationSeq) -> Result; + async fn apply(&self, op: OperationSeq) -> Result, OTError>; + async fn process(&self, op: OperationSeq) -> Result, OTError>; async fn poll(&self) -> Option; async fn ack(&self) -> Option; - async fn wait(&self); + async fn wait(&self) -> Range; } @@ -23,14 +25,14 @@ pub struct OperationController { queue: Mutex>, last: Mutex>, notifier: watch::Sender, - changed: Mutex>, - changed_notifier: watch::Sender<()>, + changed: Mutex>>, + changed_notifier: watch::Sender>, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); - let (done, wait) = watch::channel(()); + let (done, wait) = watch::channel(0..0); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), @@ -60,32 +62,35 @@ fn ignore_and_log(x: Result, msg: &str) { #[tonic::async_trait] impl OperationProcessor for OperationController { - async fn apply(&self, op: OperationSeq) -> Result { + async fn apply(&self, op: OperationSeq) -> Result, OTError> { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); self.queue.lock().unwrap().push_back(op.clone()); - ignore_and_log(self.notifier.send(op), "notifying of applied change"); - Ok(res) + ignore_and_log(self.notifier.send(op.clone()), "notifying of applied change"); + Ok(op_effective_range(&op)) } - async fn wait(&self) { + async fn wait(&self) -> Range { let mut blocker = self.changed.lock().unwrap().clone(); // TODO less jank way ignore_and_log(blocker.changed().await, "waiting for changed content #1"); ignore_and_log(blocker.changed().await, "waiting for changed content #2"); + let span = blocker.borrow().clone(); + span } - async fn process(&self, mut op: OperationSeq) -> Result { + async fn process(&self, mut op: OperationSeq) -> Result, OTError> { let mut queue = self.queue.lock().unwrap(); for el in queue.iter_mut() { (op, *el) = op.transform(el)?; } let txt = self.content(); let res = op.apply(&txt)?; + let span = op_effective_range(&op); *self.text.lock().unwrap() = res.clone(); - ignore_and_log(self.changed_notifier.send(()), "notifying of changed content"); - Ok(res) + ignore_and_log(self.changed_notifier.send(span.clone()), "notifying of changed content"); + Ok(span) } async fn poll(&self) -> Option {