diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9917ad7..39efbe7 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -2,6 +2,7 @@ use operational_transform::OperationSeq; use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::async_trait; +use crate::errors::IgnorableError; use crate::{Controller, Error}; use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; @@ -11,6 +12,7 @@ pub struct BufferController { content: watch::Receiver, operations: mpsc::Sender, stream: Mutex>, + stop: mpsc::UnboundedSender<()>, } impl BufferController { @@ -18,8 +20,15 @@ impl BufferController { content: watch::Receiver, operations: mpsc::Sender, stream: Mutex>, + stop: mpsc::UnboundedSender<()>, ) -> Self { - BufferController { content, operations, stream } + BufferController { content, operations, stream, stop } + } +} + +impl Drop for BufferController { + fn drop(&mut self) { + self.stop.send(()).unwrap_or_warn("could not send stop message to worker"); } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 922e217..adc53d0 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -5,6 +5,7 @@ pub mod controller; pub mod factory; +#[derive(Debug)] pub struct TextChange { pub span: Range, pub content: String, diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 99e673e..5f19863 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -5,6 +5,7 @@ use tokio::sync::{watch, mpsc, broadcast, Mutex}; use tonic::transport::Channel; use tonic::{async_trait, Streaming}; +use crate::errors::IgnorableError; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; use crate::ControllerWorker; @@ -23,6 +24,8 @@ pub(crate) struct BufferControllerWorker { sender: mpsc::Sender, buffer: String, path: String, + stop: mpsc::UnboundedReceiver<()>, + stop_control: mpsc::UnboundedSender<()>, } impl BufferControllerWorker { @@ -30,6 +33,7 @@ impl BufferControllerWorker { let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::channel(64); let (s_tx, _s_rx) = broadcast::channel(64); + let (end_tx, end_rx) = mpsc::unbounded_channel(); BufferControllerWorker { uid, content: txt_tx, @@ -40,6 +44,8 @@ impl BufferControllerWorker { queue: VecDeque::new(), buffer: buffer.to_string(), path: path.to_string(), + stop: end_rx, + stop_control: end_tx, } } } @@ -55,6 +61,7 @@ impl ControllerWorker for BufferControllerWorker { self.receiver.clone(), self.sender.clone(), Mutex::new(self.stream.subscribe()), + self.stop_control.clone(), ) } @@ -64,19 +71,31 @@ impl ControllerWorker for BufferControllerWorker { Some(operation) = recv_opseq(&mut rx) => { let mut out = operation; for op in self.queue.iter_mut() { - (*op, out) = op.transform(&out).unwrap(); + (*op, out) = match op.transform(&out) { + Ok((x, y)) => (x, y), + Err(e) => { + tracing::warn!("could not transform enqueued operation: {}", e); + break + }, + } } - self.stream.send(out.clone()).unwrap(); + self.stream.send(out.clone()).unwrap_or_warn("could not send operation to server"); out }, Some(op) = self.operations.recv() => { self.queue.push_back(op.clone()); op }, + Some(()) = self.stop.recv() => { + break; + } else => break }; - self.buffer = op.apply(&self.buffer).unwrap(); - self.content.send(self.buffer.clone()).unwrap(); + self.buffer = op.apply(&self.buffer).unwrap_or_else(|e| { + tracing::error!("could not update buffer string: {}", e); + self.buffer + }); + self.content.send(self.buffer.clone()).unwrap_or_warn("error showing updated buffer"); while let Some(op) = self.queue.get(0) { if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break } @@ -87,11 +106,17 @@ impl ControllerWorker for BufferControllerWorker { } async fn send_opseq(tx: &mut BufferClient, uid: String, path: String, op: OperationSeq) -> bool { + let opseq = match serde_json::to_string(&op) { + Ok(x) => x, + Err(e) => { + tracing::warn!("could not serialize opseq: {}", e); + return false; + } + }; let req = OperationRequest { hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path, user: uid, + opseq, path, }; match tx.edit(req).await { Ok(_) => true, @@ -104,7 +129,13 @@ async fn send_opseq(tx: &mut BufferClient, uid: String, path: String, o async fn recv_opseq(rx: &mut Streaming) -> Option { match rx.message().await { - Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(Some(op)) => match serde_json::from_str(&op.opseq) { + Ok(x) => Some(x), + Err(e) => { + tracing::warn!("could not deserialize opseq: {}", e); + None + } + }, Ok(None) => None, Err(e) => { tracing::error!("could not receive edit from server: {}", e); diff --git a/src/client.rs b/src/client.rs index 24508dd..903833d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -41,6 +41,17 @@ impl Client { Some(self.workspace.as_ref()?.cursor.clone()) } + pub fn leave_workspace(&mut self) { + self.workspace = None + } + + pub fn disconnect_buffer(&mut self, path: &str) -> bool { + match &mut self.workspace { + Some(w) => w.buffers.remove(path).is_some(), + None => false, + } + } + pub fn get_buffer(&self, path: &str) -> Option> { self.workspace.as_ref()?.buffers.get(path).cloned() } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 2c650e9..fe6bd85 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -1,21 +1,29 @@ use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; use tonic::async_trait; -use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller}; +use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError}; pub struct CursorController { uid: String, op: mpsc::Sender, stream: Mutex>, + stop: mpsc::UnboundedSender<()>, +} + +impl Drop for CursorController { + fn drop(&mut self) { + self.stop.send(()).unwrap_or_warn("could not stop cursor actor") + } } impl CursorController { pub(crate) fn new( uid: String, op: mpsc::Sender, - stream: Mutex> + stream: Mutex>, + stop: mpsc::UnboundedSender<()>, ) -> Self { - CursorController { uid, op, stream } + CursorController { uid, op, stream, stop } } } diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index c847fcd..c71f232 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -12,17 +12,22 @@ pub(crate) struct CursorControllerWorker { producer: mpsc::Sender, op: mpsc::Receiver, channel: Arc>, + stop: mpsc::UnboundedReceiver<()>, + stop_control: mpsc::UnboundedSender<()>, } impl CursorControllerWorker { pub(crate) fn new(uid: String) -> Self { let (op_tx, op_rx) = mpsc::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64); + let (end_tx, end_rx) = mpsc::unbounded_channel(); Self { uid, producer: op_tx, op: op_rx, channel: Arc::new(cur_tx), + stop: end_rx, + stop_control: end_tx, } } } @@ -37,7 +42,8 @@ impl ControllerWorker for CursorControllerWorker { CursorController::new( self.uid.clone(), self.producer.clone(), - Mutex::new(self.channel.subscribe()) + Mutex::new(self.channel.subscribe()), + self.stop_control.clone(), ) } @@ -46,6 +52,7 @@ impl ControllerWorker for CursorControllerWorker { tokio::select!{ Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(()) = self.stop.recv() => { break; }, else => break, } } diff --git a/src/instance.rs b/src/instance.rs index 54c49ee..a31eb13 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -4,7 +4,7 @@ use tokio::sync::Mutex; use crate::{ buffer::controller::BufferController, - errors::Error, client::Client, cursor::controller::CursorController, Controller, + errors::Error, client::Client, cursor::controller::CursorController, }; @@ -82,4 +82,25 @@ impl Instance { .get_buffer(path) .ok_or(Error::InvalidState { msg: "join a workspace or create requested buffer first".into() }) } + + pub async fn leave_workspace(&self) -> Result<(), Error> { + self.client + .lock() + .await + .as_mut() + .ok_or(Error::InvalidState { msg: "connect first".into() })? + .leave_workspace(); + Ok(()) + } + + pub async fn disconnect_buffer(&self, path: &str) -> Result { + Ok( + self.client + .lock() + .await + .as_mut() + .ok_or(Error::InvalidState { msg: "connect first".into() })? + .disconnect_buffer(path) + ) + } } diff --git a/src/lib.rs b/src/lib.rs index ccbc229..e48eb28 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,13 +41,22 @@ pub trait Controller : Sized + Send + Sync { async fn send(&self, x: Self::Input) -> Result<(), Error>; async fn recv(&self) -> Result; - fn callback(self: Arc, rt: &tokio::runtime::Runtime, mut cb: F) - where Self : 'static, F : FnMut(T) + Sync + Send + 'static + fn callback( + self: Arc, + rt: &tokio::runtime::Runtime, + mut stop: tokio::sync::mpsc::UnboundedReceiver<()>, + mut cb: F + ) where + Self : 'static, + F : FnMut(T) + Sync + Send + 'static { - let x = Arc::new(self); rt.spawn(async move { - while let Ok(data) = x.recv().await { - cb(data) + loop { + tokio::select! { + Ok(data) = self.recv() => cb(data), + Some(()) = stop.recv() => break, + else => break, + } } }); }