feat: waker implementation

basically posting should now be instant? very ugly implementation tho: i
wanted to keep tokio out of core but this is awful and also im
realistically never swapping out tokio, so might as well move these
tokens down in core...
This commit is contained in:
əlemi 2024-09-19 16:50:13 +02:00
parent 52b93ba539
commit 0934cdaad4
Signed by: alemi
GPG key ID: A4895B84D311642C
6 changed files with 50 additions and 12 deletions

23
main.rs
View file

@ -4,8 +4,8 @@ use sea_orm::{ConnectOptions, Database};
use signal_hook::consts::signal::*; use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals; use signal_hook_tokio::Signals;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use upub::{context, ext::LoggableError};
use upub::ext::LoggableError;
#[cfg(feature = "cli")] #[cfg(feature = "cli")]
use upub_cli as cli; use upub_cli as cli;
@ -167,8 +167,10 @@ async fn init(args: Args, config: upub::Config) {
return; return;
} }
let (tx_wake, rx_wake) = tokio::sync::watch::channel(false);
let wake = CancellationToken(rx_wake);
let ctx = upub::Context::new(db, domain, config.clone()) let ctx = upub::Context::new(db, domain, config.clone(), Some(Box::new(WakerToken(tx_wake))))
.await.expect("failed creating server context"); .await.expect("failed creating server context");
#[cfg(feature = "cli")] #[cfg(feature = "cli")]
@ -195,12 +197,12 @@ async fn init(args: Args, config: upub::Config) {
#[cfg(feature = "worker")] #[cfg(feature = "worker")]
Mode::Work { filter, tasks, poll } => Mode::Work { filter, tasks, poll } =>
worker::spawn(ctx, tasks, poll, filter.into(), stop) worker::spawn(ctx, tasks, poll, filter.into(), stop, wake)
.await.expect("failed running worker"), .await.expect("failed running worker"),
#[cfg(all(feature = "serve", feature = "worker"))] #[cfg(all(feature = "serve", feature = "worker"))]
Mode::Monolith { bind, tasks, poll } => { Mode::Monolith { bind, tasks, poll } => {
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone()); worker::spawn(ctx.clone(), tasks, poll, None, stop.clone(), wake);
routes::serve(ctx, bind, stop) routes::serve(ctx, bind, stop)
.await.expect("failed serving api routes"); .await.expect("failed serving api routes");
@ -217,6 +219,13 @@ async fn init(args: Args, config: upub::Config) {
signals_task.await.expect("failed joining signal handler task"); signals_task.await.expect("failed joining signal handler task");
} }
struct WakerToken(tokio::sync::watch::Sender<bool>);
impl context::WakerToken for WakerToken {
fn wake(&self) {
self.0.send_replace(true);
}
}
#[derive(Clone)] #[derive(Clone)]
struct CancellationToken(tokio::sync::watch::Receiver<bool>); struct CancellationToken(tokio::sync::watch::Receiver<bool>);
@ -226,6 +235,12 @@ impl worker::StopToken for CancellationToken {
} }
} }
impl worker::WakeToken for CancellationToken {
async fn wait(&mut self) {
self.0.changed().await.err_failed("error waiting for waker token");
}
}
impl routes::ShutdownToken for CancellationToken { impl routes::ShutdownToken for CancellationToken {
async fn event(mut self) { async fn event(mut self) {
self.0.changed().await.warn_failed("cancellation token channel closed, stopping..."); self.0.changed().await.warn_failed("cancellation token channel closed, stopping...");

View file

@ -17,6 +17,7 @@ struct ContextInner {
actor: model::actor::Model, actor: model::actor::Model,
instance: model::instance::Model, instance: model::instance::Model,
pkey: String, pkey: String,
waker: Option<Box<dyn WakerToken>>,
#[allow(unused)] relay: Relays, #[allow(unused)] relay: Relays,
} }
@ -33,10 +34,14 @@ macro_rules! url {
}; };
} }
pub trait WakerToken: Sync + Send {
fn wake(&self);
}
impl Context { impl Context {
// TODO slim constructor down, maybe make a builder? // TODO slim constructor down, maybe make a builder?
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> { pub async fn new(db: DatabaseConnection, mut domain: String, config: Config, waker: Option<Box<dyn WakerToken>>) -> Result<Self, crate::init::InitError> {
let protocol = if domain.starts_with("http://") let protocol = if domain.starts_with("http://")
{ "http://" } else { "https://" }.to_string(); { "http://" } else { "https://" }.to_string();
if domain.ends_with('/') { if domain.ends_with('/') {
@ -72,7 +77,7 @@ impl Context {
}; };
Ok(Context(Arc::new(ContextInner { Ok(Context(Arc::new(ContextInner {
base_url, db, domain, protocol, actor, instance, config, pkey, relay, base_url, db, domain, protocol, actor, instance, config, pkey, relay, waker,
}))) })))
} }
@ -167,6 +172,12 @@ impl Context {
Ok(None) Ok(None)
} }
pub fn wake_workers(&self) {
if let Some(ref waker) = self.0.waker {
waker.wake();
}
}
#[allow(unused)] #[allow(unused)]
pub fn is_relay(&self, id: &str) -> bool { pub fn is_relay(&self, id: &str) -> bool {
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)

View file

@ -53,12 +53,13 @@ pub async fn serve(ctx: upub::Context, bind: String, shutdown: impl ShutdownToke
let listener = tokio::net::TcpListener::bind(bind).await?; let listener = tokio::net::TcpListener::bind(bind).await?;
axum::serve(listener, router) axum::serve(listener, router)
.with_graceful_shutdown(async move { shutdown.event().await }) .with_graceful_shutdown(shutdown.event())
.await?; .await?;
Ok(()) Ok(())
} }
pub trait ShutdownToken: Sync + Send + 'static { pub trait ShutdownToken: Sync + Send + 'static {
async fn event(self); // TODO this is bs...
fn event(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
} }

View file

@ -32,7 +32,7 @@ pub type JobResult<T> = Result<T, JobError>;
pub trait JobDispatcher : Sized { pub trait JobDispatcher : Sized {
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>; async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
async fn lock(&self, job_internal: i64) -> JobResult<bool>; async fn lock(&self, job_internal: i64) -> JobResult<bool>;
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken); async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
} }
impl JobDispatcher for Context { impl JobDispatcher for Context {
@ -69,12 +69,15 @@ impl JobDispatcher for Context {
Ok(true) Ok(true)
} }
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken) { async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
macro_rules! restart { macro_rules! restart {
(now) => { continue }; (now) => { continue };
() => { () => {
{ {
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
_ = wake.wait() => {},
}
continue; continue;
} }
} }

View file

@ -11,14 +11,20 @@ pub fn spawn(
poll: u64, poll: u64,
filter: Option<upub::model::job::JobType>, filter: Option<upub::model::job::JobType>,
stop: impl StopToken, stop: impl StopToken,
wake: impl WakeToken,
) -> tokio::task::JoinHandle<()> { ) -> tokio::task::JoinHandle<()> {
use dispatcher::JobDispatcher; use dispatcher::JobDispatcher;
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("starting worker task"); tracing::info!("starting worker task");
ctx.run(concurrency, poll, filter, stop).await ctx.run(concurrency, poll, filter, stop, wake).await
}) })
} }
pub trait StopToken: Sync + Send + 'static { pub trait StopToken: Sync + Send + 'static {
fn stop(&self) -> bool; fn stop(&self) -> bool;
} }
pub trait WakeToken: Sync + Send + 'static {
// TODO this is bs...
fn wait(&mut self) -> impl std::future::Future<Output = ()> + std::marker::Send;
}

View file

@ -148,5 +148,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
tx.commit().await?; tx.commit().await?;
ctx.wake_workers();
Ok(()) Ok(())
} }