feat: inner Arc<T> on controllers

so we can wrap them directly in our glue
This commit is contained in:
əlemi 2024-08-05 19:13:48 +02:00
parent 25e56f9894
commit 84c77eaca3
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 91 additions and 75 deletions

View file

@ -22,7 +22,10 @@ use crate::api::TextChange;
/// ///
/// upon dropping this handle will stop the associated worker /// upon dropping this handle will stop the associated worker
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BufferController { pub struct BufferController(Arc<BufferControllerInner>);
#[derive(Debug)]
struct BufferControllerInner {
name: String, name: String,
content: watch::Receiver<String>, content: watch::Receiver<String>,
seen: StatusCheck<String>, // internal buffer previous state seen: StatusCheck<String>, // internal buffer previous state
@ -39,23 +42,25 @@ impl BufferController {
poller: mpsc::UnboundedSender<oneshot::Sender<()>>, poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
BufferController { Self(Arc::new(
BufferControllerInner {
name, name,
content, operations, poller, content, operations, poller,
seen: StatusCheck::default(), seen: StatusCheck::default(),
_stop: Arc::new(StopOnDrop(stop)), _stop: Arc::new(StopOnDrop(stop)),
} }
))
} }
/// unique identifier of buffer /// unique identifier of buffer
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
&self.name &self.0.name
} }
/// return buffer whole content, updating internal buffer previous state /// return buffer whole content, updating internal buffer previous state
pub fn content(&self) -> String { pub fn content(&self) -> String {
self.seen.update(self.content.borrow().clone()); self.0.seen.update(self.0.content.borrow().clone());
self.content.borrow().clone() self.0.content.borrow().clone()
} }
} }
@ -75,43 +80,43 @@ impl Controller<TextChange> for BufferController {
/// block until a text change is available /// block until a text change is available
/// this returns immediately if one is already available /// this returns immediately if one is already available
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> crate::Result<()> {
if self.seen.check() != *self.content.borrow() { if self.0.seen.check() != *self.0.content.borrow() {
return Ok(()); // short circuit: already available! return Ok(()); // short circuit: already available!
} }
let (tx, rx) = oneshot::channel::<()>(); let (tx, rx) = oneshot::channel::<()>();
self.poller.send(tx)?; self.0.poller.send(tx)?;
rx.await.map_err(|_| crate::Error::Channel { send: false })?; rx.await.map_err(|_| crate::Error::Channel { send: false })?;
Ok(()) Ok(())
} }
/// if a text change is available, return it immediately /// if a text change is available, return it immediately
fn try_recv(&self) -> crate::Result<Option<TextChange>> { fn try_recv(&self) -> crate::Result<Option<TextChange>> {
let seen = self.seen.check(); let seen = self.0.seen.check();
let actual = self.content.borrow().clone(); let actual = self.0.content.borrow().clone();
if seen == actual { if seen == actual {
return Ok(None); return Ok(None);
} }
let change = TextChange::from_diff(&seen, &actual); let change = TextChange::from_diff(&seen, &actual);
self.seen.update(actual); self.0.seen.update(actual);
Ok(Some(change)) Ok(Some(change))
} }
/// block until a new text change is available, and return it /// block until a new text change is available, and return it
async fn recv(&self) -> crate::Result<TextChange> { async fn recv(&self) -> crate::Result<TextChange> {
self.poll().await?; self.poll().await?;
let seen = self.seen.check(); let seen = self.0.seen.check();
let actual = self.content.borrow().clone(); let actual = self.0.content.borrow().clone();
let change = TextChange::from_diff(&seen, &actual); let change = TextChange::from_diff(&seen, &actual);
self.seen.update(actual); self.0.seen.update(actual);
Ok(change) Ok(change)
} }
/// enqueue a text change for processing /// enqueue a text change for processing
/// this also updates internal buffer previous state /// this also updates internal buffer previous state
fn send(&self, op: TextChange) -> crate::Result<()> { fn send(&self, op: TextChange) -> crate::Result<()> {
let before = self.seen.check(); let before = self.0.seen.check();
self.seen.update(op.apply(&before)); self.0.seen.update(op.apply(&before));
Ok(self.operations.send(op)?) Ok(self.0.operations.send(op)?)
} }
} }

View file

@ -34,7 +34,7 @@ use crate::{
pub struct Client { pub struct Client {
user_id: Uuid, user_id: Uuid,
token_tx: Arc<tokio::sync::watch::Sender<Token>>, token_tx: Arc<tokio::sync::watch::Sender<Token>>,
workspaces: Arc<DashMap<String, Arc<Workspace>>>, workspaces: Arc<DashMap<String, Workspace>>,
services: Arc<Services> services: Arc<Services>
} }
@ -109,7 +109,7 @@ impl Client {
} }
/// join a workspace, returns an [tokio::sync::RwLock] to interact with it /// join a workspace, returns an [tokio::sync::RwLock] to interact with it
pub async fn join_workspace(&self, workspace: &str) -> crate::Result<Arc<Workspace>> { pub async fn join_workspace(&self, workspace: &str) -> crate::Result<Workspace> {
let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner(); let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner();
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
@ -119,20 +119,20 @@ impl Client {
.into_inner(); .into_inner();
let worker = CursorWorker::default(); let worker = CursorWorker::default();
let controller = Arc::new(worker.subscribe()); let controller = worker.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("controller worker started"); tracing::debug!("controller worker started");
worker.work(tx, cur_stream).await; worker.work(tx, cur_stream).await;
tracing::debug!("controller worker stopped"); tracing::debug!("controller worker stopped");
}); });
let ws = Arc::new(Workspace::new( let ws = Workspace::new(
workspace.to_string(), workspace.to_string(),
self.user_id, self.user_id,
self.token_tx.clone(), self.token_tx.clone(),
controller, controller,
self.services.clone() self.services.clone()
)); );
ws.fetch_users().await?; ws.fetch_users().await?;
ws.fetch_buffers().await?; ws.fetch_buffers().await?;
@ -144,7 +144,7 @@ impl Client {
Ok(ws) Ok(ws)
} }
pub fn get_workspace(&self, id: &str) -> Option<Arc<Workspace>> { pub fn get_workspace(&self, id: &str) -> Option<Workspace> {
self.workspaces.get(id).map(|x| x.clone()) self.workspaces.get(id).map(|x| x.clone())
} }

View file

@ -2,10 +2,12 @@
//! //!
//! a controller implementation for cursor actions //! a controller implementation for cursor actions
use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::{api::Controller, errors::IgnorableError}; use crate::{api::{Cursor, Controller}, errors::IgnorableError};
use codemp_proto::cursor::{CursorEvent, CursorPosition}; use codemp_proto::cursor::{CursorEvent, CursorPosition};
/// the cursor controller implementation /// the cursor controller implementation
/// ///
@ -18,8 +20,11 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
/// for each controller a worker exists, managing outgoing and inbound event queues /// for each controller a worker exists, managing outgoing and inbound event queues
/// ///
/// upon dropping this handle will stop the associated worker /// upon dropping this handle will stop the associated worker
#[derive(Debug, Clone)]
pub struct CursorController(Arc<CursorControllerInner>);
#[derive(Debug)] #[derive(Debug)]
pub struct CursorController { struct CursorControllerInner {
op: mpsc::UnboundedSender<CursorPosition>, op: mpsc::UnboundedSender<CursorPosition>,
last_op: Mutex<watch::Receiver<CursorEvent>>, last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
@ -28,7 +33,7 @@ pub struct CursorController {
impl Drop for CursorController { impl Drop for CursorController {
fn drop(&mut self) { fn drop(&mut self) {
self.stop.send(()).unwrap_or_warn("could not stop cursor actor") self.0.stop.send(()).unwrap_or_warn("could not stop cursor actor")
} }
} }
@ -39,33 +44,35 @@ impl CursorController {
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
CursorController { op, last_op, stream, stop } Self(Arc::new(
CursorControllerInner { op, last_op, stream, stop }
))
} }
} }
#[async_trait] #[async_trait]
impl Controller<CursorEvent> for CursorController { impl Controller<Cursor> for CursorController {
type Input = CursorPosition; type Input = Cursor;
/// enqueue a cursor event to be broadcast to current workspace /// enqueue a cursor event to be broadcast to current workspace
/// will automatically invert cursor start/end if they are inverted /// will automatically invert cursor start/end if they are inverted
fn send(&self, mut cursor: CursorPosition) -> crate::Result<()> { fn send(&self, mut cursor: Cursor) -> crate::Result<()> {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
Ok(self.op.send(cursor)?) Ok(self.0.op.send(cursor.into())?)
} }
/// try to receive without blocking, but will still block on stream mutex /// try to receive without blocking, but will still block on stream mutex
fn try_recv(&self) -> crate::Result<Option<CursorEvent>> { fn try_recv(&self) -> crate::Result<Option<Cursor>> {
let mut stream = self.stream.blocking_lock(); let mut stream = self.0.stream.blocking_lock();
match stream.try_recv() { match stream.try_recv() {
Ok(x) => Ok(Some(x)), Ok(x) => Ok(Some(x.into())),
Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => { Err(TryRecvError::Lagged(n)) => {
tracing::warn!("cursor channel lagged, skipping {} events", n); tracing::warn!("cursor channel lagged, skipping {} events", n);
Ok(stream.try_recv().ok()) Ok(stream.try_recv().map(|x| x.into()).ok())
}, },
} }
} }
@ -73,21 +80,21 @@ impl Controller<CursorEvent> for CursorController {
// TODO is this cancelable? so it can be used in tokio::select! // TODO is this cancelable? so it can be used in tokio::select!
// TODO is the result type overkill? should be an option? // TODO is the result type overkill? should be an option?
/// get next cursor event from current workspace, or block until one is available /// get next cursor event from current workspace, or block until one is available
async fn recv(&self) -> crate::Result<CursorEvent> { async fn recv(&self) -> crate::Result<Cursor> {
let mut stream = self.stream.lock().await; let mut stream = self.0.stream.lock().await;
match stream.recv().await { match stream.recv().await {
Ok(x) => Ok(x), Ok(x) => Ok(x.into()),
Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(RecvError::Lagged(n)) => { Err(RecvError::Lagged(n)) => {
tracing::error!("cursor channel lagged behind, skipping {} events", n); tracing::error!("cursor channel lagged behind, skipping {} events", n);
Ok(stream.recv().await.expect("could not receive after lagging")) Ok(stream.recv().await.expect("could not receive after lagging").into())
} }
} }
} }
/// await for changed mutex and then next op change /// await for changed mutex and then next op change
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> crate::Result<()> {
Ok(self.last_op.lock().await.changed().await?) Ok(self.0.last_op.lock().await.changed().await?)
} }
} }

View file

@ -14,12 +14,16 @@ pub struct UserInfo {
pub uuid: Uuid pub uuid: Uuid
} }
pub struct Workspace { #[derive(Debug, Clone)]
pub struct Workspace(Arc<WorkspaceInner>);
#[derive(Debug)]
struct WorkspaceInner {
id: String, id: String,
user_id: Uuid, // reference to global user id user_id: Uuid, // reference to global user id
token: Arc<tokio::sync::watch::Sender<Token>>, token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: Arc<cursor::Controller>, cursor: cursor::Controller,
buffers: Arc<DashMap<String, Arc<buffer::Controller>>>, buffers: Arc<DashMap<String, buffer::Controller>>,
pub(crate) filetree: Arc<DashSet<String>>, pub(crate) filetree: Arc<DashSet<String>>,
pub(crate) users: Arc<DashMap<Uuid, UserInfo>>, pub(crate) users: Arc<DashMap<Uuid, UserInfo>>,
services: Arc<Services> services: Arc<Services>
@ -31,10 +35,10 @@ impl Workspace {
id: String, id: String,
user_id: Uuid, user_id: Uuid,
token: Arc<tokio::sync::watch::Sender<Token>>, token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: Arc<cursor::Controller>, cursor: cursor::Controller,
services: Arc<Services> services: Arc<Services>
) -> Self { ) -> Self {
Workspace { Self(Arc::new(WorkspaceInner {
id, id,
user_id, user_id,
token, token,
@ -43,12 +47,12 @@ impl Workspace {
filetree: Arc::new(DashSet::default()), filetree: Arc::new(DashSet::default()),
users: Arc::new(DashMap::default()), users: Arc::new(DashMap::default()),
services services
} }))
} }
pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) { pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) {
let users = self.users.clone(); let users = self.0.users.clone();
let filetree = self.filetree.clone(); let filetree = self.0.filetree.clone();
let name = self.id(); let name = self.id();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -70,13 +74,13 @@ impl Workspace {
/// create a new buffer in current workspace /// create a new buffer in current workspace
pub async fn create(&self, path: &str) -> crate::Result<()> { pub async fn create(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.0.services.workspace.clone();
workspace_client.create_buffer( workspace_client.create_buffer(
tonic::Request::new(BufferNode { path: path.to_string() }) tonic::Request::new(BufferNode { path: path.to_string() })
).await?; ).await?;
// add to filetree // add to filetree
self.filetree.insert(path.to_string()); self.0.filetree.insert(path.to_string());
// fetch buffers // fetch buffers
self.fetch_buffers().await?; self.fetch_buffers().await?;
@ -88,40 +92,40 @@ impl Workspace {
/// ///
/// to interact with such buffer use [crate::api::Controller::send] or /// to interact with such buffer use [crate::api::Controller::send] or
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange] /// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&self, path: &str) -> crate::Result<Arc<buffer::Controller>> { pub async fn attach(&self, path: &str) -> crate::Result<buffer::Controller> {
let mut worskspace_client = self.services.workspace.clone(); let mut worskspace_client = self.0.services.workspace.clone();
let request = tonic::Request::new(BufferNode { path: path.to_string() }); let request = tonic::Request::new(BufferNode { path: path.to_string() });
let credentials = worskspace_client.access_buffer(request).await?.into_inner(); let credentials = worskspace_client.access_buffer(request).await?.into_inner();
self.token.send(credentials.token)?; self.0.token.send(credentials.token)?;
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence")); req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence"));
let stream = self.services.buffer.clone().attach(req).await?.into_inner(); let stream = self.0.services.buffer.clone().attach(req).await?.into_inner();
let worker = BufferWorker::new(self.user_id, path); let worker = BufferWorker::new(self.0.user_id, path);
let controller = Arc::new(worker.subscribe()); let controller = worker.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("controller worker started"); tracing::debug!("controller worker started");
worker.work(tx, stream).await; worker.work(tx, stream).await;
tracing::debug!("controller worker stopped"); tracing::debug!("controller worker stopped");
}); });
self.buffers.insert(path.to_string(), controller.clone()); self.0.buffers.insert(path.to_string(), controller.clone());
Ok(controller) Ok(controller)
} }
/// fetch a list of all buffers in a workspace /// fetch a list of all buffers in a workspace
pub async fn fetch_buffers(&self) -> crate::Result<()> { pub async fn fetch_buffers(&self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.0.services.workspace.clone();
let buffers = workspace_client.list_buffers( let buffers = workspace_client.list_buffers(
tonic::Request::new(Empty {}) tonic::Request::new(Empty {})
).await?.into_inner().buffers; ).await?.into_inner().buffers;
self.filetree.clear(); self.0.filetree.clear();
for b in buffers { for b in buffers {
self.filetree.insert(b.path); self.0.filetree.insert(b.path);
} }
Ok(()) Ok(())
@ -129,14 +133,14 @@ impl Workspace {
/// fetch a list of all users in a workspace /// fetch a list of all users in a workspace
pub async fn fetch_users(&self) -> crate::Result<()> { pub async fn fetch_users(&self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.0.services.workspace.clone();
let users = BTreeSet::from_iter(workspace_client.list_users( let users = BTreeSet::from_iter(workspace_client.list_users(
tonic::Request::new(Empty {}) tonic::Request::new(Empty {})
).await?.into_inner().users.into_iter().map(Uuid::from)); ).await?.into_inner().users.into_iter().map(Uuid::from));
self.users.clear(); self.0.users.clear();
for u in users { for u in users {
self.users.insert(u, UserInfo { uuid: u }); self.0.users.insert(u, UserInfo { uuid: u });
} }
Ok(()) Ok(())
@ -146,7 +150,7 @@ impl Workspace {
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> { pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.0.services.workspace.clone();
let buffer_users = workspace_client.list_buffer_users( let buffer_users = workspace_client.list_buffer_users(
tonic::Request::new(BufferNode { path: path.to_string() }) tonic::Request::new(BufferNode { path: path.to_string() })
).await?.into_inner().users; ).await?.into_inner().users;
@ -156,29 +160,29 @@ impl Workspace {
/// delete a buffer /// delete a buffer
pub async fn delete(&self, path: &str) -> crate::Result<()> { pub async fn delete(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone(); let mut workspace_client = self.0.services.workspace.clone();
workspace_client.delete_buffer( workspace_client.delete_buffer(
tonic::Request::new(BufferNode { path: path.to_string() }) tonic::Request::new(BufferNode { path: path.to_string() })
).await?; ).await?;
self.filetree.remove(path); self.0.filetree.remove(path);
Ok(()) Ok(())
} }
/// get the id of the workspace /// get the id of the workspace
pub fn id(&self) -> String { self.id.clone() } pub fn id(&self) -> String { self.0.id.clone() }
/// return a reference to current cursor controller, if currently in a workspace /// return a reference to current cursor controller, if currently in a workspace
pub fn cursor(&self) -> Arc<cursor::Controller> { self.cursor.clone() } pub fn cursor(&self) -> cursor::Controller { self.0.cursor.clone() }
/// get a new reference to a buffer controller, if any is active to given path /// get a new reference to a buffer controller, if any is active to given path
pub fn buffer_by_name(&self, path: &str) -> Option<Arc<buffer::Controller>> { pub fn buffer_by_name(&self, path: &str) -> Option<buffer::Controller> {
self.buffers.get(path).map(|x| x.clone()) self.0.buffers.get(path).map(|x| x.clone())
} }
/// get the currently cached "filetree" /// get the currently cached "filetree"
pub fn filetree(&self) -> Vec<String> { pub fn filetree(&self) -> Vec<String> {
self.filetree.iter().map(|f| f.clone()).collect() self.0.filetree.iter().map(|f| f.clone()).collect()
} }
} }