use crate::{Error, api::Controller}; use tokio::sync::mpsc; /// invoke .poll() on all given buffer controllers and wait, returning the first one ready /// /// this will spawn tasks blocked on .poll() for each buffer controller. as soon as /// one finishes, all other tasks will be canceled and the ready controller will be /// returned /// /// if timeout is None, result will never be None, otherwise returns None if no buffer /// is ready before timeout expires /// /// returns an error if all buffers returned errors while polling. pub async fn select_buffer( buffers: &[crate::buffer::Controller], timeout: Option, runtime: &tokio::runtime::Runtime ) -> crate::Result> { let (tx, mut rx) = mpsc::unbounded_channel(); let mut tasks = Vec::new(); for buffer in buffers { let _tx = tx.clone(); let _buffer = buffer.clone(); tasks.push(runtime.spawn(async move { match _buffer.poll().await { Ok(()) => _tx.send(Ok(Some(_buffer))), Err(_) => _tx.send(Err(Error::Channel { send: true })), } })) } if let Some(d) = timeout { let _tx = tx.clone(); tasks.push(runtime.spawn(async move { tokio::time::sleep(d).await; _tx.send(Ok(None)) })); } loop { match rx.recv().await { None => return Err(Error::Channel { send: false }), Some(Err(_)) => continue, // TODO log errors maybe? Some(Ok(x)) => { for t in tasks { t.abort(); } return Ok(x); }, } } } pub fn hash(data: impl AsRef<[u8]>) -> i64 { let hash = xxhash_rust::xxh3::xxh3_64(data.as_ref()); i64::from_ne_bytes(hash.to_ne_bytes()) } /// wraps sender and receiver to allow mutable field with immutable ref #[derive(Debug)] pub struct InternallyMutable { getter: tokio::sync::watch::Receiver, setter: tokio::sync::watch::Sender, } impl Default for InternallyMutable { fn default() -> Self { Self::new(T::default()) } } impl InternallyMutable { pub fn new(init: T) -> Self { let (tx, rx) = tokio::sync::watch::channel(init); InternallyMutable { getter: rx, setter: tx, } } pub fn set(&self, state: T) -> T { self.setter.send_replace(state) } } impl InternallyMutable { pub fn get(&self) -> T { self.getter.borrow().clone() } } /* pub(crate) struct CallbackHandleWatch(pub(crate) tokio::sync::watch::Sender>); impl crate::api::controller::CallbackHandle for CallbackHandleWatch { fn unregister(self) { self.0.send_replace(None); } }*/