From 9adeff6fbf2425be84c824457ad3bda4f4d06a9d Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 6 Jul 2024 06:03:26 +0200 Subject: [PATCH] feat: added error field to jobs basically just for manual inspection, maybe we could drop this again once upub is much more stable? --- upub/cli/src/nuke.rs | 2 ++ upub/cli/src/relay.rs | 4 +++ upub/core/src/model/job.rs | 4 ++- upub/core/src/traits/address.rs | 1 + upub/migrations/src/lib.rs | 2 ++ .../src/m20240605_000001_add_jobs_table.rs | 1 + .../src/m20240706_000001_add_error_to_jobs.rs | 35 +++++++++++++++++++ upub/routes/src/activitypub/actor/outbox.rs | 1 + upub/routes/src/activitypub/inbox.rs | 3 +- upub/worker/src/delivery.rs | 2 +- upub/worker/src/dispatcher.rs | 2 +- 11 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 upub/migrations/src/m20240706_000001_add_error_to_jobs.rs diff --git a/upub/cli/src/nuke.rs b/upub/cli/src/nuke.rs index 1c70f7d..b01ff13 100644 --- a/upub/cli/src/nuke.rs +++ b/upub/cli/src/nuke.rs @@ -81,6 +81,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res not_before: Set(chrono::Utc::now()), attempt: Set(0), payload: Set(Some(undo_activity)), + error: Set(None), }; tracing::info!("undoing {}", activity.id); @@ -121,6 +122,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res not_before: Set(chrono::Utc::now()), attempt: Set(0), payload: Set(Some(undo_activity)), + error: Set(None), }; tracing::info!("deleting {}", object.id); diff --git a/upub/cli/src/relay.rs b/upub/cli/src/relay.rs index b86e2a3..6dd4e40 100644 --- a/upub/cli/src/relay.rs +++ b/upub/cli/src/relay.rs @@ -84,6 +84,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE attempt: Set(0), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), + error: Set(None), }; tracing::info!("following relay {actor}"); upub::model::job::Entity::insert(job).exec(ctx.db()).await?; @@ -119,6 +120,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE attempt: Set(0), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), + error: Set(None), }; tracing::info!("accepting relay {actor}"); upub::model::job::Entity::insert(job).exec(ctx.db()).await?; @@ -155,6 +157,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE attempt: Set(0), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), + error: Set(None), }; tracing::info!("unfollowing relay {actor}"); upub::model::job::Entity::insert(job).exec(ctx.db()).await?; @@ -190,6 +193,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE attempt: Set(0), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), + error: Set(None), }; tracing::info!("unfollowing relay {actor}"); upub::model::job::Entity::insert(job).exec(ctx.db()).await?; diff --git a/upub/core/src/model/job.rs b/upub/core/src/model/job.rs index 1cc0c6e..f108dfa 100644 --- a/upub/core/src/model/job.rs +++ b/upub/core/src/model/job.rs @@ -21,6 +21,7 @@ pub struct Model { pub published: ChronoDateTimeUtc, pub not_before: ChronoDateTimeUtc, pub attempt: i16, + pub error: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -41,7 +42,7 @@ impl Model { } } - pub fn repeat(self) -> ActiveModel { + pub fn repeat(self, error: Option) -> ActiveModel { ActiveModel { internal: sea_orm::ActiveValue::NotSet, job_type: sea_orm::ActiveValue::Set(self.job_type), @@ -52,6 +53,7 @@ impl Model { activity: sea_orm::ActiveValue::Set(self.activity), published: sea_orm::ActiveValue::Set(self.published), attempt: sea_orm::ActiveValue::Set(self.attempt + 1), + error: sea_orm::ActiveValue::Set(error), } } } diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index f4bef2c..1213b4e 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -36,6 +36,7 @@ impl Addresser for crate::Context { published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), attempt: Set(0), + error: Set(None), } ), Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"), diff --git a/upub/migrations/src/lib.rs b/upub/migrations/src/lib.rs index 57dffe4..c4bb8db 100644 --- a/upub/migrations/src/lib.rs +++ b/upub/migrations/src/lib.rs @@ -16,6 +16,7 @@ mod m20240628_000001_add_followers_following_indexes; mod m20240628_000002_add_credentials_activated; mod m20240703_000001_add_audience_index; mod m20240703_000002_add_image_to_objects; +mod m20240706_000001_add_error_to_jobs; pub struct Migrator; @@ -39,6 +40,7 @@ impl MigratorTrait for Migrator { Box::new(m20240628_000002_add_credentials_activated::Migration), Box::new(m20240703_000001_add_audience_index::Migration), Box::new(m20240703_000002_add_image_to_objects::Migration), + Box::new(m20240706_000001_add_error_to_jobs::Migration), ] } } diff --git a/upub/migrations/src/m20240605_000001_add_jobs_table.rs b/upub/migrations/src/m20240605_000001_add_jobs_table.rs index cdbf4b0..b11163b 100644 --- a/upub/migrations/src/m20240605_000001_add_jobs_table.rs +++ b/upub/migrations/src/m20240605_000001_add_jobs_table.rs @@ -14,6 +14,7 @@ pub enum Jobs { Published, NotBefore, Attempt, + Error, // added after } diff --git a/upub/migrations/src/m20240706_000001_add_error_to_jobs.rs b/upub/migrations/src/m20240706_000001_add_error_to_jobs.rs new file mode 100644 index 0000000..8d2f2e9 --- /dev/null +++ b/upub/migrations/src/m20240706_000001_add_error_to_jobs.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +use crate::m20240605_000001_add_jobs_table::Jobs; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Jobs::Table) + .add_column(ColumnDef::new(Jobs::Error).string().null()) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Jobs::Table) + .drop_column(Jobs::Error) + .to_owned() + ) + .await?; + + Ok(()) + } +} diff --git a/upub/routes/src/activitypub/actor/outbox.rs b/upub/routes/src/activitypub/actor/outbox.rs index 2a7c46c..9e9e46b 100644 --- a/upub/routes/src/activitypub/actor/outbox.rs +++ b/upub/routes/src/activitypub/actor/outbox.rs @@ -67,6 +67,7 @@ pub async fn post( not_before: Set(chrono::Utc::now()), attempt: Set(0), payload: Set(Some(activity)), + error: Set(None), }; model::job::Entity::insert(job).exec(ctx.db()).await?; diff --git a/upub/routes/src/activitypub/inbox.rs b/upub/routes/src/activitypub/inbox.rs index bcd8f7b..5db3b63 100644 --- a/upub/routes/src/activitypub/inbox.rs +++ b/upub/routes/src/activitypub/inbox.rs @@ -79,7 +79,8 @@ pub async fn post( payload: Set(Some(activity)), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), - attempt: Set(0) + attempt: Set(0), + error: Set(None), }; upub::model::job::Entity::insert(job).exec(ctx.db()).await?; diff --git a/upub/worker/src/delivery.rs b/upub/worker/src/delivery.rs index 4d5d401..897461b 100644 --- a/upub/worker/src/delivery.rs +++ b/upub/worker/src/delivery.rs @@ -65,7 +65,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult< &job.actor, &key, ctx.domain() ).await { tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target); - model::job::Entity::insert(job.clone().repeat()) + model::job::Entity::insert(job.clone().repeat(Some(e.to_string()))) .exec(ctx.db()) .await?; } diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index 94c95a0..ff01f69 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -156,7 +156,7 @@ impl JobDispatcher for Context { } } tracing::error!("failed processing job '{}': {e}", job.activity); - let active = job.clone().repeat(); + let active = job.clone().repeat(Some(e.to_string())); let mut count = 0; loop { match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {