diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 61aa3c2..4760693 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -147,7 +147,8 @@ impl Handler for NeovimHandler { Ok(()) => { tokio::spawn(async move { loop { - _controller.wait().await; + let _span = _controller.wait().await; + // TODO only change lines affected! let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); if let Err(e) = buffer.set_lines(0, -1, false, lines).await { error!("could not update buffer: {}", e); diff --git a/src/lib/client.rs b/src/lib/client.rs index f16ce15..50d407d 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -24,10 +24,6 @@ impl From::> for CodempClient { } impl CodempClient { - pub fn new(id: String, client: BufferClient) -> Self { - CodempClient { id, client } - } - pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { path, content, @@ -90,9 +86,7 @@ impl CodempClient { Err(e) => break error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_txt) => { - // send event containing where the change happened - } + Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here } }, } diff --git a/src/lib/operation/mod.rs b/src/lib/operation/mod.rs index 8796b59..39f22da 100644 --- a/src/lib/operation/mod.rs +++ b/src/lib/operation/mod.rs @@ -1,5 +1,27 @@ pub mod factory; pub mod processor; +use std::ops::Range; + +use operational_transform::{Operation, OperationSeq}; pub use processor::{OperationController, OperationProcessor}; pub use factory::OperationFactory; + +pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } +pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } + +const fn count_noop(op: Option<&Operation>) -> u64 { + match op { + None => 0, + Some(op) => match op { + Operation::Retain(n) => *n, + _ => 0, + } + } +} + +pub fn op_effective_range(op: &OperationSeq) -> Range { + let first = leading_noop(op.ops()); + let last = op.base_len() as u64 - tailing_noop(op.ops()); + first..last +} diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index 9786341..2832dfe 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -1,4 +1,4 @@ -use std::{sync::Mutex, collections::VecDeque}; +use std::{sync::Mutex, collections::VecDeque, ops::Range}; use operational_transform::{OperationSeq, OTError}; use tokio::sync::watch; @@ -6,15 +6,17 @@ use tracing::warn; use crate::operation::factory::OperationFactory; +use super::op_effective_range; + #[tonic::async_trait] pub trait OperationProcessor : OperationFactory { - async fn apply(&self, op: OperationSeq) -> Result; - async fn process(&self, op: OperationSeq) -> Result; + async fn apply(&self, op: OperationSeq) -> Result, OTError>; + async fn process(&self, op: OperationSeq) -> Result, OTError>; async fn poll(&self) -> Option; async fn ack(&self) -> Option; - async fn wait(&self); + async fn wait(&self) -> Range; } @@ -23,14 +25,14 @@ pub struct OperationController { queue: Mutex>, last: Mutex>, notifier: watch::Sender, - changed: Mutex>, - changed_notifier: watch::Sender<()>, + changed: Mutex>>, + changed_notifier: watch::Sender>, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); - let (done, wait) = watch::channel(()); + let (done, wait) = watch::channel(0..0); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), @@ -60,32 +62,35 @@ fn ignore_and_log(x: Result, msg: &str) { #[tonic::async_trait] impl OperationProcessor for OperationController { - async fn apply(&self, op: OperationSeq) -> Result { + async fn apply(&self, op: OperationSeq) -> Result, OTError> { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); self.queue.lock().unwrap().push_back(op.clone()); - ignore_and_log(self.notifier.send(op), "notifying of applied change"); - Ok(res) + ignore_and_log(self.notifier.send(op.clone()), "notifying of applied change"); + Ok(op_effective_range(&op)) } - async fn wait(&self) { + async fn wait(&self) -> Range { let mut blocker = self.changed.lock().unwrap().clone(); // TODO less jank way ignore_and_log(blocker.changed().await, "waiting for changed content #1"); ignore_and_log(blocker.changed().await, "waiting for changed content #2"); + let span = blocker.borrow().clone(); + span } - async fn process(&self, mut op: OperationSeq) -> Result { + async fn process(&self, mut op: OperationSeq) -> Result, OTError> { let mut queue = self.queue.lock().unwrap(); for el in queue.iter_mut() { (op, *el) = op.transform(el)?; } let txt = self.content(); let res = op.apply(&txt)?; + let span = op_effective_range(&op); *self.text.lock().unwrap() = res.clone(); - ignore_and_log(self.changed_notifier.send(()), "notifying of changed content"); - Ok(res) + ignore_and_log(self.changed_notifier.send(span.clone()), "notifying of changed content"); + Ok(span) } async fn poll(&self) -> Option {