From 053414824aeb1b716bac905c1ee57c6f1764245b Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 04:36:16 +0200 Subject: [PATCH] fix: retry some times before dropping acquired job --- upub/core/src/config.rs | 3 +++ upub/worker/src/dispatcher.rs | 17 ++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/upub/core/src/config.rs b/upub/core/src/config.rs index c78a20f9..f129e922 100644 --- a/upub/core/src/config.rs +++ b/upub/core/src/config.rs @@ -82,6 +82,9 @@ pub struct SecurityConfig { #[serde_inline_default(30)] pub job_expiration_days: u32, + + #[serde_inline_default(100)] + pub reinsertion_attempt_limit: u32, } diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index 16d020fe..f073f3dd 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -115,11 +115,18 @@ impl JobDispatcher for Context { if let Err(e) = res { tracing::error!("failed processing job '{}': {e}", job.activity); let active = job.clone().repeat(); - if let Err(e) = model::job::Entity::insert(active) - .exec(_ctx.db()) - .await - { - tracing::error!("could not insert back job ({e}), dropping:\n{job:#?}") + let mut count = 0; + loop { + match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { + Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), + Ok(_) => break, + } + count += 1; + if count > _ctx.cfg().security.reinsertion_attempt_limit { + tracing::error!("reached job reinsertion limit, dropping {job:#?}"); + break; + } + tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; } } });