diff --git a/Cargo.toml b/Cargo.toml index fc279d7..94f9210 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,8 @@ name = "codemp" [dependencies] # core tracing = "0.1" -# ot -operational-transform = { version = "0.6", features = ["serde"], optional = true } +# woot +codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", tag = "v0.1.0", optional = true } # proto tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true } prost = { version = "0.11.8", optional = true } @@ -31,8 +31,8 @@ tonic-build = "0.9" [features] default = ["client"] -api = ["ot", "dep:similar", "dep:tokio", "dep:async-trait"] -ot = ["dep:operational-transform"] +api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"] +woot = ["dep:codemp-woot"] proto = ["dep:prost", "dep:tonic"] client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json"] global = ["client", "dep:lazy_static"] diff --git a/src/api/change.rs b/src/api/change.rs new file mode 100644 index 0000000..95e34fa --- /dev/null +++ b/src/api/change.rs @@ -0,0 +1,59 @@ +//! # TextChange +//! +//! an editor-friendly representation of a text change in a buffer +//! to easily interface with codemp from various editors + +/// an editor-friendly representation of a text change in a buffer +/// +/// this represent a range in the previous state of the string and a new content which should be +/// replaced to it, allowing to represent any combination of deletions, insertions or replacements +/// +/// bulk and widespread operations will result in a TextChange effectively sending the whole new +/// buffer, but small changes are efficient and easy to create or apply +/// +/// ### examples +/// to insert 'a' after 4th character we should send a +/// `TextChange { span: 4..4, content: "a".into() }` +/// +/// to delete a the fourth character we should send a +/// `TextChange { span: 3..4, content: "".into() }` +/// +#[derive(Clone, Debug, Default)] +pub struct TextChange { + /// range of text change, as char indexes in buffer previous state + pub span: std::ops::Range, + /// new content of text inside span + pub content: String, +} + +impl TextChange { + /// create a new TextChange from the difference of given strings + pub fn from_diff(before: &str, after: &str) -> TextChange { + let diff = similar::TextDiff::from_chars(before, after); + let mut start = 0; + let mut end = 0; + let mut from_beginning = true; + for op in diff.ops() { + match op { + similar::DiffOp::Equal { .. } => { + if from_beginning { + start += 1 + } else { + end += 1 + } + }, + _ => { + end = 0; + from_beginning = false; + } + } + } + let end_before = before.len() - end; + let end_after = after.len() - end; + + TextChange { + span: start..end_before, + content: after[start..end_after].to_string(), + } + } +} diff --git a/src/api/controller.rs b/src/api/controller.rs index 07a0d8f..5866f6b 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -4,7 +4,6 @@ //! server use crate::Result; -use std::sync::Arc; #[async_trait::async_trait] pub(crate) trait ControllerWorker { @@ -21,20 +20,22 @@ pub(crate) trait ControllerWorker { /// this generic trait is implemented by actors managing stream procedures. /// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking /// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking -/// ([Controller::blocking_recv]) and callback-based ([Controller::callback]) are implemented. +/// ([Controller::blocking_recv]) is implemented if feature `sync` is enabled. /// -/// * if possible, prefer a pure [Controller::recv] consumer -/// * a second possibility in preference is using a [Controller::callback] -/// * if neither is feasible a [Controller::poll]/[Controller::try_recv] approach is available +/// * if possible, prefer a pure [Controller::recv] consumer, awaiting for events +/// * if async is not feasible a [Controller::poll]/[Controller::try_recv] approach is possible #[async_trait::async_trait] pub trait Controller : Sized + Send + Sync { /// type of upstream values, used in [Self::send] type Input; - /// enqueue a new value to be sent + /// enqueue a new value to be sent to all other users + /// + /// success or failure of this function does not imply validity of sent operation, + /// because it's integrated asynchronously on the background worker fn send(&self, x: Self::Input) -> Result<()>; - /// get next value from stream, blocking until one is available + /// get next value from other users, blocking until one is available /// /// this is just an async trait function wrapped by `async_trait`: /// @@ -48,7 +49,7 @@ pub trait Controller : Sized + Send + Sync { Ok(self.try_recv()?.expect("no message available after polling")) } - /// block until next value is added to the stream without removing any element + /// block until next value is available without consuming it /// /// this is just an async trait function wrapped by `async_trait`: /// @@ -56,40 +57,15 @@ pub trait Controller : Sized + Send + Sync { async fn poll(&self) -> Result<()>; /// attempt to receive a value without blocking, return None if nothing is available + /// + /// note that this function does not circumvent race conditions, returning errors if it would + /// block. it's usually safe to ignore such errors and retry fn try_recv(&self) -> Result>; /// sync variant of [Self::recv], blocking invoking thread + /// this calls [Controller::recv] inside a [tokio::runtime::Runtime::block_on] + #[cfg(feature = "sync")] fn blocking_recv(&self, rt: &tokio::runtime::Handle) -> Result { rt.block_on(self.recv()) } - - /// register a callback to be called for each received stream value - /// - /// this will spawn a new task on given runtime invoking [Self::recv] in loop and calling given - /// callback for each received value. a stop channel should be provided, and first value sent - /// into it will stop the worker loop. - /// - /// note: creating a callback handler will hold an Arc reference to the given controller, - /// preventing it from being dropped (and likely disconnecting). using the stop channel is - /// important for proper cleanup - fn callback( - self: &Arc, - rt: &tokio::runtime::Handle, - mut stop: tokio::sync::mpsc::UnboundedReceiver<()>, - mut cb: F - ) where - Self : 'static, - F : FnMut(T) + Sync + Send + 'static - { - let _self = self.clone(); - rt.spawn(async move { - loop { - tokio::select! { - Ok(data) = _self.recv() => cb(data), - Some(()) = stop.recv() => break, - else => break, - } - } - }); - } } diff --git a/src/api/factory.rs b/src/api/factory.rs deleted file mode 100644 index 693c179..0000000 --- a/src/api/factory.rs +++ /dev/null @@ -1,165 +0,0 @@ -//! ### factory -//! -//! a helper trait that any string container can implement, which generates opseqs -//! -//! an OperationFactory trait implementation is provided for `String` and `Arc`, but plugin developers -//! should implement their own operation factory interfacing directly with the editor -//! buffer when possible. - -use std::ops::Range; - -use operational_transform::{OperationSeq, Operation}; -use similar::{TextDiff, ChangeTag}; - -/// calculate leading no-ops in given opseq -pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } - -/// calculate tailing no-ops in given opseq -pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } - -const fn count_noop(op: Option<&Operation>) -> u64 { - match op { - None => 0, - Some(Operation::Retain(n)) => *n, - Some(_) => 0, - } -} - -/// return the range on which the operation seq is actually applying its changes -pub fn op_effective_range(op: &OperationSeq) -> Range { - let first = leading_noop(op.ops()); - let last = op.base_len() as u64 - tailing_noop(op.ops()); - first..last -} - -/// a helper trait that any string container can implement, which generates opseqs -/// -/// all operations are to be considered mutating current state, obtainable with -/// [OperationFactory::content]. generating an operation has no effect on internal state -/// -/// ### examples -/// -/// ```rust -/// use codemp::api::OperationFactory; -/// -/// let mut factory = String::new(); -/// let operation = factory.ins("asd", 0); -/// factory = operation.apply(&factory)?; -/// assert_eq!(factory, "asd"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::ins] to add new characters at a specific index -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("asd"); -/// factory = factory.ins(" dsa", 3).apply(&factory)?; -/// assert_eq!(factory, "asd dsa"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::diff] to arbitrarily change text at any position -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("asd dsa"); -/// factory = factory -/// .diff(2, " xxx ", 5) -/// .expect("replaced region is equal to origin") -/// .apply(&factory)?; -/// assert_eq!(factory, "as xxx sa"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::del] to remove characters from given index -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("as xxx sa"); -/// factory = factory.del(2, 5).apply(&factory)?; -/// assert_eq!(factory, "assa"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -/// -/// use [OperationFactory::replace] to completely replace buffer content -/// -/// ```rust -/// # use codemp::api::OperationFactory; -/// # let mut factory = String::from("assa"); -/// factory = factory.replace("from scratch") -/// .expect("replace is equal to origin") -/// .apply(&factory)?; -/// assert_eq!(factory, "from scratch"); -/// # Ok::<(), codemp::ot::OTError>(()) -/// ``` -pub trait OperationFactory { - /// the current content of the buffer - fn content(&self) -> String; - - /// completely replace the buffer with given text - fn replace(&self, txt: &str) -> Option { - self.diff(0, txt, self.content().len()) - } - - /// transform buffer in range [start..end] with given text - fn diff(&self, start: usize, txt: &str, end: usize) -> Option { - let mut out = OperationSeq::default(); - let content = self.content(); - let tail_skip = content.len() - end; // TODO len is number of bytes, not chars - let content_slice = &content[start..end]; - - if content_slice == txt { - // if slice equals given text, no operation should be taken - return None; - } - - out.retain(start as u64); - - let diff = TextDiff::from_chars(content_slice, txt); - - for change in diff.iter_all_changes() { - match change.tag() { - ChangeTag::Equal => out.retain(1), - ChangeTag::Delete => out.delete(1), - ChangeTag::Insert => out.insert(change.value()), - } - } - - out.retain(tail_skip as u64); - - Some(out) - } - - /// insert given chars at target position - fn ins(&self, txt: &str, pos: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let total = self.content().len() as u64; - out.retain(pos); - out.insert(txt); - out.retain(total - pos); - out - } - - /// delete n characters forward at given position - fn del(&self, pos: u64, count: u64) -> OperationSeq { - let mut out = OperationSeq::default(); - let len = self.content().len() as u64; - out.retain(pos); - out.delete(count); - out.retain(len - (pos+count)); - out - } -} - -impl OperationFactory for String { - fn content(&self) -> String { - self.clone() - } -} - -impl OperationFactory for std::sync::Arc { - fn content(&self) -> String { - self.to_string() - } -} diff --git a/src/api/mod.rs b/src/api/mod.rs index 4a69729..fcb5f3e 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -7,8 +7,8 @@ /// a generic async provider for bidirectional communication pub mod controller; -/// a helper trait to generate operation sequences -pub mod factory; +/// a generic representation of a text change +pub mod change; pub use controller::Controller; -pub use factory::OperationFactory; +pub use change::TextChange; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 4dcad19..70ba39f 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -3,14 +3,16 @@ //! a controller implementation for buffer actions -use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, Mutex, oneshot}; +use std::sync::Arc; + +use tokio::sync::oneshot; +use tokio::sync::{watch, mpsc, RwLock}; use tonic::async_trait; use crate::errors::IgnorableError; -use crate::{api::Controller, Error}; +use crate::api::Controller; -use super::TextChange; +use crate::api::TextChange; /// the buffer controller implementation /// @@ -24,29 +26,31 @@ use super::TextChange; /// queues, transforming outbound delayed ops and applying remote changes /// to the local buffer /// -/// this controller implements [crate::api::OperationFactory], allowing to produce -/// Operation Sequences easily -/// /// upon dropping this handle will stop the associated worker -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BufferController { + /// unique identifier of buffer + pub name: String, content: watch::Receiver, - operations: mpsc::UnboundedSender, - last_op: Mutex>, - stream: mpsc::UnboundedSender>>, - stop: mpsc::UnboundedSender<()>, + seen: Arc>, + operations: mpsc::UnboundedSender, + poller: mpsc::Sender>, + _stop: Arc, // just exist } impl BufferController { pub(crate) fn new( + name: String, content: watch::Receiver, - operations: mpsc::UnboundedSender, - stream: mpsc::UnboundedSender>>, + operations: mpsc::UnboundedSender, + poller: mpsc::Sender>, stop: mpsc::UnboundedSender<()>, - last_op: Mutex>, ) -> Self { BufferController { - last_op, content, operations, stream, stop, + name, + content, operations, poller, + _stop: Arc::new(StopOnDrop(stop)), + seen: Arc::new(RwLock::new("".into())), } } @@ -55,40 +59,53 @@ impl BufferController { } } -impl Drop for BufferController { +#[derive(Debug)] +struct StopOnDrop(mpsc::UnboundedSender<()>); + +impl Drop for StopOnDrop { fn drop(&mut self) { - self.stop.send(()).unwrap_or_warn("could not send stop message to worker"); + self.0.send(()).unwrap_or_warn("could not send stop message to worker"); } } #[async_trait] impl Controller for BufferController { - type Input = OperationSeq; + type Input = TextChange; - async fn poll(&self) -> Result<(), Error> { - Ok(self.last_op.lock().await.changed().await?) + async fn poll(&self) -> crate::Result<()> { + let (tx, rx) = oneshot::channel::<()>(); + self.poller.send(tx); + Ok(rx.await.map_err(|_| crate::Error::Channel { send: false })?) } - fn try_recv(&self) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - self.stream.send(tx)?; - rx.blocking_recv() - .map_err(|_| Error::Channel { send: false }) + fn try_recv(&self) -> crate::Result> { + let seen = match self.seen.try_read() { + Err(_) => return Err(crate::Error::Deadlocked), + Ok(x) => x.clone(), + }; + let actual = self.content.borrow().clone(); + if seen == actual { + return Ok(None); + } + let change = TextChange::from_diff(&seen, &actual); + match self.seen.try_write() { + Err(_) => return Err(crate::Error::Deadlocked), + Ok(mut w) => *w = actual, + }; + Ok(Some(change)) } - async fn recv(&self) -> Result { + async fn recv(&self) -> crate::Result { self.poll().await?; - let (tx, rx) = oneshot::channel(); - self.stream.send(tx)?; - Ok( - rx.await - .map_err(|_| Error::Channel { send: false })? - .expect("empty channel after polling") - ) + let cur = self.seen.read().await.clone(); + let change = TextChange::from_diff(&cur, &self.content.borrow()); + let mut seen = self.seen.write().await; + *seen = self.content.borrow().clone(); + Ok(change) } /// enqueue an opseq for processing - fn send(&self, op: OperationSeq) -> Result<(), Error> { + fn send(&self, op: TextChange) -> crate::Result<()> { Ok(self.operations.send(op)?) } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 6dde13c..1610400 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -12,18 +12,3 @@ pub mod controller; pub(crate) mod worker; pub use controller::BufferController as Controller; - - -/// an editor-friendly representation of a text change in a buffer -/// -/// TODO move in proto -#[derive(Clone, Debug, Default)] -pub struct TextChange { - /// range of text change, as byte indexes in buffer - pub span: std::ops::Range, - /// content of text change, as string - pub content: String, - /// content after this text change - /// note that this field will probably be dropped, don't rely on it - pub after: String -} diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 72accfa..0dd900a 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -1,64 +1,65 @@ -use std::collections::VecDeque; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; -use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, oneshot, Mutex}; +use similar::{TextDiff, ChangeTag}; +use tokio::sync::{watch, mpsc, oneshot}; use tonic::transport::Channel; use tonic::{async_trait, Streaming}; +use woot::crdt::{Op, CRDT, TextEditor}; +use woot::woot::Woot; +use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::api::controller::ControllerWorker; -use crate::api::factory::{leading_noop, tailing_noop}; +use crate::api::TextChange; -use super::TextChange; use super::controller::BufferController; pub(crate) struct BufferControllerWorker { uid: String, content: watch::Sender, - operations: mpsc::UnboundedReceiver, - stream: mpsc::UnboundedReceiver>>, - stream_requestor: mpsc::UnboundedSender>>, + operations: mpsc::UnboundedReceiver, receiver: watch::Receiver, - sender: mpsc::UnboundedSender, - buffer: String, - path: String, + sender: mpsc::UnboundedSender, + buffer: Woot, + name: String, stop: mpsc::UnboundedReceiver<()>, stop_control: mpsc::UnboundedSender<()>, - new_op_tx: watch::Sender<()>, - new_op_rx: watch::Receiver<()>, + poller_rx: mpsc::Receiver>, + poller_tx: mpsc::Sender>, + pollers: Vec>, } impl BufferControllerWorker { - pub fn new(uid: String, buffer: &str, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + pub fn new(uid: String, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel("".to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); - let (s_tx, s_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); - let (notx, norx) = watch::channel(()); + let (poller_tx, poller_rx) = mpsc::channel(10); + let mut hasher = DefaultHasher::new(); + uid.hash(&mut hasher); + let site_id = hasher.finish() as usize; BufferControllerWorker { - uid, + uid, poller_rx, poller_tx, + pollers: Vec::new(), content: txt_tx, operations: op_rx, - stream: s_rx, - stream_requestor: s_tx, receiver: txt_rx, sender: op_tx, - buffer: buffer.to_string(), - path: path.to_string(), + buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! + name: path.to_string(), stop: end_rx, stop_control: end_tx, - new_op_tx: notx, - new_op_rx: norx, } } - async fn send_op(&self, tx: &mut BufferClient, outbound: &OperationSeq) -> crate::Result<()> { + async fn send_op(&self, tx: &mut BufferClient, outbound: &Op) -> crate::Result<()> { let opseq = serde_json::to_string(outbound).expect("could not serialize opseq"); let req = OperationRequest { - path: self.path.clone(), - hash: format!("{:x}", md5::compute(&self.buffer)), + path: self.name.clone(), + hash: format!("{:x}", md5::compute(self.buffer.view())), op: Some(RawOp { opseq, user: self.uid.clone(), }), @@ -76,125 +77,91 @@ impl ControllerWorker for BufferControllerWorker { fn subscribe(&self) -> BufferController { BufferController::new( + self.name.clone(), self.receiver.clone(), self.sender.clone(), - self.stream_requestor.clone(), + self.poller_tx.clone(), self.stop_control.clone(), - Mutex::new(self.new_op_rx.clone()), ) } async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { - let mut clientside : VecDeque = VecDeque::new(); - let mut serverside : VecDeque = VecDeque::new(); - loop { - // block until one of these is ready tokio::select! { biased; - // received a stop request (or channel got closed) - res = self.stop.recv() => { - tracing::info!("received stop signal"); - match res { - None => return tracing::warn!("stop channel closed, stopping worker"), - Some(()) => return tracing::debug!("buffer worker stopping cleanly"), - } - } + // received stop signal + _ = self.stop.recv() => break, - // received a new message from server (or an error) - res = rx.message() => { - tracing::info!("received msg from server"); - let inbound : OperationSeq = match res { - Err(e) => return tracing::error!("error receiving op from server: {}", e), - Ok(None) => return tracing::warn!("server closed operation stream"), - Ok(Some(msg)) => serde_json::from_str(&msg.opseq) - .expect("could not deserialize server opseq"), - }; - self.buffer = inbound.apply(&self.buffer).expect("could not apply remote opseq???"); - serverside.push_back(inbound); - while let Some(mut outbound) = clientside.get(0).cloned() { - let mut serverside_tmp = serverside.clone(); - for server_op in serverside_tmp.iter_mut() { - tracing::info!("transforming {:?} <-> {:?}", outbound, server_op); - (outbound, *server_op) = outbound.transform(server_op) - .expect("could not transform enqueued out with just received"); - } - match self.send_op(&mut tx, &outbound).await { - Err(e) => { tracing::warn!("could not send op even after transforming: {}", e); break; }, - Ok(()) => { - tracing::info!("back in sync"); - serverside = serverside_tmp; - self.buffer = outbound.apply(&self.buffer).expect("could not apply op after synching back"); - clientside.pop_front(); + // received a new poller, add it to collection + res = self.poller_rx.recv() => match res { + None => break tracing::error!("poller channel closed"), + Some(tx) => self.pollers.push(tx), + }, + + // received a text change from editor + res = self.operations.recv() => match res { + None => break, + Some(change) => { + match self.buffer.view().get(change.span.clone()) { + None => tracing::error!("received illegal span from client"), + Some(span) => { + let diff = TextDiff::from_chars(span, &change.content); + + let mut i = 0; + let mut ops = Vec::new(); + for diff in diff.iter_all_changes() { + match diff.tag() { + ChangeTag::Equal => i += 1, + ChangeTag::Delete => match self.buffer.delete(change.span.start + i) { + Ok(op) => ops.push(op), + Err(e) => tracing::error!("could not apply deletion: {}", e), + }, + ChangeTag::Insert => { + for c in diff.value().chars() { + match self.buffer.insert(change.span.start + i, c) { + Ok(op) => { + ops.push(op); + i += 1; + }, + Err(e) => tracing::error!("could not apply insertion: {}", e), + } + } + }, + } + } + + for op in ops { + match self.send_op(&mut tx, &op).await { + Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), + Ok(()) => { + // self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); + }, + } + } }, } } - self.content.send(self.buffer.clone()).expect("could not broadcast buffer update"); - self.new_op_tx.send(()).expect("could not activate client after new server event"); }, - // received a new operation from client (or channel got closed) - res = self.operations.recv() => { - tracing::info!("received op from client"); - match res { - None => return tracing::warn!("client closed operation stream"), - Some(op) => { - if clientside.is_empty() { - match self.send_op(&mut tx, &op).await { - Ok(()) => { - self.buffer = op.apply(&self.buffer).expect("could not apply op"); - self.content.send(self.buffer.clone()).expect("could not update buffer view"); - }, - Err(e) => { - tracing::warn!("server rejected op: {}", e); - clientside.push_back(op); - }, - } - } else { // I GET STUCK IN THIS BRANCH AND NOTHING HAPPENS AAAAAAAAAA - clientside.push_back(op); + // received a message from server + res = rx.message() => match res { + Err(_e) => break, + Ok(None) => break, + Ok(Some(change)) => match serde_json::from_str::(&change.opseq) { + Ok(op) => { + self.buffer.merge(op); + self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); + for tx in self.pollers.drain(0..self.pollers.len()) { + tx.send(()).unwrap_or_warn("could not wake up poller"); } - } - } + }, + Err(e) => tracing::error!("could not deserialize operation from server: {}", e), + }, }, - - // client requested a server operation, transform it and send it - res = self.stream.recv() => { - tracing::info!("received op REQUEST from client"); - match res { - None => return tracing::error!("client closed requestor stream"), - Some(tx) => tx.send(match serverside.pop_front() { - None => { - tracing::warn!("requested change but none is available"); - None - }, - Some(mut operation) => { - let mut after = self.buffer.clone(); - for op in clientside.iter_mut() { - (*op, operation) = match op.transform(&operation) { - Err(e) => return tracing::warn!("could not transform enqueued operation: {}", e), - Ok((x, y)) => (x, y), - }; - after = match op.apply(&after) { - Err(_) => return tracing::error!("could not apply outgoing enqueued opseq to current buffer?"), - Ok(x) => x, - }; - } - - let skip = leading_noop(operation.ops()) as usize; - let tail = tailing_noop(operation.ops()) as usize; - let span = skip..(operation.base_len() - tail); - let content = if after.len() - tail < skip { "".into() } else { after[skip..after.len()-tail].to_string() }; - let change = TextChange { span, content, after }; - - Some(change) - }, - }).expect("client did not wait????"), - } - }, - } + } } } diff --git a/src/client.rs b/src/client.rs index 7038dde..9b1aca2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,9 +4,12 @@ use std::{sync::Arc, collections::BTreeMap}; +use tokio::sync::mpsc; +use tokio_stream::StreamExt; use tonic::transport::Channel; use crate::{ + api::Controller, cursor::{worker::CursorControllerWorker, controller::CursorController}, proto::{ buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, @@ -76,7 +79,7 @@ impl Client { /// /// to interact with such workspace [crate::api::Controller::send] cursor events or /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller]. - pub async fn join(&mut self, _session: &str) -> Result, Error> { + pub async fn join(&mut self, _session: &str) -> crate::Result> { // TODO there is no real workspace handling in codemp server so it behaves like one big global // session. I'm still creating this to start laying out the proper use flow let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); @@ -103,7 +106,7 @@ impl Client { } /// create a new buffer in current workspace, with optional given content - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), Error> { + pub async fn create(&mut self, path: &str, content: Option<&str>) -> crate::Result<()> { if let Some(_workspace) = &self.workspace { self.client.buffer .create(BufferPayload { @@ -120,23 +123,18 @@ impl Client { /// attach to a buffer, starting a buffer controller and returning a new reference to it /// - /// to interact with such buffer [crate::api::Controller::send] operation sequences - /// or [crate::api::Controller::recv] for text events using its [crate::buffer::Controller]. - /// to generate operation sequences use the [crate::api::OperationFactory] - /// methods, which are implemented on [crate::buffer::Controller], such as - /// [crate::api::OperationFactory::diff]. - pub async fn attach(&mut self, path: &str) -> Result, Error> { + /// to interact with such buffer use [crate::api::Controller::send] or + /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] + pub async fn attach(&mut self, path: &str) -> crate::Result> { if let Some(workspace) = &mut self.workspace { let mut client = self.client.buffer.clone(); let req = BufferPayload { path: path.to_string(), user: self.id.clone(), content: None }; - let content = client.sync(req.clone()).await?.into_inner().content; - let stream = client.attach(req).await?.into_inner(); - let controller = BufferControllerWorker::new(self.id.clone(), &content, path); + let controller = BufferControllerWorker::new(self.id.clone(), path); let handler = Arc::new(controller.subscribe()); let _path = path.to_string(); @@ -153,4 +151,38 @@ impl Client { Err(Error::InvalidState { msg: "join a workspace first".into() }) } } + + + pub async fn select_buffer(&self) -> crate::Result { + match &self.workspace { + None => Err(Error::InvalidState { msg: "join workspace first".into() }), + Some(workspace) => { + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut tasks = Vec::new(); + for (id, buffer) in workspace.buffers.iter() { + let _tx = tx.clone(); + let _id = id.clone(); + let _buffer = buffer.clone(); + tasks.push(tokio::spawn(async move { + match _buffer.poll().await { + Ok(()) => _tx.send(Ok(_id)), + Err(_) => _tx.send(Err(Error::Channel { send: true })), + } + })) + } + loop { + match rx.recv().await { + None => return Err(Error::Channel { send: false }), + Some(Err(_)) => continue, // TODO log errors + Some(Ok(x)) => { + for t in tasks { + t.abort(); + } + return Ok(x.clone()); + }, + } + } + } + } + } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 443574e..c77784e 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -52,7 +52,7 @@ impl Controller for CursorController { /// enqueue a cursor event to be broadcast to current workspace /// will automatically invert cursor start/end if they are inverted fn send(&self, mut cursor: CursorPosition) -> Result<(), Error> { - if cursor.start() < cursor.end() { + if cursor.start() > cursor.end() { std::mem::swap(&mut cursor.start, &mut cursor.end); } Ok(self.op.send(CursorEvent { diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 16d0950..0e34aa7 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -26,13 +26,20 @@ impl From::<(i32, i32)> for RowCol { } } +impl RowCol { + /// create a RowCol and wrap into an Option, to help build protocol packets + pub fn wrap(row: i32, col: i32) -> Option { + Some(RowCol { row, col }) + } +} + impl CursorPosition { - /// extract start position, defaulting to (0,0) + /// extract start position, defaulting to (0,0), to help build protocol packets pub fn start(&self) -> RowCol { self.start.clone().unwrap_or((0, 0).into()) } - /// extract end position, defaulting to (0,0) + /// extract end position, defaulting to (0,0), to help build protocol packets pub fn end(&self) -> RowCol { self.end.clone().unwrap_or((0, 0).into()) } diff --git a/src/errors.rs b/src/errors.rs index c076cc8..3ae14d8 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -61,6 +61,9 @@ pub enum Error { msg: String, }, + /// errors caused by wrong interlocking, safe to retry + Deadlocked, + /// if you see these errors someone is being lazy (: Filler { // TODO filler error, remove later message: String, diff --git a/src/instance.rs b/src/instance.rs index d89bc5e..410038c 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -46,13 +46,13 @@ pub mod a_sync { impl Instance { /// connect to remote address instantiating a new client [crate::client::Client::new] - pub async fn connect(&self, addr: &str) -> Result<(), Error> { + pub async fn connect(&self, addr: &str) -> crate::Result<()> { *self.client.lock().await = Some(Client::new(addr).await?); Ok(()) } /// threadsafe version of [crate::client::Client::join] - pub async fn join(&self, session: &str) -> Result, Error> { + pub async fn join(&self, session: &str) -> crate::Result> { self.client .lock().await .as_mut() @@ -62,7 +62,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::create] - pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { + pub async fn create(&self, path: &str, content: Option<&str>) -> crate::Result<()> { self.client .lock().await .as_mut() @@ -72,7 +72,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::attach] - pub async fn attach(&self, path: &str) -> Result, Error> { + pub async fn attach(&self, path: &str) -> crate::Result> { self.client .lock().await .as_mut() @@ -82,7 +82,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::get_cursor] - pub async fn get_cursor(&self) -> Result, Error> { + pub async fn get_cursor(&self) -> crate::Result> { self.client .lock().await .as_mut() @@ -92,7 +92,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::get_buffer] - pub async fn get_buffer(&self, path: &str) -> Result, Error> { + pub async fn get_buffer(&self, path: &str) -> crate::Result> { self.client .lock().await .as_mut() @@ -102,7 +102,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::leave_workspace] - pub async fn leave_workspace(&self) -> Result<(), Error> { + pub async fn leave_workspace(&self) -> crate::Result<()> { self.client .lock().await .as_mut() @@ -112,7 +112,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::disconnect_buffer] - pub async fn disconnect_buffer(&self, path: &str) -> Result { + pub async fn disconnect_buffer(&self, path: &str) -> crate::Result { let res = self.client .lock().await .as_mut() @@ -120,6 +120,16 @@ pub mod a_sync { .disconnect_buffer(path); Ok(res) } + + pub async fn select_buffer(&self) -> crate::Result { + let res = self.client + .lock().await + .as_ref() + .ok_or(Error::InvalidState { msg: "connect first".into() })? + .select_buffer() + .await?; + Ok(res) + } } } @@ -135,7 +145,7 @@ pub mod sync { buffer::controller::BufferController }; - /// persistant session manager for codemp client + /// persistent session manager for codemp client /// /// will hold a std mutex over an optional client, and drop its reference when disconnecting. /// also contains a tokio runtime to execute async futures on @@ -157,7 +167,7 @@ pub mod sync { } impl Instance { - fn if_client(&self, op: impl FnOnce(&mut Client) -> T) -> Result { + fn if_client(&self, op: impl FnOnce(&mut Client) -> T) -> crate::Result { if let Some(c) = self.client.lock().expect("client mutex poisoned").as_mut() { Ok(op(c)) } else { @@ -175,38 +185,42 @@ pub mod sync { } /// threadsafe and sync version of [crate::client::Client::join] - pub fn join(&self, session: &str) -> Result, Error> { + pub fn join(&self, session: &str) -> crate::Result> { self.if_client(|c| self.rt().block_on(c.join(session)))? } /// threadsafe and sync version of [crate::client::Client::create] - pub fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { + pub fn create(&self, path: &str, content: Option<&str>) -> crate::Result<()> { self.if_client(|c| self.rt().block_on(c.create(path, content)))? } /// threadsafe and sync version of [crate::client::Client::attach] - pub fn attach(&self, path: &str) -> Result, Error> { + pub fn attach(&self, path: &str) -> crate::Result> { self.if_client(|c| self.rt().block_on(c.attach(path)))? } /// threadsafe and sync version of [crate::client::Client::get_cursor] - pub fn get_cursor(&self) -> Result, Error> { + pub fn get_cursor(&self) -> crate::Result> { self.if_client(|c| c.get_cursor().ok_or(Error::InvalidState { msg: "join workspace first".into() }))? } /// threadsafe and sync version of [crate::client::Client::get_buffer] - pub fn get_buffer(&self, path: &str) -> Result, Error> { + pub fn get_buffer(&self, path: &str) -> crate::Result> { self.if_client(|c| c.get_buffer(path).ok_or(Error::InvalidState { msg: "join workspace or create requested buffer first".into() }))? } /// threadsafe and sync version of [crate::client::Client::leave_workspace] - pub fn leave_workspace(&self) -> Result<(), Error> { + pub fn leave_workspace(&self) -> crate::Result<()> { self.if_client(|c| c.leave_workspace()) } /// threadsafe and sync version of [crate::client::Client::disconnect_buffer] - pub fn disconnect_buffer(&self, path: &str) -> Result { + pub fn disconnect_buffer(&self, path: &str) -> crate::Result { self.if_client(|c| c.disconnect_buffer(path)) } + + pub fn select_buffer(&self) -> crate::Result { + self.if_client(|c| self.rt().block_on(c.select_buffer()))? + } } } diff --git a/src/lib.rs b/src/lib.rs index 23e87a9..0ae3765 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ //! //! ![just a nice pic](https://alemi.dev/img/about-slice-1.png) //! -//! This is the core library of the codemp project. +//! > the core library of the codemp project, driving all editor plugins //! //! ## structure //! The main entrypoint is the [Instance] object, that maintains a connection and can @@ -14,17 +14,14 @@ //! Blocking and callback variants are also implemented. The [api::Controller] can also be used to send new //! events to the server ([api::Controller::send]). //! -//! Each operation on a buffer is represented as an [ot::OperationSeq]. -//! A visualization about how OperationSeqs work is available -//! [here](http://operational-transformation.github.io/index.html), -//! but to use this library it's only sufficient to know that they can only -//! be applied on buffers of some length and are transformable to be able to be -//! applied in a different order while maintaining the same result. -//! -//! To generate Operation Sequences use helper methods from module [api::factory] (trait [api::OperationFactory]). +//! Each operation on a buffer is represented as an [woot::crdt::Op]. The underlying buffer is a +//! [WOOT CRDT](https://inria.hal.science/file/index/docid/71240/filename/RR-5580.pdf), +//! but to use this library it's only sufficient to know that all WOOT buffers that have received +//! the same operations converge to the same state, and that operations might not get integrated +//! immediately but instead deferred until compatible. //! //! ## features -//! * `ot` : include the underlying operational transform library (default enabled) +//! * `woot` : include the underlying CRDT library and re-exports it (default enabled) //! * `api` : include traits for core interfaces under [api] (default enabled) //! * `proto` : include GRCP protocol definitions under [proto] (default enabled) //! * `client`: include the local [client] implementation (default enabled) @@ -40,7 +37,7 @@ //! [instance::a_sync::Instance] //! //! ```rust,no_run -//! use codemp::api::{Controller, OperationFactory}; +//! use codemp::api::{Controller, TextChange}; //! # use codemp::instance::a_sync::Instance; //! //! # async fn async_example() -> codemp::Result<()> { @@ -62,12 +59,9 @@ //! // attach to a new buffer and execute operations on it //! session.create("test.txt", None).await?; // create new buffer //! let buffer = session.attach("test.txt").await?; // attach to it -//! let text = buffer.content(); // any string can be used as operation factory -//! buffer.send(text.ins("hello", 0))?; // insert some text -//! if let Some(operation) = text.diff(4, "o world", 5) { -//! buffer.send(operation)?; // replace with precision, if valid -//! } -//! assert_eq!(buffer.content(), "hello world"); +//! let local_change = TextChange { span: 0..0, content: "hello!".into() }; +//! buffer.send(local_change)?; // insert some text +//! let remote_change = buffer.recv().await?; //! # //! # Ok(()) //! # } @@ -85,16 +79,6 @@ //! let session = Instance::default(); // instantiate sync variant //! session.connect("http://alemi.dev:50051")?; // connect to server //! -//! // join remote workspace and handle cursor events with a callback -//! let cursor = session.join("some_workspace")?; // join workspace -//! let (stop, stop_rx) = tokio::sync::mpsc::unbounded_channel(); // create stop channel -//! Arc::new(cursor).callback( // register callback -//! session.rt(), stop_rx, // pass instance runtime and stop channel receiver -//! | cursor_event | { -//! println!("received cursor event: {:?}", cursor_event); -//! } -//! ); -//! //! // attach to buffer and blockingly receive events //! let buffer = session.attach("test.txt")?; // attach to buffer, must already exist //! while let Ok(op) = buffer.blocking_recv(session.rt()) { // must pass runtime @@ -172,8 +156,8 @@ pub mod instance; pub mod prelude; /// underlying OperationalTransform library used, re-exported -#[cfg(feature = "ot")] -pub use operational_transform as ot; +#[cfg(feature = "woot")] +pub use woot; /// protocol types and services auto-generated by grpc #[cfg(feature = "proto")] diff --git a/src/prelude.rs b/src/prelude.rs index 2c3e5fa..44170b9 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -7,22 +7,21 @@ pub use crate::{ Result as CodempResult, }; -#[cfg(feature = "ot")] -pub use crate::ot::OperationSeq as CodempOperationSeq; +#[cfg(feature = "woot")] +pub use crate::woot::crdt::Op as CodempOp; #[cfg(feature = "api")] -pub use crate::{ - api::Controller as CodempController, - api::OperationFactory as CodempOperationFactory, +pub use crate::api::{ + Controller as CodempController, + TextChange as CodempTextChange, }; #[cfg(feature = "client")] pub use crate::{ + Instance as CodempInstance, client::Client as CodempClient, cursor::Controller as CodempCursorController, buffer::Controller as CodempBufferController, - buffer::TextChange as CodempTextChange, - Instance as CodempInstance, }; #[cfg(feature = "proto")]