fix: delete duplicated jobs

This commit is contained in:
əlemi 2024-06-07 23:16:39 +02:00
parent 746ba4bbee
commit 87d0d7b6d2
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -1,6 +1,6 @@
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder}; use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
use upub::{model, Context}; use upub::{model, traits::process::ProcessorError, Context};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum JobError { pub enum JobError {
@ -123,23 +123,28 @@ impl JobDispatcher for Context {
model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await, model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await,
model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await, model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await,
}; };
if let Err(e) = res { match res {
tracing::error!("failed processing job '{}': {e}", job.activity); Ok(()) => tracing::debug!("job {} completed", job.activity),
let active = job.clone().repeat(); Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) => tracing::info!("dropping job already processed: {}", job.activity),
let mut count = 0; Err(e) => {
loop { tracing::error!("failed processing job '{}': {e}", job.activity);
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { let active = job.clone().repeat();
Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity), let mut count = 0;
Ok(_) => break, loop {
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {
Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity),
Ok(_) => break,
}
count += 1;
if count > _ctx.cfg().security.reinsertion_attempt_limit {
tracing::error!("reached job reinsertion limit, dropping {job:#?}");
break;
}
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
} }
count += 1;
if count > _ctx.cfg().security.reinsertion_attempt_limit {
tracing::error!("reached job reinsertion limit, dropping {job:#?}");
break;
}
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
} }
} }
}); });