diff --git a/src/buffer/client.rs b/src/buffer/client.rs deleted file mode 100644 index aff3254..0000000 --- a/src/buffer/client.rs +++ /dev/null @@ -1,143 +0,0 @@ -use operational_transform::OperationSeq; -use tonic::{transport::Channel, Status, Streaming, async_trait}; -use uuid::Uuid; - -use crate::{ - ControllerWorker, - cursor::controller::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, - buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, - proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest, Cursor}, -}; - -#[derive(Clone)] -pub struct CodempClient { - id: String, - client: BufferClient, -} - -impl From::> for CodempClient { - fn from(value: BufferClient) -> Self { - CodempClient { id: Uuid::new_v4().to_string(), client: value } - } -} - -impl CodempClient { - pub async fn new(dest: &str) -> Result { - Ok(BufferClient::connect(dest.to_string()).await?.into()) - } - - pub fn id(&self) -> &str { &self.id } - - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { - let req = BufferPayload { - path: path.to_string(), - content: content.map(|x| x.to_string()), - user: self.id.clone(), - }; - - let res = self.client.create(req).await?; - - Ok(res.into_inner().accepted) - } - - pub async fn listen(&mut self) -> Result { - let req = BufferPayload { - path: "".into(), - content: None, - user: self.id.clone(), - }; - - let stream = self.client.listen(req).await?.into_inner(); - - let controller = CursorControllerWorker::new(self.id().to_string(), (self.clone(), stream)); - let handle = controller.subscribe(); - - tokio::spawn(async move { - tracing::debug!("cursor worker started"); - controller.work().await; - tracing::debug!("cursor worker stopped"); - }); - - Ok(handle) - } - - pub async fn attach(&mut self, path: &str) -> Result { - let req = BufferPayload { - path: path.to_string(), - content: None, - user: self.id.clone(), - }; - - let content = self.client.sync(req.clone()) - .await? - .into_inner() - .content - .unwrap_or("".into()); - - let stream = self.client.attach(req).await?.into_inner(); - - let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); - let factory = controller.subscribe(); - - tokio::spawn(async move { - tracing::debug!("buffer worker started"); - controller.work().await; - tracing::debug!("buffer worker stopped"); - }); - - Ok(factory) - } -} - -#[async_trait] -impl OperationControllerEditor for (CodempClient, Streaming) { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool { - let req = OperationRequest { - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path, - user: self.0.id().to_string(), - }; - match self.0.client.edit(req).await { - Ok(res) => res.into_inner().accepted, - Err(e) => { - tracing::error!("error sending edit: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), - Ok(None) => None, - Err(e) => { - tracing::error!("could not receive edit from server: {}", e); - None - } - } - } -} - -#[async_trait] -impl CursorEditor for (CodempClient, Streaming) { - async fn moved(&mut self, cursor: Cursor) -> bool { - match self.0.client.moved(cursor).await { - Ok(res) => res.into_inner().accepted, - Err(e) => { - tracing::error!("could not send cursor movement: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(cursor) => cursor, - Err(e) => { - tracing::error!("could not receive cursor update: {}", e); - None - } - } - } -} diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 39cecb1..0e1de5f 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,151 +1,121 @@ -use std::{sync::Arc, collections::VecDeque, ops::Range}; - use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, broadcast, Mutex}; -use tonic::async_trait; +use tonic::{transport::Channel, Status, Streaming, async_trait}; +use uuid::Uuid; -use crate::ControllerWorker; -use crate::errors::IgnorableError; -use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; +use crate::{ + ControllerWorker, + cursor::tracker::{CursorTracker, CursorTrackerWorker}, + buffer::controller::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker}, + proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, +}; -pub struct TextChange { - pub span: Range, - pub content: String, +#[derive(Clone)] +pub struct BufferController { + id: String, + client: BufferClient, } -#[async_trait] -pub trait OperationControllerSubscriber : OperationFactory { - async fn poll(&self) -> Option; - async fn apply(&self, op: OperationSeq); +impl From::> for BufferController { + fn from(value: BufferClient) -> Self { + BufferController { id: Uuid::new_v4().to_string(), client: value } + } } +impl BufferController { + pub async fn new(dest: &str) -> Result { + Ok(BufferClient::connect(dest.to_string()).await?.into()) + } -pub struct OperationControllerHandle { - content: watch::Receiver, - operations: mpsc::Sender, - original: Arc>, - stream: Mutex>, -} + pub fn id(&self) -> &str { &self.id } -impl Clone for OperationControllerHandle { - fn clone(&self) -> Self { - OperationControllerHandle { - content: self.content.clone(), - operations: self.operations.clone(), - original: self.original.clone(), - stream: Mutex::new(self.original.subscribe()), - } + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { + let req = BufferPayload { + path: path.to_string(), + content: content.map(|x| x.to_string()), + user: self.id.clone(), + }; + + let res = self.client.create(req).await?; + + Ok(res.into_inner().accepted) + } + + pub async fn listen(&mut self) -> Result { + let req = BufferPayload { + path: "".into(), + content: None, + user: self.id.clone(), + }; + + let stream = self.client.listen(req).await?.into_inner(); + + let controller = CursorTrackerWorker::new(self.id().to_string()); + let handle = controller.subscribe(); + let client = self.client.clone(); + + tokio::spawn(async move { + tracing::debug!("cursor worker started"); + controller.work(stream, client).await; + tracing::debug!("cursor worker stopped"); + }); + + Ok(handle) + } + + pub async fn attach(&mut self, path: &str) -> Result { + let req = BufferPayload { + path: path.to_string(), + content: None, + user: self.id.clone(), + }; + + let content = self.client.sync(req.clone()) + .await? + .into_inner() + .content + .unwrap_or("".into()); + + let stream = self.client.attach(req).await?.into_inner(); + + let controller = OperationControllerWorker::new((self.clone(), stream), &content, path); + let factory = controller.subscribe(); + + tokio::spawn(async move { + tracing::debug!("buffer worker started"); + controller.work().await; + tracing::debug!("buffer worker stopped"); + }); + + Ok(factory) } } #[async_trait] -impl OperationFactory for OperationControllerHandle { - fn content(&self) -> String { - self.content.borrow().clone() - } -} - -#[async_trait] -impl OperationControllerSubscriber for OperationControllerHandle { - async fn poll(&self) -> Option { - let op = self.stream.lock().await.recv().await.ok()?; - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Some(TextChange { span, content }) - } - - async fn apply(&self, op: OperationSeq) { - self.operations.send(op).await - .unwrap_or_warn("could not apply+send operation") - } - - // fn subscribe(&self) -> Self { - // OperationControllerHandle { - // content: self.content.clone(), - // operations: self.operations.clone(), - // original: self.original.clone(), - // stream: Arc::new(Mutex::new(self.original.subscribe())), - // } - // } -} - -#[async_trait] -pub(crate) trait OperationControllerEditor { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct OperationControllerWorker { - pub(crate) content: watch::Sender, - pub(crate) operations: mpsc::Receiver, - pub(crate) stream: Arc>, - pub(crate) queue: VecDeque, - receiver: watch::Receiver, - sender: mpsc::Sender, - client: C, - buffer: String, - path: String, -} - -#[async_trait] -impl ControllerWorker for OperationControllerWorker { - fn subscribe(&self) -> OperationControllerHandle { - OperationControllerHandle { - content: self.receiver.clone(), - operations: self.sender.clone(), - original: self.stream.clone(), - stream: Mutex::new(self.stream.subscribe()), - } - } - - async fn work(mut self) { - loop { - let op = tokio::select! { - Some(operation) = self.client.recv() => { - let mut out = operation; - for op in self.queue.iter_mut() { - (*op, out) = op.transform(&out).unwrap(); - } - self.stream.send(out.clone()).unwrap(); - out - }, - Some(op) = self.operations.recv() => { - self.queue.push_back(op.clone()); - op - }, - else => break - }; - self.buffer = op.apply(&self.buffer).unwrap(); - self.content.send(self.buffer.clone()).unwrap(); - - while let Some(op) = self.queue.get(0) { - if !self.client.edit(self.path.clone(), op.clone()).await { break } - self.queue.pop_front(); +impl OperationControllerEditor for (BufferController, Streaming) { + async fn edit(&mut self, path: String, op: OperationSeq) -> bool { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path, + user: self.0.id().to_string(), + }; + match self.0.client.edit(req).await { + Ok(res) => res.into_inner().accepted, + Err(e) => { + tracing::error!("error sending edit: {}", e); + false } } } -} - -impl OperationControllerWorker { - pub fn new(client: C, buffer: &str, path: &str) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); - let (op_tx, op_rx) = mpsc::channel(64); - let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { - content: txt_tx, - operations: op_rx, - stream: Arc::new(s_tx), - receiver: txt_rx, - sender: op_tx, - queue: VecDeque::new(), - buffer: buffer.to_string(), - path: path.to_string(), - client, + async fn recv(&mut self) -> Option { + match self.1.message().await { + Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(None) => None, + Err(e) => { + tracing::error!("could not receive edit from server: {}", e); + None + } } } } diff --git a/src/buffer/handle.rs b/src/buffer/handle.rs new file mode 100644 index 0000000..ecd1a12 --- /dev/null +++ b/src/buffer/handle.rs @@ -0,0 +1,130 @@ +use std::{sync::Arc, collections::VecDeque, ops::Range}; + +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tonic::async_trait; + +use crate::ControllerWorker; +use crate::errors::IgnorableError; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; + +pub struct TextChange { + pub span: Range, + pub content: String, +} + +pub struct BufferHandle { + content: watch::Receiver, + operations: mpsc::Sender, + stream: Mutex>, +} + +#[async_trait] +impl OperationFactory for BufferHandle { + fn content(&self) -> String { + self.content.borrow().clone() + } +} + +impl BufferHandle { + async fn poll(&self) -> Option { + let op = self.stream.lock().await.recv().await.ok()?; + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Some(TextChange { span, content }) + } + + async fn apply(&self, op: OperationSeq) { + self.operations.send(op).await + .unwrap_or_warn("could not apply+send operation") + } + + // fn subscribe(&self) -> Self { + // OperationControllerHandle { + // content: self.content.clone(), + // operations: self.operations.clone(), + // original: self.original.clone(), + // stream: Arc::new(Mutex::new(self.original.subscribe())), + // } + // } +} + +#[async_trait] +pub(crate) trait OperationControllerEditor { + async fn edit(&mut self, path: String, op: OperationSeq) -> bool; + async fn recv(&mut self) -> Option; +} + +pub(crate) struct OperationControllerWorker { + pub(crate) content: watch::Sender, + pub(crate) operations: mpsc::Receiver, + pub(crate) stream: Arc>, + pub(crate) queue: VecDeque, + receiver: watch::Receiver, + sender: mpsc::Sender, + client: C, + buffer: String, + path: String, +} + +#[async_trait] +impl ControllerWorker for OperationControllerWorker { + fn subscribe(&self) -> BufferHandle { + BufferHandle { + content: self.receiver.clone(), + operations: self.sender.clone(), + stream: Mutex::new(self.stream.subscribe()), + } + } + + async fn work(mut self) { + loop { + let op = tokio::select! { + Some(operation) = self.client.recv() => { + let mut out = operation; + for op in self.queue.iter_mut() { + (*op, out) = op.transform(&out).unwrap(); + } + self.stream.send(out.clone()).unwrap(); + out + }, + Some(op) = self.operations.recv() => { + self.queue.push_back(op.clone()); + op + }, + else => break + }; + self.buffer = op.apply(&self.buffer).unwrap(); + self.content.send(self.buffer.clone()).unwrap(); + + while let Some(op) = self.queue.get(0) { + if !self.client.edit(self.path.clone(), op.clone()).await { break } + self.queue.pop_front(); + } + } + } + +} + +impl OperationControllerWorker { + pub fn new(client: C, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + let (op_tx, op_rx) = mpsc::channel(64); + let (s_tx, _s_rx) = broadcast::channel(64); + OperationControllerWorker { + content: txt_tx, + operations: op_rx, + stream: Arc::new(s_tx), + receiver: txt_rx, + sender: op_tx, + queue: VecDeque::new(), + buffer: buffer.to_string(), + path: path.to_string(), + client, + } + } +} diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index ca7ad3c..7fe80e1 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -1,3 +1,3 @@ pub mod factory; -pub mod client; pub mod controller; +pub mod handle; diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs deleted file mode 100644 index 54985e4..0000000 --- a/src/cursor/controller.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex}; -use tonic::async_trait; - -use crate::{proto::{Position, Cursor}, errors::IgnorableError, ControllerWorker}; - -#[async_trait] -pub trait CursorSubscriber { - async fn send(&self, path: &str, start: Position, end: Position); - async fn poll(&self) -> Option; - fn try_poll(&self) -> Option>; // TODO fuck this fuck neovim -} - -pub struct CursorControllerHandle { - uid: String, - op: mpsc::Sender, - original: Arc>, - stream: Mutex>, -} - -impl Clone for CursorControllerHandle { - fn clone(&self) -> Self { - CursorControllerHandle { - uid: self.uid.clone(), - op: self.op.clone(), - original: self.original.clone(), - stream: Mutex::new(self.original.subscribe()), - } - } -} - -#[async_trait] -impl CursorSubscriber for CursorControllerHandle { - async fn send(&self, path: &str, start: Position, end: Position) { - self.op.send(Cursor { - user: self.uid.clone(), - buffer: path.to_string(), - start: Some(start), - end: Some(end), - }).await.unwrap_or_warn("could not send cursor op") - } - - // TODO is this cancelable? so it can be used in tokio::select! - async fn poll(&self) -> Option { - let mut stream = self.stream.lock().await; - match stream.recv().await { - Ok(x) => Some(x), - Err(RecvError::Closed) => None, - Err(RecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Some(stream.recv().await.expect("could not receive after lagging")) - } - } - } - - fn try_poll(&self) -> Option> { - match self.stream.try_lock() { - Err(_) => None, - Ok(mut x) => match x.try_recv() { - Ok(x) => Some(Some(x)), - Err(TryRecvError::Empty) => None, - Err(TryRecvError::Closed) => Some(None), - Err(TryRecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Some(Some(x.try_recv().expect("could not receive after lagging"))) - } - } - } - } -} - -#[async_trait] -pub(crate) trait CursorEditor { - async fn moved(&mut self, cursor: Cursor) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct CursorControllerWorker { - uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, - client: C, -} - -impl CursorControllerWorker { - pub(crate) fn new(uid: String, client: C) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); - let (cur_tx, _cur_rx) = broadcast::channel(64); - CursorControllerWorker { - uid, client, - producer: op_tx, - op: op_rx, - channel: Arc::new(cur_tx), - } - } -} - -#[async_trait] -impl ControllerWorker for CursorControllerWorker { - fn subscribe(&self) -> CursorControllerHandle { - CursorControllerHandle { - uid: self.uid.clone(), - op: self.producer.clone(), - original: self.channel.clone(), - stream: Mutex::new(self.channel.subscribe()), - } - } - - async fn work(mut self) { - loop { - tokio::select!{ - Some(cur) = self.client.recv() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { self.client.moved(op).await; }, - else => break, - } - } - } -} - - diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index b4cfefc..134f2a7 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,4 +1,4 @@ -pub mod controller; +pub mod tracker; use crate::proto::{Position, Cursor}; diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs new file mode 100644 index 0000000..67149fb --- /dev/null +++ b/src/cursor/tracker.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; +use tonic::{Streaming, transport::Channel}; + +use crate::{proto::{Position, Cursor, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; + +pub struct CursorTracker { + uid: String, + op: mpsc::Sender, + stream: Mutex>, +} + +impl CursorTracker { + pub async fn moved(&self, path: &str, start: Position, end: Position) -> Result<(), CodempError> { + Ok(self.op.send(Cursor { + user: self.uid.clone(), + buffer: path.to_string(), + start: start.into(), + end: end.into(), + }).await?) + } + + // TODO is this cancelable? so it can be used in tokio::select! + // TODO is the result type overkill? should be an option? + pub async fn recv(&self) -> Result { + let mut stream = self.stream.lock().await; + match stream.recv().await { + Ok(x) => Ok(x), + Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Ok(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + // fn try_poll(&self) -> Option> { + // match self.stream.try_lock() { + // Err(_) => None, + // Ok(mut x) => match x.try_recv() { + // Ok(x) => Some(Some(x)), + // Err(TryRecvError::Empty) => None, + // Err(TryRecvError::Closed) => Some(None), + // Err(TryRecvError::Lagged(n)) => { + // tracing::error!("cursor channel lagged behind, skipping {} events", n); + // Some(Some(x.try_recv().expect("could not receive after lagging"))) + // } + // } + // } + // } +} + +pub(crate) struct CursorTrackerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorTrackerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + Self { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } + + pub(crate) fn subscribe(&self) -> CursorTracker { + CursorTracker { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: Mutex::new(self.channel.subscribe()), + } + } + + // TODO is it possible to avoid passing directly tonic Streaming and proto BufferClient ? + pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { + loop { + tokio::select!{ + Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + else => break, + } + } + } +} +