2024-09-04 20:03:34 +02:00
|
|
|
//! ### Workspace
|
|
|
|
//! A workspace represents a development environment. It contains any number of buffers and
|
|
|
|
//! tracks cursor movements across them.
|
|
|
|
//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems.
|
|
|
|
|
|
|
|
use crate::{
|
2024-09-28 03:56:57 +02:00
|
|
|
api::{controller::{AsyncReceiver, ControllerCallback}, Event, User},
|
2024-09-26 02:29:13 +02:00
|
|
|
buffer, cursor,
|
2024-09-04 20:03:34 +02:00
|
|
|
errors::{ConnectionResult, ControllerResult, RemoteResult},
|
|
|
|
ext::InternallyMutable,
|
2024-09-19 21:32:46 +02:00
|
|
|
network::Services,
|
2024-09-04 20:03:34 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
use codemp_proto::{
|
|
|
|
common::{Empty, Token},
|
|
|
|
files::BufferNode,
|
|
|
|
workspace::{
|
|
|
|
workspace_event::{
|
|
|
|
Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave,
|
|
|
|
},
|
|
|
|
WorkspaceEvent,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
use dashmap::{DashMap, DashSet};
|
|
|
|
use std::{collections::BTreeSet, sync::Arc};
|
2024-09-28 03:56:57 +02:00
|
|
|
use tokio::sync::{mpsc::error::TryRecvError, mpsc};
|
2024-09-04 20:03:34 +02:00
|
|
|
use tonic::Streaming;
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
#[cfg(feature = "js")]
|
|
|
|
use napi_derive::napi;
|
|
|
|
|
2024-09-21 11:52:46 +02:00
|
|
|
/// A currently active shared development environment
|
|
|
|
///
|
|
|
|
/// Workspaces encapsulate a working environment: cursor positions, filetree, user list
|
|
|
|
/// and more. Each holds a [cursor::Controller] and a map of [buffer::Controller]s.
|
|
|
|
/// Using a workspace handle, it's possible to receive events (user join/leave, filetree updates)
|
|
|
|
/// and create/delete/attach to new buffers.
|
2024-09-04 20:03:34 +02:00
|
|
|
#[derive(Debug, Clone)]
|
2024-09-19 21:32:46 +02:00
|
|
|
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass)]
|
2024-09-04 20:03:34 +02:00
|
|
|
#[cfg_attr(feature = "js", napi)]
|
|
|
|
pub struct Workspace(Arc<WorkspaceInner>);
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct WorkspaceInner {
|
|
|
|
name: String,
|
|
|
|
user: User, // TODO back-reference to global user id... needed for buffer controllers
|
|
|
|
cursor: cursor::Controller,
|
|
|
|
buffers: DashMap<String, buffer::Controller>,
|
2024-09-26 02:29:13 +02:00
|
|
|
services: Services,
|
|
|
|
// TODO these two are Arced so that the inner worker can hold them without holding the
|
|
|
|
// WorkspaceInner itself, otherwise its impossible to drop Workspace
|
2024-09-04 20:03:34 +02:00
|
|
|
filetree: DashSet<String>,
|
2024-09-05 23:59:05 +02:00
|
|
|
users: Arc<DashMap<Uuid, User>>,
|
2024-09-04 20:03:34 +02:00
|
|
|
// TODO can we drop the mutex?
|
|
|
|
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
|
2024-09-28 03:56:57 +02:00
|
|
|
callback: std::sync::Mutex<Option<ControllerCallback<Workspace>>>, // TODO lmao another one
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AsyncReceiver<Event> for Workspace {
|
|
|
|
async fn try_recv(&self) -> ControllerResult<Option<Event>> {
|
|
|
|
match self.0
|
|
|
|
.events
|
|
|
|
.lock()
|
|
|
|
.await
|
|
|
|
.try_recv()
|
|
|
|
{
|
|
|
|
Ok(x) => Ok(Some(x)),
|
|
|
|
Err(TryRecvError::Empty) => Ok(None),
|
|
|
|
Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn poll(&self) -> ControllerResult<()> {
|
|
|
|
loop {
|
|
|
|
if !self.0.events.lock().await.is_empty() { break Ok(()) }
|
|
|
|
// TODO disgusting, please send help
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO please send HELP ASAP this is hurting me emotionally
|
|
|
|
fn clear_callback(&self) {
|
|
|
|
*self.0.callback.lock().expect("mutex poisoned") = None;
|
|
|
|
}
|
|
|
|
|
|
|
|
fn callback(&self, cb: impl Into<ControllerCallback<Self>>) {
|
|
|
|
*self.0.callback.lock().expect("mutex poisoned") = Some(cb.into());
|
|
|
|
}
|
2024-09-04 20:03:34 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Workspace {
|
2024-09-16 21:11:42 +02:00
|
|
|
pub(crate) async fn connect(
|
2024-09-04 20:03:34 +02:00
|
|
|
name: String,
|
|
|
|
user: User,
|
2024-09-11 15:12:31 +02:00
|
|
|
config: crate::api::Config,
|
2024-09-04 20:03:34 +02:00
|
|
|
token: Token,
|
|
|
|
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>, // TODO ughh receiving this
|
|
|
|
) -> ConnectionResult<Self> {
|
|
|
|
let workspace_claim = InternallyMutable::new(token);
|
2024-09-19 21:32:46 +02:00
|
|
|
let services =
|
|
|
|
Services::try_new(&config.endpoint(), claims, workspace_claim.channel()).await?;
|
2024-09-04 20:03:34 +02:00
|
|
|
let ws_stream = services.ws().attach(Empty {}).await?.into_inner();
|
|
|
|
|
|
|
|
let (tx, rx) = mpsc::channel(128);
|
|
|
|
let (ev_tx, ev_rx) = mpsc::unbounded_channel();
|
|
|
|
let cur_stream = services
|
|
|
|
.cur()
|
|
|
|
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
|
|
|
|
.await?
|
|
|
|
.into_inner();
|
|
|
|
|
2024-09-05 23:59:05 +02:00
|
|
|
let users = Arc::new(DashMap::default());
|
|
|
|
|
2024-09-26 02:29:13 +02:00
|
|
|
let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream);
|
2024-09-04 20:03:34 +02:00
|
|
|
|
|
|
|
let ws = Self(Arc::new(WorkspaceInner {
|
|
|
|
name,
|
|
|
|
user,
|
|
|
|
cursor: controller,
|
|
|
|
buffers: DashMap::default(),
|
|
|
|
filetree: DashSet::default(),
|
2024-09-05 23:59:05 +02:00
|
|
|
users,
|
2024-09-04 20:03:34 +02:00
|
|
|
events: tokio::sync::Mutex::new(ev_rx),
|
|
|
|
services,
|
2024-09-28 03:56:57 +02:00
|
|
|
callback: std::sync::Mutex::new(None),
|
2024-09-04 20:03:34 +02:00
|
|
|
}));
|
|
|
|
|
|
|
|
ws.fetch_users().await?;
|
|
|
|
ws.fetch_buffers().await?;
|
|
|
|
ws.run_actor(ws_stream, ev_tx);
|
|
|
|
|
|
|
|
Ok(ws)
|
|
|
|
}
|
|
|
|
|
2024-09-26 05:11:44 +02:00
|
|
|
/// drop arc, return true if was last
|
|
|
|
pub(crate) fn consume(self) -> bool {
|
|
|
|
Arc::into_inner(self.0).is_some()
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Create a new buffer in the current workspace.
|
2024-09-04 20:03:34 +02:00
|
|
|
pub async fn create(&self, path: &str) -> RemoteResult<()> {
|
|
|
|
let mut workspace_client = self.0.services.ws();
|
|
|
|
workspace_client
|
|
|
|
.create_buffer(tonic::Request::new(BufferNode {
|
|
|
|
path: path.to_string(),
|
|
|
|
}))
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
// add to filetree
|
|
|
|
self.0.filetree.insert(path.to_string());
|
|
|
|
|
|
|
|
// fetch buffers
|
|
|
|
self.fetch_buffers().await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Attach to a buffer and return a handle to it.
|
2024-09-04 20:03:34 +02:00
|
|
|
pub async fn attach(&self, path: &str) -> ConnectionResult<buffer::Controller> {
|
|
|
|
let mut worskspace_client = self.0.services.ws();
|
|
|
|
let request = tonic::Request::new(BufferNode {
|
|
|
|
path: path.to_string(),
|
|
|
|
});
|
|
|
|
let credentials = worskspace_client.access_buffer(request).await?.into_inner();
|
|
|
|
|
|
|
|
let (tx, rx) = mpsc::channel(256);
|
|
|
|
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
|
2024-09-19 21:32:46 +02:00
|
|
|
req.metadata_mut().insert(
|
|
|
|
"buffer",
|
|
|
|
tonic::metadata::MetadataValue::try_from(credentials.token).map_err(|e| {
|
|
|
|
tonic::Status::internal(format!("failed representing token to string: {e}"))
|
|
|
|
})?,
|
|
|
|
);
|
2024-09-04 20:03:34 +02:00
|
|
|
let stream = self.0.services.buf().attach(req).await?.into_inner();
|
|
|
|
|
2024-09-26 02:29:13 +02:00
|
|
|
let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream);
|
2024-09-04 20:03:34 +02:00
|
|
|
self.0.buffers.insert(path.to_string(), controller.clone());
|
|
|
|
|
|
|
|
Ok(controller)
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Detach from an active buffer.
|
2024-09-04 20:03:34 +02:00
|
|
|
///
|
2024-09-26 02:29:13 +02:00
|
|
|
/// This will stop and drop its [`buffer::Controller`].
|
|
|
|
///
|
|
|
|
/// Returns `true` if connectly dropped or wasn't present, `false` if dropped but wasn't last ref
|
|
|
|
///
|
|
|
|
/// If this method returns `false` you have a dangling ref, maybe just waiting for garbage
|
|
|
|
/// collection or maybe preventing the controller from being dropped completely
|
|
|
|
#[allow(clippy::redundant_pattern_matching)] // all cases are clearer this way
|
|
|
|
pub fn detach(&self, path: &str) -> bool {
|
2024-09-04 20:03:34 +02:00
|
|
|
match self.0.buffers.remove(path) {
|
2024-09-26 02:29:13 +02:00
|
|
|
None => true, // noop: we werent attached in the first place
|
|
|
|
Some((_name, controller)) => match Arc::into_inner(controller.0) {
|
2024-10-01 00:42:57 +02:00
|
|
|
None => false, // dangling ref! we can't drop this
|
2024-09-26 02:29:13 +02:00
|
|
|
Some(_) => true, // dropping it now
|
2024-10-01 00:42:57 +02:00
|
|
|
},
|
2024-09-04 20:03:34 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Re-fetch the list of available buffers in the workspace.
|
2024-09-04 20:03:34 +02:00
|
|
|
pub async fn fetch_buffers(&self) -> RemoteResult<()> {
|
|
|
|
let mut workspace_client = self.0.services.ws();
|
|
|
|
let buffers = workspace_client
|
|
|
|
.list_buffers(tonic::Request::new(Empty {}))
|
|
|
|
.await?
|
|
|
|
.into_inner()
|
|
|
|
.buffers;
|
|
|
|
|
|
|
|
self.0.filetree.clear();
|
|
|
|
for b in buffers {
|
|
|
|
self.0.filetree.insert(b.path);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Re-fetch the list of all users in the workspace.
|
2024-09-04 20:03:34 +02:00
|
|
|
pub async fn fetch_users(&self) -> RemoteResult<()> {
|
|
|
|
let mut workspace_client = self.0.services.ws();
|
|
|
|
let users = BTreeSet::from_iter(
|
|
|
|
workspace_client
|
|
|
|
.list_users(tonic::Request::new(Empty {}))
|
|
|
|
.await?
|
|
|
|
.into_inner()
|
|
|
|
.users
|
|
|
|
.into_iter()
|
|
|
|
.map(User::from),
|
|
|
|
);
|
|
|
|
|
|
|
|
self.0.users.clear();
|
|
|
|
for u in users {
|
|
|
|
self.0.users.insert(u.id, u);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Get a list of the [User]s attached to a specific buffer.
|
2024-09-04 20:03:34 +02:00
|
|
|
pub async fn list_buffer_users(&self, path: &str) -> RemoteResult<Vec<User>> {
|
|
|
|
let mut workspace_client = self.0.services.ws();
|
|
|
|
let buffer_users = workspace_client
|
|
|
|
.list_buffer_users(tonic::Request::new(BufferNode {
|
|
|
|
path: path.to_string(),
|
|
|
|
}))
|
|
|
|
.await?
|
|
|
|
.into_inner()
|
|
|
|
.users
|
|
|
|
.into_iter()
|
|
|
|
.map(|id| id.into())
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
Ok(buffer_users)
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Delete a buffer.
|
2024-09-04 20:03:34 +02:00
|
|
|
pub async fn delete(&self, path: &str) -> RemoteResult<()> {
|
2024-09-26 02:29:13 +02:00
|
|
|
self.detach(path); // just in case
|
|
|
|
|
2024-09-04 20:03:34 +02:00
|
|
|
let mut workspace_client = self.0.services.ws();
|
|
|
|
workspace_client
|
|
|
|
.delete_buffer(tonic::Request::new(BufferNode {
|
|
|
|
path: path.to_string(),
|
|
|
|
}))
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
self.0.filetree.remove(path);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Get the workspace unique id.
|
2024-09-04 20:03:34 +02:00
|
|
|
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
|
|
|
|
pub fn id(&self) -> String {
|
|
|
|
self.0.name.clone()
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Return a handle to the [`cursor::Controller`].
|
2024-09-04 20:03:34 +02:00
|
|
|
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
|
|
|
|
pub fn cursor(&self) -> cursor::Controller {
|
|
|
|
self.0.cursor.clone()
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Return a handle to the [buffer::Controller] with the given path, if present.
|
2024-09-04 20:03:34 +02:00
|
|
|
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
|
|
|
|
pub fn buffer_by_name(&self, path: &str) -> Option<buffer::Controller> {
|
|
|
|
self.0.buffers.get(path).map(|x| x.clone())
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Get a list of all the currently attached buffers.
|
2024-09-04 20:03:34 +02:00
|
|
|
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
|
|
|
|
pub fn buffer_list(&self) -> Vec<String> {
|
|
|
|
self.0
|
|
|
|
.buffers
|
|
|
|
.iter()
|
|
|
|
.map(|elem| elem.key().clone())
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2024-09-21 12:05:09 +02:00
|
|
|
/// Get all names of users currently in this workspace
|
|
|
|
pub fn user_list(&self) -> Vec<String> {
|
|
|
|
self.0
|
|
|
|
.users
|
|
|
|
.iter()
|
|
|
|
.map(|elem| elem.value().name.clone())
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Get the filetree as it is currently cached.
|
2024-09-13 20:02:42 +02:00
|
|
|
/// A filter may be applied, and it may be strict (equality check) or not (starts_with check).
|
2024-09-04 20:03:34 +02:00
|
|
|
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
|
2024-09-13 20:02:42 +02:00
|
|
|
pub fn filetree(&self, filter: Option<&str>, strict: bool) -> Vec<String> {
|
2024-09-30 20:30:55 +02:00
|
|
|
let mut tree = self.0
|
2024-09-19 21:32:46 +02:00
|
|
|
.filetree
|
|
|
|
.iter()
|
|
|
|
.filter(|f| {
|
|
|
|
filter.map_or(true, |flt| {
|
|
|
|
if strict {
|
|
|
|
f.as_str() == flt
|
|
|
|
} else {
|
|
|
|
f.starts_with(flt)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
2024-09-04 20:03:34 +02:00
|
|
|
.map(|f| f.clone())
|
2024-09-30 20:30:55 +02:00
|
|
|
.collect::<Vec<String>>();
|
|
|
|
tree.sort();
|
|
|
|
tree
|
2024-09-04 20:03:34 +02:00
|
|
|
}
|
2024-09-04 21:37:35 +02:00
|
|
|
|
|
|
|
pub(crate) fn run_actor(
|
|
|
|
&self,
|
|
|
|
mut stream: Streaming<WorkspaceEvent>,
|
|
|
|
tx: mpsc::UnboundedSender<crate::api::Event>,
|
|
|
|
) {
|
|
|
|
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
|
2024-09-26 02:29:13 +02:00
|
|
|
let weak = Arc::downgrade(&self.0);
|
2024-09-04 21:37:35 +02:00
|
|
|
let name = self.id();
|
|
|
|
tokio::spawn(async move {
|
2024-09-26 05:11:44 +02:00
|
|
|
tracing::debug!("workspace worker starting");
|
2024-09-04 21:37:35 +02:00
|
|
|
loop {
|
2024-09-26 02:29:13 +02:00
|
|
|
// TODO can we stop responsively rather than poll for Arc being dropped?
|
2024-10-01 00:42:57 +02:00
|
|
|
if weak.upgrade().is_none() {
|
|
|
|
break;
|
|
|
|
};
|
2024-09-26 02:29:13 +02:00
|
|
|
let Some(res) = tokio::select!(
|
|
|
|
x = stream.message() => Some(x),
|
|
|
|
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None,
|
2024-10-01 00:42:57 +02:00
|
|
|
) else {
|
|
|
|
continue;
|
|
|
|
};
|
2024-09-26 02:29:13 +02:00
|
|
|
match res {
|
2024-09-04 21:37:35 +02:00
|
|
|
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
|
|
|
|
Ok(None) => break tracing::info!("leaving workspace {}", name),
|
|
|
|
Ok(Some(WorkspaceEvent { event: None })) => {
|
|
|
|
tracing::warn!("workspace {} received empty event", name)
|
|
|
|
}
|
|
|
|
Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
|
2024-09-26 02:29:13 +02:00
|
|
|
let Some(inner) = weak.upgrade() else { break };
|
2024-09-04 21:37:35 +02:00
|
|
|
let update = crate::api::Event::from(&ev);
|
|
|
|
match ev {
|
|
|
|
// user
|
|
|
|
WorkspaceEventInner::Join(UserJoin { user }) => {
|
2024-09-19 21:32:46 +02:00
|
|
|
inner.users.insert(user.id.uuid(), user.into());
|
2024-09-04 21:37:35 +02:00
|
|
|
}
|
|
|
|
WorkspaceEventInner::Leave(UserLeave { user }) => {
|
|
|
|
inner.users.remove(&user.id.uuid());
|
|
|
|
}
|
|
|
|
// buffer
|
|
|
|
WorkspaceEventInner::Create(FileCreate { path }) => {
|
|
|
|
inner.filetree.insert(path);
|
|
|
|
}
|
|
|
|
WorkspaceEventInner::Rename(FileRename { before, after }) => {
|
|
|
|
inner.filetree.remove(&before);
|
|
|
|
inner.filetree.insert(after);
|
|
|
|
}
|
|
|
|
WorkspaceEventInner::Delete(FileDelete { path }) => {
|
|
|
|
inner.filetree.remove(&path);
|
2024-09-26 02:29:13 +02:00
|
|
|
let _ = inner.buffers.remove(&path);
|
2024-09-04 21:37:35 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if tx.send(update).is_err() {
|
|
|
|
tracing::warn!("no active controller to receive workspace event");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-09-26 05:11:44 +02:00
|
|
|
tracing::debug!("workspace worker stopping");
|
2024-09-04 21:37:35 +02:00
|
|
|
});
|
|
|
|
}
|
2024-09-04 20:03:34 +02:00
|
|
|
}
|