fix: update local version on send

also keep a branch on which we merge editor's changes, which kind of
solves our race condition thing

Co-authored-by: cschen <cschen@codemp.dev>
This commit is contained in:
əlemi 2024-08-14 15:55:18 +02:00
parent 3346f1c526
commit 413247f9b4
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 17 additions and 13 deletions

View file

@ -36,7 +36,9 @@ impl BufferController {
pub async fn content(&self) -> crate::Result<String> { pub async fn content(&self) -> crate::Result<String> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.content_request.send(tx).await?; self.0.content_request.send(tx).await?;
Ok(rx.await?) let content = rx.await?;
self.0.last_update.set(self.0.latest_version.borrow().clone());
Ok(content)
} }
} }
@ -45,7 +47,7 @@ pub(crate) struct BufferControllerInner {
pub(crate) name: String, pub(crate) name: String,
pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>, pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>, pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
pub(crate) ops_in: mpsc::UnboundedSender<TextChange>, pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>,
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
@ -86,9 +88,12 @@ impl Controller<TextChange> for BufferController {
/// enqueue a text change for processing /// enqueue a text change for processing
/// this also updates internal buffer previous state /// this also updates internal buffer previous state
fn send(&self, op: TextChange) -> crate::Result<()> { async fn send(&self, op: TextChange) -> crate::Result<()> {
// we let the worker do the updating to the last version and send it back. // we let the worker do the updating to the last version and send it back.
Ok(self.0.ops_in.send(op)?) let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
Ok(())
} }
fn stop(&self) -> bool { fn stop(&self) -> bool {

View file

@ -17,7 +17,7 @@ use super::controller::{BufferController, BufferControllerInner};
pub(crate) struct BufferWorker { pub(crate) struct BufferWorker {
user_id: Uuid, user_id: Uuid,
latest_version: watch::Sender<diamond_types::LocalVersion>, latest_version: watch::Sender<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedReceiver<TextChange>, ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>, content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
@ -96,21 +96,21 @@ impl ControllerWorker<TextChange> for BufferWorker {
// received a text change from editor // received a text change from editor
res = self.ops_in.recv() => match res { res = self.ops_in.recv() => match res {
None => break tracing::debug!("stopping: editor closed channel"), None => break tracing::debug!("stopping: editor closed channel"),
Some(change) => { Some((change, ack)) => {
let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string()); let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string());
let last_ver = oplog.local_version(); let last_ver = oplog.local_version();
if change.is_insert() { if change.is_insert() {
oplog.add_insert(agent_id, change.start as usize, &change.content) branch.insert(&mut oplog, agent_id, change.start as usize, &change.content)
} else if change.is_delete() { } else if change.is_delete() {
oplog.add_delete_without_content(1, change.span()) branch.delete_without_content(&mut oplog, 1, change.span())
} else { continue; }; } else { continue; };
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
.unwrap_or_warn("failed to send change!"); .unwrap_or_warn("failed to send change!");
self.latest_version.send(oplog.local_version()) self.latest_version.send(oplog.local_version())
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack");
}, },
}, },
@ -141,11 +141,10 @@ impl ControllerWorker<TextChange> for BufferWorker {
// this step_ver will be the version after we apply the operation // this step_ver will be the version after we apply the operation
// we give it to the controller so that he knows where it's at. // we give it to the controller so that he knows where it's at.
let step_ver = oplog.version_union(&[lv.start], &last_ver); let step_ver = oplog.version_union(&[lv.start], &last_ver);
// moved the merging inside as we only need
// an up to date content when we hash the content.
let hash = if timer.step() {
branch.merge(&oplog, &step_ver); branch.merge(&oplog, &step_ver);
let new_local_v = branch.local_version();
let hash = if timer.step() {
let hash = xxhash_rust::xxh3::xxh3_64(branch.content().to_string().as_bytes()); let hash = xxhash_rust::xxh3::xxh3_64(branch.content().to_string().as_bytes());
Some(i64::from_ne_bytes(hash.to_ne_bytes())) Some(i64::from_ne_bytes(hash.to_ne_bytes()))
} else { None }; } else { None };
@ -156,7 +155,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
content: dtop.content_as_str().unwrap_or_default().to_string(), content: dtop.content_as_str().unwrap_or_default().to_string(),
hash hash
}; };
tx.send((step_ver, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?"); tx.send((new_local_v, tc)).unwrap_or_warn("could not update ops channel -- is controller dead?");
} }
}, },
}, },