diff --git a/upub/core/src/config.rs b/upub/core/src/config.rs index c78a20f9..f129e922 100644 --- a/upub/core/src/config.rs +++ b/upub/core/src/config.rs @@ -82,6 +82,9 @@ pub struct SecurityConfig { #[serde_inline_default(30)] pub job_expiration_days: u32, + + #[serde_inline_default(100)] + pub reinsertion_attempt_limit: u32, } diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index 16d020fe..f073f3dd 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -115,11 +115,18 @@ impl JobDispatcher for Context { if let Err(e) = res { tracing::error!("failed processing job '{}': {e}", job.activity); let active = job.clone().repeat(); - if let Err(e) = model::job::Entity::insert(active) - .exec(_ctx.db()) - .await - { - tracing::error!("could not insert back job ({e}), dropping:\n{job:#?}") + let mut count = 0; + loop { + match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { + Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), + Ok(_) => break, + } + count += 1; + if count > _ctx.cfg().security.reinsertion_attempt_limit { + tracing::error!("reached job reinsertion limit, dropping {job:#?}"); + break; + } + tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; } } });