forked from alemi/upub
fix: no unique index on job activity, skip dupes
rather than preventing dupes from being inserted (which breaks processing local activities that require delivering) skip them as soon as they get acquired
This commit is contained in:
parent
f3d28c9371
commit
485724701a
5 changed files with 48 additions and 13 deletions
|
@ -16,7 +16,6 @@ pub struct Model {
|
||||||
pub job_type: JobType,
|
pub job_type: JobType,
|
||||||
pub actor: String,
|
pub actor: String,
|
||||||
pub target: Option<String>,
|
pub target: Option<String>,
|
||||||
#[sea_orm(unique)]
|
|
||||||
pub activity: String,
|
pub activity: String,
|
||||||
pub payload: Option<String>,
|
pub payload: Option<String>,
|
||||||
pub published: ChronoDateTimeUtc,
|
pub published: ChronoDateTimeUtc,
|
||||||
|
|
|
@ -7,6 +7,7 @@ mod m20240524_000004_create_addressing_deliveries;
|
||||||
mod m20240524_000005_create_attachments_tags_mentions;
|
mod m20240524_000005_create_attachments_tags_mentions;
|
||||||
mod m20240529_000001_add_relation_unique_index;
|
mod m20240529_000001_add_relation_unique_index;
|
||||||
mod m20240605_000001_add_jobs_table;
|
mod m20240605_000001_add_jobs_table;
|
||||||
|
mod m20240606_000001_no_unique_index_on_job_activity;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
|
@ -21,6 +22,7 @@ impl MigratorTrait for Migrator {
|
||||||
Box::new(m20240524_000005_create_attachments_tags_mentions::Migration),
|
Box::new(m20240524_000005_create_attachments_tags_mentions::Migration),
|
||||||
Box::new(m20240529_000001_add_relation_unique_index::Migration),
|
Box::new(m20240529_000001_add_relation_unique_index::Migration),
|
||||||
Box::new(m20240605_000001_add_jobs_table::Migration),
|
Box::new(m20240605_000001_add_jobs_table::Migration),
|
||||||
|
Box::new(m20240606_000001_no_unique_index_on_job_activity::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ impl MigrationTrait for Migration {
|
||||||
.col(ColumnDef::new(Jobs::JobType).small_integer().not_null())
|
.col(ColumnDef::new(Jobs::JobType).small_integer().not_null())
|
||||||
.col(ColumnDef::new(Jobs::Actor).string().not_null())
|
.col(ColumnDef::new(Jobs::Actor).string().not_null())
|
||||||
.col(ColumnDef::new(Jobs::Target).string().null())
|
.col(ColumnDef::new(Jobs::Target).string().null())
|
||||||
.col(ColumnDef::new(Jobs::Activity).string().not_null().unique_key())
|
.col(ColumnDef::new(Jobs::Activity).string().not_null())
|
||||||
.col(ColumnDef::new(Jobs::Payload).string().null())
|
.col(ColumnDef::new(Jobs::Payload).string().null())
|
||||||
.col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp()))
|
.col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp()))
|
||||||
.col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
|
.col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
|
||||||
|
@ -52,17 +52,6 @@ impl MigrationTrait for Migration {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
manager
|
|
||||||
.create_index(
|
|
||||||
Index::create()
|
|
||||||
.unique()
|
|
||||||
.name("index-jobs-activity")
|
|
||||||
.table(Jobs::Table)
|
|
||||||
.col(Jobs::Activity)
|
|
||||||
.to_owned()
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
manager
|
manager
|
||||||
.create_index(
|
.create_index(
|
||||||
Index::create()
|
Index::create()
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
use crate::m20240605_000001_add_jobs_table::Jobs;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_index(
|
||||||
|
Index::drop()
|
||||||
|
.name("index-jobs-activity")
|
||||||
|
.table(Jobs::Table)
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.unique()
|
||||||
|
.name("index-jobs-activity")
|
||||||
|
.table(Jobs::Table)
|
||||||
|
.col(Jobs::Activity)
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -104,6 +104,14 @@ impl JobDispatcher for Context {
|
||||||
restart!(now);
|
restart!(now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity)
|
||||||
|
.one(self.db())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::info!("dropping already processed job '{}'", job.activity);
|
||||||
|
restart!(now);
|
||||||
|
}
|
||||||
|
|
||||||
let _ctx = self.clone();
|
let _ctx = self.clone();
|
||||||
pool.spawn(async move {
|
pool.spawn(async move {
|
||||||
let res = match job.job_type {
|
let res = match job.job_type {
|
||||||
|
|
Loading…
Reference in a new issue