feat: swap in our WOOT implementation

it's kinda crude and buggy but some things might just work?
This commit is contained in:
əlemi 2023-11-09 05:21:24 +01:00
parent c6abc33c53
commit 8686715e9d
7 changed files with 76 additions and 336 deletions

View file

@ -9,8 +9,8 @@ name = "codemp"
[dependencies] [dependencies]
# core # core
tracing = "0.1" tracing = "0.1"
# ot # woot
operational-transform = { version = "0.6", features = ["serde"], optional = true } codemp-woot = { path = "../woot", optional = true }
# proto # proto
tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true } tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true }
prost = { version = "0.11.8", optional = true } prost = { version = "0.11.8", optional = true }
@ -31,8 +31,8 @@ tonic-build = "0.9"
[features] [features]
default = ["client"] default = ["client"]
api = ["ot", "dep:similar", "dep:tokio", "dep:async-trait"] api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"]
ot = ["dep:operational-transform"] woot = ["dep:codemp-woot"]
proto = ["dep:prost", "dep:tonic"] proto = ["dep:prost", "dep:tonic"]
client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json"] client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json"]
global = ["client", "dep:lazy_static"] global = ["client", "dep:lazy_static"]

View file

@ -1,165 +0,0 @@
//! ### factory
//!
//! a helper trait that any string container can implement, which generates opseqs
//!
//! an OperationFactory trait implementation is provided for `String` and `Arc<str>`, but plugin developers
//! should implement their own operation factory interfacing directly with the editor
//! buffer when possible.
use std::ops::Range;
use operational_transform::{OperationSeq, Operation};
use similar::{TextDiff, ChangeTag};
/// calculate leading no-ops in given opseq
pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) }
/// calculate tailing no-ops in given opseq
pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) }
const fn count_noop(op: Option<&Operation>) -> u64 {
match op {
None => 0,
Some(Operation::Retain(n)) => *n,
Some(_) => 0,
}
}
/// return the range on which the operation seq is actually applying its changes
pub fn op_effective_range(op: &OperationSeq) -> Range<u64> {
let first = leading_noop(op.ops());
let last = op.base_len() as u64 - tailing_noop(op.ops());
first..last
}
/// a helper trait that any string container can implement, which generates opseqs
///
/// all operations are to be considered mutating current state, obtainable with
/// [OperationFactory::content]. generating an operation has no effect on internal state
///
/// ### examples
///
/// ```rust
/// use codemp::api::OperationFactory;
///
/// let mut factory = String::new();
/// let operation = factory.ins("asd", 0);
/// factory = operation.apply(&factory)?;
/// assert_eq!(factory, "asd");
/// # Ok::<(), codemp::ot::OTError>(())
/// ```
///
/// use [OperationFactory::ins] to add new characters at a specific index
///
/// ```rust
/// # use codemp::api::OperationFactory;
/// # let mut factory = String::from("asd");
/// factory = factory.ins(" dsa", 3).apply(&factory)?;
/// assert_eq!(factory, "asd dsa");
/// # Ok::<(), codemp::ot::OTError>(())
/// ```
///
/// use [OperationFactory::diff] to arbitrarily change text at any position
///
/// ```rust
/// # use codemp::api::OperationFactory;
/// # let mut factory = String::from("asd dsa");
/// factory = factory
/// .diff(2, " xxx ", 5)
/// .expect("replaced region is equal to origin")
/// .apply(&factory)?;
/// assert_eq!(factory, "as xxx sa");
/// # Ok::<(), codemp::ot::OTError>(())
/// ```
///
/// use [OperationFactory::del] to remove characters from given index
///
/// ```rust
/// # use codemp::api::OperationFactory;
/// # let mut factory = String::from("as xxx sa");
/// factory = factory.del(2, 5).apply(&factory)?;
/// assert_eq!(factory, "assa");
/// # Ok::<(), codemp::ot::OTError>(())
/// ```
///
/// use [OperationFactory::replace] to completely replace buffer content
///
/// ```rust
/// # use codemp::api::OperationFactory;
/// # let mut factory = String::from("assa");
/// factory = factory.replace("from scratch")
/// .expect("replace is equal to origin")
/// .apply(&factory)?;
/// assert_eq!(factory, "from scratch");
/// # Ok::<(), codemp::ot::OTError>(())
/// ```
pub trait OperationFactory {
/// the current content of the buffer
fn content(&self) -> String;
/// completely replace the buffer with given text
fn replace(&self, txt: &str) -> Option<OperationSeq> {
self.diff(0, txt, self.content().len())
}
/// transform buffer in range [start..end] with given text
fn diff(&self, start: usize, txt: &str, end: usize) -> Option<OperationSeq> {
let mut out = OperationSeq::default();
let content = self.content();
let tail_skip = content.len() - end; // TODO len is number of bytes, not chars
let content_slice = &content[start..end];
if content_slice == txt {
// if slice equals given text, no operation should be taken
return None;
}
out.retain(start as u64);
let diff = TextDiff::from_chars(content_slice, txt);
for change in diff.iter_all_changes() {
match change.tag() {
ChangeTag::Equal => out.retain(1),
ChangeTag::Delete => out.delete(1),
ChangeTag::Insert => out.insert(change.value()),
}
}
out.retain(tail_skip as u64);
Some(out)
}
/// insert given chars at target position
fn ins(&self, txt: &str, pos: u64) -> OperationSeq {
let mut out = OperationSeq::default();
let total = self.content().len() as u64;
out.retain(pos);
out.insert(txt);
out.retain(total - pos);
out
}
/// delete n characters forward at given position
fn del(&self, pos: u64, count: u64) -> OperationSeq {
let mut out = OperationSeq::default();
let len = self.content().len() as u64;
out.retain(pos);
out.delete(count);
out.retain(len - (pos+count));
out
}
}
impl OperationFactory for String {
fn content(&self) -> String {
self.clone()
}
}
impl OperationFactory for std::sync::Arc<str> {
fn content(&self) -> String {
self.to_string()
}
}

View file

@ -7,8 +7,4 @@
/// a generic async provider for bidirectional communication /// a generic async provider for bidirectional communication
pub mod controller; pub mod controller;
/// a helper trait to generate operation sequences
pub mod factory;
pub use controller::Controller; pub use controller::Controller;
pub use factory::OperationFactory;

View file

@ -3,8 +3,9 @@
//! a controller implementation for buffer actions //! a controller implementation for buffer actions
use operational_transform::OperationSeq; use std::sync::Arc;
use tokio::sync::{watch, mpsc, Mutex, oneshot};
use tokio::sync::{watch, mpsc};
use tonic::async_trait; use tonic::async_trait;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
@ -28,67 +29,50 @@ use super::TextChange;
/// Operation Sequences easily /// Operation Sequences easily
/// ///
/// upon dropping this handle will stop the associated worker /// upon dropping this handle will stop the associated worker
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct BufferController { pub struct BufferController {
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>, operations: mpsc::UnboundedSender<TextChange>,
last_op: Mutex<watch::Receiver<()>>, _stop: Arc<StopOnDrop>, // just exist
stream: mpsc::UnboundedSender<oneshot::Sender<Option<TextChange>>>,
stop: mpsc::UnboundedSender<()>,
} }
impl BufferController { impl BufferController {
pub(crate) fn new( pub(crate) fn new(
content: watch::Receiver<String>, content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<OperationSeq>, operations: mpsc::UnboundedSender<TextChange>,
stream: mpsc::UnboundedSender<oneshot::Sender<Option<TextChange>>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
last_op: Mutex<watch::Receiver<()>>,
) -> Self { ) -> Self {
BufferController { BufferController { content, operations, _stop: Arc::new(StopOnDrop(stop)) }
last_op, content, operations, stream, stop,
}
}
pub fn content(&self) -> String {
self.content.borrow().clone()
} }
} }
impl Drop for BufferController { #[derive(Debug)]
struct StopOnDrop(mpsc::UnboundedSender<()>);
impl Drop for StopOnDrop {
fn drop(&mut self) { fn drop(&mut self) {
self.stop.send(()).unwrap_or_warn("could not send stop message to worker"); self.0.send(()).unwrap_or_warn("could not send stop message to worker");
} }
} }
#[async_trait] #[async_trait]
impl Controller<TextChange> for BufferController { impl Controller<String> for BufferController {
type Input = OperationSeq; type Input = TextChange;
async fn poll(&self) -> Result<(), Error> { async fn poll(&self) -> Result<(), Error> {
Ok(self.last_op.lock().await.changed().await?) Ok(self.content.clone().changed().await?)
} }
fn try_recv(&self) -> Result<Option<TextChange>, Error> { fn try_recv(&self) -> Result<Option<String>, Error> {
let (tx, rx) = oneshot::channel(); Ok(Some(self.content.borrow().clone()))
self.stream.send(tx)?;
rx.blocking_recv()
.map_err(|_| Error::Channel { send: false })
} }
async fn recv(&self) -> Result<TextChange, Error> { async fn recv(&self) -> Result<String, Error> {
self.poll().await?; Ok(self.content.borrow().clone())
let (tx, rx) = oneshot::channel();
self.stream.send(tx)?;
Ok(
rx.await
.map_err(|_| Error::Channel { send: false })?
.expect("empty channel after polling")
)
} }
/// enqueue an opseq for processing /// enqueue an opseq for processing
fn send(&self, op: OperationSeq) -> Result<(), Error> { fn send(&self, op: TextChange) -> Result<(), Error> {
Ok(self.operations.send(op)?) Ok(self.operations.send(op)?)
} }
} }

View file

@ -1,14 +1,13 @@
use std::collections::VecDeque; use similar::{TextDiff, ChangeTag};
use tokio::sync::{watch, mpsc};
use operational_transform::OperationSeq;
use tokio::sync::{watch, mpsc, oneshot, Mutex};
use tonic::transport::Channel; use tonic::transport::Channel;
use tonic::{async_trait, Streaming}; use tonic::{async_trait, Streaming};
use woot::crdt::{Op, CRDT, TextEditor};
use woot::woot::Woot;
use crate::proto::{OperationRequest, RawOp}; use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient; use crate::proto::buffer_client::BufferClient;
use crate::api::controller::ControllerWorker; use crate::api::controller::ControllerWorker;
use crate::api::factory::{leading_noop, tailing_noop};
use super::TextChange; use super::TextChange;
use super::controller::BufferController; use super::controller::BufferController;
@ -17,48 +16,38 @@ use super::controller::BufferController;
pub(crate) struct BufferControllerWorker { pub(crate) struct BufferControllerWorker {
uid: String, uid: String,
content: watch::Sender<String>, content: watch::Sender<String>,
operations: mpsc::UnboundedReceiver<OperationSeq>, operations: mpsc::UnboundedReceiver<TextChange>,
stream: mpsc::UnboundedReceiver<oneshot::Sender<Option<TextChange>>>,
stream_requestor: mpsc::UnboundedSender<oneshot::Sender<Option<TextChange>>>,
receiver: watch::Receiver<String>, receiver: watch::Receiver<String>,
sender: mpsc::UnboundedSender<OperationSeq>, sender: mpsc::UnboundedSender<TextChange>,
buffer: String, buffer: Woot,
path: String, path: String,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>, stop_control: mpsc::UnboundedSender<()>,
new_op_tx: watch::Sender<()>,
new_op_rx: watch::Receiver<()>,
} }
impl BufferControllerWorker { impl BufferControllerWorker {
pub fn new(uid: String, buffer: &str, path: &str) -> Self { pub fn new(uid: String, buffer: &str, path: &str) -> Self {
let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (txt_tx, txt_rx) = watch::channel(buffer.to_string());
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (s_tx, s_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (notx, norx) = watch::channel(());
BufferControllerWorker { BufferControllerWorker {
uid, uid,
content: txt_tx, content: txt_tx,
operations: op_rx, operations: op_rx,
stream: s_rx,
stream_requestor: s_tx,
receiver: txt_rx, receiver: txt_rx,
sender: op_tx, sender: op_tx,
buffer: buffer.to_string(), buffer: Woot::new(42069), // TODO initialize with buffer!
path: path.to_string(), path: path.to_string(),
stop: end_rx, stop: end_rx,
stop_control: end_tx, stop_control: end_tx,
new_op_tx: notx,
new_op_rx: norx,
} }
} }
async fn send_op(&self, tx: &mut BufferClient<Channel>, outbound: &OperationSeq) -> crate::Result<()> { async fn send_op(&self, tx: &mut BufferClient<Channel>, outbound: &Op) -> crate::Result<()> {
let opseq = serde_json::to_string(outbound).expect("could not serialize opseq"); let opseq = serde_json::to_string(outbound).expect("could not serialize opseq");
let req = OperationRequest { let req = OperationRequest {
path: self.path.clone(), path: self.path.clone(),
hash: format!("{:x}", md5::compute(&self.buffer)), hash: format!("{:x}", md5::compute(self.buffer.view())),
op: Some(RawOp { op: Some(RawOp {
opseq, user: self.uid.clone(), opseq, user: self.uid.clone(),
}), }),
@ -69,7 +58,7 @@ impl BufferControllerWorker {
} }
#[async_trait] #[async_trait]
impl ControllerWorker<TextChange> for BufferControllerWorker { impl ControllerWorker<String> for BufferControllerWorker {
type Controller = BufferController; type Controller = BufferController;
type Tx = BufferClient<Channel>; type Tx = BufferClient<Channel>;
type Rx = Streaming<RawOp>; type Rx = Streaming<RawOp>;
@ -78,123 +67,62 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
BufferController::new( BufferController::new(
self.receiver.clone(), self.receiver.clone(),
self.sender.clone(), self.sender.clone(),
self.stream_requestor.clone(),
self.stop_control.clone(), self.stop_control.clone(),
Mutex::new(self.new_op_rx.clone()),
) )
} }
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
let mut clientside : VecDeque<OperationSeq> = VecDeque::new();
let mut serverside : VecDeque<OperationSeq> = VecDeque::new();
loop { loop {
// block until one of these is ready // block until one of these is ready
tokio::select! { tokio::select! {
biased; biased;
// received a stop request (or channel got closed) // received stop signal
res = self.stop.recv() => { _ = self.stop.recv() => break,
tracing::info!("received stop signal");
match res {
None => return tracing::warn!("stop channel closed, stopping worker"),
Some(()) => return tracing::debug!("buffer worker stopping cleanly"),
}
}
// received a new message from server (or an error) // received a text change from editor
res = rx.message() => { res = self.operations.recv() => match res {
tracing::info!("received msg from server"); None => break,
let inbound : OperationSeq = match res { Some(change) => {
Err(e) => return tracing::error!("error receiving op from server: {}", e), let span = &self.buffer.view()[change.span.clone()];
Ok(None) => return tracing::warn!("server closed operation stream"), let diff = TextDiff::from_chars(span, &change.content);
Ok(Some(msg)) => serde_json::from_str(&msg.opseq)
.expect("could not deserialize server opseq"),
};
self.buffer = inbound.apply(&self.buffer).expect("could not apply remote opseq???");
serverside.push_back(inbound);
while let Some(mut outbound) = clientside.get(0).cloned() {
let mut serverside_tmp = serverside.clone();
for server_op in serverside_tmp.iter_mut() {
tracing::info!("transforming {:?} <-> {:?}", outbound, server_op);
(outbound, *server_op) = outbound.transform(server_op)
.expect("could not transform enqueued out with just received");
}
match self.send_op(&mut tx, &outbound).await {
Err(e) => { tracing::warn!("could not send op even after transforming: {}", e); break; },
Ok(()) => {
tracing::info!("back in sync");
serverside = serverside_tmp;
self.buffer = outbound.apply(&self.buffer).expect("could not apply op after synching back");
clientside.pop_front();
},
}
}
self.content.send(self.buffer.clone()).expect("could not broadcast buffer update");
self.new_op_tx.send(()).expect("could not activate client after new server event");
},
// received a new operation from client (or channel got closed) let mut i = 0;
res = self.operations.recv() => { let mut ops = Vec::new();
tracing::info!("received op from client"); for diff in diff.iter_all_changes() {
match res { match diff.tag() {
None => return tracing::warn!("client closed operation stream"), ChangeTag::Equal => i += 1,
Some(op) => { ChangeTag::Delete => ops.push(self.buffer.delete(change.span.start + i).unwrap()),
if clientside.is_empty() { ChangeTag::Insert => {
match self.send_op(&mut tx, &op).await { for c in diff.value().chars() {
Ok(()) => { ops.push(self.buffer.insert(change.span.start + i, c).unwrap());
self.buffer = op.apply(&self.buffer).expect("could not apply op"); i += 1;
self.content.send(self.buffer.clone()).expect("could not update buffer view"); }
}, },
Err(e) => { }
tracing::warn!("server rejected op: {}", e); }
clientside.push_back(op);
}, for op in ops {
} match self.send_op(&mut tx, &op).await {
} else { // I GET STUCK IN THIS BRANCH AND NOTHING HAPPENS AAAAAAAAAA Ok(()) => self.buffer.enqueue(op),
clientside.push_back(op); Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e),
} }
} }
} }
}, },
// client requested a server operation, transform it and send it
res = self.stream.recv() => {
tracing::info!("received op REQUEST from client");
match res {
None => return tracing::error!("client closed requestor stream"),
Some(tx) => tx.send(match serverside.pop_front() {
None => {
tracing::warn!("requested change but none is available");
None
},
Some(mut operation) => {
let mut after = self.buffer.clone();
for op in clientside.iter_mut() {
(*op, operation) = match op.transform(&operation) {
Err(e) => return tracing::warn!("could not transform enqueued operation: {}", e),
Ok((x, y)) => (x, y),
};
after = match op.apply(&after) {
Err(_) => return tracing::error!("could not apply outgoing enqueued opseq to current buffer?"),
Ok(x) => x,
};
}
let skip = leading_noop(operation.ops()) as usize; // received a stop request (or channel got closed)
let tail = tailing_noop(operation.ops()) as usize; res = rx.message() => match res {
let span = skip..(operation.base_len() - tail); Err(_e) => break,
let content = if after.len() - tail < skip { "".into() } else { after[skip..after.len()-tail].to_string() }; Ok(None) => break,
let change = TextChange { span, content, after }; Ok(Some(change)) => {
let op : Op = serde_json::from_str(&change.opseq).unwrap();
Some(change) self.buffer.enqueue(op);
}, self.content.send(self.buffer.view()).unwrap();
}).expect("client did not wait????"), },
}
}, },
} }
} }
} }
} }

View file

@ -172,8 +172,8 @@ pub mod instance;
pub mod prelude; pub mod prelude;
/// underlying OperationalTransform library used, re-exported /// underlying OperationalTransform library used, re-exported
#[cfg(feature = "ot")] #[cfg(feature = "woot")]
pub use operational_transform as ot; pub use woot;
/// protocol types and services auto-generated by grpc /// protocol types and services auto-generated by grpc
#[cfg(feature = "proto")] #[cfg(feature = "proto")]

View file

@ -11,10 +11,7 @@ pub use crate::{
pub use crate::ot::OperationSeq as CodempOperationSeq; pub use crate::ot::OperationSeq as CodempOperationSeq;
#[cfg(feature = "api")] #[cfg(feature = "api")]
pub use crate::{ pub use crate::api::Controller as CodempController;
api::Controller as CodempController,
api::OperationFactory as CodempOperationFactory,
};
#[cfg(feature = "client")] #[cfg(feature = "client")]
pub use crate::{ pub use crate::{