feat: track instance downtime

so we dont keep attempting to update users, ughhh. receiving or
successfully fetching from an instance clears the "down" status
This commit is contained in:
əlemi 2024-12-27 00:21:17 +01:00
parent a29e16633a
commit ddb12cb6ea
Signed by: alemi
GPG key ID: A4895B84D311642C
10 changed files with 163 additions and 2 deletions

View file

@ -16,6 +16,8 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) ->
if let Some(limit) = limit { if let Some(limit) = limit {
if count >= limit { break } if count >= limit { break }
} }
let server = upub::Context::server(&user.id);
if upub::downtime::get(ctx.db(), &server).await?.is_some() { continue }
match ctx.pull(&user.id).await.and_then(|x| x.actor()) { match ctx.pull(&user.id).await.and_then(|x| x.actor()) {
Err(upub::traits::fetch::RequestError::Fetch(status, msg)) => { Err(upub::traits::fetch::RequestError::Fetch(status, msg)) => {
if status.as_u16() == 410 { if status.as_u16() == 410 {
@ -27,11 +29,19 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) ->
user.delete(ctx.db()).await?; user.delete(ctx.db()).await?;
} }
else { else {
upub::downtime::set(ctx.db(), &server).await?;
tracing::warn!("could not fetch user {}: failed with status {status} -- {msg}", user.id); tracing::warn!("could not fetch user {}: failed with status {status} -- {msg}", user.id);
} }
}, },
Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id), Err(e) => {
upub::downtime::set(ctx.db(), &server).await?;
tracing::warn!("could not fetch user {}: {e}", user.id)
},
Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await { Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await {
Err(e) => {
upub::downtime::set(ctx.db(), &server).await?;
tracing::warn!("failed deserializing user '{}': {e}", user.id)
},
Ok(mut u) => { Ok(mut u) => {
tracing::info!("updating user {}", user.id); tracing::info!("updating user {}", user.id);
u.internal = Unchanged(user.internal); u.internal = Unchanged(user.internal);
@ -39,7 +49,6 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) ->
u.update(ctx.db()).await?; u.update(ctx.db()).await?;
count += 1; count += 1;
}, },
Err(e) => tracing::warn!("failed deserializing user '{}': {e}", user.id),
}, },
} }
} }

45
upub/core/src/downtime.rs Normal file
View file

@ -0,0 +1,45 @@
use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter};
pub async fn get(db: &impl ConnectionTrait, domain: &str) -> Result<Option<chrono::DateTime<chrono::Utc>>, DbErr> {
Ok(
crate::model::downtime::Entity::find()
.filter(crate::model::downtime::Column::Domain.eq(domain))
.one(db)
.await?
.map(|x| x.published)
)
}
pub async fn set(db: &impl ConnectionTrait, domain: &str) -> Result<(), DbErr> {
match crate::model::downtime::Entity::find()
.filter(crate::model::downtime::Column::Domain.eq(domain))
.one(db)
.await?
{
Some(model) => {
let mut active = model.into_active_model();
active.published = sea_orm::ActiveValue::Set(chrono::Utc::now());
active.update(db).await?;
},
None => {
crate::model::downtime::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
domain: sea_orm::ActiveValue::Set(domain.to_string()),
published: sea_orm::ActiveValue::Set(chrono::Utc::now()),
}
.insert(db)
.await?;
},
}
Ok(())
}
pub async fn unset(db: &impl ConnectionTrait, domain: &str) -> Result<(), DbErr> {
crate::model::downtime::Entity::delete_many()
.filter(crate::model::downtime::Column::Domain.eq(domain))
.exec(db)
.await?;
Ok(())
}

View file

@ -15,4 +15,6 @@ pub use selector::Query;
pub use traits::normalize::AP; pub use traits::normalize::AP;
pub mod downtime;
pub const VERSION: &str = env!("CARGO_PKG_VERSION"); pub const VERSION: &str = env!("CARGO_PKG_VERSION");

View file

@ -0,0 +1,29 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "downtimes")]
pub struct Model {
#[sea_orm(primary_key)]
pub internal: i64,
pub domain: String,
pub published: ChronoDateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::instance::Entity",
from = "Column::Domain",
to = "super::instance::Column::Domain",
on_update = "Cascade",
)]
Instances,
}
impl Related<super::instance::Entity> for Entity {
fn to() -> RelationDef {
Relation::Instances.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -25,6 +25,8 @@ pub enum Relation {
Actors, Actors,
#[sea_orm(has_many = "super::addressing::Entity")] #[sea_orm(has_many = "super::addressing::Entity")]
Addressing, Addressing,
#[sea_orm(has_many = "super::downtime::Entity")]
Downtime,
} }
impl Related<super::actor::Entity> for Entity { impl Related<super::actor::Entity> for Entity {
@ -39,6 +41,12 @@ impl Related<super::addressing::Entity> for Entity {
} }
} }
impl Related<super::downtime::Entity> for Entity {
fn to() -> RelationDef {
Relation::Downtime.def()
}
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}
impl Entity { impl Entity {

View file

@ -19,3 +19,5 @@ pub mod dislike;
pub mod hashtag; pub mod hashtag;
pub mod mention; pub mod mention;
pub mod attachment; pub mod attachment;
pub mod downtime;

View file

@ -187,6 +187,8 @@ impl Fetcher for crate::Context {
return self.pull(&doc_id).await; return self.pull(&doc_id).await;
} }
crate::downtime::unset(self.db(), &crate::Context::server(id)).await?;
match document.object_type()? { match document.object_type()? {
apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))), apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))),
apb::ObjectType::Tombstone => Err(RequestError::Tombstone), apb::ObjectType::Tombstone => Err(RequestError::Tombstone),

View file

@ -22,6 +22,7 @@ mod m20240715_000002_add_actors_fields_and_aliases;
mod m20240811_000001_add_full_text_index; mod m20240811_000001_add_full_text_index;
mod m20241226_000001_add_show_likes_collection; mod m20241226_000001_add_show_likes_collection;
mod m20241226_000002_add_like_activities; mod m20241226_000002_add_like_activities;
mod m20241226_000003_create_downtime_table;
pub struct Migrator; pub struct Migrator;
@ -51,6 +52,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240811_000001_add_full_text_index::Migration), Box::new(m20240811_000001_add_full_text_index::Migration),
Box::new(m20241226_000001_add_show_likes_collection::Migration), Box::new(m20241226_000001_add_show_likes_collection::Migration),
Box::new(m20241226_000002_add_like_activities::Migration), Box::new(m20241226_000002_add_like_activities::Migration),
Box::new(m20241226_000003_create_downtime_table::Migration),
] ]
} }
} }

View file

@ -0,0 +1,59 @@
use sea_orm_migration::prelude::*;
use crate::m20240524_000001_create_actor_activity_object_tables::Instances;
#[derive(DeriveIden)]
pub enum Downtimes {
Table,
Internal,
Domain,
Published,
}
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Downtimes::Table)
.comment("tracking remote instances downtimes")
.col(
ColumnDef::new(Downtimes::Internal)
.big_integer()
.not_null()
.auto_increment()
.primary_key()
)
.col(ColumnDef::new(Downtimes::Domain).string().not_null().unique_key())
.foreign_key(
ForeignKey::create()
.name("fkey-downtime-instances")
.from(Downtimes::Table, Downtimes::Domain)
.to(Instances::Table, Instances::Domain)
.on_update(ForeignKeyAction::Cascade)
)
.col(ColumnDef::new(Downtimes::Published).timestamp_with_time_zone().not_null().default(Expr::current_timestamp()))
.to_owned()
)
.await?;
manager
.create_index(Index::create().unique().name("index-downtimes-domain").table(Downtimes::Table).col(Downtimes::Domain).to_owned())
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Downtimes::Table).to_owned())
.await?;
Ok(())
}
}

View file

@ -63,6 +63,7 @@ pub async fn post(
}; };
let aid = activity.id()?.to_string(); let aid = activity.id()?.to_string();
let server = upub::Context::server(&aid);
if activity.actor().id()? != uid { if activity.actor().id()? != uid {
return Err(crate::ApiError::forbidden()); return Err(crate::ApiError::forbidden());
@ -87,5 +88,7 @@ pub async fn post(
upub::model::job::Entity::insert(job).exec(ctx.db()).await?; upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
upub::downtime::unset(ctx.db(), &server).await?;
Ok(StatusCode::ACCEPTED) Ok(StatusCode::ACCEPTED)
} }