From 87d0d7b6d2e5b072d4a7fd09e2a049cfee63eeff Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 7 Jun 2024 23:16:39 +0200 Subject: [PATCH] fix: delete duplicated jobs --- upub/worker/src/dispatcher.rs | 37 ++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index 90e9ef41..df293f5f 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -1,6 +1,6 @@ use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder}; -use upub::{model, Context}; +use upub::{model, traits::process::ProcessorError, Context}; #[derive(Debug, thiserror::Error)] pub enum JobError { @@ -123,23 +123,28 @@ impl JobDispatcher for Context { model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await, model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await, }; - - if let Err(e) = res { - tracing::error!("failed processing job '{}': {e}", job.activity); - let active = job.clone().repeat(); - let mut count = 0; - loop { - match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { - Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), - Ok(_) => break, + + match res { + Ok(()) => tracing::debug!("job {} completed", job.activity), + Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) => tracing::info!("dropping job already processed: {}", job.activity), + Err(e) => { + tracing::error!("failed processing job '{}': {e}", job.activity); + let active = job.clone().repeat(); + let mut count = 0; + loop { + match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { + Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), + Ok(_) => break, + } + count += 1; + if count > _ctx.cfg().security.reinsertion_attempt_limit { + tracing::error!("reached job reinsertion limit, dropping {job:#?}"); + break; + } + tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; } - count += 1; - if count > _ctx.cfg().security.reinsertion_attempt_limit { - tracing::error!("reached job reinsertion limit, dropping {job:#?}"); - break; - } - tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; } + } });