diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index 90e9ef4..df293f5 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -1,6 +1,6 @@ use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder}; -use upub::{model, Context}; +use upub::{model, traits::process::ProcessorError, Context}; #[derive(Debug, thiserror::Error)] pub enum JobError { @@ -123,23 +123,28 @@ impl JobDispatcher for Context { model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await, model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await, }; - - if let Err(e) = res { - tracing::error!("failed processing job '{}': {e}", job.activity); - let active = job.clone().repeat(); - let mut count = 0; - loop { - match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { - Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), - Ok(_) => break, + + match res { + Ok(()) => tracing::debug!("job {} completed", job.activity), + Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) => tracing::info!("dropping job already processed: {}", job.activity), + Err(e) => { + tracing::error!("failed processing job '{}': {e}", job.activity); + let active = job.clone().repeat(); + let mut count = 0; + loop { + match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { + Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), + Ok(_) => break, + } + count += 1; + if count > _ctx.cfg().security.reinsertion_attempt_limit { + tracing::error!("reached job reinsertion limit, dropping {job:#?}"); + break; + } + tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; } - count += 1; - if count > _ctx.cfg().security.reinsertion_attempt_limit { - tracing::error!("reached job reinsertion limit, dropping {job:#?}"); - break; - } - tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; } + } });