From 5d7ce0e3c4557f9ff2623a6c211fa41811ee7f54 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 25 Mar 2024 05:07:58 +0100 Subject: [PATCH] feat: simple delivery system + http signatures --- Cargo.toml | 2 + src/dispatcher.rs | 114 ++++++++++++++++++ src/main.rs | 1 + .../m20240325_000001_add_deliveries.rs | 66 ++++++++++ src/migrations/mod.rs | 2 + src/model/activity.rs | 3 + src/model/delivery.rs | 50 ++++++++ src/model/mod.rs | 1 + 8 files changed, 239 insertions(+) create mode 100644 src/dispatcher.rs create mode 100644 src/migrations/m20240325_000001_add_deliveries.rs create mode 100644 src/model/delivery.rs diff --git a/Cargo.toml b/Cargo.toml index fa3cfd70..35e47dac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,3 +25,5 @@ jrd = "0.1" nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" } rand = "0.8.5" sha256 = "1.5.0" +openssl = "0.10.64" +base64 = "0.22.0" diff --git a/src/dispatcher.rs b/src/dispatcher.rs new file mode 100644 index 00000000..8c2495ea --- /dev/null +++ b/src/dispatcher.rs @@ -0,0 +1,114 @@ +use base64::Engine; +use openssl::{hash::MessageDigest, pkey::PKey, sign::Signer}; +use reqwest::header::USER_AGENT; +use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, SelectColumns}; +use tokio::task::JoinHandle; + +use crate::{VERSION, activitypub::{activity::ap_activity, object::ap_object}, activitystream::{object::activity::ActivityMut, Node}, model}; + +pub struct Dispatcher; + +impl Dispatcher { + pub fn spawn(db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle> { + tokio::spawn(async move { + let mut nosleep = true; + loop { + if nosleep { nosleep = false } else { + tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; + } + let Some(delivery) = model::delivery::Entity::find() + .filter(Condition::all().add(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))) + .order_by(model::delivery::Column::NotBefore, Order::Asc) + .one(&db) + .await? + else { continue }; + + let del_row = model::delivery::ActiveModel { + id: sea_orm::ActiveValue::Set(delivery.id), + ..Default::default() + }; + let del = model::delivery::Entity::delete(del_row) + .exec(&db) + .await?; + + if del.rows_affected == 0 { + // another worker claimed this delivery + nosleep = true; + continue; // go back to the top + } + if delivery.expired() { + // try polling for another one + nosleep = true; + continue; // go back to top + } + + let payload = match model::activity::Entity::find_by_id(&delivery.activity) + .find_also_related(model::object::Entity) + .one(&db) + .await? // TODO probably should not fail here and at least re-insert the delivery + { + Some((activity, Some(object))) => ap_activity(activity).set_object(Node::object(ap_object(object))), + Some((activity, None)) => ap_activity(activity), + None => { + tracing::warn!("skipping dispatch for deleted object {}", delivery.activity); + continue; + }, + }; + + let Some(key_pem) = model::user::Entity::find_by_id(&delivery.from) + .select_only() + .select_column(model::user::Column::PrivateKey) + .into_tuple::() + .one(&db) + .await? + else { + tracing::error!("can not dispatch activity for user without private key: {}", delivery.from); + continue; + }; + + let Ok(key) = PKey::private_key_from_pem(key_pem.as_bytes()) + else { + tracing::error!("failed parsing private key for user {}", delivery.from); + continue; + }; + + let mut signer = Signer::new(MessageDigest::sha256(), &key).unwrap(); + + let without_protocol = delivery.target.replace("https://", "").replace("http://", ""); + let host = without_protocol.replace('/', ""); + let request_target = without_protocol.replace(&host, ""); + let date = chrono::Utc::now().to_rfc2822(); + let signed_string = format!("(request-target): post {request_target}\nhost: {host}\ndate: {date}"); + signer.update(signed_string.as_bytes()).unwrap(); + let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec().unwrap()); + let signature_header = format!("keyId=\"{}\",headers=\"(request-target) host date\",signature=\"{signature}\"", delivery.from); + + if let Err(e) = reqwest::Client::new() + .post(&delivery.target) + .json(&payload) + .header("Host", host) + .header("Date", date) + .header("Signature", signature_header) + .header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email + .send() + .await + { + tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); + let new_delivery = model::delivery::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()), + from: sea_orm::ActiveValue::Set(delivery.from), + target: sea_orm::ActiveValue::Set(delivery.target), + activity: sea_orm::ActiveValue::Set(delivery.activity), + created: sea_orm::ActiveValue::Set(delivery.created), + attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1), + }; + model::delivery::Entity::insert(new_delivery) + .exec(&db) + .await?; + } + } + }) + } +} + diff --git a/src/main.rs b/src/main.rs index 478ebb43..805b1457 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ pub mod server; pub mod router; pub mod errors; pub mod auth; +mod dispatcher; use clap::{Parser, Subcommand}; use sea_orm::{ConnectOptions, Database, EntityTrait, IntoActiveModel}; diff --git a/src/migrations/m20240325_000001_add_deliveries.rs b/src/migrations/m20240325_000001_add_deliveries.rs new file mode 100644 index 00000000..a2b9cc18 --- /dev/null +++ b/src/migrations/m20240325_000001_add_deliveries.rs @@ -0,0 +1,66 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Deliveries::Table) + .col( + ColumnDef::new(Deliveries::Id) + .integer() + .not_null() + .auto_increment() + .primary_key() + ) + .col(ColumnDef::new(Deliveries::Actor).string().not_null()) + .col(ColumnDef::new(Deliveries::Target).string().not_null()) + .col(ColumnDef::new(Deliveries::Activity).string().not_null()) + .col(ColumnDef::new(Deliveries::Created).date_time().not_null()) + .col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null()) + .col(ColumnDef::new(Deliveries::Attempt).integer().not_null()) + .to_owned() + ) + .await?; + + manager + .create_index( + Index::create() + .name("deliveries-notbefore-index") + .table(Deliveries::Table) + .col((Deliveries::NotBefore, IndexOrder::Asc)) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Deliveries::Table).to_owned()) + .await?; + + manager + .drop_index(Index::drop().name("deliveries-notbefore-index").to_owned()) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Deliveries { + Table, + Id, + Actor, + Target, + Activity, + Created, + NotBefore, + Attempt, +} diff --git a/src/migrations/mod.rs b/src/migrations/mod.rs index 4bec1c0b..5afb5337 100644 --- a/src/migrations/mod.rs +++ b/src/migrations/mod.rs @@ -7,6 +7,7 @@ mod m20240322_000003_add_indexes; mod m20240323_000001_add_user_configs; mod m20240323_000002_add_simple_credentials; mod m20240324_000001_add_addressing; +mod m20240325_000001_add_deliveries; pub struct Migrator; @@ -21,6 +22,7 @@ impl MigratorTrait for Migrator { Box::new(m20240323_000001_add_user_configs::Migration), Box::new(m20240323_000002_add_simple_credentials::Migration), Box::new(m20240324_000001_add_addressing::Migration), + Box::new(m20240325_000001_add_deliveries::Migration), ] } } diff --git a/src/model/activity.rs b/src/model/activity.rs index 7a4a2caa..cd778b16 100644 --- a/src/model/activity.rs +++ b/src/model/activity.rs @@ -59,6 +59,9 @@ pub enum Relation { #[sea_orm(has_many = "super::addressing::Entity")] Addressing, + + #[sea_orm(has_many = "super::delivery::Entity")] + Delivery, } impl Related for Entity { diff --git a/src/model/delivery.rs b/src/model/delivery.rs new file mode 100644 index 00000000..ef712619 --- /dev/null +++ b/src/model/delivery.rs @@ -0,0 +1,50 @@ +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "deliveries")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i64, + pub actor: String, + pub target: String, + pub activity: String, + pub created: ChronoDateTimeUtc, + pub not_before: ChronoDateTimeUtc, + pub attempt: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::activity::Entity", + from = "Column::Activity", + to = "super::activity::Column::Id" + )] + Activity, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Activity.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + +impl Model { + pub fn next_delivery(&self) -> ChronoDateTimeUtc { + match self.attempt { + 0 => chrono::Utc::now() + std::time::Duration::from_secs(10), + 1 => chrono::Utc::now() + std::time::Duration::from_secs(60), + 2 => chrono::Utc::now() + std::time::Duration::from_secs(5 * 60), + 3 => chrono::Utc::now() + std::time::Duration::from_secs(20 * 60), + 4 => chrono::Utc::now() + std::time::Duration::from_secs(60 * 60), + 5 => chrono::Utc::now() + std::time::Duration::from_secs(12 * 60 * 60), + _ => chrono::Utc::now() + std::time::Duration::from_secs(24 * 60 * 60), + } + } + + pub fn expired(&self) -> bool { + chrono::Utc::now() - self.created > chrono::Duration::days(7) + } +} diff --git a/src/model/mod.rs b/src/model/mod.rs index 2313225f..7dad667d 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -9,6 +9,7 @@ pub mod share; pub mod like; pub mod credential; pub mod session; +pub mod delivery; pub mod faker;