diff --git a/Cargo.toml b/Cargo.toml index 7afbca1..4efbeb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ serde_json = { version = "1", optional = true } tokio-stream = { version = "0.1", optional = true } # global lazy_static = { version = "1.4", optional = true } -futures = "0.3" [build-dependencies] tonic-build = "0.9" diff --git a/src/client.rs b/src/client.rs index a56d22f..9b1aca2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,8 +3,8 @@ //! codemp client manager, containing grpc services use std::{sync::Arc, collections::BTreeMap}; -use futures::stream::FuturesUnordered; +use tokio::sync::mpsc; use tokio_stream::StreamExt; use tonic::transport::Channel; @@ -154,19 +154,33 @@ impl Client { pub async fn select_buffer(&self) -> crate::Result { - let mut futures = FuturesUnordered::new(); match &self.workspace { None => Err(Error::InvalidState { msg: "join workspace first".into() }), Some(workspace) => { + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut tasks = Vec::new(); for (id, buffer) in workspace.buffers.iter() { - futures.push(async move { - buffer.poll().await?; - Ok::<&String, Error>(id) - }) + let _tx = tx.clone(); + let _id = id.clone(); + let _buffer = buffer.clone(); + tasks.push(tokio::spawn(async move { + match _buffer.poll().await { + Ok(()) => _tx.send(Ok(_id)), + Err(_) => _tx.send(Err(Error::Channel { send: true })), + } + })) } - match futures.next().await { - None => Err(Error::Deadlocked), // TODO shouldn't really happen??? - Some(x) => Ok(x?.clone()), + loop { + match rx.recv().await { + None => return Err(Error::Channel { send: false }), + Some(Err(_)) => continue, // TODO log errors + Some(Ok(x)) => { + for t in tasks { + t.abort(); + } + return Ok(x.clone()); + }, + } } } }