diff --git a/Cargo.toml b/Cargo.toml index 4efbeb8..7afbca1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ serde_json = { version = "1", optional = true } tokio-stream = { version = "0.1", optional = true } # global lazy_static = { version = "1.4", optional = true } +futures = "0.3" [build-dependencies] tonic-build = "0.9" diff --git a/src/client.rs b/src/client.rs index 3adb1c8..a56d22f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,10 +3,13 @@ //! codemp client manager, containing grpc services use std::{sync::Arc, collections::BTreeMap}; +use futures::stream::FuturesUnordered; +use tokio_stream::StreamExt; use tonic::transport::Channel; use crate::{ + api::Controller, cursor::{worker::CursorControllerWorker, controller::CursorController}, proto::{ buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, @@ -76,7 +79,7 @@ impl Client { /// /// to interact with such workspace [crate::api::Controller::send] cursor events or /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller]. - pub async fn join(&mut self, _session: &str) -> Result, Error> { + pub async fn join(&mut self, _session: &str) -> crate::Result> { // TODO there is no real workspace handling in codemp server so it behaves like one big global // session. I'm still creating this to start laying out the proper use flow let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); @@ -103,7 +106,7 @@ impl Client { } /// create a new buffer in current workspace, with optional given content - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), Error> { + pub async fn create(&mut self, path: &str, content: Option<&str>) -> crate::Result<()> { if let Some(_workspace) = &self.workspace { self.client.buffer .create(BufferPayload { @@ -122,7 +125,7 @@ impl Client { /// /// to interact with such buffer use [crate::api::Controller::send] or /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] - pub async fn attach(&mut self, path: &str) -> Result, Error> { + pub async fn attach(&mut self, path: &str) -> crate::Result> { if let Some(workspace) = &mut self.workspace { let mut client = self.client.buffer.clone(); let req = BufferPayload { @@ -148,4 +151,24 @@ impl Client { Err(Error::InvalidState { msg: "join a workspace first".into() }) } } + + + pub async fn select_buffer(&self) -> crate::Result { + let mut futures = FuturesUnordered::new(); + match &self.workspace { + None => Err(Error::InvalidState { msg: "join workspace first".into() }), + Some(workspace) => { + for (id, buffer) in workspace.buffers.iter() { + futures.push(async move { + buffer.poll().await?; + Ok::<&String, Error>(id) + }) + } + match futures.next().await { + None => Err(Error::Deadlocked), // TODO shouldn't really happen??? + Some(x) => Ok(x?.clone()), + } + } + } + } } diff --git a/src/instance.rs b/src/instance.rs index d89bc5e..410038c 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -46,13 +46,13 @@ pub mod a_sync { impl Instance { /// connect to remote address instantiating a new client [crate::client::Client::new] - pub async fn connect(&self, addr: &str) -> Result<(), Error> { + pub async fn connect(&self, addr: &str) -> crate::Result<()> { *self.client.lock().await = Some(Client::new(addr).await?); Ok(()) } /// threadsafe version of [crate::client::Client::join] - pub async fn join(&self, session: &str) -> Result, Error> { + pub async fn join(&self, session: &str) -> crate::Result> { self.client .lock().await .as_mut() @@ -62,7 +62,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::create] - pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { + pub async fn create(&self, path: &str, content: Option<&str>) -> crate::Result<()> { self.client .lock().await .as_mut() @@ -72,7 +72,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::attach] - pub async fn attach(&self, path: &str) -> Result, Error> { + pub async fn attach(&self, path: &str) -> crate::Result> { self.client .lock().await .as_mut() @@ -82,7 +82,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::get_cursor] - pub async fn get_cursor(&self) -> Result, Error> { + pub async fn get_cursor(&self) -> crate::Result> { self.client .lock().await .as_mut() @@ -92,7 +92,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::get_buffer] - pub async fn get_buffer(&self, path: &str) -> Result, Error> { + pub async fn get_buffer(&self, path: &str) -> crate::Result> { self.client .lock().await .as_mut() @@ -102,7 +102,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::leave_workspace] - pub async fn leave_workspace(&self) -> Result<(), Error> { + pub async fn leave_workspace(&self) -> crate::Result<()> { self.client .lock().await .as_mut() @@ -112,7 +112,7 @@ pub mod a_sync { } /// threadsafe version of [crate::client::Client::disconnect_buffer] - pub async fn disconnect_buffer(&self, path: &str) -> Result { + pub async fn disconnect_buffer(&self, path: &str) -> crate::Result { let res = self.client .lock().await .as_mut() @@ -120,6 +120,16 @@ pub mod a_sync { .disconnect_buffer(path); Ok(res) } + + pub async fn select_buffer(&self) -> crate::Result { + let res = self.client + .lock().await + .as_ref() + .ok_or(Error::InvalidState { msg: "connect first".into() })? + .select_buffer() + .await?; + Ok(res) + } } } @@ -135,7 +145,7 @@ pub mod sync { buffer::controller::BufferController }; - /// persistant session manager for codemp client + /// persistent session manager for codemp client /// /// will hold a std mutex over an optional client, and drop its reference when disconnecting. /// also contains a tokio runtime to execute async futures on @@ -157,7 +167,7 @@ pub mod sync { } impl Instance { - fn if_client(&self, op: impl FnOnce(&mut Client) -> T) -> Result { + fn if_client(&self, op: impl FnOnce(&mut Client) -> T) -> crate::Result { if let Some(c) = self.client.lock().expect("client mutex poisoned").as_mut() { Ok(op(c)) } else { @@ -175,38 +185,42 @@ pub mod sync { } /// threadsafe and sync version of [crate::client::Client::join] - pub fn join(&self, session: &str) -> Result, Error> { + pub fn join(&self, session: &str) -> crate::Result> { self.if_client(|c| self.rt().block_on(c.join(session)))? } /// threadsafe and sync version of [crate::client::Client::create] - pub fn create(&self, path: &str, content: Option<&str>) -> Result<(), Error> { + pub fn create(&self, path: &str, content: Option<&str>) -> crate::Result<()> { self.if_client(|c| self.rt().block_on(c.create(path, content)))? } /// threadsafe and sync version of [crate::client::Client::attach] - pub fn attach(&self, path: &str) -> Result, Error> { + pub fn attach(&self, path: &str) -> crate::Result> { self.if_client(|c| self.rt().block_on(c.attach(path)))? } /// threadsafe and sync version of [crate::client::Client::get_cursor] - pub fn get_cursor(&self) -> Result, Error> { + pub fn get_cursor(&self) -> crate::Result> { self.if_client(|c| c.get_cursor().ok_or(Error::InvalidState { msg: "join workspace first".into() }))? } /// threadsafe and sync version of [crate::client::Client::get_buffer] - pub fn get_buffer(&self, path: &str) -> Result, Error> { + pub fn get_buffer(&self, path: &str) -> crate::Result> { self.if_client(|c| c.get_buffer(path).ok_or(Error::InvalidState { msg: "join workspace or create requested buffer first".into() }))? } /// threadsafe and sync version of [crate::client::Client::leave_workspace] - pub fn leave_workspace(&self) -> Result<(), Error> { + pub fn leave_workspace(&self) -> crate::Result<()> { self.if_client(|c| c.leave_workspace()) } /// threadsafe and sync version of [crate::client::Client::disconnect_buffer] - pub fn disconnect_buffer(&self, path: &str) -> Result { + pub fn disconnect_buffer(&self, path: &str) -> crate::Result { self.if_client(|c| c.disconnect_buffer(path)) } + + pub fn select_buffer(&self) -> crate::Result { + self.if_client(|c| self.rt().block_on(c.select_buffer()))? + } } }