feat: propagate range of buffer affected by change

added some const functions to calculate leading and tailing noops, made
apply() and process() return Range<u64>. actors implementation is still
awful but will do them properly in the future
This commit is contained in:
əlemi 2023-04-19 20:13:36 +02:00
parent 7f41127f8b
commit 243f23a04b
4 changed files with 44 additions and 22 deletions

View file

@ -147,7 +147,8 @@ impl Handler for NeovimHandler {
Ok(()) => { Ok(()) => {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
_controller.wait().await; let _span = _controller.wait().await;
// TODO only change lines affected!
let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect(); let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect();
if let Err(e) = buffer.set_lines(0, -1, false, lines).await { if let Err(e) = buffer.set_lines(0, -1, false, lines).await {
error!("could not update buffer: {}", e); error!("could not update buffer: {}", e);

View file

@ -24,10 +24,6 @@ impl From::<BufferClient<Channel>> for CodempClient {
} }
impl CodempClient { impl CodempClient {
pub fn new(id: String, client: BufferClient<Channel>) -> Self {
CodempClient { id, client }
}
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> { pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
let req = BufferPayload { let req = BufferPayload {
path, content, path, content,
@ -90,9 +86,7 @@ impl CodempClient {
Err(e) => break error!("error deserializing opseq: {}", e), Err(e) => break error!("error deserializing opseq: {}", e),
Ok(v) => match _factory.process(v).await { Ok(v) => match _factory.process(v).await {
Err(e) => break error!("could not apply operation from server: {}", e), Err(e) => break error!("could not apply operation from server: {}", e),
Ok(_txt) => { Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here
// send event containing where the change happened
}
} }
}, },
} }

View file

@ -1,5 +1,27 @@
pub mod factory; pub mod factory;
pub mod processor; pub mod processor;
use std::ops::Range;
use operational_transform::{Operation, OperationSeq};
pub use processor::{OperationController, OperationProcessor}; pub use processor::{OperationController, OperationProcessor};
pub use factory::OperationFactory; pub use factory::OperationFactory;
pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) }
pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) }
const fn count_noop(op: Option<&Operation>) -> u64 {
match op {
None => 0,
Some(op) => match op {
Operation::Retain(n) => *n,
_ => 0,
}
}
}
pub fn op_effective_range(op: &OperationSeq) -> Range<u64> {
let first = leading_noop(op.ops());
let last = op.base_len() as u64 - tailing_noop(op.ops());
first..last
}

View file

@ -1,4 +1,4 @@
use std::{sync::Mutex, collections::VecDeque}; use std::{sync::Mutex, collections::VecDeque, ops::Range};
use operational_transform::{OperationSeq, OTError}; use operational_transform::{OperationSeq, OTError};
use tokio::sync::watch; use tokio::sync::watch;
@ -6,15 +6,17 @@ use tracing::warn;
use crate::operation::factory::OperationFactory; use crate::operation::factory::OperationFactory;
use super::op_effective_range;
#[tonic::async_trait] #[tonic::async_trait]
pub trait OperationProcessor : OperationFactory { pub trait OperationProcessor : OperationFactory {
async fn apply(&self, op: OperationSeq) -> Result<String, OTError>; async fn apply(&self, op: OperationSeq) -> Result<Range<u64>, OTError>;
async fn process(&self, op: OperationSeq) -> Result<String, OTError>; async fn process(&self, op: OperationSeq) -> Result<Range<u64>, OTError>;
async fn poll(&self) -> Option<OperationSeq>; async fn poll(&self) -> Option<OperationSeq>;
async fn ack(&self) -> Option<OperationSeq>; async fn ack(&self) -> Option<OperationSeq>;
async fn wait(&self); async fn wait(&self) -> Range<u64>;
} }
@ -23,14 +25,14 @@ pub struct OperationController {
queue: Mutex<VecDeque<OperationSeq>>, queue: Mutex<VecDeque<OperationSeq>>,
last: Mutex<watch::Receiver<OperationSeq>>, last: Mutex<watch::Receiver<OperationSeq>>,
notifier: watch::Sender<OperationSeq>, notifier: watch::Sender<OperationSeq>,
changed: Mutex<watch::Receiver<()>>, changed: Mutex<watch::Receiver<Range<u64>>>,
changed_notifier: watch::Sender<()>, changed_notifier: watch::Sender<Range<u64>>,
} }
impl OperationController { impl OperationController {
pub fn new(content: String) -> Self { pub fn new(content: String) -> Self {
let (tx, rx) = watch::channel(OperationSeq::default()); let (tx, rx) = watch::channel(OperationSeq::default());
let (done, wait) = watch::channel(()); let (done, wait) = watch::channel(0..0);
OperationController { OperationController {
text: Mutex::new(content), text: Mutex::new(content),
queue: Mutex::new(VecDeque::new()), queue: Mutex::new(VecDeque::new()),
@ -60,32 +62,35 @@ fn ignore_and_log<T, E : std::fmt::Display>(x: Result<T, E>, msg: &str) {
#[tonic::async_trait] #[tonic::async_trait]
impl OperationProcessor for OperationController { impl OperationProcessor for OperationController {
async fn apply(&self, op: OperationSeq) -> Result<String, OTError> { async fn apply(&self, op: OperationSeq) -> Result<Range<u64>, OTError> {
let txt = self.content(); let txt = self.content();
let res = op.apply(&txt)?; let res = op.apply(&txt)?;
*self.text.lock().unwrap() = res.clone(); *self.text.lock().unwrap() = res.clone();
self.queue.lock().unwrap().push_back(op.clone()); self.queue.lock().unwrap().push_back(op.clone());
ignore_and_log(self.notifier.send(op), "notifying of applied change"); ignore_and_log(self.notifier.send(op.clone()), "notifying of applied change");
Ok(res) Ok(op_effective_range(&op))
} }
async fn wait(&self) { async fn wait(&self) -> Range<u64> {
let mut blocker = self.changed.lock().unwrap().clone(); let mut blocker = self.changed.lock().unwrap().clone();
// TODO less jank way // TODO less jank way
ignore_and_log(blocker.changed().await, "waiting for changed content #1"); ignore_and_log(blocker.changed().await, "waiting for changed content #1");
ignore_and_log(blocker.changed().await, "waiting for changed content #2"); ignore_and_log(blocker.changed().await, "waiting for changed content #2");
let span = blocker.borrow().clone();
span
} }
async fn process(&self, mut op: OperationSeq) -> Result<String, OTError> { async fn process(&self, mut op: OperationSeq) -> Result<Range<u64>, OTError> {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
for el in queue.iter_mut() { for el in queue.iter_mut() {
(op, *el) = op.transform(el)?; (op, *el) = op.transform(el)?;
} }
let txt = self.content(); let txt = self.content();
let res = op.apply(&txt)?; let res = op.apply(&txt)?;
let span = op_effective_range(&op);
*self.text.lock().unwrap() = res.clone(); *self.text.lock().unwrap() = res.clone();
ignore_and_log(self.changed_notifier.send(()), "notifying of changed content"); ignore_and_log(self.changed_notifier.send(span.clone()), "notifying of changed content");
Ok(res) Ok(span)
} }
async fn poll(&self) -> Option<OperationSeq> { async fn poll(&self) -> Option<OperationSeq> {