From ddb12cb6eab31b0d3ec74c5e2d1108135c6c87e1 Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 27 Dec 2024 00:21:17 +0100 Subject: [PATCH] feat: track instance downtime so we dont keep attempting to update users, ughhh. receiving or successfully fetching from an instance clears the "down" status --- upub/cli/src/update.rs | 13 +++- upub/core/src/downtime.rs | 45 ++++++++++++++ upub/core/src/lib.rs | 2 + upub/core/src/model/downtime.rs | 29 +++++++++ upub/core/src/model/instance.rs | 8 +++ upub/core/src/model/mod.rs | 2 + upub/core/src/traits/fetch.rs | 2 + upub/migrations/src/lib.rs | 2 + .../m20241226_000003_create_downtime_table.rs | 59 +++++++++++++++++++ upub/routes/src/activitypub/inbox.rs | 3 + 10 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 upub/core/src/downtime.rs create mode 100644 upub/core/src/model/downtime.rs create mode 100644 upub/migrations/src/m20241226_000003_create_downtime_table.rs diff --git a/upub/cli/src/update.rs b/upub/cli/src/update.rs index b51b50b..68f082e 100644 --- a/upub/cli/src/update.rs +++ b/upub/cli/src/update.rs @@ -16,6 +16,8 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option) -> if let Some(limit) = limit { if count >= limit { break } } + let server = upub::Context::server(&user.id); + if upub::downtime::get(ctx.db(), &server).await?.is_some() { continue } match ctx.pull(&user.id).await.and_then(|x| x.actor()) { Err(upub::traits::fetch::RequestError::Fetch(status, msg)) => { if status.as_u16() == 410 { @@ -27,11 +29,19 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option) -> user.delete(ctx.db()).await?; } else { + upub::downtime::set(ctx.db(), &server).await?; tracing::warn!("could not fetch user {}: failed with status {status} -- {msg}", user.id); } }, - Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id), + Err(e) => { + upub::downtime::set(ctx.db(), &server).await?; + tracing::warn!("could not fetch user {}: {e}", user.id) + }, Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await { + Err(e) => { + upub::downtime::set(ctx.db(), &server).await?; + tracing::warn!("failed deserializing user '{}': {e}", user.id) + }, Ok(mut u) => { tracing::info!("updating user {}", user.id); u.internal = Unchanged(user.internal); @@ -39,7 +49,6 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option) -> u.update(ctx.db()).await?; count += 1; }, - Err(e) => tracing::warn!("failed deserializing user '{}': {e}", user.id), }, } } diff --git a/upub/core/src/downtime.rs b/upub/core/src/downtime.rs new file mode 100644 index 0000000..c698f75 --- /dev/null +++ b/upub/core/src/downtime.rs @@ -0,0 +1,45 @@ +use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; + + +pub async fn get(db: &impl ConnectionTrait, domain: &str) -> Result>, DbErr> { + Ok( + crate::model::downtime::Entity::find() + .filter(crate::model::downtime::Column::Domain.eq(domain)) + .one(db) + .await? + .map(|x| x.published) + ) +} + +pub async fn set(db: &impl ConnectionTrait, domain: &str) -> Result<(), DbErr> { + match crate::model::downtime::Entity::find() + .filter(crate::model::downtime::Column::Domain.eq(domain)) + .one(db) + .await? + { + Some(model) => { + let mut active = model.into_active_model(); + active.published = sea_orm::ActiveValue::Set(chrono::Utc::now()); + active.update(db).await?; + }, + None => { + crate::model::downtime::ActiveModel { + internal: sea_orm::ActiveValue::NotSet, + domain: sea_orm::ActiveValue::Set(domain.to_string()), + published: sea_orm::ActiveValue::Set(chrono::Utc::now()), + } + .insert(db) + .await?; + }, + } + + Ok(()) +} + +pub async fn unset(db: &impl ConnectionTrait, domain: &str) -> Result<(), DbErr> { + crate::model::downtime::Entity::delete_many() + .filter(crate::model::downtime::Column::Domain.eq(domain)) + .exec(db) + .await?; + Ok(()) +} diff --git a/upub/core/src/lib.rs b/upub/core/src/lib.rs index 476eee6..f55fc01 100644 --- a/upub/core/src/lib.rs +++ b/upub/core/src/lib.rs @@ -15,4 +15,6 @@ pub use selector::Query; pub use traits::normalize::AP; +pub mod downtime; + pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/upub/core/src/model/downtime.rs b/upub/core/src/model/downtime.rs new file mode 100644 index 0000000..af6bc41 --- /dev/null +++ b/upub/core/src/model/downtime.rs @@ -0,0 +1,29 @@ +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "downtimes")] +pub struct Model { + #[sea_orm(primary_key)] + pub internal: i64, + pub domain: String, + pub published: ChronoDateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::instance::Entity", + from = "Column::Domain", + to = "super::instance::Column::Domain", + on_update = "Cascade", + )] + Instances, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Instances.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/upub/core/src/model/instance.rs b/upub/core/src/model/instance.rs index 0ca6ef6..0a3c293 100644 --- a/upub/core/src/model/instance.rs +++ b/upub/core/src/model/instance.rs @@ -25,6 +25,8 @@ pub enum Relation { Actors, #[sea_orm(has_many = "super::addressing::Entity")] Addressing, + #[sea_orm(has_many = "super::downtime::Entity")] + Downtime, } impl Related for Entity { @@ -39,6 +41,12 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Downtime.def() + } +} + impl ActiveModelBehavior for ActiveModel {} impl Entity { diff --git a/upub/core/src/model/mod.rs b/upub/core/src/model/mod.rs index 8eabaa2..8b125e4 100644 --- a/upub/core/src/model/mod.rs +++ b/upub/core/src/model/mod.rs @@ -19,3 +19,5 @@ pub mod dislike; pub mod hashtag; pub mod mention; pub mod attachment; + +pub mod downtime; diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index 026b431..93a9a47 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -187,6 +187,8 @@ impl Fetcher for crate::Context { return self.pull(&doc_id).await; } + crate::downtime::unset(self.db(), &crate::Context::server(id)).await?; + match document.object_type()? { apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))), apb::ObjectType::Tombstone => Err(RequestError::Tombstone), diff --git a/upub/migrations/src/lib.rs b/upub/migrations/src/lib.rs index 3091cd6..17a6488 100644 --- a/upub/migrations/src/lib.rs +++ b/upub/migrations/src/lib.rs @@ -22,6 +22,7 @@ mod m20240715_000002_add_actors_fields_and_aliases; mod m20240811_000001_add_full_text_index; mod m20241226_000001_add_show_likes_collection; mod m20241226_000002_add_like_activities; +mod m20241226_000003_create_downtime_table; pub struct Migrator; @@ -51,6 +52,7 @@ impl MigratorTrait for Migrator { Box::new(m20240811_000001_add_full_text_index::Migration), Box::new(m20241226_000001_add_show_likes_collection::Migration), Box::new(m20241226_000002_add_like_activities::Migration), + Box::new(m20241226_000003_create_downtime_table::Migration), ] } } diff --git a/upub/migrations/src/m20241226_000003_create_downtime_table.rs b/upub/migrations/src/m20241226_000003_create_downtime_table.rs new file mode 100644 index 0000000..215c543 --- /dev/null +++ b/upub/migrations/src/m20241226_000003_create_downtime_table.rs @@ -0,0 +1,59 @@ +use sea_orm_migration::prelude::*; + +use crate::m20240524_000001_create_actor_activity_object_tables::Instances; + +#[derive(DeriveIden)] +pub enum Downtimes { + Table, + Internal, + Domain, + Published, +} + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + + manager + .create_table( + Table::create() + .table(Downtimes::Table) + .comment("tracking remote instances downtimes") + .col( + ColumnDef::new(Downtimes::Internal) + .big_integer() + .not_null() + .auto_increment() + .primary_key() + ) + .col(ColumnDef::new(Downtimes::Domain).string().not_null().unique_key()) + .foreign_key( + ForeignKey::create() + .name("fkey-downtime-instances") + .from(Downtimes::Table, Downtimes::Domain) + .to(Instances::Table, Instances::Domain) + .on_update(ForeignKeyAction::Cascade) + ) + .col(ColumnDef::new(Downtimes::Published).timestamp_with_time_zone().not_null().default(Expr::current_timestamp())) + .to_owned() + ) + .await?; + + manager + .create_index(Index::create().unique().name("index-downtimes-domain").table(Downtimes::Table).col(Downtimes::Domain).to_owned()) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Downtimes::Table).to_owned()) + .await?; + + Ok(()) + } +} diff --git a/upub/routes/src/activitypub/inbox.rs b/upub/routes/src/activitypub/inbox.rs index ae57d0b..6e08f06 100644 --- a/upub/routes/src/activitypub/inbox.rs +++ b/upub/routes/src/activitypub/inbox.rs @@ -63,6 +63,7 @@ pub async fn post( }; let aid = activity.id()?.to_string(); + let server = upub::Context::server(&aid); if activity.actor().id()? != uid { return Err(crate::ApiError::forbidden()); @@ -87,5 +88,7 @@ pub async fn post( upub::model::job::Entity::insert(job).exec(ctx.db()).await?; + upub::downtime::unset(ctx.db(), &server).await?; + Ok(StatusCode::ACCEPTED) }