2024-09-04 20:03:34 +02:00
|
|
|
//! ### Extensions
|
|
|
|
//! Contains a number of utils used internally or that may be of general interest.
|
|
|
|
|
2024-09-28 03:33:13 +02:00
|
|
|
use crate::{api::controller::AsyncReceiver, errors::ControllerResult};
|
2023-11-30 03:30:50 +01:00
|
|
|
use tokio::sync::mpsc;
|
2023-11-28 02:22:21 +01:00
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Poll all given buffer controllers and wait, returning the first one ready.
|
2023-11-28 02:22:21 +01:00
|
|
|
///
|
2024-10-14 14:13:35 +02:00
|
|
|
/// It will spawn tasks blocked on [`AsyncReceiver::poll`] for each buffer controller.
|
2024-09-04 20:03:34 +02:00
|
|
|
/// As soon as one finishes, its controller is returned and all other tasks are canceled.
|
2023-11-28 02:22:21 +01:00
|
|
|
///
|
2024-09-04 20:03:34 +02:00
|
|
|
/// If a timeout is provided, the result may be `None` if it expires before any task is
|
|
|
|
/// complete.
|
2023-11-30 03:50:32 +01:00
|
|
|
///
|
2024-09-04 20:03:34 +02:00
|
|
|
/// It may return an error if all buffers returned errors while polling.
|
2023-11-30 03:30:50 +01:00
|
|
|
pub async fn select_buffer(
|
2024-08-07 10:22:01 +02:00
|
|
|
buffers: &[crate::buffer::Controller],
|
2023-11-30 03:30:50 +01:00
|
|
|
timeout: Option<std::time::Duration>,
|
2024-10-01 00:42:57 +02:00
|
|
|
runtime: &tokio::runtime::Runtime,
|
2024-09-01 02:46:03 +02:00
|
|
|
) -> ControllerResult<Option<crate::buffer::Controller>> {
|
2023-11-30 03:30:50 +01:00
|
|
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
2023-11-28 02:22:21 +01:00
|
|
|
let mut tasks = Vec::new();
|
|
|
|
for buffer in buffers {
|
|
|
|
let _tx = tx.clone();
|
|
|
|
let _buffer = buffer.clone();
|
2024-08-07 10:22:01 +02:00
|
|
|
tasks.push(runtime.spawn(async move {
|
2023-11-28 02:22:21 +01:00
|
|
|
match _buffer.poll().await {
|
2023-11-30 03:30:50 +01:00
|
|
|
Ok(()) => _tx.send(Ok(Some(_buffer))),
|
2024-09-01 02:46:03 +02:00
|
|
|
Err(e) => _tx.send(Err(e)),
|
2023-11-28 02:22:21 +01:00
|
|
|
}
|
|
|
|
}))
|
|
|
|
}
|
2023-11-30 03:30:50 +01:00
|
|
|
if let Some(d) = timeout {
|
|
|
|
let _tx = tx.clone();
|
2024-08-07 10:22:01 +02:00
|
|
|
tasks.push(runtime.spawn(async move {
|
2023-11-30 03:30:50 +01:00
|
|
|
tokio::time::sleep(d).await;
|
|
|
|
_tx.send(Ok(None))
|
|
|
|
}));
|
|
|
|
}
|
2023-11-28 02:22:21 +01:00
|
|
|
loop {
|
|
|
|
match rx.recv().await {
|
2024-09-01 02:46:03 +02:00
|
|
|
None => return Err(crate::errors::ControllerError::Unfulfilled),
|
2023-11-28 02:22:21 +01:00
|
|
|
Some(Err(_)) => continue, // TODO log errors maybe?
|
|
|
|
Some(Ok(x)) => {
|
|
|
|
for t in tasks {
|
|
|
|
t.abort();
|
|
|
|
}
|
2024-08-15 03:42:48 +02:00
|
|
|
return Ok(x);
|
2024-10-01 00:42:57 +02:00
|
|
|
}
|
2023-11-28 02:22:21 +01:00
|
|
|
}
|
|
|
|
}
|
2024-02-01 17:54:56 +01:00
|
|
|
}
|
2024-08-08 00:27:24 +02:00
|
|
|
|
2024-09-05 01:45:48 +02:00
|
|
|
/// Hash a given byte array with the internally used algorithm.
|
2024-10-01 00:42:57 +02:00
|
|
|
///
|
2024-09-04 20:03:34 +02:00
|
|
|
/// Currently, it uses [`xxhash_rust::xxh3::xxh3_64`].
|
2024-08-14 17:12:36 +02:00
|
|
|
pub fn hash(data: impl AsRef<[u8]>) -> i64 {
|
|
|
|
let hash = xxhash_rust::xxh3::xxh3_64(data.as_ref());
|
|
|
|
i64::from_ne_bytes(hash.to_ne_bytes())
|
|
|
|
}
|
|
|
|
|
2024-09-04 20:03:34 +02:00
|
|
|
/// A field that can be *internally mutated* regardless of its external mutability.
|
|
|
|
///
|
|
|
|
/// Currently, it wraps the [`tokio::sync::watch`] channel couple to achieve this.
|
2024-08-08 00:27:24 +02:00
|
|
|
#[derive(Debug)]
|
2024-08-08 21:55:52 +02:00
|
|
|
pub struct InternallyMutable<T> {
|
2024-08-08 00:27:24 +02:00
|
|
|
getter: tokio::sync::watch::Receiver<T>,
|
|
|
|
setter: tokio::sync::watch::Sender<T>,
|
|
|
|
}
|
|
|
|
|
2024-08-08 21:55:52 +02:00
|
|
|
impl<T: Default> Default for InternallyMutable<T> {
|
2024-08-08 00:27:24 +02:00
|
|
|
fn default() -> Self {
|
2024-08-08 21:55:52 +02:00
|
|
|
Self::new(T::default())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> InternallyMutable<T> {
|
|
|
|
pub fn new(init: T) -> Self {
|
|
|
|
let (tx, rx) = tokio::sync::watch::channel(init);
|
2024-09-04 20:03:34 +02:00
|
|
|
Self {
|
2024-08-08 00:27:24 +02:00
|
|
|
getter: rx,
|
|
|
|
setter: tx,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set(&self, state: T) -> T {
|
|
|
|
self.setter.send_replace(state)
|
|
|
|
}
|
2024-08-22 00:57:24 +02:00
|
|
|
|
|
|
|
pub fn channel(&self) -> tokio::sync::watch::Receiver<T> {
|
|
|
|
self.getter.clone()
|
|
|
|
}
|
2024-08-08 21:55:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Clone> InternallyMutable<T> {
|
2024-08-08 00:27:24 +02:00
|
|
|
pub fn get(&self) -> T {
|
|
|
|
self.getter.borrow().clone()
|
|
|
|
}
|
|
|
|
}
|
2024-08-10 16:13:16 +02:00
|
|
|
|
2024-09-04 20:03:34 +02:00
|
|
|
/// An error that can be ignored with just a warning.
|
2024-09-01 02:46:03 +02:00
|
|
|
pub trait IgnorableError {
|
|
|
|
fn unwrap_or_warn(self, msg: &str);
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T, E> IgnorableError for std::result::Result<T, E>
|
2024-10-01 00:42:57 +02:00
|
|
|
where
|
|
|
|
E: std::fmt::Debug,
|
|
|
|
{
|
2024-09-04 20:03:34 +02:00
|
|
|
/// Logs the error as a warning and returns a unit.
|
2024-09-01 02:46:03 +02:00
|
|
|
fn unwrap_or_warn(self, msg: &str) {
|
|
|
|
match self {
|
2024-10-01 00:42:57 +02:00
|
|
|
Ok(_) => {}
|
2024-09-01 02:46:03 +02:00
|
|
|
Err(e) => tracing::warn!("{}: {:?}", msg, e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|