feat: more capillar errors

so its clear that some functions wont return a transport error or an rpc
error
This commit is contained in:
əlemi 2024-09-01 02:46:03 +02:00 committed by zaaarf
parent 0b1a542ed5
commit b98be22a8b
No known key found for this signature in database
GPG key ID: 102E445F4C3F829B
10 changed files with 145 additions and 156 deletions

View file

@ -3,7 +3,7 @@
//! an bidirectional stream handler to easily manage async operations across local buffers and the //! an bidirectional stream handler to easily manage async operations across local buffers and the
//! server //! server
use crate::Result; use crate::errors::ControllerResult;
#[async_trait::async_trait] #[async_trait::async_trait]
pub(crate) trait ControllerWorker<T : Sized + Send + Sync> { pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
@ -15,6 +15,9 @@ pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
async fn work(self, tx: Self::Tx, rx: Self::Rx); async fn work(self, tx: Self::Tx, rx: Self::Rx);
} }
// note that we don't use thiserror's #[from] because we don't want the error structs to contain
// these foreign types, and also we want these to be easily constructable
/// async and threadsafe handle to a generic bidirectional stream /// async and threadsafe handle to a generic bidirectional stream
/// ///
/// this generic trait is implemented by actors managing stream procedures. /// this generic trait is implemented by actors managing stream procedures.
@ -29,14 +32,14 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// ///
/// success or failure of this function does not imply validity of sent operation, /// success or failure of this function does not imply validity of sent operation,
/// because it's integrated asynchronously on the background worker /// because it's integrated asynchronously on the background worker
async fn send(&self, x: T) -> Result<()>; async fn send(&self, x: T) -> ControllerResult<()>;
/// get next value from other users, blocking until one is available /// get next value from other users, blocking until one is available
/// ///
/// this is just an async trait function wrapped by `async_trait`: /// this is just an async trait function wrapped by `async_trait`:
/// ///
/// `async fn recv(&self) -> codemp::Result<T>;` /// `async fn recv(&self) -> codemp::ControllerResult<T>;`
async fn recv(&self) -> Result<T> { async fn recv(&self) -> ControllerResult<T> {
loop { loop {
self.poll().await?; self.poll().await?;
if let Some(x) = self.try_recv().await? { if let Some(x) = self.try_recv().await? {
@ -57,11 +60,11 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// ///
/// this is just an async trait function wrapped by `async_trait`: /// this is just an async trait function wrapped by `async_trait`:
/// ///
/// `async fn poll(&self) -> codemp::Result<()>;` /// `async fn poll(&self) -> codemp::ControllerResult<()>;`
async fn poll(&self) -> Result<()>; async fn poll(&self) -> ControllerResult<()>;
/// attempt to receive a value without blocking, return None if nothing is available /// attempt to receive a value without blocking, return None if nothing is available
async fn try_recv(&self) -> Result<Option<T>>; async fn try_recv(&self) -> ControllerResult<Option<T>>;
/// stop underlying worker /// stop underlying worker
/// ///
@ -75,7 +78,7 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// type wrapper for Boxed dyn callback /// type wrapper for Boxed dyn callback
pub struct ControllerCallback<T>(Box<dyn Sync + Send + Fn(T)>); pub struct ControllerCallback<T>(pub Box<dyn Sync + Send + Fn(T)>);
impl<T> ControllerCallback<T> { impl<T> ControllerCallback<T> {
pub fn call(&self, x: T) { pub fn call(&self, x: T) {

View file

@ -8,11 +8,9 @@ use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::api::controller::ControllerCallback; use crate::api::controller::{Controller, ControllerCallback};
use crate::api::Controller;
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::ControllerResult;
use crate::ext::InternallyMutable; use crate::ext::InternallyMutable;
use super::worker::DeltaRequest; use super::worker::DeltaRequest;
@ -36,7 +34,7 @@ impl BufferController {
} }
/// return buffer whole content, updating internal buffer previous state /// return buffer whole content, updating internal buffer previous state
pub async fn content(&self) -> crate::Result<String> { pub async fn content(&self) -> ControllerResult<String> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.content_request.send(tx).await?; self.0.content_request.send(tx).await?;
let content = rx.await?; let content = rx.await?;
@ -64,20 +62,19 @@ pub(crate) struct BufferControllerInner {
impl Controller<TextChange> for BufferController { impl Controller<TextChange> for BufferController {
/// block until a text change is available /// block until a text change is available
/// this returns immediately if one is already available /// this returns immediately if one is already available
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> ControllerResult<()> {
if self.0.last_update.get() != *self.0.latest_version.borrow() { if self.0.last_update.get() != *self.0.latest_version.borrow() {
return Ok(()); return Ok(());
} }
let (tx, rx) = oneshot::channel::<()>(); let (tx, rx) = oneshot::channel::<()>();
self.0.poller.send(tx)?; self.0.poller.send(tx)?;
rx.await rx.await?;
.map_err(|_| crate::Error::Channel { send: false })?;
Ok(()) Ok(())
} }
/// if a text change is available, return it immediately /// if a text change is available, return it immediately
async fn try_recv(&self) -> crate::Result<Option<TextChange>> { async fn try_recv(&self) -> ControllerResult<Option<TextChange>> {
let last_update = self.0.last_update.get(); let last_update = self.0.last_update.get();
let latest_version = self.0.latest_version.borrow().clone(); let latest_version = self.0.latest_version.borrow().clone();
@ -94,7 +91,7 @@ impl Controller<TextChange> for BufferController {
/// enqueue a text change for processing /// enqueue a text change for processing
/// this also updates internal buffer previous state /// this also updates internal buffer previous state
async fn send(&self, op: TextChange) -> crate::Result<()> { async fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back. // we let the worker do the updating to the last version and send it back.
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?; self.0.ops_in.send((op, tx))?;

View file

@ -7,7 +7,7 @@ use std::sync::Arc;
use dashmap::DashMap; use dashmap::DashMap;
use tonic::{service::interceptor::InterceptedService, transport::{Channel, Endpoint}}; use tonic::{service::interceptor::InterceptedService, transport::{Channel, Endpoint}};
use crate::{api::User, ext::InternallyMutable, workspace::Workspace}; use crate::{api::User, errors::{ConnectionResult, ProcedureResult}, ext::InternallyMutable, workspace::Workspace};
use codemp_proto::{ use codemp_proto::{
auth::{auth_client::AuthClient, LoginRequest}, auth::{auth_client::AuthClient, LoginRequest},
common::{Empty, Token}, session::{session_client::SessionClient, InviteRequest, WorkspaceRequest}, common::{Empty, Token}, session::{session_client::SessionClient, InviteRequest, WorkspaceRequest},
@ -42,7 +42,7 @@ impl Client {
host: impl AsRef<str>, host: impl AsRef<str>,
username: impl AsRef<str>, username: impl AsRef<str>,
password: impl AsRef<str>, password: impl AsRef<str>,
) -> crate::Result<Self> { ) -> ConnectionResult<Self> {
let host = if host.as_ref().starts_with("http") { let host = if host.as_ref().starts_with("http") {
host.as_ref().to_string() host.as_ref().to_string()
} else { } else {
@ -75,7 +75,7 @@ impl Client {
} }
/// refresh session token /// refresh session token
pub async fn refresh(&self) -> crate::Result<()> { pub async fn refresh(&self) -> ProcedureResult<()> {
let new_token = self.0.auth.clone().refresh(self.0.claims.get()) let new_token = self.0.auth.clone().refresh(self.0.claims.get())
.await? .await?
.into_inner(); .into_inner();
@ -84,7 +84,7 @@ impl Client {
} }
/// attempts to create a new workspace with given name /// attempts to create a new workspace with given name
pub async fn create_workspace(&self, name: impl AsRef<str>) -> crate::Result<()> { pub async fn create_workspace(&self, name: impl AsRef<str>) -> ProcedureResult<()> {
self.0.session self.0.session
.clone() .clone()
.create_workspace(WorkspaceRequest { workspace: name.as_ref().to_string() }) .create_workspace(WorkspaceRequest { workspace: name.as_ref().to_string() })
@ -93,7 +93,7 @@ impl Client {
} }
/// delete an existing workspace if possible /// delete an existing workspace if possible
pub async fn delete_workspace(&self, name: impl AsRef<str>) -> crate::Result<()> { pub async fn delete_workspace(&self, name: impl AsRef<str>) -> ProcedureResult<()> {
self.0.session self.0.session
.clone() .clone()
.delete_workspace(WorkspaceRequest { workspace: name.as_ref().to_string() }) .delete_workspace(WorkspaceRequest { workspace: name.as_ref().to_string() })
@ -102,7 +102,7 @@ impl Client {
} }
/// invite user associated with username to workspace, if possible /// invite user associated with username to workspace, if possible
pub async fn invite_to_workspace(&self, workspace_name: impl AsRef<str>, user_name: impl AsRef<str>) -> crate::Result<()> { pub async fn invite_to_workspace(&self, workspace_name: impl AsRef<str>, user_name: impl AsRef<str>) -> ProcedureResult<()> {
self.0.session self.0.session
.clone() .clone()
.invite_to_workspace(InviteRequest { .invite_to_workspace(InviteRequest {
@ -114,7 +114,7 @@ impl Client {
} }
/// list all available workspaces, filtering between those owned and those invited to /// list all available workspaces, filtering between those owned and those invited to
pub async fn list_workspaces(&self, owned: bool, invited: bool) -> crate::Result<Vec<String>> { pub async fn list_workspaces(&self, owned: bool, invited: bool) -> ProcedureResult<Vec<String>> {
let mut workspaces = self.0.session let mut workspaces = self.0.session
.clone() .clone()
.list_workspaces(Empty {}) .list_workspaces(Empty {})
@ -130,7 +130,7 @@ impl Client {
} }
/// join a workspace, returns [Workspace] /// join a workspace, returns [Workspace]
pub async fn join_workspace(&self, workspace: impl AsRef<str>) -> crate::Result<Workspace> { pub async fn join_workspace(&self, workspace: impl AsRef<str>) -> ConnectionResult<Workspace> {
let token = self.0.session let token = self.0.session
.clone() .clone()
.access_workspace(WorkspaceRequest { workspace: workspace.as_ref().to_string() }) .access_workspace(WorkspaceRequest { workspace: workspace.as_ref().to_string() })
@ -184,7 +184,7 @@ impl tonic::service::Interceptor for SessionInterceptor {
fn call( fn call(
&mut self, &mut self,
mut request: tonic::Request<()>, mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> { ) -> tonic::Result<tonic::Request<()>> {
if let Ok(token) = self.0.borrow().token.parse() { if let Ok(token) = self.0.borrow().token.parse() {
request.metadata_mut().insert("session", token); request.metadata_mut().insert("session", token);
} }

View file

@ -9,8 +9,8 @@ use tokio::sync::{
}; };
use tonic::async_trait; use tonic::async_trait;
use crate::api::{controller::ControllerCallback, Controller, Cursor};
use codemp_proto::cursor::{CursorEvent, CursorPosition}; use codemp_proto::cursor::{CursorEvent, CursorPosition};
use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult};
/// the cursor controller implementation /// the cursor controller implementation
/// ///
/// this contains /// this contains
@ -40,7 +40,7 @@ pub(crate) struct CursorControllerInner {
impl Controller<Cursor> for CursorController { impl Controller<Cursor> for CursorController {
/// enqueue a cursor event to be broadcast to current workspace /// enqueue a cursor event to be broadcast to current workspace
/// will automatically invert cursor start/end if they are inverted /// will automatically invert cursor start/end if they are inverted
async fn send(&self, mut cursor: Cursor) -> crate::Result<()> { async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
@ -48,7 +48,6 @@ impl Controller<Cursor> for CursorController {
} }
/// try to receive without blocking, but will still block on stream mutex /// try to receive without blocking, but will still block on stream mutex
async fn try_recv(&self) -> crate::Result<Option<Cursor>> {
let mut stream = self.0.stream.lock().await; let mut stream = self.0.stream.lock().await;
match stream.try_recv() { match stream.try_recv() {
Ok(x) => Ok(Some(x.into())), Ok(x) => Ok(Some(x.into())),
@ -59,11 +58,12 @@ impl Controller<Cursor> for CursorController {
Ok(stream.try_recv().map(|x| x.into()).ok()) Ok(stream.try_recv().map(|x| x.into()).ok())
} }
} }
async fn try_recv(&self) -> ControllerResult<Option<Cursor>> {
} }
/// await for changed mutex and then next op change /// await for changed mutex and then next op change
async fn poll(&self) -> crate::Result<()> {
Ok(self.0.last_op.lock().await.changed().await?) Ok(self.0.last_op.lock().await.changed().await?)
async fn poll(&self) -> ControllerResult<()> {
} }
fn callback(&self, cb: impl Into<ControllerCallback<CursorController>>) { fn callback(&self, cb: impl Into<ControllerCallback<CursorController>>) {

View file

@ -1,120 +1,73 @@
//! ### Errors
//!
//! library error helpers and types
use std::result::Result as StdResult; #[deprecated = "use underlying errors to provide more context on what errors could really be thrown"]
#[allow(deprecated)]
pub type Result<T> = std::result::Result<T, Error>;
use tracing::warn; #[deprecated = "use underlying errors to provide more context on what errors could really be thrown"]
/// an error which can be ignored with just a warning entry
pub trait IgnorableError {
fn unwrap_or_warn(self, msg: &str);
}
impl<T, E> IgnorableError for StdResult<T, E>
where E : std::fmt::Debug {
fn unwrap_or_warn(self, msg: &str) {
match self {
Ok(_) => {},
Err(e) => warn!("{}: {:?}", msg, e),
}
}
}
/// an error which can be ignored with just a warning entry and returning the default value
pub trait IgnorableDefaultableError<T> {
fn unwrap_or_warn_default(self, msg: &str) -> T;
}
impl<T, E> IgnorableDefaultableError<T> for StdResult<T, E>
where E : std::fmt::Display, T: Default {
fn unwrap_or_warn_default(self, msg: &str) -> T {
match self {
Ok(x) => x,
Err(e) => {
warn!("{}: {}", msg, e);
T::default()
},
}
}
}
/// result type for codemp errors
pub type Result<T> = StdResult<T, Error>;
// TODO split this into specific errors for various parts of the library
/// codemp error type for library issues
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
/// errors caused by tonic http layer #[error("connection error: {0}")]
#[error("tonic error (status: {status}, message: {message})")] Connection(#[from] ConnectionError),
Transport {
status: String,
message: String,
},
/// errors caused by async channels #[error("procedure error: {0}")]
#[error("channel error, send: {send}")] Procedure(#[from] ProcedureError),
Channel {
send: bool
},
/// errors caused by wrong usage of library objects #[error("controller error: {0}")]
#[error("invalid state error: {msg}")] Controller(#[from] ControllerError),
InvalidState {
msg: String,
},
/// errors caused by wrong interlocking, safe to retry
#[error("deadlocked error")]
Deadlocked
} }
impl From<tonic::Status> for Error {
fn from(status: tonic::Status) -> Self {
Error::Transport { pub type ProcedureResult<T> = std::result::Result<T, ProcedureError>;
status: status.code().to_string(),
message: status.message().to_string() #[derive(Debug, thiserror::Error)]
pub enum ProcedureError {
#[error("server rejected procedure with error: {0}")]
Server(#[from] tonic::Status)
}
pub type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("network error: {0}")]
Transport(#[from] tonic::transport::Error),
#[error("server rejected connection attempt: {0}")]
Procedure(#[from] tonic::Status),
}
impl From<ProcedureError> for ConnectionError {
fn from(value: ProcedureError) -> Self {
match value {
ProcedureError::Server(x) => Self::Procedure(x)
} }
} }
} }
impl From<tonic::transport::Error> for Error {
fn from(err: tonic::transport::Error) -> Self {
Error::Transport { pub type ControllerResult<T> = std::result::Result<T, ControllerError>;
status: tonic::Code::Unknown.to_string(),
message: format!("underlying transport error: {:?}", err) #[derive(Debug, thiserror::Error)]
pub enum ControllerError {
#[error("worker is already stopped")]
Stopped,
#[error("worker stopped before completing requested operation")]
Unfulfilled,
} }
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for ControllerError {
fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Stopped
} }
} }
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error { impl From<tokio::sync::oneshot::error::RecvError> for ControllerError {
fn from(_value: tokio::sync::mpsc::error::SendError<T>) -> Self { fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
Error::Channel { send: true } Self::Unfulfilled
}
}
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
fn from(_value: tokio::sync::watch::error::SendError<T>) -> Self {
Error::Channel { send: true }
}
}
impl From<tokio::sync::broadcast::error::RecvError> for Error {
fn from(_value: tokio::sync::broadcast::error::RecvError) -> Self {
Error::Channel { send: false }
}
}
impl From<tokio::sync::oneshot::error::RecvError> for Error {
fn from(_value: tokio::sync::oneshot::error::RecvError) -> Self {
Error::Channel { send: false }
}
}
impl From<tokio::sync::watch::error::RecvError> for Error {
fn from(_value: tokio::sync::watch::error::RecvError) -> Self {
Error::Channel { send: false }
} }
} }

View file

@ -1,4 +1,4 @@
use crate::{Error, api::Controller}; use crate::{api::Controller, errors::ControllerResult};
use tokio::sync::mpsc; use tokio::sync::mpsc;
/// invoke .poll() on all given buffer controllers and wait, returning the first one ready /// invoke .poll() on all given buffer controllers and wait, returning the first one ready
@ -15,7 +15,7 @@ pub async fn select_buffer(
buffers: &[crate::buffer::Controller], buffers: &[crate::buffer::Controller],
timeout: Option<std::time::Duration>, timeout: Option<std::time::Duration>,
runtime: &tokio::runtime::Runtime runtime: &tokio::runtime::Runtime
) -> crate::Result<Option<crate::buffer::Controller>> { ) -> ControllerResult<Option<crate::buffer::Controller>> {
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for buffer in buffers { for buffer in buffers {
@ -24,7 +24,7 @@ pub async fn select_buffer(
tasks.push(runtime.spawn(async move { tasks.push(runtime.spawn(async move {
match _buffer.poll().await { match _buffer.poll().await {
Ok(()) => _tx.send(Ok(Some(_buffer))), Ok(()) => _tx.send(Ok(Some(_buffer))),
Err(_) => _tx.send(Err(Error::Channel { send: true })), Err(e) => _tx.send(Err(e)),
} }
})) }))
} }
@ -37,7 +37,7 @@ pub async fn select_buffer(
} }
loop { loop {
match rx.recv().await { match rx.recv().await {
None => return Err(Error::Channel { send: false }), None => return Err(crate::errors::ControllerError::Unfulfilled),
Some(Err(_)) => continue, // TODO log errors maybe? Some(Err(_)) => continue, // TODO log errors maybe?
Some(Ok(x)) => { Some(Ok(x)) => {
for t in tasks { for t in tasks {
@ -99,3 +99,37 @@ impl<T> crate::api::controller::CallbackHandle for CallbackHandleWatch<T> {
self.0.send_replace(None); self.0.send_replace(None);
} }
}*/ }*/
/// an error which can be ignored with just a warning entry
pub trait IgnorableError {
fn unwrap_or_warn(self, msg: &str);
}
impl<T, E> IgnorableError for std::result::Result<T, E>
where E : std::fmt::Debug {
fn unwrap_or_warn(self, msg: &str) {
match self {
Ok(_) => {},
Err(e) => tracing::warn!("{}: {:?}", msg, e),
}
}
}
/// an error which can be ignored with just a warning entry and returning the default value
pub trait IgnorableDefaultableError<T> {
fn unwrap_or_warn_default(self, msg: &str) -> T;
}
impl<T, E> IgnorableDefaultableError<T> for std::result::Result<T, E>
where E : std::fmt::Display, T: Default {
fn unwrap_or_warn_default(self, msg: &str) -> T {
match self {
Ok(x) => x,
Err(e) => {
tracing::warn!("{}: {}", msg, e);
T::default()
},
}
}
}

View file

@ -138,8 +138,7 @@ pub use client::Client;
/// crate error types and helpers /// crate error types and helpers
pub mod errors; pub mod errors;
pub use errors::Error; pub use errors::{Error, Result};
pub use errors::Result;
/// all-in-one imports : `use codemp::prelude::*;` /// all-in-one imports : `use codemp::prelude::*;`
pub mod prelude; pub mod prelude;

View file

@ -2,11 +2,6 @@
//! //!
//! all-in-one renamed imports with `use codemp::prelude::*` //! all-in-one renamed imports with `use codemp::prelude::*`
pub use crate::{
Error as CodempError,
Result as CodempResult,
};
pub use crate::api::{ pub use crate::api::{
Controller as CodempController, Controller as CodempController,
TextChange as CodempTextChange, TextChange as CodempTextChange,

View file

@ -7,6 +7,8 @@ use tonic::{
transport::{Channel, Endpoint}, transport::{Channel, Endpoint},
}; };
use crate::errors::ConnectionResult;
type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>; type AuthedService = InterceptedService<Channel, WorkspaceInterceptor>;
#[derive(Debug)] #[derive(Debug)]
@ -21,7 +23,7 @@ impl Services {
dest: &str, dest: &str,
session: tokio::sync::watch::Receiver<codemp_proto::common::Token>, session: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
workspace: tokio::sync::watch::Receiver<codemp_proto::common::Token>, workspace: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
) -> crate::Result<Self> { ) -> ConnectionResult<Self> {
let channel = Endpoint::from_shared(dest.to_string())?.connect().await?; let channel = Endpoint::from_shared(dest.to_string())?.connect().await?;
let inter = WorkspaceInterceptor { session, workspace }; let inter = WorkspaceInterceptor { session, workspace };
Ok(Self { Ok(Self {

View file

@ -1,5 +1,10 @@
use crate::{ use crate::{
api::{controller::ControllerWorker, Controller, Event, User}, buffer::{self, worker::BufferWorker}, cursor::{self, worker::CursorWorker}, ext::InternallyMutable, workspace::service::Services api::{controller::ControllerWorker, Controller, Event, User},
buffer::{self, worker::BufferWorker},
cursor::{self, worker::CursorWorker},
errors::{ConnectionResult, ControllerResult, ProcedureResult},
ext::InternallyMutable,
workspace::service::Services
}; };
use codemp_proto::{ use codemp_proto::{
@ -47,7 +52,7 @@ impl Workspace {
dest: &str, dest: &str,
token: Token, token: Token,
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>, // TODO ughh receiving this claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>, // TODO ughh receiving this
) -> crate::Result<Self> { ) -> ConnectionResult<Self> {
let workspace_claim = InternallyMutable::new(token); let workspace_claim = InternallyMutable::new(token);
let services = Services::try_new(dest, claims, workspace_claim.channel()).await?; let services = Services::try_new(dest, claims, workspace_claim.channel()).await?;
let ws_stream = services.ws().attach(Empty {}).await?.into_inner(); let ws_stream = services.ws().attach(Empty {}).await?.into_inner();
@ -141,7 +146,7 @@ impl Workspace {
impl Workspace { impl Workspace {
/// create a new buffer in current workspace /// create a new buffer in current workspace
pub async fn create(&self, path: &str) -> crate::Result<()> { pub async fn create(&self, path: &str) -> ProcedureResult<()> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
workspace_client workspace_client
.create_buffer(tonic::Request::new(BufferNode { .create_buffer(tonic::Request::new(BufferNode {
@ -162,7 +167,7 @@ impl Workspace {
/// ///
/// to interact with such buffer use [crate::api::Controller::send] or /// to interact with such buffer use [crate::api::Controller::send] or
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange] /// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&self, path: &str) -> crate::Result<buffer::Controller> { pub async fn attach(&self, path: &str) -> ConnectionResult<buffer::Controller> {
let mut worskspace_client = self.0.services.ws(); let mut worskspace_client = self.0.services.ws();
let request = tonic::Request::new(BufferNode { let request = tonic::Request::new(BufferNode {
path: path.to_string(), path: path.to_string(),
@ -216,18 +221,19 @@ impl Workspace {
} }
/// await next workspace [crate::api::Event] and return it /// await next workspace [crate::api::Event] and return it
pub async fn event(&self) -> crate::Result<Event> { // TODO this method is weird and ugly, can we make it more standard?
pub async fn event(&self) -> ControllerResult<Event> {
self.0 self.0
.events .events
.lock() .lock()
.await .await
.recv() .recv()
.await .await
.ok_or(crate::Error::Channel { send: false }) .ok_or(crate::errors::ControllerError::Unfulfilled)
} }
/// fetch a list of all buffers in a workspace /// fetch a list of all buffers in a workspace
pub async fn fetch_buffers(&self) -> crate::Result<()> { pub async fn fetch_buffers(&self) -> ProcedureResult<()> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
let buffers = workspace_client let buffers = workspace_client
.list_buffers(tonic::Request::new(Empty {})) .list_buffers(tonic::Request::new(Empty {}))
@ -244,7 +250,7 @@ impl Workspace {
} }
/// fetch a list of all users in a workspace /// fetch a list of all users in a workspace
pub async fn fetch_users(&self) -> crate::Result<()> { pub async fn fetch_users(&self) -> ProcedureResult<()> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
let users = BTreeSet::from_iter( let users = BTreeSet::from_iter(
workspace_client workspace_client
@ -267,7 +273,7 @@ impl Workspace {
/// get a list of the users attached to a specific buffer /// get a list of the users attached to a specific buffer
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<User>> { pub async fn list_buffer_users(&self, path: &str) -> ProcedureResult<Vec<User>> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
let buffer_users = workspace_client let buffer_users = workspace_client
.list_buffer_users(tonic::Request::new(BufferNode { .list_buffer_users(tonic::Request::new(BufferNode {
@ -284,7 +290,7 @@ impl Workspace {
} }
/// delete a buffer /// delete a buffer
pub async fn delete(&self, path: &str) -> crate::Result<()> { pub async fn delete(&self, path: &str) -> ProcedureResult<()> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
workspace_client workspace_client
.delete_buffer(tonic::Request::new(BufferNode { .delete_buffer(tonic::Request::new(BufferNode {