From ddbad59ae2fa70982bd1f115c17aa1807b5cd2da Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 28 Sep 2024 03:56:57 +0200 Subject: [PATCH] feat: implemented AsyncReceiver for Workspace... ... its very bad tho, very very bad Co-authored-by: zaaarf --- src/workspace.rs | 50 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/src/workspace.rs b/src/workspace.rs index c4c1d9e..87d2f83 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,7 +4,7 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{Event, User}, + api::{controller::{AsyncReceiver, ControllerCallback}, Event, User}, buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, @@ -24,7 +24,7 @@ use codemp_proto::{ use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc::error::TryRecvError, mpsc}; use tonic::Streaming; use uuid::Uuid; @@ -55,6 +55,39 @@ struct WorkspaceInner { users: Arc>, // TODO can we drop the mutex? events: tokio::sync::Mutex>, + callback: std::sync::Mutex>>, // TODO lmao another one +} + +impl AsyncReceiver for Workspace { + async fn try_recv(&self) -> ControllerResult> { + match self.0 + .events + .lock() + .await + .try_recv() + { + Ok(x) => Ok(Some(x)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped), + } + } + + async fn poll(&self) -> ControllerResult<()> { + loop { + if !self.0.events.lock().await.is_empty() { break Ok(()) } + // TODO disgusting, please send help + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + + // TODO please send HELP ASAP this is hurting me emotionally + fn clear_callback(&self) { + *self.0.callback.lock().expect("mutex poisoned") = None; + } + + fn callback(&self, cb: impl Into>) { + *self.0.callback.lock().expect("mutex poisoned") = Some(cb.into()); + } } impl Workspace { @@ -91,6 +124,7 @@ impl Workspace { users, events: tokio::sync::Mutex::new(ev_rx), services, + callback: std::sync::Mutex::new(None), })); ws.fetch_users().await?; @@ -161,18 +195,6 @@ impl Workspace { } } - /// Await next workspace [Event] and return it when it arrives. - // TODO this method is weird and ugly, can we make it more standard? - pub async fn event(&self) -> ControllerResult { - self.0 - .events - .lock() - .await - .recv() - .await - .ok_or(crate::errors::ControllerError::Unfulfilled) - } - /// Re-fetch the list of available buffers in the workspace. pub async fn fetch_buffers(&self) -> RemoteResult<()> { let mut workspace_client = self.0.services.ws();