feat: automatically map cursor uuids to names

not a fan of passing an Arc<DashMap> down to the cursor worker, but it
needs to access a mapping managed by the workspace so not sure if it can
be better. Into and From protocol types and Cursor are gone: do things
manually (since user is now a different thing, it can't be auto). Also
api::Cursor got changed: user field is Option<String> now
This commit is contained in:
əlemi 2024-09-05 23:59:05 +02:00
parent fd109d6c39
commit 917a6b96c2
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 30 additions and 76 deletions

View file

@ -1,9 +1,6 @@
//! ### Cursor //! ### Cursor
//! Represents the position of a remote user's cursor. //! Represents the position of a remote user's cursor.
use codemp_proto as proto;
use uuid::Uuid;
#[cfg(feature = "python")] #[cfg(feature = "python")]
use pyo3::prelude::*; use pyo3::prelude::*;
@ -19,62 +16,5 @@ pub struct Cursor {
/// Path of buffer this cursor is on. /// Path of buffer this cursor is on.
pub buffer: String, pub buffer: String,
/// User display name, if provided. /// User display name, if provided.
pub user: Option<Uuid>, // TODO this should be a string, not the UUID! pub user: Option<String>,
}
impl From<proto::cursor::CursorPosition> for Cursor {
fn from(value: proto::cursor::CursorPosition) -> Self {
Self {
start: (value.start.row, value.start.col),
end: (value.end.row, value.end.col),
buffer: value.buffer.path,
user: None,
}
}
}
impl From<Cursor> for proto::cursor::CursorPosition {
fn from(value: Cursor) -> Self {
Self {
buffer: proto::files::BufferNode { path: value.buffer },
start: proto::cursor::RowCol {
row: value.start.0,
col: value.start.1,
},
end: proto::cursor::RowCol {
row: value.end.0,
col: value.end.1,
},
}
}
}
impl From<proto::cursor::CursorEvent> for Cursor {
fn from(value: proto::cursor::CursorEvent) -> Self {
Self {
start: (value.position.start.row, value.position.start.col),
end: (value.position.end.row, value.position.end.col),
buffer: value.position.buffer.path,
user: Some(value.user.uuid()),
}
}
}
impl From<Cursor> for proto::cursor::CursorEvent {
fn from(value: Cursor) -> Self {
Self {
user: value.user.unwrap_or_default().into(),
position: proto::cursor::CursorPosition {
buffer: proto::files::BufferNode { path: value.buffer },
start: proto::cursor::RowCol {
row: value.start.0,
col: value.start.1,
},
end: proto::cursor::RowCol {
row: value.end.0,
col: value.end.1,
},
},
}
}
} }

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult}; use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult};
use codemp_proto::cursor::CursorPosition; use codemp_proto::{cursor::{CursorPosition, RowCol}, files::BufferNode};
/// A [Controller] for asynchronously sending and receiving [Cursor] event. /// A [Controller] for asynchronously sending and receiving [Cursor] event.
/// ///
@ -31,7 +31,11 @@ impl Controller<Cursor> for CursorController {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
Ok(self.0.op.send(cursor.into()).await?) Ok(self.0.op.send(CursorPosition {
buffer: BufferNode { path: cursor.buffer },
start: RowCol { row: cursor.start.0, col: cursor.start.1 },
end: RowCol { row: cursor.end.0, col: cursor.end.1 },
}).await?)
} }
async fn try_recv(&self) -> ControllerResult<Option<Cursor>> { async fn try_recv(&self) -> ControllerResult<Option<Cursor>> {

View file

@ -2,14 +2,16 @@ use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid;
use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, ext::IgnorableError}; use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError};
use codemp_proto::cursor::{CursorPosition, CursorEvent}; use codemp_proto::cursor::{CursorPosition, CursorEvent};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
pub(crate) struct CursorWorker { pub(crate) struct CursorWorker {
op: mpsc::Receiver<CursorPosition>, op: mpsc::Receiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>, stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
@ -19,15 +21,10 @@ pub(crate) struct CursorWorker {
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>, callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
} }
impl Default for CursorWorker {
fn default() -> Self {
Self::new(64)
}
}
impl CursorWorker { impl CursorWorker {
fn new(buffer_size: usize) -> Self { pub fn new(user_map: Arc<dashmap::DashMap<Uuid, User>>) -> Self {
let (op_tx, op_rx) = mpsc::channel(buffer_size); // TODO we should tweak the channel buffer size to better propagate backpressure
let (op_tx, op_rx) = mpsc::channel(64);
let (stream_tx, stream_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (cb_tx, cb_rx) = watch::channel(None); let (cb_tx, cb_rx) = watch::channel(None);
@ -41,6 +38,7 @@ impl CursorWorker {
}; };
Self { Self {
op: op_rx, op: op_rx,
map: user_map,
stream: stream_rx, stream: stream_rx,
store: std::collections::VecDeque::default(), store: std::collections::VecDeque::default(),
stop: end_rx, stop: end_rx,
@ -82,7 +80,17 @@ impl ControllerWorker<Cursor> for CursorWorker {
// server sents us a cursor // server sents us a cursor
Ok(Some(cur)) = rx.message() => { Ok(Some(cur)) = rx.message() => {
tracing::debug!("received cursor from server"); tracing::debug!("received cursor from server");
self.store.push_back(cur.into()); let mut cursor = Cursor {
buffer: cur.position.buffer.path,
start: (cur.position.start.row, cur.position.start.col),
end: (cur.position.end.row, cur.position.end.col),
user: None,
};
let user_id = Uuid::from(cur.user);
if let Some(user) = self.map.get(&user_id) {
cursor.user = Some(user.name.clone());
}
self.store.push_back(cursor);
for tx in self.pollers.drain(..) { for tx in self.pollers.drain(..) {
tx.send(()).unwrap_or_warn("poller dropped before unblocking"); tx.send(()).unwrap_or_warn("poller dropped before unblocking");
} }

View file

@ -44,7 +44,7 @@ struct WorkspaceInner {
cursor: cursor::Controller, cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>, buffers: DashMap<String, buffer::Controller>,
filetree: DashSet<String>, filetree: DashSet<String>,
users: DashMap<Uuid, User>, users: Arc<DashMap<Uuid, User>>,
services: Services, services: Services,
// TODO can we drop the mutex? // TODO can we drop the mutex?
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>, events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
@ -70,7 +70,9 @@ impl Workspace {
.await? .await?
.into_inner(); .into_inner();
let worker = CursorWorker::default(); let users = Arc::new(DashMap::default());
let worker = CursorWorker::new(users.clone());
let controller = worker.controller(); let controller = worker.controller();
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("controller worker started"); tracing::debug!("controller worker started");
@ -84,7 +86,7 @@ impl Workspace {
cursor: controller, cursor: controller,
buffers: DashMap::default(), buffers: DashMap::default(),
filetree: DashSet::default(), filetree: DashSet::default(),
users: DashMap::default(), users,
events: tokio::sync::Mutex::new(ev_rx), events: tokio::sync::Mutex::new(ev_rx),
services, services,
})); }));