2023-11-28 02:22:21 +01:00
|
|
|
use crate::{Error, api::Controller};
|
2023-11-30 03:30:50 +01:00
|
|
|
use tokio::sync::mpsc;
|
2023-11-28 02:22:21 +01:00
|
|
|
|
2023-11-30 03:50:32 +01:00
|
|
|
/// invoke .poll() on all given buffer controllers and wait, returning the first one ready
|
2023-11-28 02:22:21 +01:00
|
|
|
///
|
2023-11-30 03:50:32 +01:00
|
|
|
/// this will spawn tasks blocked on .poll() for each buffer controller. as soon as
|
|
|
|
/// one finishes, all other tasks will be canceled and the ready controller will be
|
|
|
|
/// returned
|
2023-11-28 02:22:21 +01:00
|
|
|
///
|
2023-11-30 03:50:32 +01:00
|
|
|
/// if timeout is None, result will never be None, otherwise returns None if no buffer
|
|
|
|
/// is ready before timeout expires
|
|
|
|
///
|
|
|
|
/// returns an error if all buffers returned errors while polling.
|
2023-11-30 03:30:50 +01:00
|
|
|
pub async fn select_buffer(
|
2024-08-07 10:22:01 +02:00
|
|
|
buffers: &[crate::buffer::Controller],
|
2023-11-30 03:30:50 +01:00
|
|
|
timeout: Option<std::time::Duration>,
|
2024-08-07 10:22:01 +02:00
|
|
|
runtime: &tokio::runtime::Runtime
|
|
|
|
) -> crate::Result<Option<crate::buffer::Controller>> {
|
2023-11-30 03:30:50 +01:00
|
|
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
2023-11-28 02:22:21 +01:00
|
|
|
let mut tasks = Vec::new();
|
|
|
|
for buffer in buffers {
|
|
|
|
let _tx = tx.clone();
|
|
|
|
let _buffer = buffer.clone();
|
2024-08-07 10:22:01 +02:00
|
|
|
tasks.push(runtime.spawn(async move {
|
2023-11-28 02:22:21 +01:00
|
|
|
match _buffer.poll().await {
|
2023-11-30 03:30:50 +01:00
|
|
|
Ok(()) => _tx.send(Ok(Some(_buffer))),
|
2023-11-28 02:22:21 +01:00
|
|
|
Err(_) => _tx.send(Err(Error::Channel { send: true })),
|
|
|
|
}
|
|
|
|
}))
|
|
|
|
}
|
2023-11-30 03:30:50 +01:00
|
|
|
if let Some(d) = timeout {
|
|
|
|
let _tx = tx.clone();
|
2024-08-07 10:22:01 +02:00
|
|
|
tasks.push(runtime.spawn(async move {
|
2023-11-30 03:30:50 +01:00
|
|
|
tokio::time::sleep(d).await;
|
|
|
|
_tx.send(Ok(None))
|
|
|
|
}));
|
|
|
|
}
|
2023-11-28 02:22:21 +01:00
|
|
|
loop {
|
|
|
|
match rx.recv().await {
|
|
|
|
None => return Err(Error::Channel { send: false }),
|
|
|
|
Some(Err(_)) => continue, // TODO log errors maybe?
|
|
|
|
Some(Ok(x)) => {
|
|
|
|
for t in tasks {
|
|
|
|
t.abort();
|
|
|
|
}
|
|
|
|
return Ok(x.clone());
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
2024-02-01 17:54:56 +01:00
|
|
|
}
|
2024-08-08 00:27:24 +02:00
|
|
|
|
|
|
|
/// wraps sender and receiver to allow mutable field with immutable ref
|
|
|
|
#[derive(Debug)]
|
2024-08-08 21:55:52 +02:00
|
|
|
pub struct InternallyMutable<T> {
|
2024-08-08 00:27:24 +02:00
|
|
|
getter: tokio::sync::watch::Receiver<T>,
|
|
|
|
setter: tokio::sync::watch::Sender<T>,
|
|
|
|
}
|
|
|
|
|
2024-08-08 21:55:52 +02:00
|
|
|
impl<T: Default> Default for InternallyMutable<T> {
|
2024-08-08 00:27:24 +02:00
|
|
|
fn default() -> Self {
|
2024-08-08 21:55:52 +02:00
|
|
|
Self::new(T::default())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> InternallyMutable<T> {
|
|
|
|
pub fn new(init: T) -> Self {
|
|
|
|
let (tx, rx) = tokio::sync::watch::channel(init);
|
2024-08-08 00:27:24 +02:00
|
|
|
InternallyMutable {
|
|
|
|
getter: rx,
|
|
|
|
setter: tx,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set(&self, state: T) -> T {
|
|
|
|
self.setter.send_replace(state)
|
|
|
|
}
|
2024-08-08 21:55:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Clone> InternallyMutable<T> {
|
2024-08-08 00:27:24 +02:00
|
|
|
pub fn get(&self) -> T {
|
|
|
|
self.getter.borrow().clone()
|
|
|
|
}
|
|
|
|
}
|
2024-08-10 16:13:16 +02:00
|
|
|
|
2024-08-13 00:36:09 +02:00
|
|
|
/*
|
2024-08-10 16:13:16 +02:00
|
|
|
pub(crate) struct CallbackHandleWatch<T>(pub(crate) tokio::sync::watch::Sender<Option<T>>);
|
|
|
|
|
|
|
|
impl<T> crate::api::controller::CallbackHandle for CallbackHandleWatch<T> {
|
|
|
|
fn unregister(self) {
|
|
|
|
self.0.send_replace(None);
|
|
|
|
}
|
2024-08-13 00:36:09 +02:00
|
|
|
}*/
|