diff --git a/src/client.rs b/src/client.rs index e39a792..69fae5d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,14 +1,13 @@ -use std::sync::Arc; - use operational_transform::OperationSeq; -use tonic::{transport::Channel, Status}; -use tracing::{error, warn, debug}; +use tonic::{transport::Channel, Status, Streaming, async_trait}; use uuid::Uuid; use crate::{ - cursor::{CursorControllerHandle, CursorControllerWorker, CursorProvider}, - operation::{OperationProcessor, OperationController}, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, errors::IgnorableError, + controller::{ControllerWorker, + cursor::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, + buffer::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker} + }, + proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest, Cursor}, }; #[derive(Clone)] @@ -24,6 +23,10 @@ impl From::> for CodempClient { } impl CodempClient { + pub async fn new(dest: &str) -> Result { + Ok(BufferClient::connect(dest.to_string()).await?.into()) + } + pub fn id(&self) -> &str { &self.id } pub async fn create(&mut self, path: String, content: Option) -> Result { @@ -44,35 +47,21 @@ impl CodempClient { user: self.id.clone(), }; - let mut stream = self.client.listen(req).await?.into_inner(); + let stream = self.client.listen(req).await?.into_inner(); - let mut controller = CursorControllerWorker::new(self.id().to_string()); + let controller = CursorControllerWorker::new(self.id().to_string(), (self.clone(), stream)); let handle = controller.subscribe(); - let mut _client = self.client.clone(); tokio::spawn(async move { - loop { - tokio::select!{ - res = stream.message() => { - match res { - Err(e) => break error!("error receiving cursor: {}", e), - Ok(None) => break debug!("cursor worker clean exit"), - Ok(Some(x)) => { controller.broadcast(x); }, - } - }, - Some(op) = controller.wait() => { - _client.moved(op).await - .unwrap_or_warn("could not send cursor update") - } - - } - } + tracing::debug!("cursor worker started"); + controller.work().await; + tracing::debug!("cursor worker stopped"); }); Ok(handle) } - pub async fn attach(&mut self, path: String) -> Result, Status> { + pub async fn attach(&mut self, path: String) -> Result { let req = BufferPayload { path: path.clone(), content: None, @@ -82,59 +71,73 @@ impl CodempClient { let content = self.client.sync(req.clone()) .await? .into_inner() - .content; + .content + .unwrap_or("".into()); - let mut stream = self.client.attach(req).await?.into_inner(); + let stream = self.client.attach(req).await?.into_inner(); - let factory = Arc::new(OperationController::new(content.unwrap_or("".into()))); - - let _factory = factory.clone(); - let _path = path.clone(); + let controller = OperationControllerWorker::new((self.clone(), stream), content, path); + let factory = controller.subscribe(); tokio::spawn(async move { - loop { - if !_factory.run() { break debug!("downstream worker clean exit") } - match stream.message().await { - Err(e) => break error!("error receiving update: {}", e), - Ok(None) => break warn!("stream closed for buffer {}", _path), - Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { - Err(e) => error!("error deserializing opseq: {}", e), - Ok(v) => match _factory.process(v) { - Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_range) => { } // range is obtained awaiting wait(), need to pass the OpSeq itself - } - }, - } - } - }); - - let mut _client = self.client.clone(); - let _uid = self.id.clone(); - let _factory = factory.clone(); - let _path = path.clone(); - tokio::spawn(async move { - while let Some(op) = _factory.poll().await { - if !_factory.run() { break } - let req = OperationRequest { - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path: _path.clone(), - user: _uid.clone(), - }; - match _client.edit(req).await { - Ok(res) => match res.into_inner().accepted { - true => { _factory.ack().await; }, - false => { - warn!("server rejected operation, retrying in 1s"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }, - Err(e) => error!("could not send edit: {}", e), - } - } - debug!("upstream worker clean exit"); + tracing::debug!("buffer worker started"); + controller.work().await; + tracing::debug!("buffer worker stopped"); }); Ok(factory) } } + +#[async_trait] +impl OperationControllerEditor for (CodempClient, Streaming) { + async fn edit(&mut self, path: String, op: OperationSeq) -> bool { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path, + user: self.0.id().to_string(), + }; + match self.0.client.edit(req).await { + Ok(res) => res.into_inner().accepted, + Err(e) => { + tracing::error!("error sending edit: {}", e); + false + } + } + } + + async fn recv(&mut self) -> Option { + match self.1.message().await { + Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(None) => None, + Err(e) => { + tracing::error!("could not receive edit from server: {}", e); + None + } + } + } +} + +#[async_trait] +impl CursorEditor for (CodempClient, Streaming) { + async fn moved(&mut self, cursor: Cursor) -> bool { + match self.0.client.moved(cursor).await { + Ok(res) => res.into_inner().accepted, + Err(e) => { + tracing::error!("could not send cursor movement: {}", e); + false + } + } + } + + async fn recv(&mut self) -> Option { + match self.1.message().await { + Ok(cursor) => cursor, + Err(e) => { + tracing::error!("could not receive cursor update: {}", e); + None + } + } + } +} diff --git a/src/controller/buffer.rs b/src/controller/buffer.rs new file mode 100644 index 0000000..11ca642 --- /dev/null +++ b/src/controller/buffer.rs @@ -0,0 +1,139 @@ +use std::{sync::Arc, collections::VecDeque, ops::Range}; + +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast}; +use tonic::async_trait; + +use super::{leading_noop, tailing_noop, ControllerWorker}; +use crate::errors::IgnorableError; +use crate::factory::OperationFactory; + +pub struct TextChange { + pub span: Range, + pub content: String, +} + +#[async_trait] +pub trait OperationControllerSubscriber { + async fn poll(&mut self) -> Option; + async fn apply(&self, op: OperationSeq); +} + +pub struct OperationControllerHandle { + content: watch::Receiver, + operations: mpsc::Sender, + original: Arc>, + stream: broadcast::Receiver, +} + +impl Clone for OperationControllerHandle { + fn clone(&self) -> Self { + OperationControllerHandle { + content: self.content.clone(), + operations: self.operations.clone(), + original: self.original.clone(), + stream: self.original.subscribe(), + } + } +} + +#[async_trait] +impl OperationFactory for OperationControllerHandle { + fn content(&self) -> String { + self.content.borrow().clone() + } +} + +#[async_trait] +impl OperationControllerSubscriber for OperationControllerHandle { + async fn poll(&mut self) -> Option { + let op = self.stream.recv().await.ok()?; + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Some(TextChange { span, content }) + } + + async fn apply(&self, op: OperationSeq) { + self.operations.send(op).await + .unwrap_or_warn("could not apply+send operation") + } +} + +#[async_trait] +pub(crate) trait OperationControllerEditor { + async fn edit(&mut self, path: String, op: OperationSeq) -> bool; + async fn recv(&mut self) -> Option; +} + +pub(crate) struct OperationControllerWorker { + pub(crate) content: watch::Sender, + pub(crate) operations: mpsc::Receiver, + pub(crate) stream: Arc>, + pub(crate) queue: VecDeque, + receiver: watch::Receiver, + sender: mpsc::Sender, + client: C, + buffer: String, + path: String, +} + +#[async_trait] +impl ControllerWorker for OperationControllerWorker { + fn subscribe(&self) -> OperationControllerHandle { + OperationControllerHandle { + content: self.receiver.clone(), + operations: self.sender.clone(), + original: self.stream.clone(), + stream: self.stream.subscribe(), + } + } + + async fn work(mut self) { + loop { + let op = tokio::select! { + Some(operation) = self.client.recv() => { + let mut out = operation; + for op in self.queue.iter_mut() { + (*op, out) = op.transform(&out).unwrap(); + } + self.stream.send(out.clone()).unwrap(); + out + }, + Some(op) = self.operations.recv() => { + self.queue.push_back(op.clone()); + op + }, + else => break + }; + self.buffer = op.apply(&self.buffer).unwrap(); + self.content.send(self.buffer.clone()).unwrap(); + + while let Some(op) = self.queue.get(0) { + if !self.client.edit(self.path.clone(), op.clone()).await { break } + self.queue.pop_front(); + } + } + } + +} + +impl OperationControllerWorker { + pub fn new(client: C, buffer: String, path: String) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.clone()); + let (op_tx, op_rx) = mpsc::channel(64); + let (s_tx, _s_rx) = broadcast::channel(64); + OperationControllerWorker { + content: txt_tx, + operations: op_rx, + stream: Arc::new(s_tx), + receiver: txt_rx, + sender: op_tx, + queue: VecDeque::new(), + client, buffer, path + } + } +} diff --git a/src/controller/cursor.rs b/src/controller/cursor.rs new file mode 100644 index 0000000..267e358 --- /dev/null +++ b/src/controller/cursor.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, broadcast}; +use tonic::async_trait; + +use crate::{proto::{Position, Cursor}, errors::IgnorableError, controller::ControllerWorker}; + +#[async_trait] +pub trait CursorSubscriber { + async fn send(&self, path: &str, start: Position, end: Position); + async fn poll(&mut self) -> Option; +} + +pub struct CursorControllerHandle { + uid: String, + op: mpsc::Sender, + stream: broadcast::Receiver, + original: Arc>, +} + +impl Clone for CursorControllerHandle { + fn clone(&self) -> Self { + Self { + uid: self.uid.clone(), + op: self.op.clone(), + stream: self.original.subscribe(), + original: self.original.clone() + } + } +} + +#[async_trait] +impl CursorSubscriber for CursorControllerHandle { + async fn send(&self, path: &str, start: Position, end: Position) { + self.op.send(Cursor { + user: self.uid.clone(), + buffer: path.to_string(), + start: Some(start), + end: Some(end), + }).await.unwrap_or_warn("could not send cursor op") + } + + async fn poll(&mut self) -> Option { + match self.stream.recv().await { + Ok(x) => Some(x), + Err(e) => { + tracing::warn!("could not poll for cursor: {}", e); + None + } + } + } +} + +#[async_trait] +pub(crate) trait CursorEditor { + async fn moved(&mut self, cursor: Cursor) -> bool; + async fn recv(&mut self) -> Option; +} + +pub(crate) struct CursorControllerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, + client: C, +} + +impl CursorControllerWorker { + pub(crate) fn new(uid: String, client: C) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + CursorControllerWorker { + uid, client, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } +} + +#[async_trait] +impl ControllerWorker for CursorControllerWorker { + fn subscribe(&self) -> CursorControllerHandle { + CursorControllerHandle { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: self.channel.subscribe(), + original: self.channel.clone(), + } + } + + async fn work(mut self) { + loop { + tokio::select!{ + Some(cur) = self.client.recv() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Some(op) = self.op.recv() => { self.client.moved(op).await; }, + else => break, + } + } + } + +} + + diff --git a/src/operation/mod.rs b/src/controller/mod.rs similarity index 76% rename from src/operation/mod.rs rename to src/controller/mod.rs index 5890e23..86ca589 100644 --- a/src/operation/mod.rs +++ b/src/controller/mod.rs @@ -1,13 +1,16 @@ -pub mod factory; -pub mod processor; -pub mod controller; +pub mod buffer; +pub mod cursor; use std::ops::Range; use operational_transform::{Operation, OperationSeq}; -pub use processor::OperationProcessor; -pub use controller::OperationController; -pub use factory::OperationFactory; +use tonic::async_trait; + +#[async_trait] +pub trait ControllerWorker { + fn subscribe(&self) -> T; + async fn work(self); +} pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } diff --git a/src/cursor.rs b/src/cursor.rs index e1dad9c..4ee265e 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,9 +1,4 @@ -use std::sync::Arc; - -use tokio::sync::{mpsc, broadcast}; -use tonic::async_trait; - -use crate::{proto::{Position, Cursor}, errors::IgnorableError}; +use crate::proto::{Position, Cursor}; impl From:: for (i32, i32) { fn from(pos: Position) -> (i32, i32) { @@ -26,99 +21,3 @@ impl Cursor { self.end.clone().unwrap_or((0, 0).into()) } } - -#[async_trait] -pub trait CursorSubscriber { - async fn send(&self, path: &str, start: Position, end: Position); - async fn poll(&mut self) -> Option; -} - -pub struct CursorControllerHandle { - uid: String, - op: mpsc::Sender, - stream: broadcast::Receiver, - original: Arc>, -} - -impl Clone for CursorControllerHandle { - fn clone(&self) -> Self { - Self { - uid: self.uid.clone(), - op: self.op.clone(), - stream: self.original.subscribe(), - original: self.original.clone() - } - } -} - -#[async_trait] -impl CursorSubscriber for CursorControllerHandle { - async fn send(&self, path: &str, start: Position, end: Position) { - self.op.send(Cursor { - user: self.uid.clone(), - buffer: path.to_string(), - start: Some(start), - end: Some(end), - }).await.unwrap_or_warn("could not send cursor op") - } - - async fn poll(&mut self) -> Option { - match self.stream.recv().await { - Ok(x) => Some(x), - Err(e) => { - tracing::warn!("could not poll for cursor: {}", e); - None - } - } - } -} - -#[async_trait] -pub(crate) trait CursorProvider -where T : CursorSubscriber { - fn subscribe(&self) -> T; - fn broadcast(&self, op: Cursor); - async fn wait(&mut self) -> Option; -} - -pub(crate) struct CursorControllerWorker { - uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, -} - -impl CursorControllerWorker { - pub(crate) fn new(uid: String) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); - let (cur_tx, _cur_rx) = broadcast::channel(64); - CursorControllerWorker { - uid, - producer: op_tx, - op: op_rx, - channel: Arc::new(cur_tx), - } - } -} - -#[async_trait] -impl CursorProvider for CursorControllerWorker { - fn broadcast(&self, op: Cursor) { - self.channel.send(op).unwrap_or_warn("could not broadcast cursor event") - } - - async fn wait(&mut self) -> Option { - self.op.recv().await - } - - fn subscribe(&self) -> CursorControllerHandle { - CursorControllerHandle { - uid: self.uid.clone(), - op: self.producer.clone(), - stream: self.channel.subscribe(), - original: self.channel.clone(), - } - } - -} - diff --git a/src/operation/factory.rs b/src/factory.rs similarity index 85% rename from src/operation/factory.rs rename to src/factory.rs index e6e6f4f..bd3fce6 100644 --- a/src/operation/factory.rs +++ b/src/factory.rs @@ -4,18 +4,19 @@ use similar::{TextDiff, ChangeTag}; pub trait OperationFactory { fn content(&self) -> String; - fn replace(&self, txt: &str) -> OperationSeq { + fn replace(&self, txt: &str) -> Option { self.delta(0, txt, self.content().len()) } - fn delta(&self, skip: usize, txt: &str, tail: usize) -> OperationSeq { + fn delta(&self, skip: usize, txt: &str, tail: usize) -> Option { let mut out = OperationSeq::default(); let content = self.content(); let tail_index = content.len() - tail; let content_slice = &content[skip..tail]; if content_slice == txt { - return out; // TODO this won't work, should we return a noop instead? + // if slice equals given text, no operation should be taken + return None; } out.retain(skip as u64); @@ -32,7 +33,7 @@ pub trait OperationFactory { out.retain(tail_index as u64); - out + Some(out) } fn insert(&self, txt: &str, pos: u64) -> OperationSeq { diff --git a/src/lib.rs b/src/lib.rs index bc248e2..3931ea4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,9 @@ pub mod proto; pub mod client; -pub mod operation; +pub mod controller; pub mod cursor; pub mod errors; +pub mod factory; pub use tonic; pub use tokio; diff --git a/src/operation/controller.rs b/src/operation/controller.rs deleted file mode 100644 index c6f11e5..0000000 --- a/src/operation/controller.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::{sync::Mutex, collections::VecDeque, ops::Range}; - -use operational_transform::{OperationSeq, OTError}; -use tokio::sync::watch; -use tracing::error; - -use super::{OperationFactory, OperationProcessor, op_effective_range}; -use crate::errors::IgnorableError; - - -#[derive(Debug)] -pub struct OperationController { - text: Mutex, - queue: Mutex>, - last: Mutex>, - notifier: watch::Sender, - changed: Mutex>>, - changed_notifier: watch::Sender>, - run: watch::Receiver, - stop: watch::Sender, -} - -impl OperationController { - pub fn new(content: String) -> Self { - let (tx, rx) = watch::channel(OperationSeq::default()); - let (done, wait) = watch::channel(0..0); - let (stop, run) = watch::channel(true); - OperationController { - text: Mutex::new(content), - queue: Mutex::new(VecDeque::new()), - last: Mutex::new(rx), - notifier: tx, - changed: Mutex::new(wait), - changed_notifier: done, - run, stop, - } - } - - pub async fn wait(&self) -> Range { - let mut blocker = self.changed.lock().unwrap().clone(); - // TODO less jank way - blocker.changed().await.unwrap_or_warn("waiting for changed content #1"); - blocker.changed().await.unwrap_or_warn("waiting for changed content #2"); - let span = blocker.borrow().clone(); - span - } - - pub async fn poll(&self) -> Option { - let len = self.queue.lock().unwrap().len(); - if len == 0 { - let mut recv = self.last.lock().unwrap().clone(); - // TODO less jank way - recv.changed().await.unwrap_or_warn("wairing for op changes #1"); // acknowledge current state - recv.changed().await.unwrap_or_warn("wairing for op changes #2"); // wait for a change in state - } - Some(self.queue.lock().unwrap().get(0)?.clone()) - } - - pub async fn ack(&self) -> Option { - self.queue.lock().unwrap().pop_front() - } - - pub fn stop(&self) -> bool { - match self.stop.send(false) { - Ok(()) => { - self.changed_notifier.send(0..0).unwrap_or_warn("unlocking downstream for stop"); - self.notifier.send(OperationSeq::default()).unwrap_or_warn("unlocking upstream for stop"); - true - }, - Err(e) => { - error!("could not send stop signal to workers: {}", e); - false - } - } - } - - pub fn run(&self) -> bool { - *self.run.borrow() - } - - fn operation(&self, op: &OperationSeq) -> Result, OTError> { - let txt = self.content(); - let res = op.apply(&txt)?; - *self.text.lock().unwrap() = res; - Ok(op_effective_range(op)) - } - - fn transform(&self, mut op: OperationSeq) -> Result { - let mut queue = self.queue.lock().unwrap(); - for el in queue.iter_mut() { - (op, *el) = op.transform(el)?; - } - Ok(op) - } -} - -impl OperationFactory for OperationController { - fn content(&self) -> String { - self.text.lock().unwrap().clone() - } -} - -impl OperationProcessor for OperationController { - fn apply(&self, op: OperationSeq) -> Result, OTError> { - let span = self.operation(&op)?; - self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op).unwrap_or_warn("notifying of applied change"); - Ok(span) - } - - - fn process(&self, mut op: OperationSeq) -> Result, OTError> { - op = self.transform(op)?; - let span = self.operation(&op)?; - self.changed_notifier.send(span.clone()).unwrap_or_warn("notifying of changed content"); - Ok(span) - } -} diff --git a/src/operation/processor.rs b/src/operation/processor.rs deleted file mode 100644 index cda36b9..0000000 --- a/src/operation/processor.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::ops::Range; - -use operational_transform::{OperationSeq, OTError}; - -use crate::operation::factory::OperationFactory; - -pub trait OperationProcessor : OperationFactory { - fn apply(&self, op: OperationSeq) -> Result, OTError>; - fn process(&self, op: OperationSeq) -> Result, OTError>; -}