diff --git a/upub/core/src/model/job.rs b/upub/core/src/model/job.rs index afcf15ca..ff9d4cf2 100644 --- a/upub/core/src/model/job.rs +++ b/upub/core/src/model/job.rs @@ -16,7 +16,6 @@ pub struct Model { pub job_type: JobType, pub actor: String, pub target: Option, - #[sea_orm(unique)] pub activity: String, pub payload: Option, pub published: ChronoDateTimeUtc, diff --git a/upub/migrations/src/lib.rs b/upub/migrations/src/lib.rs index 0465e1b4..34c9e55b 100644 --- a/upub/migrations/src/lib.rs +++ b/upub/migrations/src/lib.rs @@ -7,6 +7,7 @@ mod m20240524_000004_create_addressing_deliveries; mod m20240524_000005_create_attachments_tags_mentions; mod m20240529_000001_add_relation_unique_index; mod m20240605_000001_add_jobs_table; +mod m20240606_000001_no_unique_index_on_job_activity; pub struct Migrator; @@ -21,6 +22,7 @@ impl MigratorTrait for Migrator { Box::new(m20240524_000005_create_attachments_tags_mentions::Migration), Box::new(m20240529_000001_add_relation_unique_index::Migration), Box::new(m20240605_000001_add_jobs_table::Migration), + Box::new(m20240606_000001_no_unique_index_on_job_activity::Migration), ] } } diff --git a/upub/migrations/src/m20240605_000001_add_jobs_table.rs b/upub/migrations/src/m20240605_000001_add_jobs_table.rs index 112e65f0..5a48e4c4 100644 --- a/upub/migrations/src/m20240605_000001_add_jobs_table.rs +++ b/upub/migrations/src/m20240605_000001_add_jobs_table.rs @@ -43,7 +43,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Jobs::JobType).small_integer().not_null()) .col(ColumnDef::new(Jobs::Actor).string().not_null()) .col(ColumnDef::new(Jobs::Target).string().null()) - .col(ColumnDef::new(Jobs::Activity).string().not_null().unique_key()) + .col(ColumnDef::new(Jobs::Activity).string().not_null()) .col(ColumnDef::new(Jobs::Payload).string().null()) .col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp())) @@ -52,17 +52,6 @@ impl MigrationTrait for Migration { ) .await?; - manager - .create_index( - Index::create() - .unique() - .name("index-jobs-activity") - .table(Jobs::Table) - .col(Jobs::Activity) - .to_owned() - ) - .await?; - manager .create_index( Index::create() diff --git a/upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs b/upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs new file mode 100644 index 00000000..c26a0b9f --- /dev/null +++ b/upub/migrations/src/m20240606_000001_no_unique_index_on_job_activity.rs @@ -0,0 +1,37 @@ +use sea_orm_migration::prelude::*; + +use crate::m20240605_000001_add_jobs_table::Jobs; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index( + Index::drop() + .name("index-jobs-activity") + .table(Jobs::Table) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_index( + Index::create() + .unique() + .name("index-jobs-activity") + .table(Jobs::Table) + .col(Jobs::Activity) + .to_owned() + ) + .await?; + + Ok(()) + } +} diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index f073f3dd..ae696db7 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -104,6 +104,14 @@ impl JobDispatcher for Context { restart!(now); } + if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity) + .one(self.db()) + .await + { + tracing::info!("dropping already processed job '{}'", job.activity); + restart!(now); + } + let _ctx = self.clone(); pool.spawn(async move { let res = match job.job_type {