diff --git a/upub/core/src/model/activity.rs b/upub/core/src/model/activity.rs index 440df53..5d773b4 100644 --- a/upub/core/src/model/activity.rs +++ b/upub/core/src/model/activity.rs @@ -33,8 +33,6 @@ pub enum Relation { Actors, #[sea_orm(has_many = "super::addressing::Entity")] Addressing, - #[sea_orm(has_many = "super::delivery::Entity")] - Deliveries, #[sea_orm( belongs_to = "super::object::Entity", from = "Column::Object", @@ -57,12 +55,6 @@ impl Related for Entity { } } -impl Related for Entity { - fn to() -> RelationDef { - Relation::Deliveries.def() - } -} - impl Related for Entity { fn to() -> RelationDef { Relation::Objects.def() diff --git a/upub/core/src/model/actor.rs b/upub/core/src/model/actor.rs index 33816ff..bf7692d 100644 --- a/upub/core/src/model/actor.rs +++ b/upub/core/src/model/actor.rs @@ -42,8 +42,6 @@ pub enum Relation { Configs, #[sea_orm(has_many = "super::credential::Entity")] Credentials, - #[sea_orm(has_many = "super::delivery::Entity")] - Deliveries, #[sea_orm( belongs_to = "super::instance::Entity", from = "Column::Domain", @@ -94,12 +92,6 @@ impl Related for Entity { } } -impl Related for Entity { - fn to() -> RelationDef { - Relation::Deliveries.def() - } -} - impl Related for Entity { fn to() -> RelationDef { Relation::Instances.def() diff --git a/upub/core/src/model/delivery.rs b/upub/core/src/model/job.rs similarity index 52% rename from upub/core/src/model/delivery.rs rename to upub/core/src/model/job.rs index b767f67..ac54d9b 100644 --- a/upub/core/src/model/delivery.rs +++ b/upub/core/src/model/job.rs @@ -1,54 +1,36 @@ use sea_orm::entity::prelude::*; +#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "i32", db_type = "Integer")] +pub enum JobType { + Inbound = 1, + Outbound = 2, + Local = 3, +} + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] -#[sea_orm(table_name = "deliveries")] +#[sea_orm(table_name = "jobs")] pub struct Model { #[sea_orm(primary_key)] pub internal: i64, + pub job_type: JobType, pub actor: String, - pub target: String, + pub target: Option, + #[sea_orm(unique)] pub activity: String, + pub payload: Option, pub published: ChronoDateTimeUtc, pub not_before: ChronoDateTimeUtc, pub attempt: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm( - belongs_to = "super::activity::Entity", - from = "Column::Activity", - to = "super::activity::Column::Id", - on_update = "Cascade", - on_delete = "Cascade" - )] - Activities, - #[sea_orm( - belongs_to = "super::actor::Entity", - from = "Column::Actor", - to = "super::actor::Column::Id", - on_update = "Cascade", - on_delete = "Cascade" - )] - Actors, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Activities.def() - } -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Actors.def() - } -} +pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} impl Model { - pub fn next_delivery(&self) -> ChronoDateTimeUtc { + pub fn next_attempt(&self) -> ChronoDateTimeUtc { match self.attempt { 0 => chrono::Utc::now() + std::time::Duration::from_secs(10), 1 => chrono::Utc::now() + std::time::Duration::from_secs(60), @@ -63,4 +45,18 @@ impl Model { pub fn expired(&self) -> bool { chrono::Utc::now() - self.published > chrono::Duration::days(7) } + + pub fn repeat(self) -> ActiveModel { + ActiveModel { + internal: sea_orm::ActiveValue::NotSet, + job_type: sea_orm::ActiveValue::Set(self.job_type), + not_before: sea_orm::ActiveValue::Set(self.next_attempt()), + actor: sea_orm::ActiveValue::Set(self.actor), + target: sea_orm::ActiveValue::Set(self.target), + payload: sea_orm::ActiveValue::Set(self.payload), + activity: sea_orm::ActiveValue::Set(self.activity), + published: sea_orm::ActiveValue::Set(self.published), + attempt: sea_orm::ActiveValue::Set(self.attempt + 1), + } + } } diff --git a/upub/core/src/model/mod.rs b/upub/core/src/model/mod.rs index 2315231..1f97087 100644 --- a/upub/core/src/model/mod.rs +++ b/upub/core/src/model/mod.rs @@ -6,9 +6,8 @@ pub mod config; pub mod credential; pub mod session; -pub mod addressing; pub mod instance; -pub mod delivery; +pub mod addressing; pub mod job; pub mod relation; diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index 30ac338..68dd801 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -87,12 +87,14 @@ impl Addresser for crate::Context { // TODO fetch concurrently match self.fetch_user(target).await { Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( - crate::model::delivery::ActiveModel { + crate::model::job::ActiveModel { internal: sea_orm::ActiveValue::NotSet, actor: Set(from.to_string()), + job_type: Set(crate::model::job::JobType::Outbound), + payload: Set(None), // TODO we should resolve each user by id and check its inbox because we can't assume // it's /actors/{id}/inbox for every software, but oh well it's waaaaay easier now - target: Set(inbox), + target: Set(Some(inbox)), activity: Set(aid.to_string()), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), @@ -105,7 +107,7 @@ impl Addresser for crate::Context { } if !deliveries.is_empty() { - crate::model::delivery::Entity::insert_many(deliveries) + crate::model::job::Entity::insert_many(deliveries) .exec(self.db()) .await?; } diff --git a/upub/migrations/src/README.md b/upub/migrations/src/README.md deleted file mode 100644 index a76d0fe..0000000 --- a/upub/migrations/src/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# migrations - -there are sea_orm migrations to apply to your database diff --git a/upub/migrations/src/lib.rs b/upub/migrations/src/lib.rs index 698b3e0..0465e1b 100644 --- a/upub/migrations/src/lib.rs +++ b/upub/migrations/src/lib.rs @@ -6,6 +6,7 @@ mod m20240524_000003_create_users_auth_and_config; mod m20240524_000004_create_addressing_deliveries; mod m20240524_000005_create_attachments_tags_mentions; mod m20240529_000001_add_relation_unique_index; +mod m20240605_000001_add_jobs_table; pub struct Migrator; @@ -19,6 +20,7 @@ impl MigratorTrait for Migrator { Box::new(m20240524_000004_create_addressing_deliveries::Migration), Box::new(m20240524_000005_create_attachments_tags_mentions::Migration), Box::new(m20240529_000001_add_relation_unique_index::Migration), + Box::new(m20240605_000001_add_jobs_table::Migration), ] } } diff --git a/upub/migrations/src/m20240605_000001_add_jobs_table.rs b/upub/migrations/src/m20240605_000001_add_jobs_table.rs new file mode 100644 index 0000000..c5d7c7f --- /dev/null +++ b/upub/migrations/src/m20240605_000001_add_jobs_table.rs @@ -0,0 +1,146 @@ +use sea_orm_migration::prelude::*; + +use crate::{m20240524_000001_create_actor_activity_object_tables::{Activities, Actors}, m20240524_000004_create_addressing_deliveries::Deliveries}; + +#[derive(DeriveIden)] +pub enum Jobs { + Table, + Internal, + JobType, + Actor, + Target, + Activity, + Payload, + Published, + NotBefore, + Attempt, +} + + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + + manager + .drop_table(Table::drop().table(Deliveries::Table).to_owned()) + .await?; + + manager + .drop_index(Index::drop().name("index-deliveries-not-before").table(Deliveries::Table).to_owned()) + .await?; + + manager + .create_table( + Table::create() + .table(Jobs::Table) + .comment("background job queue: delivery, fetch and processing tasks") + .col( + ColumnDef::new(Jobs::Internal) + .big_integer() + .not_null() + .auto_increment() + .primary_key() + ) + .col(ColumnDef::new(Jobs::JobType).small_integer().not_null()) + .col(ColumnDef::new(Jobs::Actor).string().not_null()) + .col(ColumnDef::new(Jobs::Target).string().null()) + .col(ColumnDef::new(Jobs::Activity).string().not_null().unique_key()) + .col(ColumnDef::new(Jobs::Payload).string().null()) + .col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Jobs::Attempt).small_integer().not_null().default(0)) + .to_owned() + ) + .await?; + + manager + .create_index( + Index::create() + .unique() + .name("index-jobs-activity") + .table(Jobs::Table) + .col(Jobs::Activity) + .to_owned() + ) + .await?; + + manager + .create_index( + Index::create() + .name("index-jobs-not-before") + .table(Jobs::Table) + .col((Jobs::NotBefore, IndexOrder::Asc)) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Jobs::Table).to_owned()) + .await?; + + manager + .drop_index(Index::drop().name("index-jobs-activity").table(Jobs::Table).to_owned()) + .await?; + + manager + .drop_index(Index::drop().name("index-jobs-not-before").table(Jobs::Table).to_owned()) + .await?; + + manager + .create_table( + Table::create() + .table(Deliveries::Table) + .comment("this table contains all enqueued outgoing delivery jobs") + .col( + ColumnDef::new(Deliveries::Internal) + .big_integer() + .not_null() + .auto_increment() + .primary_key() + ) + .col(ColumnDef::new(Deliveries::Actor).string().not_null()) + .foreign_key( + ForeignKey::create() + .name("fkey-deliveries-actor") + .from(Deliveries::Table, Deliveries::Actor) + .to(Actors::Table, Actors::Id) + .on_update(ForeignKeyAction::Cascade) + .on_delete(ForeignKeyAction::Cascade) + ) + .col(ColumnDef::new(Deliveries::Target).string().not_null()) + .col(ColumnDef::new(Deliveries::Activity).string().not_null()) + .foreign_key( + ForeignKey::create() + .name("fkey-deliveries-activity") + .from(Deliveries::Table, Deliveries::Activity) + .to(Activities::Table, Activities::Id) + .on_update(ForeignKeyAction::Cascade) + .on_delete(ForeignKeyAction::Cascade) + ) + .col(ColumnDef::new(Deliveries::Published).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Deliveries::Attempt).integer().not_null().default(0)) + .to_owned() + ) + .await?; + + manager + .create_index( + Index::create() + .name("index-deliveries-not-before") + .table(Deliveries::Table) + .col((Deliveries::NotBefore, IndexOrder::Asc)) + .to_owned() + ) + .await?; + + Ok(()) + } +}