From 485724701a97bbdb49c0c7e14805ea661d117c32 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 18:36:30 +0200 Subject: [PATCH] fix: no unique index on job activity, skip dupes rather than preventing dupes from being inserted (which breaks processing local activities that require delivering) skip them as soon as they get acquired --- upub/core/src/model/job.rs | 1 - upub/migrations/src/lib.rs | 2 + .../src/m20240605_000001_add_jobs_table.rs | 13 +------ ..._000001_no_unique_index_on_job_activity.rs | 37 +++++++++++++++++++ upub/worker/src/dispatcher.rs | 8 ++++ 5 files changed, 48 insertions(+), 13 deletions(-) create mode 100644 upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs diff --git a/upub/core/src/model/job.rs b/upub/core/src/model/job.rs index afcf15c..ff9d4cf 100644 --- a/upub/core/src/model/job.rs +++ b/upub/core/src/model/job.rs @@ -16,7 +16,6 @@ pub struct Model { pub job_type: JobType, pub actor: String, pub target: Option, - #[sea_orm(unique)] pub activity: String, pub payload: Option, pub published: ChronoDateTimeUtc, diff --git a/upub/migrations/src/lib.rs b/upub/migrations/src/lib.rs index 0465e1b..34c9e55 100644 --- a/upub/migrations/src/lib.rs +++ b/upub/migrations/src/lib.rs @@ -7,6 +7,7 @@ mod m20240524_000004_create_addressing_deliveries; mod m20240524_000005_create_attachments_tags_mentions; mod m20240529_000001_add_relation_unique_index; mod m20240605_000001_add_jobs_table; +mod m20240606_000001_no_unique_index_on_job_activity; pub struct Migrator; @@ -21,6 +22,7 @@ impl MigratorTrait for Migrator { Box::new(m20240524_000005_create_attachments_tags_mentions::Migration), Box::new(m20240529_000001_add_relation_unique_index::Migration), Box::new(m20240605_000001_add_jobs_table::Migration), + Box::new(m20240606_000001_no_unique_index_on_job_activity::Migration), ] } } diff --git a/upub/migrations/src/m20240605_000001_add_jobs_table.rs b/upub/migrations/src/m20240605_000001_add_jobs_table.rs index 112e65f..5a48e4c 100644 --- a/upub/migrations/src/m20240605_000001_add_jobs_table.rs +++ b/upub/migrations/src/m20240605_000001_add_jobs_table.rs @@ -43,7 +43,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Jobs::JobType).small_integer().not_null()) .col(ColumnDef::new(Jobs::Actor).string().not_null()) .col(ColumnDef::new(Jobs::Target).string().null()) - .col(ColumnDef::new(Jobs::Activity).string().not_null().unique_key()) + .col(ColumnDef::new(Jobs::Activity).string().not_null()) .col(ColumnDef::new(Jobs::Payload).string().null()) .col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp())) @@ -52,17 +52,6 @@ impl MigrationTrait for Migration { ) .await?; - manager - .create_index( - Index::create() - .unique() - .name("index-jobs-activity") - .table(Jobs::Table) - .col(Jobs::Activity) - .to_owned() - ) - .await?; - manager .create_index( Index::create() diff --git a/upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs b/upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs new file mode 100644 index 0000000..c26a0b9 --- /dev/null +++ b/upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs @@ -0,0 +1,37 @@ +use sea_orm_migration::prelude::*; + +use crate::m20240605_000001_add_jobs_table::Jobs; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index( + Index::drop() + .name("index-jobs-activity") + .table(Jobs::Table) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_index( + Index::create() + .unique() + .name("index-jobs-activity") + .table(Jobs::Table) + .col(Jobs::Activity) + .to_owned() + ) + .await?; + + Ok(()) + } +} diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index f073f3d..ae696db 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -104,6 +104,14 @@ impl JobDispatcher for Context { restart!(now); } + if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity) + .one(self.db()) + .await + { + tracing::info!("dropping already processed job '{}'", job.activity); + restart!(now); + } + let _ctx = self.clone(); pool.spawn(async move { let res = match job.job_type {