diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index da47efa..8dd9085 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -22,7 +22,10 @@ use crate::api::TextChange; /// /// upon dropping this handle will stop the associated worker #[derive(Debug, Clone)] -pub struct BufferController { +pub struct BufferController(Arc); + +#[derive(Debug)] +struct BufferControllerInner { name: String, content: watch::Receiver, seen: StatusCheck, // internal buffer previous state @@ -39,23 +42,25 @@ impl BufferController { poller: mpsc::UnboundedSender>, stop: mpsc::UnboundedSender<()>, ) -> Self { - BufferController { - name, - content, operations, poller, - seen: StatusCheck::default(), - _stop: Arc::new(StopOnDrop(stop)), - } + Self(Arc::new( + BufferControllerInner { + name, + content, operations, poller, + seen: StatusCheck::default(), + _stop: Arc::new(StopOnDrop(stop)), + } + )) } /// unique identifier of buffer pub fn name(&self) -> &str { - &self.name + &self.0.name } /// return buffer whole content, updating internal buffer previous state pub fn content(&self) -> String { - self.seen.update(self.content.borrow().clone()); - self.content.borrow().clone() + self.0.seen.update(self.0.content.borrow().clone()); + self.0.content.borrow().clone() } } @@ -75,43 +80,43 @@ impl Controller for BufferController { /// block until a text change is available /// this returns immediately if one is already available async fn poll(&self) -> crate::Result<()> { - if self.seen.check() != *self.content.borrow() { + if self.0.seen.check() != *self.0.content.borrow() { return Ok(()); // short circuit: already available! } let (tx, rx) = oneshot::channel::<()>(); - self.poller.send(tx)?; + self.0.poller.send(tx)?; rx.await.map_err(|_| crate::Error::Channel { send: false })?; Ok(()) } /// if a text change is available, return it immediately fn try_recv(&self) -> crate::Result> { - let seen = self.seen.check(); - let actual = self.content.borrow().clone(); + let seen = self.0.seen.check(); + let actual = self.0.content.borrow().clone(); if seen == actual { return Ok(None); } let change = TextChange::from_diff(&seen, &actual); - self.seen.update(actual); + self.0.seen.update(actual); Ok(Some(change)) } /// block until a new text change is available, and return it async fn recv(&self) -> crate::Result { self.poll().await?; - let seen = self.seen.check(); - let actual = self.content.borrow().clone(); + let seen = self.0.seen.check(); + let actual = self.0.content.borrow().clone(); let change = TextChange::from_diff(&seen, &actual); - self.seen.update(actual); + self.0.seen.update(actual); Ok(change) } /// enqueue a text change for processing /// this also updates internal buffer previous state fn send(&self, op: TextChange) -> crate::Result<()> { - let before = self.seen.check(); - self.seen.update(op.apply(&before)); - Ok(self.operations.send(op)?) + let before = self.0.seen.check(); + self.0.seen.update(op.apply(&before)); + Ok(self.0.operations.send(op)?) } } diff --git a/src/client.rs b/src/client.rs index 1f6c361..5db7600 100644 --- a/src/client.rs +++ b/src/client.rs @@ -34,7 +34,7 @@ use crate::{ pub struct Client { user_id: Uuid, token_tx: Arc>, - workspaces: Arc>>, + workspaces: Arc>, services: Arc } @@ -109,7 +109,7 @@ impl Client { } /// join a workspace, returns an [tokio::sync::RwLock] to interact with it - pub async fn join_workspace(&self, workspace: &str) -> crate::Result> { + pub async fn join_workspace(&self, workspace: &str) -> crate::Result { let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner(); let (tx, rx) = mpsc::channel(256); @@ -119,20 +119,20 @@ impl Client { .into_inner(); let worker = CursorWorker::default(); - let controller = Arc::new(worker.subscribe()); + let controller = worker.subscribe(); tokio::spawn(async move { tracing::debug!("controller worker started"); worker.work(tx, cur_stream).await; tracing::debug!("controller worker stopped"); }); - let ws = Arc::new(Workspace::new( + let ws = Workspace::new( workspace.to_string(), self.user_id, self.token_tx.clone(), controller, self.services.clone() - )); + ); ws.fetch_users().await?; ws.fetch_buffers().await?; @@ -144,7 +144,7 @@ impl Client { Ok(ws) } - pub fn get_workspace(&self, id: &str) -> Option> { + pub fn get_workspace(&self, id: &str) -> Option { self.workspaces.get(id).map(|x| x.clone()) } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 5cc583d..651a758 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -2,10 +2,12 @@ //! //! a controller implementation for cursor actions +use std::sync::Arc; + use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tonic::async_trait; -use crate::{api::Controller, errors::IgnorableError}; +use crate::{api::{Cursor, Controller}, errors::IgnorableError}; use codemp_proto::cursor::{CursorEvent, CursorPosition}; /// the cursor controller implementation /// @@ -18,8 +20,11 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; /// for each controller a worker exists, managing outgoing and inbound event queues /// /// upon dropping this handle will stop the associated worker +#[derive(Debug, Clone)] +pub struct CursorController(Arc); + #[derive(Debug)] -pub struct CursorController { +struct CursorControllerInner { op: mpsc::UnboundedSender, last_op: Mutex>, stream: Mutex>, @@ -28,7 +33,7 @@ pub struct CursorController { impl Drop for CursorController { fn drop(&mut self) { - self.stop.send(()).unwrap_or_warn("could not stop cursor actor") + self.0.stop.send(()).unwrap_or_warn("could not stop cursor actor") } } @@ -39,33 +44,35 @@ impl CursorController { stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - CursorController { op, last_op, stream, stop } + Self(Arc::new( + CursorControllerInner { op, last_op, stream, stop } + )) } } #[async_trait] -impl Controller for CursorController { - type Input = CursorPosition; +impl Controller for CursorController { + type Input = Cursor; /// enqueue a cursor event to be broadcast to current workspace /// will automatically invert cursor start/end if they are inverted - fn send(&self, mut cursor: CursorPosition) -> crate::Result<()> { + fn send(&self, mut cursor: Cursor) -> crate::Result<()> { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self.op.send(cursor)?) + Ok(self.0.op.send(cursor.into())?) } /// try to receive without blocking, but will still block on stream mutex - fn try_recv(&self) -> crate::Result> { - let mut stream = self.stream.blocking_lock(); + fn try_recv(&self) -> crate::Result> { + let mut stream = self.0.stream.blocking_lock(); match stream.try_recv() { - Ok(x) => Ok(Some(x)), + Ok(x) => Ok(Some(x.into())), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(TryRecvError::Lagged(n)) => { tracing::warn!("cursor channel lagged, skipping {} events", n); - Ok(stream.try_recv().ok()) + Ok(stream.try_recv().map(|x| x.into()).ok()) }, } } @@ -73,21 +80,21 @@ impl Controller for CursorController { // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? /// get next cursor event from current workspace, or block until one is available - async fn recv(&self) -> crate::Result { - let mut stream = self.stream.lock().await; + async fn recv(&self) -> crate::Result { + let mut stream = self.0.stream.lock().await; match stream.recv().await { - Ok(x) => Ok(x), + Ok(x) => Ok(x.into()), Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(RecvError::Lagged(n)) => { tracing::error!("cursor channel lagged behind, skipping {} events", n); - Ok(stream.recv().await.expect("could not receive after lagging")) + Ok(stream.recv().await.expect("could not receive after lagging").into()) } } } /// await for changed mutex and then next op change async fn poll(&self) -> crate::Result<()> { - Ok(self.last_op.lock().await.changed().await?) + Ok(self.0.last_op.lock().await.changed().await?) } } diff --git a/src/workspace.rs b/src/workspace.rs index 7f141b7..3a8892c 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -14,12 +14,16 @@ pub struct UserInfo { pub uuid: Uuid } -pub struct Workspace { +#[derive(Debug, Clone)] +pub struct Workspace(Arc); + +#[derive(Debug)] +struct WorkspaceInner { id: String, user_id: Uuid, // reference to global user id token: Arc>, - cursor: Arc, - buffers: Arc>>, + cursor: cursor::Controller, + buffers: Arc>, pub(crate) filetree: Arc>, pub(crate) users: Arc>, services: Arc @@ -31,10 +35,10 @@ impl Workspace { id: String, user_id: Uuid, token: Arc>, - cursor: Arc, + cursor: cursor::Controller, services: Arc ) -> Self { - Workspace { + Self(Arc::new(WorkspaceInner { id, user_id, token, @@ -43,12 +47,12 @@ impl Workspace { filetree: Arc::new(DashSet::default()), users: Arc::new(DashMap::default()), services - } + })) } pub(crate) fn run_actor(&self, mut stream: Streaming) { - let users = self.users.clone(); - let filetree = self.filetree.clone(); + let users = self.0.users.clone(); + let filetree = self.0.filetree.clone(); let name = self.id(); tokio::spawn(async move { loop { @@ -70,13 +74,13 @@ impl Workspace { /// create a new buffer in current workspace pub async fn create(&self, path: &str) -> crate::Result<()> { - let mut workspace_client = self.services.workspace.clone(); + let mut workspace_client = self.0.services.workspace.clone(); workspace_client.create_buffer( tonic::Request::new(BufferNode { path: path.to_string() }) ).await?; // add to filetree - self.filetree.insert(path.to_string()); + self.0.filetree.insert(path.to_string()); // fetch buffers self.fetch_buffers().await?; @@ -88,40 +92,40 @@ impl Workspace { /// /// to interact with such buffer use [crate::api::Controller::send] or /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] - pub async fn attach(&self, path: &str) -> crate::Result> { - let mut worskspace_client = self.services.workspace.clone(); + pub async fn attach(&self, path: &str) -> crate::Result { + let mut worskspace_client = self.0.services.workspace.clone(); let request = tonic::Request::new(BufferNode { path: path.to_string() }); let credentials = worskspace_client.access_buffer(request).await?.into_inner(); - self.token.send(credentials.token)?; + self.0.token.send(credentials.token)?; let (tx, rx) = mpsc::channel(256); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence")); - let stream = self.services.buffer.clone().attach(req).await?.into_inner(); + let stream = self.0.services.buffer.clone().attach(req).await?.into_inner(); - let worker = BufferWorker::new(self.user_id, path); - let controller = Arc::new(worker.subscribe()); + let worker = BufferWorker::new(self.0.user_id, path); + let controller = worker.subscribe(); tokio::spawn(async move { tracing::debug!("controller worker started"); worker.work(tx, stream).await; tracing::debug!("controller worker stopped"); }); - self.buffers.insert(path.to_string(), controller.clone()); + self.0.buffers.insert(path.to_string(), controller.clone()); Ok(controller) } /// fetch a list of all buffers in a workspace pub async fn fetch_buffers(&self) -> crate::Result<()> { - let mut workspace_client = self.services.workspace.clone(); + let mut workspace_client = self.0.services.workspace.clone(); let buffers = workspace_client.list_buffers( tonic::Request::new(Empty {}) ).await?.into_inner().buffers; - self.filetree.clear(); + self.0.filetree.clear(); for b in buffers { - self.filetree.insert(b.path); + self.0.filetree.insert(b.path); } Ok(()) @@ -129,14 +133,14 @@ impl Workspace { /// fetch a list of all users in a workspace pub async fn fetch_users(&self) -> crate::Result<()> { - let mut workspace_client = self.services.workspace.clone(); + let mut workspace_client = self.0.services.workspace.clone(); let users = BTreeSet::from_iter(workspace_client.list_users( tonic::Request::new(Empty {}) ).await?.into_inner().users.into_iter().map(Uuid::from)); - self.users.clear(); + self.0.users.clear(); for u in users { - self.users.insert(u, UserInfo { uuid: u }); + self.0.users.insert(u, UserInfo { uuid: u }); } Ok(()) @@ -146,7 +150,7 @@ impl Workspace { /// /// TODO: discuss implementation details pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { - let mut workspace_client = self.services.workspace.clone(); + let mut workspace_client = self.0.services.workspace.clone(); let buffer_users = workspace_client.list_buffer_users( tonic::Request::new(BufferNode { path: path.to_string() }) ).await?.into_inner().users; @@ -156,29 +160,29 @@ impl Workspace { /// delete a buffer pub async fn delete(&self, path: &str) -> crate::Result<()> { - let mut workspace_client = self.services.workspace.clone(); + let mut workspace_client = self.0.services.workspace.clone(); workspace_client.delete_buffer( tonic::Request::new(BufferNode { path: path.to_string() }) ).await?; - self.filetree.remove(path); + self.0.filetree.remove(path); Ok(()) } /// get the id of the workspace - pub fn id(&self) -> String { self.id.clone() } + pub fn id(&self) -> String { self.0.id.clone() } /// return a reference to current cursor controller, if currently in a workspace - pub fn cursor(&self) -> Arc { self.cursor.clone() } + pub fn cursor(&self) -> cursor::Controller { self.0.cursor.clone() } /// get a new reference to a buffer controller, if any is active to given path - pub fn buffer_by_name(&self, path: &str) -> Option> { - self.buffers.get(path).map(|x| x.clone()) + pub fn buffer_by_name(&self, path: &str) -> Option { + self.0.buffers.get(path).map(|x| x.clone()) } /// get the currently cached "filetree" pub fn filetree(&self) -> Vec { - self.filetree.iter().map(|f| f.clone()).collect() + self.0.filetree.iter().map(|f| f.clone()).collect() } }