From 0934cdaad41ec2387258e0e4959852cc10318750 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 19 Sep 2024 16:50:13 +0200 Subject: [PATCH] feat: waker implementation basically posting should now be instant? very ugly implementation tho: i wanted to keep tokio out of core but this is awful and also im realistically never swapping out tokio, so might as well move these tokens down in core... --- main.rs | 23 +++++++++++++++++++---- upub/core/src/context.rs | 15 +++++++++++++-- upub/routes/src/lib.rs | 5 +++-- upub/worker/src/dispatcher.rs | 9 ++++++--- upub/worker/src/lib.rs | 8 +++++++- upub/worker/src/outbound.rs | 2 ++ 6 files changed, 50 insertions(+), 12 deletions(-) diff --git a/main.rs b/main.rs index 82f155f..75200c2 100644 --- a/main.rs +++ b/main.rs @@ -4,8 +4,8 @@ use sea_orm::{ConnectOptions, Database}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use futures::stream::StreamExt; +use upub::{context, ext::LoggableError}; -use upub::ext::LoggableError; #[cfg(feature = "cli")] use upub_cli as cli; @@ -167,8 +167,10 @@ async fn init(args: Args, config: upub::Config) { return; } + let (tx_wake, rx_wake) = tokio::sync::watch::channel(false); + let wake = CancellationToken(rx_wake); - let ctx = upub::Context::new(db, domain, config.clone()) + let ctx = upub::Context::new(db, domain, config.clone(), Some(Box::new(WakerToken(tx_wake)))) .await.expect("failed creating server context"); #[cfg(feature = "cli")] @@ -195,12 +197,12 @@ async fn init(args: Args, config: upub::Config) { #[cfg(feature = "worker")] Mode::Work { filter, tasks, poll } => - worker::spawn(ctx, tasks, poll, filter.into(), stop) + worker::spawn(ctx, tasks, poll, filter.into(), stop, wake) .await.expect("failed running worker"), #[cfg(all(feature = "serve", feature = "worker"))] Mode::Monolith { bind, tasks, poll } => { - worker::spawn(ctx.clone(), tasks, poll, None, stop.clone()); + worker::spawn(ctx.clone(), tasks, poll, None, stop.clone(), wake); routes::serve(ctx, bind, stop) .await.expect("failed serving api routes"); @@ -217,6 +219,13 @@ async fn init(args: Args, config: upub::Config) { signals_task.await.expect("failed joining signal handler task"); } +struct WakerToken(tokio::sync::watch::Sender); +impl context::WakerToken for WakerToken { + fn wake(&self) { + self.0.send_replace(true); + } +} + #[derive(Clone)] struct CancellationToken(tokio::sync::watch::Receiver); @@ -226,6 +235,12 @@ impl worker::StopToken for CancellationToken { } } +impl worker::WakeToken for CancellationToken { + async fn wait(&mut self) { + self.0.changed().await.err_failed("error waiting for waker token"); + } +} + impl routes::ShutdownToken for CancellationToken { async fn event(mut self) { self.0.changed().await.warn_failed("cancellation token channel closed, stopping..."); diff --git a/upub/core/src/context.rs b/upub/core/src/context.rs index f589e0c..1e76f5a 100644 --- a/upub/core/src/context.rs +++ b/upub/core/src/context.rs @@ -17,6 +17,7 @@ struct ContextInner { actor: model::actor::Model, instance: model::instance::Model, pkey: String, + waker: Option>, #[allow(unused)] relay: Relays, } @@ -33,10 +34,14 @@ macro_rules! url { }; } +pub trait WakerToken: Sync + Send { + fn wake(&self); +} + impl Context { // TODO slim constructor down, maybe make a builder? - pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result { + pub async fn new(db: DatabaseConnection, mut domain: String, config: Config, waker: Option>) -> Result { let protocol = if domain.starts_with("http://") { "http://" } else { "https://" }.to_string(); if domain.ends_with('/') { @@ -72,7 +77,7 @@ impl Context { }; Ok(Context(Arc::new(ContextInner { - base_url, db, domain, protocol, actor, instance, config, pkey, relay, + base_url, db, domain, protocol, actor, instance, config, pkey, relay, waker, }))) } @@ -167,6 +172,12 @@ impl Context { Ok(None) } + pub fn wake_workers(&self) { + if let Some(ref waker) = self.0.waker { + waker.wake(); + } + } + #[allow(unused)] pub fn is_relay(&self, id: &str) -> bool { self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) diff --git a/upub/routes/src/lib.rs b/upub/routes/src/lib.rs index 674192b..9940060 100644 --- a/upub/routes/src/lib.rs +++ b/upub/routes/src/lib.rs @@ -53,12 +53,13 @@ pub async fn serve(ctx: upub::Context, bind: String, shutdown: impl ShutdownToke let listener = tokio::net::TcpListener::bind(bind).await?; axum::serve(listener, router) - .with_graceful_shutdown(async move { shutdown.event().await }) + .with_graceful_shutdown(shutdown.event()) .await?; Ok(()) } pub trait ShutdownToken: Sync + Send + 'static { - async fn event(self); + // TODO this is bs... + fn event(self) -> impl std::future::Future + std::marker::Send; } diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index 92a8391..493a50b 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -32,7 +32,7 @@ pub type JobResult = Result; pub trait JobDispatcher : Sized { async fn poll(&self, filter: Option) -> JobResult>; async fn lock(&self, job_internal: i64) -> JobResult; - async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option, stop: impl crate::StopToken); + async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option, stop: impl crate::StopToken, wake: impl crate::WakeToken); } impl JobDispatcher for Context { @@ -69,12 +69,15 @@ impl JobDispatcher for Context { Ok(true) } - async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option, stop: impl crate::StopToken) { + async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) { macro_rules! restart { (now) => { continue }; () => { { - tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {}, + _ = wake.wait() => {}, + } continue; } } diff --git a/upub/worker/src/lib.rs b/upub/worker/src/lib.rs index 72d596b..438ad56 100644 --- a/upub/worker/src/lib.rs +++ b/upub/worker/src/lib.rs @@ -11,14 +11,20 @@ pub fn spawn( poll: u64, filter: Option, stop: impl StopToken, + wake: impl WakeToken, ) -> tokio::task::JoinHandle<()> { use dispatcher::JobDispatcher; tokio::spawn(async move { tracing::info!("starting worker task"); - ctx.run(concurrency, poll, filter, stop).await + ctx.run(concurrency, poll, filter, stop, wake).await }) } pub trait StopToken: Sync + Send + 'static { fn stop(&self) -> bool; } + +pub trait WakeToken: Sync + Send + 'static { + // TODO this is bs... + fn wait(&mut self) -> impl std::future::Future + std::marker::Send; +} diff --git a/upub/worker/src/outbound.rs b/upub/worker/src/outbound.rs index 4bd9339..dc477c5 100644 --- a/upub/worker/src/outbound.rs +++ b/upub/worker/src/outbound.rs @@ -148,5 +148,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult< tx.commit().await?; + ctx.wake_workers(); + Ok(()) }