1
0
Fork 0
forked from alemi/upub

feat: added jobs table

replaces deliveries
This commit is contained in:
əlemi 2024-06-06 02:20:43 +02:00
parent 52f1238052
commit acb9a9add5
Signed by: alemi
GPG key ID: A4895B84D311642C
8 changed files with 183 additions and 57 deletions

View file

@ -33,8 +33,6 @@ pub enum Relation {
Actors, Actors,
#[sea_orm(has_many = "super::addressing::Entity")] #[sea_orm(has_many = "super::addressing::Entity")]
Addressing, Addressing,
#[sea_orm(has_many = "super::delivery::Entity")]
Deliveries,
#[sea_orm( #[sea_orm(
belongs_to = "super::object::Entity", belongs_to = "super::object::Entity",
from = "Column::Object", from = "Column::Object",
@ -57,12 +55,6 @@ impl Related<super::addressing::Entity> for Entity {
} }
} }
impl Related<super::delivery::Entity> for Entity {
fn to() -> RelationDef {
Relation::Deliveries.def()
}
}
impl Related<super::object::Entity> for Entity { impl Related<super::object::Entity> for Entity {
fn to() -> RelationDef { fn to() -> RelationDef {
Relation::Objects.def() Relation::Objects.def()

View file

@ -42,8 +42,6 @@ pub enum Relation {
Configs, Configs,
#[sea_orm(has_many = "super::credential::Entity")] #[sea_orm(has_many = "super::credential::Entity")]
Credentials, Credentials,
#[sea_orm(has_many = "super::delivery::Entity")]
Deliveries,
#[sea_orm( #[sea_orm(
belongs_to = "super::instance::Entity", belongs_to = "super::instance::Entity",
from = "Column::Domain", from = "Column::Domain",
@ -94,12 +92,6 @@ impl Related<super::credential::Entity> for Entity {
} }
} }
impl Related<super::delivery::Entity> for Entity {
fn to() -> RelationDef {
Relation::Deliveries.def()
}
}
impl Related<super::instance::Entity> for Entity { impl Related<super::instance::Entity> for Entity {
fn to() -> RelationDef { fn to() -> RelationDef {
Relation::Instances.def() Relation::Instances.def()

View file

@ -1,54 +1,36 @@
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum JobType {
Inbound = 1,
Outbound = 2,
Local = 3,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "deliveries")] #[sea_orm(table_name = "jobs")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub internal: i64, pub internal: i64,
pub job_type: JobType,
pub actor: String, pub actor: String,
pub target: String, pub target: Option<String>,
#[sea_orm(unique)]
pub activity: String, pub activity: String,
pub payload: Option<String>,
pub published: ChronoDateTimeUtc, pub published: ChronoDateTimeUtc,
pub not_before: ChronoDateTimeUtc, pub not_before: ChronoDateTimeUtc,
pub attempt: i32, pub attempt: i32,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation { pub enum Relation {}
#[sea_orm(
belongs_to = "super::activity::Entity",
from = "Column::Activity",
to = "super::activity::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Activities,
#[sea_orm(
belongs_to = "super::actor::Entity",
from = "Column::Actor",
to = "super::actor::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Actors,
}
impl Related<super::activity::Entity> for Entity {
fn to() -> RelationDef {
Relation::Activities.def()
}
}
impl Related<super::actor::Entity> for Entity {
fn to() -> RelationDef {
Relation::Actors.def()
}
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}
impl Model { impl Model {
pub fn next_delivery(&self) -> ChronoDateTimeUtc { pub fn next_attempt(&self) -> ChronoDateTimeUtc {
match self.attempt { match self.attempt {
0 => chrono::Utc::now() + std::time::Duration::from_secs(10), 0 => chrono::Utc::now() + std::time::Duration::from_secs(10),
1 => chrono::Utc::now() + std::time::Duration::from_secs(60), 1 => chrono::Utc::now() + std::time::Duration::from_secs(60),
@ -63,4 +45,18 @@ impl Model {
pub fn expired(&self) -> bool { pub fn expired(&self) -> bool {
chrono::Utc::now() - self.published > chrono::Duration::days(7) chrono::Utc::now() - self.published > chrono::Duration::days(7)
} }
pub fn repeat(self) -> ActiveModel {
ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
job_type: sea_orm::ActiveValue::Set(self.job_type),
not_before: sea_orm::ActiveValue::Set(self.next_attempt()),
actor: sea_orm::ActiveValue::Set(self.actor),
target: sea_orm::ActiveValue::Set(self.target),
payload: sea_orm::ActiveValue::Set(self.payload),
activity: sea_orm::ActiveValue::Set(self.activity),
published: sea_orm::ActiveValue::Set(self.published),
attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
}
}
} }

View file

@ -6,9 +6,8 @@ pub mod config;
pub mod credential; pub mod credential;
pub mod session; pub mod session;
pub mod addressing;
pub mod instance; pub mod instance;
pub mod delivery; pub mod addressing;
pub mod job; pub mod job;
pub mod relation; pub mod relation;

View file

@ -87,12 +87,14 @@ impl Addresser for crate::Context {
// TODO fetch concurrently // TODO fetch concurrently
match self.fetch_user(target).await { match self.fetch_user(target).await {
Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
crate::model::delivery::ActiveModel { crate::model::job::ActiveModel {
internal: sea_orm::ActiveValue::NotSet, internal: sea_orm::ActiveValue::NotSet,
actor: Set(from.to_string()), actor: Set(from.to_string()),
job_type: Set(crate::model::job::JobType::Outbound),
payload: Set(None),
// TODO we should resolve each user by id and check its inbox because we can't assume // TODO we should resolve each user by id and check its inbox because we can't assume
// it's /actors/{id}/inbox for every software, but oh well it's waaaaay easier now // it's /actors/{id}/inbox for every software, but oh well it's waaaaay easier now
target: Set(inbox), target: Set(Some(inbox)),
activity: Set(aid.to_string()), activity: Set(aid.to_string()),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
@ -105,7 +107,7 @@ impl Addresser for crate::Context {
} }
if !deliveries.is_empty() { if !deliveries.is_empty() {
crate::model::delivery::Entity::insert_many(deliveries) crate::model::job::Entity::insert_many(deliveries)
.exec(self.db()) .exec(self.db())
.await?; .await?;
} }

View file

@ -1,3 +0,0 @@
# migrations
there are sea_orm migrations to apply to your database

View file

@ -6,6 +6,7 @@ mod m20240524_000003_create_users_auth_and_config;
mod m20240524_000004_create_addressing_deliveries; mod m20240524_000004_create_addressing_deliveries;
mod m20240524_000005_create_attachments_tags_mentions; mod m20240524_000005_create_attachments_tags_mentions;
mod m20240529_000001_add_relation_unique_index; mod m20240529_000001_add_relation_unique_index;
mod m20240605_000001_add_jobs_table;
pub struct Migrator; pub struct Migrator;
@ -19,6 +20,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240524_000004_create_addressing_deliveries::Migration), Box::new(m20240524_000004_create_addressing_deliveries::Migration),
Box::new(m20240524_000005_create_attachments_tags_mentions::Migration), Box::new(m20240524_000005_create_attachments_tags_mentions::Migration),
Box::new(m20240529_000001_add_relation_unique_index::Migration), Box::new(m20240529_000001_add_relation_unique_index::Migration),
Box::new(m20240605_000001_add_jobs_table::Migration),
] ]
} }
} }

View file

@ -0,0 +1,146 @@
use sea_orm_migration::prelude::*;
use crate::{m20240524_000001_create_actor_activity_object_tables::{Activities, Actors}, m20240524_000004_create_addressing_deliveries::Deliveries};
#[derive(DeriveIden)]
pub enum Jobs {
Table,
Internal,
JobType,
Actor,
Target,
Activity,
Payload,
Published,
NotBefore,
Attempt,
}
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Deliveries::Table).to_owned())
.await?;
manager
.drop_index(Index::drop().name("index-deliveries-not-before").table(Deliveries::Table).to_owned())
.await?;
manager
.create_table(
Table::create()
.table(Jobs::Table)
.comment("background job queue: delivery, fetch and processing tasks")
.col(
ColumnDef::new(Jobs::Internal)
.big_integer()
.not_null()
.auto_increment()
.primary_key()
)
.col(ColumnDef::new(Jobs::JobType).small_integer().not_null())
.col(ColumnDef::new(Jobs::Actor).string().not_null())
.col(ColumnDef::new(Jobs::Target).string().null())
.col(ColumnDef::new(Jobs::Activity).string().not_null().unique_key())
.col(ColumnDef::new(Jobs::Payload).string().null())
.col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Jobs::Attempt).small_integer().not_null().default(0))
.to_owned()
)
.await?;
manager
.create_index(
Index::create()
.unique()
.name("index-jobs-activity")
.table(Jobs::Table)
.col(Jobs::Activity)
.to_owned()
)
.await?;
manager
.create_index(
Index::create()
.name("index-jobs-not-before")
.table(Jobs::Table)
.col((Jobs::NotBefore, IndexOrder::Asc))
.to_owned()
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Jobs::Table).to_owned())
.await?;
manager
.drop_index(Index::drop().name("index-jobs-activity").table(Jobs::Table).to_owned())
.await?;
manager
.drop_index(Index::drop().name("index-jobs-not-before").table(Jobs::Table).to_owned())
.await?;
manager
.create_table(
Table::create()
.table(Deliveries::Table)
.comment("this table contains all enqueued outgoing delivery jobs")
.col(
ColumnDef::new(Deliveries::Internal)
.big_integer()
.not_null()
.auto_increment()
.primary_key()
)
.col(ColumnDef::new(Deliveries::Actor).string().not_null())
.foreign_key(
ForeignKey::create()
.name("fkey-deliveries-actor")
.from(Deliveries::Table, Deliveries::Actor)
.to(Actors::Table, Actors::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade)
)
.col(ColumnDef::new(Deliveries::Target).string().not_null())
.col(ColumnDef::new(Deliveries::Activity).string().not_null())
.foreign_key(
ForeignKey::create()
.name("fkey-deliveries-activity")
.from(Deliveries::Table, Deliveries::Activity)
.to(Activities::Table, Activities::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade)
)
.col(ColumnDef::new(Deliveries::Published).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Deliveries::Attempt).integer().not_null().default(0))
.to_owned()
)
.await?;
manager
.create_index(
Index::create()
.name("index-deliveries-not-before")
.table(Deliveries::Table)
.col((Deliveries::NotBefore, IndexOrder::Asc))
.to_owned()
)
.await?;
Ok(())
}
}