mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 23:34:49 +01:00
wip: Some attempts at getting DT to work with buffer controller and buffer worker!
This attempt doesn't use a ref to the crdt! But uses an extra channel!
This commit is contained in:
parent
0dfba0064a
commit
9acb7b6007
5 changed files with 162 additions and 83 deletions
|
@ -14,6 +14,7 @@ thiserror = "1.0"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
# woot
|
# woot
|
||||||
codemp-woot = { git = "ssh://git@github.com/hexedtech/woot.git", features = ["serde"], tag = "v0.1.2" }
|
codemp-woot = { git = "ssh://git@github.com/hexedtech/woot.git", features = ["serde"], tag = "v0.1.2" }
|
||||||
|
diamond-types="1.0"
|
||||||
# proto
|
# proto
|
||||||
codemp-proto = { git = "ssh://git@github.com/hexedtech/codemp-proto.git", tag = "v0.6.1" }
|
codemp-proto = { git = "ssh://git@github.com/hexedtech/codemp-proto.git", tag = "v0.6.1" }
|
||||||
uuid = { version = "1.7", features = ["v4"] }
|
uuid = { version = "1.7", features = ["v4"] }
|
||||||
|
|
|
@ -3,17 +3,12 @@
|
||||||
//! an editor-friendly representation of a text change in a buffer
|
//! an editor-friendly representation of a text change in a buffer
|
||||||
//! to easily interface with codemp from various editors
|
//! to easily interface with codemp from various editors
|
||||||
|
|
||||||
use crate::woot::{
|
|
||||||
crdt::{TextEditor, CRDT},
|
|
||||||
woot::Woot,
|
|
||||||
WootResult,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// an atomic and orderable operation
|
/// an atomic and orderable operation
|
||||||
///
|
///
|
||||||
/// this under the hood thinly wraps our CRDT operation
|
/// this under the hood thinly wraps our CRDT operation
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Op(pub(crate) woot::crdt::Op);
|
pub struct Op(pub(crate) diamond_types::list::operation::Operation);
|
||||||
|
// Do we need this in the api? why not just have a TextChange, which already covers as operation.
|
||||||
|
|
||||||
/// an editor-friendly representation of a text change in a buffer
|
/// an editor-friendly representation of a text change in a buffer
|
||||||
///
|
///
|
||||||
|
@ -44,6 +39,28 @@ pub struct TextChange {
|
||||||
pub content: String,
|
pub content: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TextChange {
|
||||||
|
pub fn span(&self) -> std::ops::Range<usize> {
|
||||||
|
self.start as usize..self.end as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
/// returns true if this TextChange deletes existing text
|
||||||
|
pub fn is_delete(&self) -> bool {
|
||||||
|
self.start < self.end
|
||||||
|
}
|
||||||
|
|
||||||
|
/// returns true if this TextChange adds new text
|
||||||
|
pub fn is_insert(&self) -> bool {
|
||||||
|
!self.content.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// returns true if this TextChange is effectively as no-op
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
!self.is_delete() && !self.is_insert()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
impl TextChange {
|
impl TextChange {
|
||||||
/// create a new TextChange from the difference of given strings
|
/// create a new TextChange from the difference of given strings
|
||||||
pub fn from_diff(before: &str, after: &str) -> TextChange {
|
pub fn from_diff(before: &str, after: &str) -> TextChange {
|
||||||
|
@ -241,4 +258,15 @@ mod tests {
|
||||||
let result = change.apply("some important text");
|
let result = change.apply("some important text");
|
||||||
assert_eq!(result, "some important text");
|
assert_eq!(result, "some important text");
|
||||||
}
|
}
|
||||||
|
}*/
|
||||||
|
|
||||||
|
// TODO: properly implement this for diamond types directly
|
||||||
|
impl From<Op> for TextChange {
|
||||||
|
fn from(value: Op) -> Self {
|
||||||
|
Self {
|
||||||
|
start: value.0.start() as u32,
|
||||||
|
end: value.0.end() as u32,
|
||||||
|
content: value.0.content_as_str().unwrap_or_default().to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use diamond_types::LocalVersion;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
|
@ -12,6 +13,8 @@ use crate::api::Controller;
|
||||||
|
|
||||||
use crate::api::TextChange;
|
use crate::api::TextChange;
|
||||||
|
|
||||||
|
use crate::api::Op;
|
||||||
|
|
||||||
use crate::ext::InternallyMutable;
|
use crate::ext::InternallyMutable;
|
||||||
|
|
||||||
/// the buffer controller implementation
|
/// the buffer controller implementation
|
||||||
|
@ -34,17 +37,20 @@ impl BufferController {
|
||||||
|
|
||||||
/// return buffer whole content, updating internal buffer previous state
|
/// return buffer whole content, updating internal buffer previous state
|
||||||
pub fn content(&self) -> String {
|
pub fn content(&self) -> String {
|
||||||
self.0.seen.set(self.0.content.borrow().clone());
|
// this function either needs a ref to the worker
|
||||||
self.0.content.borrow().clone()
|
// or needs to basically mirror all the operations that go through it.
|
||||||
|
// yikes.
|
||||||
|
todo!() // TODO ouch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct BufferControllerInner {
|
pub(crate) struct BufferControllerInner {
|
||||||
name: String,
|
name: String,
|
||||||
content: watch::Receiver<String>,
|
latest_version: watch::Receiver<diamond_types::LocalVersion>,
|
||||||
seen: InternallyMutable<String>, // internal buffer previous state
|
last_update: InternallyMutable<diamond_types::LocalVersion>,
|
||||||
operations: mpsc::UnboundedSender<TextChange>,
|
ops_in: mpsc::UnboundedSender<TextChange>,
|
||||||
|
ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>,
|
||||||
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||||
stopper: mpsc::UnboundedSender<()>, // just exist
|
stopper: mpsc::UnboundedSender<()>, // just exist
|
||||||
}
|
}
|
||||||
|
@ -52,18 +58,20 @@ pub(crate) struct BufferControllerInner {
|
||||||
impl BufferControllerInner {
|
impl BufferControllerInner {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
name: String,
|
name: String,
|
||||||
content: watch::Receiver<String>,
|
latest_version: watch::Receiver<diamond_types::LocalVersion>,
|
||||||
operations: mpsc::UnboundedSender<TextChange>,
|
ops_in: mpsc::UnboundedSender<TextChange>,
|
||||||
|
ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>,
|
||||||
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||||
stop: mpsc::UnboundedSender<()>,
|
stopper: mpsc::UnboundedSender<()>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name,
|
name,
|
||||||
content,
|
latest_version,
|
||||||
operations,
|
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
|
||||||
|
ops_in,
|
||||||
|
ops_out,
|
||||||
poller,
|
poller,
|
||||||
seen: InternallyMutable::default(),
|
stopper,
|
||||||
stopper: stop,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,9 +81,13 @@ impl Controller<TextChange> for BufferController {
|
||||||
/// block until a text change is available
|
/// block until a text change is available
|
||||||
/// this returns immediately if one is already available
|
/// this returns immediately if one is already available
|
||||||
async fn poll(&self) -> crate::Result<()> {
|
async fn poll(&self) -> crate::Result<()> {
|
||||||
if self.0.seen.get() != *self.0.content.borrow() {
|
// TODO there might be some extra logic we can play with using `seen` and `not seen` yet
|
||||||
return Ok(()); // short circuit: already available!
|
// mechanics, not just the comparison. nevermind, the `has_changed` etc stuff needs mut self, yuk.
|
||||||
|
|
||||||
|
if self.0.last_update.get() != *self.0.latest_version.borrow() {
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel::<()>();
|
let (tx, rx) = oneshot::channel::<()>();
|
||||||
self.0.poller.send(tx)?;
|
self.0.poller.send(tx)?;
|
||||||
rx.await
|
rx.await
|
||||||
|
@ -85,32 +97,40 @@ impl Controller<TextChange> for BufferController {
|
||||||
|
|
||||||
/// if a text change is available, return it immediately
|
/// if a text change is available, return it immediately
|
||||||
fn try_recv(&self) -> crate::Result<Option<TextChange>> {
|
fn try_recv(&self) -> crate::Result<Option<TextChange>> {
|
||||||
let seen = self.0.seen.get();
|
let last_update = self.0.last_update.get();
|
||||||
let actual = self.0.content.borrow().clone();
|
let latest_version = self.0.latest_version.borrow().clone();
|
||||||
if seen == actual {
|
|
||||||
|
if last_update == latest_version {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
let change = TextChange::from_diff(&seen, &actual);
|
|
||||||
self.0.seen.set(actual);
|
if let Ok((lv, Some(op))) = self.0.ops_out.try_recv() {
|
||||||
Ok(Some(change))
|
// if the current change has
|
||||||
|
self.0.last_update.set(lv);
|
||||||
|
return Ok(Some(TextChange::from(op)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(crate::Error::Channel { send: false });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// block until a new text change is available, and return it
|
/// block until a new text change is available, and return it
|
||||||
async fn recv(&self) -> crate::Result<TextChange> {
|
async fn recv(&self) -> crate::Result<TextChange> {
|
||||||
self.poll().await?;
|
let last_update = self.0.last_update.get();
|
||||||
let seen = self.0.seen.get();
|
|
||||||
let actual = self.0.content.borrow().clone();
|
// no need to poll here? as soon as we have new changes we return them!
|
||||||
let change = TextChange::from_diff(&seen, &actual);
|
if let Some((lv, Some(op))) = self.0.ops_out.recv().await {
|
||||||
self.0.seen.set(actual);
|
self.0.last_update.set(lv);
|
||||||
Ok(change)
|
Ok(TextChange::from(op))
|
||||||
|
} else {
|
||||||
|
Err(crate::Error::Channel { send: false })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// enqueue a text change for processing
|
/// enqueue a text change for processing
|
||||||
/// this also updates internal buffer previous state
|
/// this also updates internal buffer previous state
|
||||||
fn send(&self, op: TextChange) -> crate::Result<()> {
|
fn send(&self, op: TextChange) -> crate::Result<()> {
|
||||||
let before = self.0.seen.get();
|
// we let the worker do the updating to the last version and send it back.
|
||||||
self.0.seen.set(op.apply(&before));
|
Ok(self.0.ops_in.send(op)?)
|
||||||
Ok(self.0.operations.send(op)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self) -> bool {
|
fn stop(&self) -> bool {
|
||||||
|
|
|
@ -2,24 +2,26 @@ use std::collections::hash_map::DefaultHasher;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::sync::{watch, mpsc, oneshot};
|
use diamond_types::LocalVersion;
|
||||||
|
use tokio::sync::{mpsc, oneshot, watch};
|
||||||
use tonic::{async_trait, Streaming};
|
use tonic::{async_trait, Streaming};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use woot::crdt::CRDT;
|
|
||||||
use woot::woot::Woot;
|
use crate::api::controller::ControllerWorker;
|
||||||
|
use crate::api::Op;
|
||||||
|
use crate::api::TextChange;
|
||||||
|
|
||||||
use crate::errors::IgnorableError;
|
use crate::errors::IgnorableError;
|
||||||
use crate::api::controller::ControllerWorker;
|
|
||||||
use crate::api::TextChange;
|
|
||||||
use codemp_proto::buffer::{BufferEvent, Operation};
|
use codemp_proto::buffer::{BufferEvent, Operation};
|
||||||
|
|
||||||
use super::controller::{BufferController, BufferControllerInner};
|
use super::controller::{BufferController, BufferControllerInner};
|
||||||
|
|
||||||
pub(crate) struct BufferWorker {
|
pub(crate) struct BufferWorker {
|
||||||
_user_id: Uuid,
|
user_id: Uuid,
|
||||||
buffer: Woot,
|
buffer: diamond_types::list::ListCRDT,
|
||||||
content: watch::Sender<String>,
|
latest_version: watch::Sender<diamond_types::LocalVersion>,
|
||||||
operations: mpsc::UnboundedReceiver<TextChange>,
|
ops_in: mpsc::UnboundedReceiver<TextChange>,
|
||||||
|
ops_out: mpsc::UnboundedSender<(LocalVersion, Option<Op>)>,
|
||||||
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
||||||
pollers: Vec<oneshot::Sender<()>>,
|
pollers: Vec<oneshot::Sender<()>>,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
stop: mpsc::UnboundedReceiver<()>,
|
||||||
|
@ -28,25 +30,37 @@ pub(crate) struct BufferWorker {
|
||||||
|
|
||||||
impl BufferWorker {
|
impl BufferWorker {
|
||||||
pub fn new(user_id: Uuid, path: &str) -> Self {
|
pub fn new(user_id: Uuid, path: &str) -> Self {
|
||||||
let (txt_tx, txt_rx) = watch::channel("".to_string());
|
//let (txt_tx, txt_rx) = watch::channel("".to_string());
|
||||||
let (op_tx, op_rx) = mpsc::unbounded_channel();
|
let init = diamond_types::LocalVersion::default();
|
||||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
let buffer = diamond_types::list::ListCRDT::default();
|
||||||
|
|
||||||
|
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
|
||||||
|
let (opin_tx, opin_rx) = mpsc::unbounded_channel();
|
||||||
|
let (opout_tx, opout_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let mut hasher = DefaultHasher::new();
|
let mut hasher = DefaultHasher::new();
|
||||||
user_id.hash(&mut hasher);
|
user_id.hash(&mut hasher);
|
||||||
let site_id = hasher.finish() as usize;
|
let _site_id = hasher.finish() as usize;
|
||||||
|
|
||||||
|
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let controller = BufferControllerInner::new(
|
let controller = BufferControllerInner::new(
|
||||||
path.to_string(),
|
path.to_string(),
|
||||||
txt_rx,
|
latest_version_rx,
|
||||||
op_tx,
|
opin_tx,
|
||||||
|
opout_rx,
|
||||||
poller_tx,
|
poller_tx,
|
||||||
end_tx,
|
end_tx,
|
||||||
);
|
);
|
||||||
|
|
||||||
BufferWorker {
|
BufferWorker {
|
||||||
_user_id: user_id,
|
user_id,
|
||||||
buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging!
|
buffer,
|
||||||
content: txt_tx,
|
latest_version: latest_version_tx,
|
||||||
operations: op_rx,
|
ops_in: opin_rx,
|
||||||
|
ops_out: opout_tx,
|
||||||
poller: poller_rx,
|
poller: poller_rx,
|
||||||
pollers: Vec::new(),
|
pollers: Vec::new(),
|
||||||
stop: end_rx,
|
stop: end_rx,
|
||||||
|
@ -81,41 +95,56 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
},
|
},
|
||||||
|
|
||||||
// received a text change from editor
|
// received a text change from editor
|
||||||
res = self.operations.recv() => match res {
|
res = self.ops_in.recv() => match res {
|
||||||
None => break tracing::debug!("stopping: editor closed channel"),
|
None => break tracing::debug!("stopping: editor closed channel"),
|
||||||
Some(change) => match change.transform(&self.buffer) {
|
Some(change) => {
|
||||||
Err(e) => break tracing::error!("could not apply operation from client: {}", e),
|
|
||||||
Ok(ops) => {
|
let agent_id = self.buffer.get_or_create_agent_id(&self.user_id.to_string());
|
||||||
for op in ops {
|
let lastver = self.buffer.oplog.local_version_ref();
|
||||||
self.buffer.merge(op.0.clone());
|
|
||||||
let operation = Operation {
|
if change.is_insert() {
|
||||||
data: postcard::to_extend(&op.0, Vec::new()).unwrap(),
|
self.buffer.insert(agent_id, change.start as usize, &change.content) // TODO da vedere il cast
|
||||||
};
|
} else if change.is_delete() {
|
||||||
if let Err(e) = tx.send(operation).await {
|
self.buffer.delete_without_content(1, change.span())
|
||||||
tracing::error!("server refused to broadcast {}: {}", op.0, e);
|
} else { continue; };
|
||||||
}
|
|
||||||
}
|
tx.send(Operation { data: self.buffer.oplog.encode_from(Default::default(), lastver) });
|
||||||
self.content.send(self.buffer.view())
|
self.latest_version.send(self.buffer.oplog.local_version());
|
||||||
.unwrap_or_warn("could not send buffer update");
|
|
||||||
},
|
},
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
// received a message from server
|
// received a message from server
|
||||||
res = rx.message() => match res {
|
res = rx.message() => match res {
|
||||||
Err(_e) => break,
|
Err(_e) => break,
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
Ok(Some(change)) => match postcard::from_bytes::<woot::crdt::Op>(&change.op.data) {
|
Ok(Some(change)) => {
|
||||||
Ok(op) => { // TODO here in change we receive info about the author, maybe propagate?
|
let lastver = self.buffer.oplog.local_version_ref();
|
||||||
self.buffer.merge(op);
|
|
||||||
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
|
match self.buffer.merge_data_and_ff(&change.op.data) {
|
||||||
for tx in self.pollers.drain(..) {
|
Ok(local_version) => {
|
||||||
tx.send(()).unwrap_or_warn("could not wake up poller");
|
|
||||||
}
|
// give all the changes needed to the controller in a channel.
|
||||||
},
|
for (lv, Some(dtop)) in self.buffer.oplog.iter_xf_operations_from(lastver, &local_version) {
|
||||||
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
// x.0.start should always be after lastver!
|
||||||
|
// this step_ver will be the version after we apply the operation
|
||||||
|
// we give it to the controller so that he knows where it's at.
|
||||||
|
let step_ver = self.buffer.oplog.version_union(&[lv.start], lastver);
|
||||||
|
let opout = (step_ver, Some(Op(dtop)));
|
||||||
|
|
||||||
|
self.ops_out.send(opout).unwrap(); //TODO ERRORS
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally we send the
|
||||||
|
self.latest_version.send(local_version);
|
||||||
|
for tx in self.pollers.drain(..) {
|
||||||
|
tx.send(()).unwrap_or_warn("could not wake up poller");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,10 +82,11 @@ impl<T: Clone> InternallyMutable<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
pub(crate) struct CallbackHandleWatch<T>(pub(crate) tokio::sync::watch::Sender<Option<T>>);
|
pub(crate) struct CallbackHandleWatch<T>(pub(crate) tokio::sync::watch::Sender<Option<T>>);
|
||||||
|
|
||||||
impl<T> crate::api::controller::CallbackHandle for CallbackHandleWatch<T> {
|
impl<T> crate::api::controller::CallbackHandle for CallbackHandleWatch<T> {
|
||||||
fn unregister(self) {
|
fn unregister(self) {
|
||||||
self.0.send_replace(None);
|
self.0.send_replace(None);
|
||||||
}
|
}
|
||||||
}
|
}*/
|
||||||
|
|
Loading…
Reference in a new issue