fix: new approach for select_buffer

this is definitely bloatier but should work and not deadlock?
This commit is contained in:
əlemi 2023-11-17 18:28:32 +01:00
parent 152679669b
commit 175b9c945a
2 changed files with 23 additions and 10 deletions

View file

@ -25,7 +25,6 @@ serde_json = { version = "1", optional = true }
tokio-stream = { version = "0.1", optional = true } tokio-stream = { version = "0.1", optional = true }
# global # global
lazy_static = { version = "1.4", optional = true } lazy_static = { version = "1.4", optional = true }
futures = "0.3"
[build-dependencies] [build-dependencies]
tonic-build = "0.9" tonic-build = "0.9"

View file

@ -3,8 +3,8 @@
//! codemp client manager, containing grpc services //! codemp client manager, containing grpc services
use std::{sync::Arc, collections::BTreeMap}; use std::{sync::Arc, collections::BTreeMap};
use futures::stream::FuturesUnordered;
use tokio::sync::mpsc;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tonic::transport::Channel; use tonic::transport::Channel;
@ -154,19 +154,33 @@ impl Client {
pub async fn select_buffer(&self) -> crate::Result<String> { pub async fn select_buffer(&self) -> crate::Result<String> {
let mut futures = FuturesUnordered::new();
match &self.workspace { match &self.workspace {
None => Err(Error::InvalidState { msg: "join workspace first".into() }), None => Err(Error::InvalidState { msg: "join workspace first".into() }),
Some(workspace) => { Some(workspace) => {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tasks = Vec::new();
for (id, buffer) in workspace.buffers.iter() { for (id, buffer) in workspace.buffers.iter() {
futures.push(async move { let _tx = tx.clone();
buffer.poll().await?; let _id = id.clone();
Ok::<&String, Error>(id) let _buffer = buffer.clone();
}) tasks.push(tokio::spawn(async move {
match _buffer.poll().await {
Ok(()) => _tx.send(Ok(_id)),
Err(_) => _tx.send(Err(Error::Channel { send: true })),
}
}))
}
loop {
match rx.recv().await {
None => return Err(Error::Channel { send: false }),
Some(Err(_)) => continue, // TODO log errors
Some(Ok(x)) => {
for t in tasks {
t.abort();
}
return Ok(x.clone());
},
} }
match futures.next().await {
None => Err(Error::Deadlocked), // TODO shouldn't really happen???
Some(x) => Ok(x?.clone()),
} }
} }
} }