codemp/src/buffer/tools.rs

78 lines
2 KiB
Rust
Raw Normal View History

use crate::{Error, api::Controller};
2023-11-30 03:30:50 +01:00
use tokio::sync::mpsc;
2023-11-30 03:50:32 +01:00
/// invoke .poll() on all given buffer controllers and wait, returning the first one ready
///
2023-11-30 03:50:32 +01:00
/// this will spawn tasks blocked on .poll() for each buffer controller. as soon as
/// one finishes, all other tasks will be canceled and the ready controller will be
/// returned
///
2023-11-30 03:50:32 +01:00
/// if timeout is None, result will never be None, otherwise returns None if no buffer
/// is ready before timeout expires
///
/// returns an error if all buffers returned errors while polling.
2023-11-30 03:30:50 +01:00
pub async fn select_buffer(
buffers: &[crate::buffer::Controller],
2023-11-30 03:30:50 +01:00
timeout: Option<std::time::Duration>,
runtime: &tokio::runtime::Runtime
) -> crate::Result<Option<crate::buffer::Controller>> {
2023-11-30 03:30:50 +01:00
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tasks = Vec::new();
for buffer in buffers {
let _tx = tx.clone();
let _buffer = buffer.clone();
tasks.push(runtime.spawn(async move {
match _buffer.poll().await {
2023-11-30 03:30:50 +01:00
Ok(()) => _tx.send(Ok(Some(_buffer))),
Err(_) => _tx.send(Err(Error::Channel { send: true })),
}
}))
}
2023-11-30 03:30:50 +01:00
if let Some(d) = timeout {
let _tx = tx.clone();
tasks.push(runtime.spawn(async move {
2023-11-30 03:30:50 +01:00
tokio::time::sleep(d).await;
_tx.send(Ok(None))
}));
}
loop {
match rx.recv().await {
None => return Err(Error::Channel { send: false }),
Some(Err(_)) => continue, // TODO log errors maybe?
Some(Ok(x)) => {
for t in tasks {
t.abort();
}
return Ok(x.clone());
},
}
}
}
/// wraps sender and receiver to allow mutable field with immutable ref
#[derive(Debug)]
pub struct InternallyMutable<T: Clone> {
getter: tokio::sync::watch::Receiver<T>,
setter: tokio::sync::watch::Sender<T>,
}
impl<T: Clone + Default> Default for InternallyMutable<T> {
fn default() -> Self {
let (tx, rx) = tokio::sync::watch::channel(T::default());
InternallyMutable {
getter: rx,
setter: tx,
}
}
}
impl<T: Clone> InternallyMutable<T> {
pub fn set(&self, state: T) -> T {
self.setter.send_replace(state)
}
pub fn get(&self) -> T {
self.getter.borrow().clone()
}
}