From e7fa9f4a5ba0800196387a5c595ced4b13c81998 Mon Sep 17 00:00:00 2001 From: cschen Date: Fri, 16 Aug 2024 16:46:32 +0200 Subject: [PATCH] fix: Worker list_users_returns a Vec instead of Vec chore: formatter going to town --- src/cursor/controller.rs | 5 ++- src/workspace/service.rs | 25 +++++++++------ src/workspace/worker.rs | 68 +++++++++++++++++++++++++--------------- 3 files changed, 62 insertions(+), 36 deletions(-) diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 281d412..c6edc38 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -3,7 +3,10 @@ //! a controller implementation for cursor actions use std::sync::Arc; -use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex}; +use tokio::sync::{ + broadcast::{self, error::TryRecvError}, + mpsc, watch, Mutex, +}; use tonic::async_trait; use crate::api::{controller::ControllerCallback, Controller, Cursor}; diff --git a/src/workspace/service.rs b/src/workspace/service.rs index 04f7ffa..7a69b01 100644 --- a/src/workspace/service.rs +++ b/src/workspace/service.rs @@ -1,18 +1,26 @@ -use codemp_proto::{auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, workspace::workspace_client::WorkspaceClient}; -use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}}; - +use codemp_proto::{ + auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, + workspace::workspace_client::WorkspaceClient, +}; +use tonic::{ + service::{interceptor::InterceptedService, Interceptor}, + transport::{Channel, Endpoint}, +}; #[derive(Clone)] pub struct WorkspaceInterceptor { - token: tokio::sync::watch::Receiver + token: tokio::sync::watch::Receiver, } impl Interceptor for WorkspaceInterceptor { - fn call(&mut self, mut request: tonic::Request<()>) -> Result, tonic::Status> { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> Result, tonic::Status> { if let Ok(token) = self.token.borrow().token.parse() { request.metadata_mut().insert("auth", token); } - + Ok(request) } } @@ -29,9 +37,7 @@ pub struct Services { impl Services { pub async fn try_new(dest: &str, token: Token) -> crate::Result { - let channel = Endpoint::from_shared(dest.to_string())? - .connect() - .await?; + let channel = Endpoint::from_shared(dest.to_string())?.connect().await?; let (token_tx, token_rx) = tokio::sync::watch::channel(token); let inter = WorkspaceInterceptor { token: token_rx }; Ok(Self { @@ -61,5 +67,4 @@ impl Services { pub fn cur(&self) -> CursorClient { self.cursor.clone() } - } diff --git a/src/workspace/worker.rs b/src/workspace/worker.rs index 67e1739..7e83dd7 100644 --- a/src/workspace/worker.rs +++ b/src/workspace/worker.rs @@ -1,14 +1,13 @@ use crate::{ - api::{controller::ControllerWorker, Controller, User}, + api::{controller::ControllerWorker, Controller, Event, User}, buffer::{self, worker::BufferWorker}, cursor::{self, worker::CursorWorker}, workspace::service::Services, }; use codemp_proto::{ - common::Empty, auth::Token, - common::Identity, + common::Empty, files::BufferNode, workspace::{ workspace_event::{ @@ -54,14 +53,12 @@ impl Workspace { token: Token, ) -> crate::Result { let services = Services::try_new(dest, token).await?; - let ws_stream = services.ws() - .attach(Empty{}) - .await? - .into_inner(); + let ws_stream = services.ws().attach(Empty {}).await?.into_inner(); let (tx, rx) = mpsc::channel(256); let (ev_tx, ev_rx) = mpsc::unbounded_channel(); - let cur_stream = services.cur() + let cur_stream = services + .cur() .attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .await? .into_inner(); @@ -92,7 +89,11 @@ impl Workspace { Ok(ws) } - pub(crate) fn run_actor(&self, mut stream: Streaming, tx: mpsc::UnboundedSender) { + pub(crate) fn run_actor( + &self, + mut stream: Streaming, + tx: mpsc::UnboundedSender, + ) { // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? let inner = self.0.clone(); let name = self.id(); @@ -109,7 +110,9 @@ impl Workspace { match ev { // user WorkspaceEventInner::Join(UserJoin { user }) => { - inner.users.insert(user.clone().into(), User { id: user.into() }); + inner + .users + .insert(user.clone().into(), User { id: user.into() }); } WorkspaceEventInner::Leave(UserLeave { user }) => { inner.users.remove(&user.into()); @@ -132,7 +135,7 @@ impl Workspace { if tx.send(update).is_err() { tracing::warn!("no active controller to receive workspace event"); } - }, + } } } }); @@ -175,10 +178,7 @@ impl Workspace { tonic::metadata::MetadataValue::try_from(credentials.id.id) .expect("could not represent path as byte sequence"), ); - let stream = self.0.services.buf() - .attach(req) - .await? - .into_inner(); + let stream = self.0.services.buf().attach(req).await?.into_inner(); let worker = BufferWorker::new(self.0.user_id, path); let controller = worker.controller(); @@ -206,17 +206,24 @@ impl Workspace { pub fn detach(&self, path: &str) -> DetachResult { match self.0.buffers.remove(path) { None => DetachResult::NotAttached, - Some((_name, controller)) => if controller.stop() { - DetachResult::Detaching - } else { - DetachResult::AlreadyDetached + Some((_name, controller)) => { + if controller.stop() { + DetachResult::Detaching + } else { + DetachResult::AlreadyDetached + } } } } /// await next workspace [crate::api::Event] and return it - pub async fn event(&self) -> crate::Result { - self.0.events.lock().await.recv().await + pub async fn event(&self) -> crate::Result { + self.0 + .events + .lock() + .await + .recv() + .await .ok_or(crate::Error::Channel { send: false }) } @@ -261,7 +268,7 @@ impl Workspace { /// get a list of the users attached to a specific buffer /// /// TODO: discuss implementation details - pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { + pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { let mut workspace_client = self.0.services.ws(); let buffer_users = workspace_client .list_buffer_users(tonic::Request::new(BufferNode { @@ -269,7 +276,10 @@ impl Workspace { })) .await? .into_inner() - .users; + .users + .into_iter() + .map(|id| id.into()) + .collect(); Ok(buffer_users) } @@ -313,7 +323,11 @@ impl Workspace { /// get a list of all the currently attached to buffers // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn buffer_list(&self) -> Vec { - self.0.buffers.iter().map(|elem| elem.key().clone()).collect() + self.0 + .buffers + .iter() + .map(|elem| elem.key().clone()) + .collect() } /// get the currently cached "filetree" @@ -327,7 +341,11 @@ impl Drop for WorkspaceInner { fn drop(&mut self) { for entry in self.buffers.iter() { if !entry.value().stop() { - tracing::warn!("could not stop buffer worker {} for workspace {}", entry.value().name(), self.id); + tracing::warn!( + "could not stop buffer worker {} for workspace {}", + entry.value().name(), + self.id + ); } } if !self.cursor.stop() {