diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 1f02299..17d356c 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use tokio::sync::{watch, mpsc}; +use tokio::sync::{watch, mpsc, Mutex, RwLock, TryLockError}; use tonic::async_trait; use crate::errors::IgnorableError; @@ -32,6 +32,7 @@ use super::TextChange; #[derive(Debug, Clone)] pub struct BufferController { content: watch::Receiver, + seen: Arc>, operations: mpsc::UnboundedSender, _stop: Arc, // just exist } @@ -42,7 +43,11 @@ impl BufferController { operations: mpsc::UnboundedSender, stop: mpsc::UnboundedSender<()>, ) -> Self { - BufferController { content, operations, _stop: Arc::new(StopOnDrop(stop)) } + BufferController { + content, operations, + _stop: Arc::new(StopOnDrop(stop)), + seen: Arc::new(RwLock::new("".into())), + } } } @@ -56,19 +61,56 @@ impl Drop for StopOnDrop { } #[async_trait] -impl Controller for BufferController { +impl Controller for BufferController { type Input = TextChange; async fn poll(&self) -> Result<(), Error> { - Ok(self.content.clone().changed().await?) + let mut poller = self.content.clone(); + loop { + poller.changed().await?; + let seen = self.seen.read().await.clone(); + if *poller.borrow() != seen { + break + } + } + Ok(()) } - fn try_recv(&self) -> Result, Error> { - Ok(Some(self.content.borrow().clone())) + fn try_recv(&self) -> Result, Error> { + let cur = match self.seen.try_read() { + Err(e) => { + tracing::error!("try_recv invoked while being mutated: {}", e); + return Ok(None); + }, + Ok(x) => x.clone(), + }; + if *self.content.borrow() != cur { + match self.seen.try_write() { + Err(e) => { + tracing::error!("try_recv mutating while being mutated: {}", e); + return Ok(None); + }, + Ok(mut w) => { + *w = self.content.borrow().clone(); + // TODO it's not the whole buffer that changed + return Ok(Some(TextChange { + span: 0..cur.len(), + content: self.content.borrow().clone(), + after: "".to_string(), + })); + } + + } + } + return Ok(None); } - async fn recv(&self) -> Result { - Ok(self.content.borrow().clone()) + async fn recv(&self) -> Result { + self.poll().await?; + match self.try_recv()? { + Some(x) => Ok(x), + None => Err(crate::Error::Filler { message: "wtfff".into() }), + } } /// enqueue an opseq for processing diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 85ac0a2..67b802e 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -8,6 +8,7 @@ use tonic::{async_trait, Streaming}; use woot::crdt::{Op, CRDT, TextEditor}; use woot::woot::Woot; +use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::api::controller::ControllerWorker; @@ -29,8 +30,8 @@ pub(crate) struct BufferControllerWorker { } impl BufferControllerWorker { - pub fn new(uid: String, buffer: &str, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + pub fn new(uid: String, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel("".to_string()); let (op_tx, op_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel(); let mut hasher = DefaultHasher::new(); @@ -42,7 +43,7 @@ impl BufferControllerWorker { operations: op_rx, receiver: txt_rx, sender: op_tx, - buffer: Woot::new(site_id, buffer), // TODO initialize with buffer! + buffer: Woot::new(site_id, ""), // TODO initialize with buffer! path: path.to_string(), stop: end_rx, stop_control: end_tx, @@ -64,7 +65,7 @@ impl BufferControllerWorker { } #[async_trait] -impl ControllerWorker for BufferControllerWorker { +impl ControllerWorker for BufferControllerWorker { type Controller = BufferController; type Tx = BufferClient; type Rx = Streaming; @@ -90,29 +91,43 @@ impl ControllerWorker for BufferControllerWorker { res = self.operations.recv() => match res { None => break, Some(change) => { - let span = &self.buffer.view()[change.span.clone()]; - let diff = TextDiff::from_chars(span, &change.content); + match self.buffer.view().get(change.span.clone()) { + None => tracing::error!("received illegal span from client"), + Some(span) => { + let diff = TextDiff::from_chars(span, &change.content); - let mut i = 0; - let mut ops = Vec::new(); - for diff in diff.iter_all_changes() { - match diff.tag() { - ChangeTag::Equal => i += 1, - ChangeTag::Delete => ops.push(self.buffer.delete(change.span.start + i).unwrap()), - ChangeTag::Insert => { - for c in diff.value().chars() { - ops.push(self.buffer.insert(change.span.start + i, c).unwrap()); - i += 1; + let mut i = 0; + let mut ops = Vec::new(); + for diff in diff.iter_all_changes() { + match diff.tag() { + ChangeTag::Equal => i += 1, + ChangeTag::Delete => match self.buffer.delete(change.span.start + i) { + Ok(op) => ops.push(op), + Err(e) => tracing::error!("could not apply deletion: {}", e), + }, + ChangeTag::Insert => { + for c in diff.value().chars() { + match self.buffer.insert(change.span.start + i, c) { + Ok(op) => { + ops.push(op); + i += 1; + }, + Err(e) => tracing::error!("could not apply insertion: {}", e), + } + } + }, } - }, - } - } + } - for op in ops { - match self.send_op(&mut tx, &op).await { - Ok(()) => self.buffer.merge(op), - Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), - } + for op in ops { + match self.send_op(&mut tx, &op).await { + Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), + Ok(()) => { + self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); + }, + } + } + }, } } }, @@ -121,10 +136,12 @@ impl ControllerWorker for BufferControllerWorker { res = rx.message() => match res { Err(_e) => break, Ok(None) => break, - Ok(Some(change)) => { - let op : Op = serde_json::from_str(&change.opseq).unwrap(); - self.buffer.merge(op); - self.content.send(self.buffer.view()).unwrap(); + Ok(Some(change)) => match serde_json::from_str::(&change.opseq) { + Ok(op) => { + self.buffer.merge(op); + self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); + }, + Err(e) => tracing::error!("could not deserialize operation from server: {}", e), }, }, } diff --git a/src/client.rs b/src/client.rs index 7038dde..5a729f2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -132,11 +132,9 @@ impl Client { path: path.to_string(), user: self.id.clone(), content: None }; - let content = client.sync(req.clone()).await?.into_inner().content; - let stream = client.attach(req).await?.into_inner(); - let controller = BufferControllerWorker::new(self.id.clone(), &content, path); + let controller = BufferControllerWorker::new(self.id.clone(), path); let handler = Arc::new(controller.subscribe()); let _path = path.to_string();