fix: Worker list_users_returns a Vec<User> instead of Vec<Identity>

chore: formatter going to town
This commit is contained in:
cschen 2024-08-16 16:46:32 +02:00
parent 9d5aae461f
commit e7fa9f4a5b
3 changed files with 62 additions and 36 deletions

View file

@ -3,7 +3,10 @@
//! a controller implementation for cursor actions //! a controller implementation for cursor actions
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast::{self, error::TryRecvError}, mpsc, watch, Mutex}; use tokio::sync::{
broadcast::{self, error::TryRecvError},
mpsc, watch, Mutex,
};
use tonic::async_trait; use tonic::async_trait;
use crate::api::{controller::ControllerCallback, Controller, Cursor}; use crate::api::{controller::ControllerCallback, Controller, Cursor};

View file

@ -1,18 +1,26 @@
use codemp_proto::{auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient, workspace::workspace_client::WorkspaceClient}; use codemp_proto::{
use tonic::{service::{interceptor::InterceptedService, Interceptor}, transport::{Channel, Endpoint}}; auth::Token, buffer::buffer_client::BufferClient, cursor::cursor_client::CursorClient,
workspace::workspace_client::WorkspaceClient,
};
use tonic::{
service::{interceptor::InterceptedService, Interceptor},
transport::{Channel, Endpoint},
};
#[derive(Clone)] #[derive(Clone)]
pub struct WorkspaceInterceptor { pub struct WorkspaceInterceptor {
token: tokio::sync::watch::Receiver<Token> token: tokio::sync::watch::Receiver<Token>,
} }
impl Interceptor for WorkspaceInterceptor { impl Interceptor for WorkspaceInterceptor {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> { fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Ok(token) = self.token.borrow().token.parse() { if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token); request.metadata_mut().insert("auth", token);
} }
Ok(request) Ok(request)
} }
} }
@ -29,9 +37,7 @@ pub struct Services {
impl Services { impl Services {
pub async fn try_new(dest: &str, token: Token) -> crate::Result<Self> { pub async fn try_new(dest: &str, token: Token) -> crate::Result<Self> {
let channel = Endpoint::from_shared(dest.to_string())? let channel = Endpoint::from_shared(dest.to_string())?.connect().await?;
.connect()
.await?;
let (token_tx, token_rx) = tokio::sync::watch::channel(token); let (token_tx, token_rx) = tokio::sync::watch::channel(token);
let inter = WorkspaceInterceptor { token: token_rx }; let inter = WorkspaceInterceptor { token: token_rx };
Ok(Self { Ok(Self {
@ -61,5 +67,4 @@ impl Services {
pub fn cur(&self) -> CursorClient<AuthedService> { pub fn cur(&self) -> CursorClient<AuthedService> {
self.cursor.clone() self.cursor.clone()
} }
} }

View file

@ -1,14 +1,13 @@
use crate::{ use crate::{
api::{controller::ControllerWorker, Controller, User}, api::{controller::ControllerWorker, Controller, Event, User},
buffer::{self, worker::BufferWorker}, buffer::{self, worker::BufferWorker},
cursor::{self, worker::CursorWorker}, cursor::{self, worker::CursorWorker},
workspace::service::Services, workspace::service::Services,
}; };
use codemp_proto::{ use codemp_proto::{
common::Empty,
auth::Token, auth::Token,
common::Identity, common::Empty,
files::BufferNode, files::BufferNode,
workspace::{ workspace::{
workspace_event::{ workspace_event::{
@ -54,14 +53,12 @@ impl Workspace {
token: Token, token: Token,
) -> crate::Result<Self> { ) -> crate::Result<Self> {
let services = Services::try_new(dest, token).await?; let services = Services::try_new(dest, token).await?;
let ws_stream = services.ws() let ws_stream = services.ws().attach(Empty {}).await?.into_inner();
.attach(Empty{})
.await?
.into_inner();
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let (ev_tx, ev_rx) = mpsc::unbounded_channel(); let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let cur_stream = services.cur() let cur_stream = services
.cur()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await? .await?
.into_inner(); .into_inner();
@ -92,7 +89,11 @@ impl Workspace {
Ok(ws) Ok(ws)
} }
pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>, tx: mpsc::UnboundedSender<crate::api::Event>) { pub(crate) fn run_actor(
&self,
mut stream: Streaming<WorkspaceEvent>,
tx: mpsc::UnboundedSender<crate::api::Event>,
) {
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
let inner = self.0.clone(); let inner = self.0.clone();
let name = self.id(); let name = self.id();
@ -109,7 +110,9 @@ impl Workspace {
match ev { match ev {
// user // user
WorkspaceEventInner::Join(UserJoin { user }) => { WorkspaceEventInner::Join(UserJoin { user }) => {
inner.users.insert(user.clone().into(), User { id: user.into() }); inner
.users
.insert(user.clone().into(), User { id: user.into() });
} }
WorkspaceEventInner::Leave(UserLeave { user }) => { WorkspaceEventInner::Leave(UserLeave { user }) => {
inner.users.remove(&user.into()); inner.users.remove(&user.into());
@ -132,7 +135,7 @@ impl Workspace {
if tx.send(update).is_err() { if tx.send(update).is_err() {
tracing::warn!("no active controller to receive workspace event"); tracing::warn!("no active controller to receive workspace event");
} }
}, }
} }
} }
}); });
@ -175,10 +178,7 @@ impl Workspace {
tonic::metadata::MetadataValue::try_from(credentials.id.id) tonic::metadata::MetadataValue::try_from(credentials.id.id)
.expect("could not represent path as byte sequence"), .expect("could not represent path as byte sequence"),
); );
let stream = self.0.services.buf() let stream = self.0.services.buf().attach(req).await?.into_inner();
.attach(req)
.await?
.into_inner();
let worker = BufferWorker::new(self.0.user_id, path); let worker = BufferWorker::new(self.0.user_id, path);
let controller = worker.controller(); let controller = worker.controller();
@ -206,17 +206,24 @@ impl Workspace {
pub fn detach(&self, path: &str) -> DetachResult { pub fn detach(&self, path: &str) -> DetachResult {
match self.0.buffers.remove(path) { match self.0.buffers.remove(path) {
None => DetachResult::NotAttached, None => DetachResult::NotAttached,
Some((_name, controller)) => if controller.stop() { Some((_name, controller)) => {
DetachResult::Detaching if controller.stop() {
} else { DetachResult::Detaching
DetachResult::AlreadyDetached } else {
DetachResult::AlreadyDetached
}
} }
} }
} }
/// await next workspace [crate::api::Event] and return it /// await next workspace [crate::api::Event] and return it
pub async fn event(&self) -> crate::Result<crate::api::Event> { pub async fn event(&self) -> crate::Result<Event> {
self.0.events.lock().await.recv().await self.0
.events
.lock()
.await
.recv()
.await
.ok_or(crate::Error::Channel { send: false }) .ok_or(crate::Error::Channel { send: false })
} }
@ -261,7 +268,7 @@ impl Workspace {
/// get a list of the users attached to a specific buffer /// get a list of the users attached to a specific buffer
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> { pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<User>> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
let buffer_users = workspace_client let buffer_users = workspace_client
.list_buffer_users(tonic::Request::new(BufferNode { .list_buffer_users(tonic::Request::new(BufferNode {
@ -269,7 +276,10 @@ impl Workspace {
})) }))
.await? .await?
.into_inner() .into_inner()
.users; .users
.into_iter()
.map(|id| id.into())
.collect();
Ok(buffer_users) Ok(buffer_users)
} }
@ -313,7 +323,11 @@ impl Workspace {
/// get a list of all the currently attached to buffers /// get a list of all the currently attached to buffers
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
pub fn buffer_list(&self) -> Vec<String> { pub fn buffer_list(&self) -> Vec<String> {
self.0.buffers.iter().map(|elem| elem.key().clone()).collect() self.0
.buffers
.iter()
.map(|elem| elem.key().clone())
.collect()
} }
/// get the currently cached "filetree" /// get the currently cached "filetree"
@ -327,7 +341,11 @@ impl Drop for WorkspaceInner {
fn drop(&mut self) { fn drop(&mut self) {
for entry in self.buffers.iter() { for entry in self.buffers.iter() {
if !entry.value().stop() { if !entry.value().stop() {
tracing::warn!("could not stop buffer worker {} for workspace {}", entry.value().name(), self.id); tracing::warn!(
"could not stop buffer worker {} for workspace {}",
entry.value().name(),
self.id
);
} }
} }
if !self.cursor.stop() { if !self.cursor.stop() {