feat: async try_recv, delta_request

Co-authored-by: alemi <me@alemi.dev>
This commit is contained in:
zaaarf 2024-08-14 00:24:32 +02:00
parent ada6ed49c1
commit a9d713fd75
No known key found for this signature in database
GPG key ID: 102E445F4C3F829B
10 changed files with 100 additions and 301 deletions

View file

@ -12,17 +12,15 @@ crate-type = ["cdylib"]
tracing = "0.1" tracing = "0.1"
thiserror = "1.0" thiserror = "1.0"
async-trait = "0.1" async-trait = "0.1"
# woot # crdt
codemp-woot = { git = "ssh://git@github.com/hexedtech/woot.git", features = ["serde"], tag = "v0.1.2" } diamond-types = "1.0"
diamond-types="1.0"
# proto # proto
codemp-proto = { git = "ssh://git@github.com/hexedtech/codemp-proto.git", tag = "v0.6.1" } codemp-proto = { git = "ssh://git@github.com/hexedtech/codemp-proto.git", tag = "v0.6.1" }
uuid = { version = "1.7", features = ["v4"] } uuid = { version = "1.7", features = ["v4"] }
tonic = { version = "0.11", features = ["tls", "tls-roots"] } tonic = { version = "0.11", features = ["tls", "tls-roots"] }
postcard = "1.0"
# api # api
similar = { version = "2.2", features = ["inline"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"] } tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
# client # client
tokio-stream = "0.1" tokio-stream = "0.1"
dashmap = "5.5" dashmap = "5.5"

View file

@ -3,13 +3,6 @@
//! an editor-friendly representation of a text change in a buffer //! an editor-friendly representation of a text change in a buffer
//! to easily interface with codemp from various editors //! to easily interface with codemp from various editors
/// an atomic and orderable operation
///
/// this under the hood thinly wraps our CRDT operation
#[derive(Debug, Clone)]
pub struct Op(pub(crate) diamond_types::list::operation::Operation);
// Do we need this in the api? why not just have a TextChange, which already covers as operation.
/// an editor-friendly representation of a text change in a buffer /// an editor-friendly representation of a text change in a buffer
/// ///
/// this represent a range in the previous state of the string and a new content which should be /// this represent a range in the previous state of the string and a new content which should be
@ -25,7 +18,6 @@ pub struct Op(pub(crate) diamond_types::list::operation::Operation);
/// to delete a the fourth character we should send a /// to delete a the fourth character we should send a
/// `TextChange { span: 3..4, content: "".into() }` /// `TextChange { span: 3..4, content: "".into() }`
/// ///
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass)]
@ -37,6 +29,8 @@ pub struct TextChange {
pub end: u32, pub end: u32,
/// new content of text inside span /// new content of text inside span
pub content: String, pub content: String,
/// optional content hash after applying this change
pub hash: Option<i64>,
} }
impl TextChange { impl TextChange {
@ -58,92 +52,6 @@ impl TextChange {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
!self.is_delete() && !self.is_insert() !self.is_delete() && !self.is_insert()
} }
}
/*
impl TextChange {
/// create a new TextChange from the difference of given strings
pub fn from_diff(before: &str, after: &str) -> TextChange {
let diff = similar::TextDiff::from_chars(before, after);
let mut start = 0;
let mut end = 0;
let mut from_beginning = true;
for op in diff.ops() {
match op {
similar::DiffOp::Equal { len, .. } => {
if from_beginning {
start += len
} else {
end += len
}
}
_ => {
end = 0;
from_beginning = false;
}
}
}
let end_before = before.len() - end;
let end_after = after.len() - end;
TextChange {
start: start as u32,
end: end_before as u32,
content: after[start..end_after].to_string(),
}
}
pub fn span(&self) -> std::ops::Range<usize> {
self.start as usize..self.end as usize
}
/// consume the [TextChange], transforming it into a Vec of [Op]
pub fn transform(&self, woot: &Woot) -> WootResult<Vec<Op>> {
let mut out = Vec::new();
if self.is_empty() {
return Ok(out);
} // no-op
let view = woot.view();
let Some(span) = view.get(self.span()) else {
return Err(crate::woot::WootError::OutOfBounds);
};
let diff = similar::TextDiff::from_chars(span, &self.content);
for (i, diff) in diff.iter_all_changes().enumerate() {
match diff.tag() {
similar::ChangeTag::Equal => {}
similar::ChangeTag::Delete => match woot.delete_one(self.span().start + i) {
Err(e) => tracing::error!("could not create deletion: {}", e),
Ok(op) => out.push(Op(op)),
},
similar::ChangeTag::Insert => {
match woot.insert(self.span().start + i, diff.value()) {
Ok(ops) => {
for op in ops {
out.push(Op(op))
}
}
Err(e) => tracing::error!("could not create insertion: {}", e),
}
}
}
}
Ok(out)
}
/// returns true if this TextChange deletes existing text
pub fn is_deletion(&self) -> bool {
!self.span().is_empty()
}
/// returns true if this TextChange adds new text
pub fn is_addition(&self) -> bool {
!self.content.is_empty()
}
/// returns true if this TextChange is effectively as no-op
pub fn is_empty(&self) -> bool {
!self.is_deletion() && !self.is_addition()
}
/// applies this text change to given text, returning a new string /// applies this text change to given text, returning a new string
pub fn apply(&self, txt: &str) -> String { pub fn apply(&self, txt: &str) -> String {
@ -152,55 +60,17 @@ impl TextChange {
let post = txt.get(self.span().end..).unwrap_or("").to_string(); let post = txt.get(self.span().end..).unwrap_or("").to_string();
format!("{}{}{}", pre, self.content, post) format!("{}{}{}", pre, self.content, post)
} }
/// convert from byte index to row and column
/// txt must be the whole content of the buffer, in order to count lines
pub fn index_to_rowcol(txt: &str, index: usize) -> codemp_proto::cursor::RowCol {
// FIXME might panic, use .get()
let row = txt[..index].matches('\n').count() as i32;
let col = txt[..index].split('\n').last().unwrap_or("").len() as i32;
codemp_proto::cursor::RowCol { row, col }
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[test]
fn textchange_diff_works_for_deletions() {
let change = super::TextChange::from_diff(
"sphinx of black quartz, judge my vow",
"sphinx of quartz, judge my vow",
);
assert_eq!(change.span(), 10..16);
assert_eq!(change.content, "");
}
#[test]
fn textchange_diff_works_for_insertions() {
let change = super::TextChange::from_diff(
"sphinx of quartz, judge my vow",
"sphinx of black quartz, judge my vow",
);
assert_eq!(change.span(), 10..10);
assert_eq!(change.content, "black ");
}
#[test]
fn textchange_diff_works_for_changes() {
let change = super::TextChange::from_diff(
"sphinx of black quartz, judge my vow",
"sphinx who watches the desert, judge my vow",
);
assert_eq!(change.span(), 7..22);
assert_eq!(change.content, "who watches the desert");
}
#[test] #[test]
fn textchange_apply_works_for_insertions() { fn textchange_apply_works_for_insertions() {
let change = super::TextChange { let change = super::TextChange {
start: 5, start: 5,
end: 5, end: 5,
content: " cruel".to_string(), content: " cruel".to_string(),
hash: None
}; };
let result = change.apply("hello world!"); let result = change.apply("hello world!");
assert_eq!(result, "hello cruel world!"); assert_eq!(result, "hello cruel world!");
@ -212,6 +82,7 @@ mod tests {
start: 5, start: 5,
end: 11, end: 11,
content: "".to_string(), content: "".to_string(),
hash: None
}; };
let result = change.apply("hello cruel world!"); let result = change.apply("hello cruel world!");
assert_eq!(result, "hello world!"); assert_eq!(result, "hello world!");
@ -223,6 +94,7 @@ mod tests {
start: 5, start: 5,
end: 11, end: 11,
content: " not very pleasant".to_string(), content: " not very pleasant".to_string(),
hash: None
}; };
let result = change.apply("hello cruel world!"); let result = change.apply("hello cruel world!");
assert_eq!(result, "hello not very pleasant world!"); assert_eq!(result, "hello not very pleasant world!");
@ -234,6 +106,7 @@ mod tests {
start: 100, start: 100,
end: 110, end: 110,
content: "a very long string \n which totally matters".to_string(), content: "a very long string \n which totally matters".to_string(),
hash: None
}; };
let result = change.apply("a short text"); let result = change.apply("a short text");
assert_eq!( assert_eq!(
@ -242,31 +115,15 @@ mod tests {
); );
} }
#[test]
fn empty_diff_produces_empty_textchange() {
let change = super::TextChange::from_diff("same \n\n text", "same \n\n text");
assert!(change.is_empty());
}
#[test] #[test]
fn empty_textchange_doesnt_alter_buffer() { fn empty_textchange_doesnt_alter_buffer() {
let change = super::TextChange { let change = super::TextChange {
start: 42, start: 42,
end: 42, end: 42,
content: "".to_string(), content: "".to_string(),
hash: None
}; };
let result = change.apply("some important text"); let result = change.apply("some important text");
assert_eq!(result, "some important text"); assert_eq!(result, "some important text");
} }
}*/
// TODO: properly implement this for diamond types directly
impl From<Op> for TextChange {
fn from(value: Op) -> Self {
Self {
start: value.0.start() as u32,
end: value.0.end() as u32,
content: value.0.content_as_str().unwrap_or_default().to_string(),
}
}
} }

View file

@ -37,12 +37,12 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// ///
/// `async fn recv(&self) -> codemp::Result<T>;` /// `async fn recv(&self) -> codemp::Result<T>;`
async fn recv(&self) -> Result<T> { async fn recv(&self) -> Result<T> {
if let Some(x) = self.try_recv()? { loop {
return Ok(x); // short circuit if already available self.poll().await?;
if let Some(x) = self.try_recv().await? {
break Ok(x);
}
} }
self.poll().await?;
Ok(self.try_recv()?.expect("no message available after polling"))
} }
/// block until next value is available without consuming it /// block until next value is available without consuming it
@ -53,7 +53,7 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
async fn poll(&self) -> Result<()>; async fn poll(&self) -> Result<()>;
/// attempt to receive a value without blocking, return None if nothing is available /// attempt to receive a value without blocking, return None if nothing is available
fn try_recv(&self) -> Result<Option<T>>; async fn try_recv(&self) -> Result<Option<T>>;
/// stop underlying worker /// stop underlying worker
/// ///

View file

@ -21,7 +21,6 @@ pub mod user;
pub use controller::Controller; pub use controller::Controller;
pub use change::TextChange; pub use change::TextChange;
pub use change::Op;
pub use cursor::Cursor; pub use cursor::Cursor;
pub use event::Event; pub use event::Event;
pub use user::User; pub use user::User;

View file

@ -5,16 +5,13 @@
use std::sync::Arc; use std::sync::Arc;
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{oneshot, Mutex}; use tokio::sync::{oneshot, mpsc, watch};
use tokio::sync::{mpsc, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::api::Controller; use crate::api::Controller;
use crate::api::TextChange; use crate::api::TextChange;
use crate::api::Op;
use crate::ext::InternallyMutable; use crate::ext::InternallyMutable;
/// the buffer controller implementation /// the buffer controller implementation
@ -45,39 +42,14 @@ impl BufferController {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct BufferControllerInner { pub(crate) struct BufferControllerInner {
name: String, pub(crate) name: String,
latest_version: watch::Receiver<diamond_types::LocalVersion>, pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>,
last_update: InternallyMutable<diamond_types::LocalVersion>, pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedSender<TextChange>, pub(crate) ops_in: mpsc::UnboundedSender<TextChange>,
ops_out: Mutex<mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>>, pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
}
impl BufferControllerInner {
pub(crate) fn new(
name: String,
latest_version: watch::Receiver<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedSender<TextChange>,
ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stopper: mpsc::UnboundedSender<()>,
content_request: mpsc::Sender<oneshot::Sender<String>>,
// TODO we're getting too much stuff via constructor, maybe make everything pub(crate)
// instead?? or maybe builder, or maybe defaults
) -> Self {
Self {
name,
latest_version,
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
ops_in,
ops_out: Mutex::new(ops_out),
poller,
stopper,
content_request,
}
}
} }
#[async_trait] #[async_trait]
@ -85,9 +57,6 @@ impl Controller<TextChange> for BufferController {
/// block until a text change is available /// block until a text change is available
/// this returns immediately if one is already available /// this returns immediately if one is already available
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> crate::Result<()> {
// TODO there might be some extra logic we can play with using `seen` and `not seen` yet
// mechanics, not just the comparison. nevermind, the `has_changed` etc stuff needs mut self, yuk.
if self.0.last_update.get() != *self.0.latest_version.borrow() { if self.0.last_update.get() != *self.0.latest_version.borrow() {
return Ok(()); return Ok(());
} }
@ -100,7 +69,7 @@ impl Controller<TextChange> for BufferController {
} }
/// if a text change is available, return it immediately /// if a text change is available, return it immediately
fn try_recv(&self) -> crate::Result<Option<TextChange>> { async fn try_recv(&self) -> crate::Result<Option<TextChange>> {
let last_update = self.0.last_update.get(); let last_update = self.0.last_update.get();
let latest_version = self.0.latest_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone();
@ -108,29 +77,11 @@ impl Controller<TextChange> for BufferController {
return Ok(None); return Ok(None);
} }
match self.0.ops_out.try_lock() { let (tx, rx) = oneshot::channel();
Err(_) => Ok(None), self.0.delta_request.send((last_update, tx)).await?;
Ok(mut ops) => match ops.try_recv() { let (v, change) = rx.await?;
Ok((lv, Some(op))) => { self.0.last_update.set(v);
self.0.last_update.set(lv); Ok(Some(change))
Ok(Some(TextChange::from(op)))
},
Ok((_lv, None)) => Ok(None), // TODO what is going on here?
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
Err(mpsc::error::TryRecvError::Disconnected) =>
Err(crate::Error::Channel { send: false }),
},
}
}
/// block until a new text change is available, and return it
async fn recv(&self) -> crate::Result<TextChange> {
if let Some((lv, Some(op))) = self.0.ops_out.lock().await.recv().await {
self.0.last_update.set(lv);
Ok(TextChange::from(op))
} else {
Err(crate::Error::Channel { send: false })
}
} }
/// enqueue a text change for processing /// enqueue a text change for processing

View file

@ -6,10 +6,10 @@ use tonic::{async_trait, Streaming};
use uuid::Uuid; use uuid::Uuid;
use crate::api::controller::ControllerWorker; use crate::api::controller::ControllerWorker;
use crate::api::Op;
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
use crate::ext::InternallyMutable;
use codemp_proto::buffer::{BufferEvent, Operation}; use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::{BufferController, BufferControllerInner}; use super::controller::{BufferController, BufferControllerInner};
@ -18,10 +18,10 @@ pub(crate) struct BufferWorker {
user_id: Uuid, user_id: Uuid,
latest_version: watch::Sender<diamond_types::LocalVersion>, latest_version: watch::Sender<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedReceiver<TextChange>, ops_in: mpsc::UnboundedReceiver<TextChange>,
ops_out: mpsc::UnboundedSender<(LocalVersion, Option<Op>)>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>, content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
controller: BufferController, controller: BufferController,
} }
@ -32,34 +32,35 @@ impl BufferWorker {
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone()); let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
let (opin_tx, opin_rx) = mpsc::unbounded_channel(); let (opin_tx, opin_rx) = mpsc::unbounded_channel();
let (opout_tx, opout_rx) = mpsc::unbounded_channel();
let (req_tx, req_rx) = mpsc::channel(1); let (req_tx, req_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let (poller_tx, poller_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let controller = BufferControllerInner::new( let controller = BufferControllerInner {
path.to_string(), name: path.to_string(),
latest_version_rx, latest_version: latest_version_rx,
opin_tx, last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
opout_rx, ops_in: opin_tx,
poller_tx, poller: poller_tx,
end_tx, stopper: end_tx,
req_tx, content_request: req_tx,
); delta_request: recv_tx,
};
BufferWorker { BufferWorker {
user_id, user_id,
latest_version: latest_version_tx, latest_version: latest_version_tx,
ops_in: opin_rx, ops_in: opin_rx,
ops_out: opout_tx,
poller: poller_rx, poller: poller_rx,
pollers: Vec::new(), pollers: Vec::new(),
stop: end_rx, stop: end_rx,
controller: BufferController(Arc::new(controller)), controller: BufferController(Arc::new(controller)),
content_checkout: req_rx, content_checkout: req_rx,
delta_req: recv_rx,
} }
} }
} }
@ -77,6 +78,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
let mut branch = diamond_types::list::Branch::new(); let mut branch = diamond_types::list::Branch::new();
let mut oplog = diamond_types::list::OpLog::new(); let mut oplog = diamond_types::list::OpLog::new();
let mut timer = Timer::new(10); // TODO configurable!!
loop { loop {
// block until one of these is ready // block until one of these is ready
tokio::select! { tokio::select! {
@ -112,31 +114,15 @@ impl ControllerWorker<TextChange> for BufferWorker {
}, },
}, },
// received a message from server // received a message from server: add to oplog and update latest version (+unlock pollers)
res = rx.message() => match res { res = rx.message() => match res {
Err(_e) => break, Err(_e) => break,
Ok(None) => break, Ok(None) => break,
Ok(Some(change)) => { Ok(Some(change)) => {
let last_ver = oplog.local_version();
match oplog.decode_and_add(&change.op.data) { match oplog.decode_and_add(&change.op.data) {
Ok(local_version) => { Ok(local_version) => {
// give all the changes needed to the controller in a channel.
for (lv, dtop) in oplog.iter_xf_operations_from(&last_ver, &local_version) {
if let Some(dtop) = dtop {
// x.0.start should always be after lastver!
// this step_ver will be the version after we apply the operation
// we give it to the controller so that he knows where it's at.
let step_ver = oplog.version_union(&[lv.start], &last_ver);
let opout = (step_ver, Some(Op(dtop)));
self.ops_out.send(opout).unwrap_or_warn("could not update ops channel -- is controller dead?");
}
}
// finally we send the
self.latest_version.send(local_version) self.latest_version.send(local_version)
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
for tx in self.pollers.drain(..) { for tx in self.pollers.drain(..) {
tx.send(()).unwrap_or_warn("could not wake up poller"); tx.send(()).unwrap_or_warn("could not wake up poller");
} }
@ -146,8 +132,35 @@ impl ControllerWorker<TextChange> for BufferWorker {
}, },
}, },
// controller is ready to apply change and recv(), calculate it and send it back
res = self.delta_req.recv() => match res {
None => break tracing::error!("no more active controllers: can't send changes"),
Some((last_ver, tx)) => {
if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() {
// x.0.start should always be after lastver!
// this step_ver will be the version after we apply the operation
// we give it to the controller so that he knows where it's at.
let step_ver = oplog.version_union(&[lv.start], &last_ver);
branch.merge(&oplog, oplog.local_version_ref());
let hash = if timer.step() {
let hash = xxhash_rust::xxh3::xxh3_64(branch.content().to_string().as_bytes());
Some(i64::from_ne_bytes(hash.to_ne_bytes()))
} else { None };
let tc = crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.end() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
hash
};
tx.send((step_ver, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?");
}
},
},
// received a request for full CRDT content
res = self.content_checkout.recv() => match res { res = self.content_checkout.recv() => match res {
None => break tracing::error!("no more active controllers"), None => break tracing::error!("no more active controllers: can't update content"),
Some(tx) => { Some(tx) => {
branch.merge(&oplog, oplog.local_version_ref()); branch.merge(&oplog, oplog.local_version_ref());
let content = branch.content().to_string(); let content = branch.content().to_string();
@ -158,3 +171,17 @@ impl ControllerWorker<TextChange> for BufferWorker {
} }
} }
} }
struct Timer(u32, u32);
impl Timer {
fn new(period: u32) -> Self { Timer(0, period) }
fn step(&mut self) -> bool {
self.0 += 1;
if self.0 >= self.1 {
self.0 = 0;
true
} else {
false
}
}
}

View file

@ -3,13 +3,7 @@
//! a controller implementation for cursor actions //! a controller implementation for cursor actions
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{ use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex};
broadcast::{
self,
error::{RecvError, TryRecvError},
},
mpsc, watch, Mutex,
};
use tonic::async_trait; use tonic::async_trait;
use crate::api::{Controller, Cursor}; use crate::api::{Controller, Cursor};
@ -66,36 +60,15 @@ impl Controller<Cursor> for CursorController {
} }
/// try to receive without blocking, but will still block on stream mutex /// try to receive without blocking, but will still block on stream mutex
fn try_recv(&self) -> crate::Result<Option<Cursor>> { async fn try_recv(&self) -> crate::Result<Option<Cursor>> {
match self.0.stream.try_lock() {
Err(_) => Ok(None),
Ok(mut stream) => match stream.try_recv() {
Ok(x) => Ok(Some(x.into())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => {
tracing::warn!("cursor channel lagged, skipping {} events", n);
Ok(stream.try_recv().map(|x| x.into()).ok())
}
}
}
}
// TODO is this cancelable? so it can be used in tokio::select!
// TODO is the result type overkill? should be an option?
/// get next cursor event from current workspace, or block until one is available
async fn recv(&self) -> crate::Result<Cursor> {
let mut stream = self.0.stream.lock().await; let mut stream = self.0.stream.lock().await;
match stream.recv().await { match stream.try_recv() {
Ok(x) => Ok(x.into()), Ok(x) => Ok(Some(x.into())),
Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(TryRecvError::Empty) => Ok(None),
Err(RecvError::Lagged(n)) => { Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }),
tracing::error!("cursor channel lagged behind, skipping {} events", n); Err(TryRecvError::Lagged(n)) => {
Ok(stream tracing::warn!("cursor channel lagged, skipping {} events", n);
.recv() Ok(stream.try_recv().map(|x| x.into()).ok())
.await
.expect("could not receive after lagging")
.into())
} }
} }
} }

View file

@ -1,5 +1,3 @@
#![deny(clippy::all)]
pub mod client; pub mod client;
pub mod workspace; pub mod workspace;
pub mod cursor; pub mod cursor;

View file

@ -143,9 +143,6 @@ pub mod ffi;
/// common utils used in this library and re-exposed /// common utils used in this library and re-exposed
pub mod ext; pub mod ext;
/// underlying OperationalTransform library used, re-exported
pub use woot;
pub use errors::Error; pub use errors::Error;
pub use errors::Result; pub use errors::Result;

View file

@ -11,7 +11,6 @@ pub use crate::api::{
Controller as CodempController, Controller as CodempController,
TextChange as CodempTextChange, TextChange as CodempTextChange,
Cursor as CodempCursor, Cursor as CodempCursor,
Op as CodempOp,
User as CodempUser, User as CodempUser,
Event as CodempEvent, Event as CodempEvent,
}; };