mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 15:24:48 +01:00
fix: add .content() back
except now its async and can fail and basically requests the worker to generate the content for us on demand Co-authored-by: zaaarf <me@zaaarf.foo> Co-authored-by: cschen <cschen@codemp.dev>
This commit is contained in:
parent
8874f31eeb
commit
2a016a6619
2 changed files with 64 additions and 48 deletions
|
@ -5,7 +5,7 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use diamond_types::LocalVersion;
|
use diamond_types::LocalVersion;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::{oneshot, Mutex};
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
|
|
||||||
|
@ -36,11 +36,10 @@ impl BufferController {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return buffer whole content, updating internal buffer previous state
|
/// return buffer whole content, updating internal buffer previous state
|
||||||
pub fn content(&self) -> String {
|
pub async fn content(&self) -> crate::Result<String> {
|
||||||
// this function either needs a ref to the worker
|
let (tx, rx) = oneshot::channel();
|
||||||
// or needs to basically mirror all the operations that go through it.
|
self.0.content_request.send(tx).await?;
|
||||||
// yikes.
|
Ok(rx.await?)
|
||||||
todo!() // TODO ouch
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,9 +49,10 @@ pub(crate) struct BufferControllerInner {
|
||||||
latest_version: watch::Receiver<diamond_types::LocalVersion>,
|
latest_version: watch::Receiver<diamond_types::LocalVersion>,
|
||||||
last_update: InternallyMutable<diamond_types::LocalVersion>,
|
last_update: InternallyMutable<diamond_types::LocalVersion>,
|
||||||
ops_in: mpsc::UnboundedSender<TextChange>,
|
ops_in: mpsc::UnboundedSender<TextChange>,
|
||||||
ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>,
|
ops_out: Mutex<mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>>,
|
||||||
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||||
stopper: mpsc::UnboundedSender<()>, // just exist
|
stopper: mpsc::UnboundedSender<()>, // just exist
|
||||||
|
content_request: mpsc::Sender<oneshot::Sender<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferControllerInner {
|
impl BufferControllerInner {
|
||||||
|
@ -63,15 +63,19 @@ impl BufferControllerInner {
|
||||||
ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>,
|
ops_out: mpsc::UnboundedReceiver<(LocalVersion, Option<Op>)>,
|
||||||
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||||
stopper: mpsc::UnboundedSender<()>,
|
stopper: mpsc::UnboundedSender<()>,
|
||||||
|
content_request: mpsc::Sender<oneshot::Sender<String>>,
|
||||||
|
// TODO we're getting too much stuff via constructor, maybe make everything pub(crate)
|
||||||
|
// instead?? or maybe builder, or maybe defaults
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name,
|
name,
|
||||||
latest_version,
|
latest_version,
|
||||||
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
|
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
|
||||||
ops_in,
|
ops_in,
|
||||||
ops_out,
|
ops_out: Mutex::new(ops_out),
|
||||||
poller,
|
poller,
|
||||||
stopper,
|
stopper,
|
||||||
|
content_request,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -104,21 +108,24 @@ impl Controller<TextChange> for BufferController {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Ok((lv, Some(op))) = self.0.ops_out.try_recv() {
|
match self.0.ops_out.try_lock() {
|
||||||
// if the current change has
|
Err(_) => Ok(None),
|
||||||
self.0.last_update.set(lv);
|
Ok(mut ops) => match ops.try_recv() {
|
||||||
return Ok(Some(TextChange::from(op)));
|
Ok((lv, Some(op))) => {
|
||||||
|
self.0.last_update.set(lv);
|
||||||
|
Ok(Some(TextChange::from(op)))
|
||||||
|
},
|
||||||
|
Ok((_lv, None)) => Ok(None), // TODO what is going on here?
|
||||||
|
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
|
||||||
|
Err(mpsc::error::TryRecvError::Disconnected) =>
|
||||||
|
Err(crate::Error::Channel { send: false }),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return Err(crate::Error::Channel { send: false });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// block until a new text change is available, and return it
|
/// block until a new text change is available, and return it
|
||||||
async fn recv(&self) -> crate::Result<TextChange> {
|
async fn recv(&self) -> crate::Result<TextChange> {
|
||||||
let last_update = self.0.last_update.get();
|
if let Some((lv, Some(op))) = self.0.ops_out.lock().await.recv().await {
|
||||||
|
|
||||||
// no need to poll here? as soon as we have new changes we return them!
|
|
||||||
if let Some((lv, Some(op))) = self.0.ops_out.recv().await {
|
|
||||||
self.0.last_update.set(lv);
|
self.0.last_update.set(lv);
|
||||||
Ok(TextChange::from(op))
|
Ok(TextChange::from(op))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
use std::collections::hash_map::DefaultHasher;
|
|
||||||
use std::hash::{Hash, Hasher};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use diamond_types::LocalVersion;
|
use diamond_types::LocalVersion;
|
||||||
|
@ -18,31 +16,27 @@ use super::controller::{BufferController, BufferControllerInner};
|
||||||
|
|
||||||
pub(crate) struct BufferWorker {
|
pub(crate) struct BufferWorker {
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
buffer: diamond_types::list::ListCRDT,
|
|
||||||
latest_version: watch::Sender<diamond_types::LocalVersion>,
|
latest_version: watch::Sender<diamond_types::LocalVersion>,
|
||||||
ops_in: mpsc::UnboundedReceiver<TextChange>,
|
ops_in: mpsc::UnboundedReceiver<TextChange>,
|
||||||
ops_out: mpsc::UnboundedSender<(LocalVersion, Option<Op>)>,
|
ops_out: mpsc::UnboundedSender<(LocalVersion, Option<Op>)>,
|
||||||
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
||||||
pollers: Vec<oneshot::Sender<()>>,
|
pollers: Vec<oneshot::Sender<()>>,
|
||||||
|
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
stop: mpsc::UnboundedReceiver<()>,
|
||||||
controller: BufferController,
|
controller: BufferController,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferWorker {
|
impl BufferWorker {
|
||||||
pub fn new(user_id: Uuid, path: &str) -> Self {
|
pub fn new(user_id: Uuid, path: &str) -> Self {
|
||||||
//let (txt_tx, txt_rx) = watch::channel("".to_string());
|
|
||||||
let init = diamond_types::LocalVersion::default();
|
let init = diamond_types::LocalVersion::default();
|
||||||
let buffer = diamond_types::list::ListCRDT::default();
|
|
||||||
|
|
||||||
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
|
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
|
||||||
let (opin_tx, opin_rx) = mpsc::unbounded_channel();
|
let (opin_tx, opin_rx) = mpsc::unbounded_channel();
|
||||||
let (opout_tx, opout_rx) = mpsc::unbounded_channel();
|
let (opout_tx, opout_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
let (req_tx, req_rx) = mpsc::channel(1);
|
||||||
|
|
||||||
let mut hasher = DefaultHasher::new();
|
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
||||||
user_id.hash(&mut hasher);
|
|
||||||
let _site_id = hasher.finish() as usize;
|
|
||||||
|
|
||||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
@ -53,11 +47,11 @@ impl BufferWorker {
|
||||||
opout_rx,
|
opout_rx,
|
||||||
poller_tx,
|
poller_tx,
|
||||||
end_tx,
|
end_tx,
|
||||||
|
req_tx,
|
||||||
);
|
);
|
||||||
|
|
||||||
BufferWorker {
|
BufferWorker {
|
||||||
user_id,
|
user_id,
|
||||||
buffer,
|
|
||||||
latest_version: latest_version_tx,
|
latest_version: latest_version_tx,
|
||||||
ops_in: opin_rx,
|
ops_in: opin_rx,
|
||||||
ops_out: opout_tx,
|
ops_out: opout_tx,
|
||||||
|
@ -65,6 +59,7 @@ impl BufferWorker {
|
||||||
pollers: Vec::new(),
|
pollers: Vec::new(),
|
||||||
stop: end_rx,
|
stop: end_rx,
|
||||||
controller: BufferController(Arc::new(controller)),
|
controller: BufferController(Arc::new(controller)),
|
||||||
|
content_checkout: req_rx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,6 +75,8 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
||||||
|
let mut branch = diamond_types::list::Branch::new();
|
||||||
|
let mut oplog = diamond_types::list::OpLog::new();
|
||||||
loop {
|
loop {
|
||||||
// block until one of these is ready
|
// block until one of these is ready
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -98,18 +95,19 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
res = self.ops_in.recv() => match res {
|
res = self.ops_in.recv() => match res {
|
||||||
None => break tracing::debug!("stopping: editor closed channel"),
|
None => break tracing::debug!("stopping: editor closed channel"),
|
||||||
Some(change) => {
|
Some(change) => {
|
||||||
|
let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string());
|
||||||
let agent_id = self.buffer.get_or_create_agent_id(&self.user_id.to_string());
|
let last_ver = oplog.local_version();
|
||||||
let lastver = self.buffer.oplog.local_version_ref();
|
|
||||||
|
|
||||||
if change.is_insert() {
|
if change.is_insert() {
|
||||||
self.buffer.insert(agent_id, change.start as usize, &change.content) // TODO da vedere il cast
|
oplog.add_insert(agent_id, change.start as usize, &change.content)
|
||||||
} else if change.is_delete() {
|
} else if change.is_delete() {
|
||||||
self.buffer.delete_without_content(1, change.span())
|
oplog.add_delete_without_content(1, change.span())
|
||||||
} else { continue; };
|
} else { continue; };
|
||||||
|
|
||||||
tx.send(Operation { data: self.buffer.oplog.encode_from(Default::default(), lastver) });
|
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
|
||||||
self.latest_version.send(self.buffer.oplog.local_version());
|
.unwrap_or_warn("failed to send change!");
|
||||||
|
self.latest_version.send(oplog.local_version())
|
||||||
|
.unwrap_or_warn("failed to update latest version!");
|
||||||
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -119,24 +117,26 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
Err(_e) => break,
|
Err(_e) => break,
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
Ok(Some(change)) => {
|
Ok(Some(change)) => {
|
||||||
let lastver = self.buffer.oplog.local_version_ref();
|
let last_ver = oplog.local_version();
|
||||||
|
match oplog.decode_and_add(&change.op.data) {
|
||||||
match self.buffer.merge_data_and_ff(&change.op.data) {
|
|
||||||
Ok(local_version) => {
|
Ok(local_version) => {
|
||||||
|
|
||||||
// give all the changes needed to the controller in a channel.
|
// give all the changes needed to the controller in a channel.
|
||||||
for (lv, Some(dtop)) in self.buffer.oplog.iter_xf_operations_from(lastver, &local_version) {
|
for (lv, dtop) in oplog.iter_xf_operations_from(&last_ver, &local_version) {
|
||||||
// x.0.start should always be after lastver!
|
if let Some(dtop) = dtop {
|
||||||
// this step_ver will be the version after we apply the operation
|
// x.0.start should always be after lastver!
|
||||||
// we give it to the controller so that he knows where it's at.
|
// this step_ver will be the version after we apply the operation
|
||||||
let step_ver = self.buffer.oplog.version_union(&[lv.start], lastver);
|
// we give it to the controller so that he knows where it's at.
|
||||||
let opout = (step_ver, Some(Op(dtop)));
|
let step_ver = oplog.version_union(&[lv.start], &last_ver);
|
||||||
|
let opout = (step_ver, Some(Op(dtop)));
|
||||||
|
|
||||||
self.ops_out.send(opout).unwrap(); //TODO ERRORS
|
self.ops_out.send(opout).unwrap_or_warn("could not update ops channel -- is controller dead?");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// finally we send the
|
// finally we send the
|
||||||
self.latest_version.send(local_version);
|
self.latest_version.send(local_version)
|
||||||
|
.unwrap_or_warn("failed to update latest version!");
|
||||||
|
|
||||||
for tx in self.pollers.drain(..) {
|
for tx in self.pollers.drain(..) {
|
||||||
tx.send(()).unwrap_or_warn("could not wake up poller");
|
tx.send(()).unwrap_or_warn("could not wake up poller");
|
||||||
}
|
}
|
||||||
|
@ -144,6 +144,15 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
res = self.content_checkout.recv() => match res {
|
||||||
|
None => break tracing::error!("no more active controllers"),
|
||||||
|
Some(tx) => {
|
||||||
|
branch.merge(&oplog, oplog.local_version_ref());
|
||||||
|
let content = branch.content().to_string();
|
||||||
|
tx.send(content).unwrap_or_warn("checkout request dropped");
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue