feat: added error field to jobs

basically just for manual inspection, maybe we could drop this again
once upub is much more stable?
This commit is contained in:
əlemi 2024-07-06 06:03:26 +02:00
parent 90f483a0ba
commit 9adeff6fbf
Signed by: alemi
GPG key ID: A4895B84D311642C
11 changed files with 53 additions and 4 deletions

View file

@ -81,6 +81,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
payload: Set(Some(undo_activity)),
error: Set(None),
};
tracing::info!("undoing {}", activity.id);
@ -121,6 +122,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
payload: Set(Some(undo_activity)),
error: Set(None),
};
tracing::info!("deleting {}", object.id);

View file

@ -84,6 +84,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
error: Set(None),
};
tracing::info!("following relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
@ -119,6 +120,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
error: Set(None),
};
tracing::info!("accepting relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
@ -155,6 +157,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
error: Set(None),
};
tracing::info!("unfollowing relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
@ -190,6 +193,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
error: Set(None),
};
tracing::info!("unfollowing relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -21,6 +21,7 @@ pub struct Model {
pub published: ChronoDateTimeUtc,
pub not_before: ChronoDateTimeUtc,
pub attempt: i16,
pub error: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -41,7 +42,7 @@ impl Model {
}
}
pub fn repeat(self) -> ActiveModel {
pub fn repeat(self, error: Option<String>) -> ActiveModel {
ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
job_type: sea_orm::ActiveValue::Set(self.job_type),
@ -52,6 +53,7 @@ impl Model {
activity: sea_orm::ActiveValue::Set(self.activity),
published: sea_orm::ActiveValue::Set(self.published),
attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
error: sea_orm::ActiveValue::Set(error),
}
}
}

View file

@ -36,6 +36,7 @@ impl Addresser for crate::Context {
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
error: Set(None),
}
),
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),

View file

@ -16,6 +16,7 @@ mod m20240628_000001_add_followers_following_indexes;
mod m20240628_000002_add_credentials_activated;
mod m20240703_000001_add_audience_index;
mod m20240703_000002_add_image_to_objects;
mod m20240706_000001_add_error_to_jobs;
pub struct Migrator;
@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240628_000002_add_credentials_activated::Migration),
Box::new(m20240703_000001_add_audience_index::Migration),
Box::new(m20240703_000002_add_image_to_objects::Migration),
Box::new(m20240706_000001_add_error_to_jobs::Migration),
]
}
}

View file

@ -14,6 +14,7 @@ pub enum Jobs {
Published,
NotBefore,
Attempt,
Error, // added after
}

View file

@ -0,0 +1,35 @@
use sea_orm_migration::prelude::*;
use crate::m20240605_000001_add_jobs_table::Jobs;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Jobs::Table)
.add_column(ColumnDef::new(Jobs::Error).string().null())
.to_owned()
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Jobs::Table)
.drop_column(Jobs::Error)
.to_owned()
)
.await?;
Ok(())
}
}

View file

@ -67,6 +67,7 @@ pub async fn post(
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
payload: Set(Some(activity)),
error: Set(None),
};
model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -79,7 +79,8 @@ pub async fn post(
payload: Set(Some(activity)),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0)
attempt: Set(0),
error: Set(None),
};
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -65,7 +65,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
&job.actor, &key, ctx.domain()
).await {
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
model::job::Entity::insert(job.clone().repeat())
model::job::Entity::insert(job.clone().repeat(Some(e.to_string())))
.exec(ctx.db())
.await?;
}

View file

@ -156,7 +156,7 @@ impl JobDispatcher for Context {
}
}
tracing::error!("failed processing job '{}': {e}", job.activity);
let active = job.clone().repeat();
let active = job.clone().repeat(Some(e.to_string()));
let mut count = 0;
loop {
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {