feat: simple delivery system + http signatures
This commit is contained in:
parent
76c0bd5218
commit
5d7ce0e3c4
8 changed files with 239 additions and 0 deletions
|
@ -25,3 +25,5 @@ jrd = "0.1"
|
||||||
nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" }
|
nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" }
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
sha256 = "1.5.0"
|
sha256 = "1.5.0"
|
||||||
|
openssl = "0.10.64"
|
||||||
|
base64 = "0.22.0"
|
||||||
|
|
114
src/dispatcher.rs
Normal file
114
src/dispatcher.rs
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
use base64::Engine;
|
||||||
|
use openssl::{hash::MessageDigest, pkey::PKey, sign::Signer};
|
||||||
|
use reqwest::header::USER_AGENT;
|
||||||
|
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, SelectColumns};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
use crate::{VERSION, activitypub::{activity::ap_activity, object::ap_object}, activitystream::{object::activity::ActivityMut, Node}, model};
|
||||||
|
|
||||||
|
pub struct Dispatcher;
|
||||||
|
|
||||||
|
impl Dispatcher {
|
||||||
|
pub fn spawn(db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<Result<(), DbErr>> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut nosleep = true;
|
||||||
|
loop {
|
||||||
|
if nosleep { nosleep = false } else {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
|
||||||
|
}
|
||||||
|
let Some(delivery) = model::delivery::Entity::find()
|
||||||
|
.filter(Condition::all().add(model::delivery::Column::NotBefore.lte(chrono::Utc::now())))
|
||||||
|
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
||||||
|
.one(&db)
|
||||||
|
.await?
|
||||||
|
else { continue };
|
||||||
|
|
||||||
|
let del_row = model::delivery::ActiveModel {
|
||||||
|
id: sea_orm::ActiveValue::Set(delivery.id),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let del = model::delivery::Entity::delete(del_row)
|
||||||
|
.exec(&db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if del.rows_affected == 0 {
|
||||||
|
// another worker claimed this delivery
|
||||||
|
nosleep = true;
|
||||||
|
continue; // go back to the top
|
||||||
|
}
|
||||||
|
if delivery.expired() {
|
||||||
|
// try polling for another one
|
||||||
|
nosleep = true;
|
||||||
|
continue; // go back to top
|
||||||
|
}
|
||||||
|
|
||||||
|
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
|
||||||
|
.find_also_related(model::object::Entity)
|
||||||
|
.one(&db)
|
||||||
|
.await? // TODO probably should not fail here and at least re-insert the delivery
|
||||||
|
{
|
||||||
|
Some((activity, Some(object))) => ap_activity(activity).set_object(Node::object(ap_object(object))),
|
||||||
|
Some((activity, None)) => ap_activity(activity),
|
||||||
|
None => {
|
||||||
|
tracing::warn!("skipping dispatch for deleted object {}", delivery.activity);
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(key_pem) = model::user::Entity::find_by_id(&delivery.from)
|
||||||
|
.select_only()
|
||||||
|
.select_column(model::user::Column::PrivateKey)
|
||||||
|
.into_tuple::<String>()
|
||||||
|
.one(&db)
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
tracing::error!("can not dispatch activity for user without private key: {}", delivery.from);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok(key) = PKey::private_key_from_pem(key_pem.as_bytes())
|
||||||
|
else {
|
||||||
|
tracing::error!("failed parsing private key for user {}", delivery.from);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut signer = Signer::new(MessageDigest::sha256(), &key).unwrap();
|
||||||
|
|
||||||
|
let without_protocol = delivery.target.replace("https://", "").replace("http://", "");
|
||||||
|
let host = without_protocol.replace('/', "");
|
||||||
|
let request_target = without_protocol.replace(&host, "");
|
||||||
|
let date = chrono::Utc::now().to_rfc2822();
|
||||||
|
let signed_string = format!("(request-target): post {request_target}\nhost: {host}\ndate: {date}");
|
||||||
|
signer.update(signed_string.as_bytes()).unwrap();
|
||||||
|
let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec().unwrap());
|
||||||
|
let signature_header = format!("keyId=\"{}\",headers=\"(request-target) host date\",signature=\"{signature}\"", delivery.from);
|
||||||
|
|
||||||
|
if let Err(e) = reqwest::Client::new()
|
||||||
|
.post(&delivery.target)
|
||||||
|
.json(&payload)
|
||||||
|
.header("Host", host)
|
||||||
|
.header("Date", date)
|
||||||
|
.header("Signature", signature_header)
|
||||||
|
.header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
||||||
|
let new_delivery = model::delivery::ActiveModel {
|
||||||
|
id: sea_orm::ActiveValue::NotSet,
|
||||||
|
not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()),
|
||||||
|
from: sea_orm::ActiveValue::Set(delivery.from),
|
||||||
|
target: sea_orm::ActiveValue::Set(delivery.target),
|
||||||
|
activity: sea_orm::ActiveValue::Set(delivery.activity),
|
||||||
|
created: sea_orm::ActiveValue::Set(delivery.created),
|
||||||
|
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
||||||
|
};
|
||||||
|
model::delivery::Entity::insert(new_delivery)
|
||||||
|
.exec(&db)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ pub mod server;
|
||||||
pub mod router;
|
pub mod router;
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
pub mod auth;
|
pub mod auth;
|
||||||
|
mod dispatcher;
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use sea_orm::{ConnectOptions, Database, EntityTrait, IntoActiveModel};
|
use sea_orm::{ConnectOptions, Database, EntityTrait, IntoActiveModel};
|
||||||
|
|
66
src/migrations/m20240325_000001_add_deliveries.rs
Normal file
66
src/migrations/m20240325_000001_add_deliveries.rs
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.create_table(
|
||||||
|
Table::create()
|
||||||
|
.table(Deliveries::Table)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Deliveries::Id)
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.auto_increment()
|
||||||
|
.primary_key()
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(Deliveries::Actor).string().not_null())
|
||||||
|
.col(ColumnDef::new(Deliveries::Target).string().not_null())
|
||||||
|
.col(ColumnDef::new(Deliveries::Activity).string().not_null())
|
||||||
|
.col(ColumnDef::new(Deliveries::Created).date_time().not_null())
|
||||||
|
.col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null())
|
||||||
|
.col(ColumnDef::new(Deliveries::Attempt).integer().not_null())
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
manager
|
||||||
|
.create_index(
|
||||||
|
Index::create()
|
||||||
|
.name("deliveries-notbefore-index")
|
||||||
|
.table(Deliveries::Table)
|
||||||
|
.col((Deliveries::NotBefore, IndexOrder::Asc))
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_table(Table::drop().table(Deliveries::Table).to_owned())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
manager
|
||||||
|
.drop_index(Index::drop().name("deliveries-notbefore-index").to_owned())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum Deliveries {
|
||||||
|
Table,
|
||||||
|
Id,
|
||||||
|
Actor,
|
||||||
|
Target,
|
||||||
|
Activity,
|
||||||
|
Created,
|
||||||
|
NotBefore,
|
||||||
|
Attempt,
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ mod m20240322_000003_add_indexes;
|
||||||
mod m20240323_000001_add_user_configs;
|
mod m20240323_000001_add_user_configs;
|
||||||
mod m20240323_000002_add_simple_credentials;
|
mod m20240323_000002_add_simple_credentials;
|
||||||
mod m20240324_000001_add_addressing;
|
mod m20240324_000001_add_addressing;
|
||||||
|
mod m20240325_000001_add_deliveries;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
|
@ -21,6 +22,7 @@ impl MigratorTrait for Migrator {
|
||||||
Box::new(m20240323_000001_add_user_configs::Migration),
|
Box::new(m20240323_000001_add_user_configs::Migration),
|
||||||
Box::new(m20240323_000002_add_simple_credentials::Migration),
|
Box::new(m20240323_000002_add_simple_credentials::Migration),
|
||||||
Box::new(m20240324_000001_add_addressing::Migration),
|
Box::new(m20240324_000001_add_addressing::Migration),
|
||||||
|
Box::new(m20240325_000001_add_deliveries::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,9 @@ pub enum Relation {
|
||||||
|
|
||||||
#[sea_orm(has_many = "super::addressing::Entity")]
|
#[sea_orm(has_many = "super::addressing::Entity")]
|
||||||
Addressing,
|
Addressing,
|
||||||
|
|
||||||
|
#[sea_orm(has_many = "super::delivery::Entity")]
|
||||||
|
Delivery,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Related<super::user::Entity> for Entity {
|
impl Related<super::user::Entity> for Entity {
|
||||||
|
|
50
src/model/delivery.rs
Normal file
50
src/model/delivery.rs
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
use sea_orm::entity::prelude::*;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
|
||||||
|
#[sea_orm(table_name = "deliveries")]
|
||||||
|
pub struct Model {
|
||||||
|
#[sea_orm(primary_key)]
|
||||||
|
pub id: i64,
|
||||||
|
pub actor: String,
|
||||||
|
pub target: String,
|
||||||
|
pub activity: String,
|
||||||
|
pub created: ChronoDateTimeUtc,
|
||||||
|
pub not_before: ChronoDateTimeUtc,
|
||||||
|
pub attempt: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
pub enum Relation {
|
||||||
|
#[sea_orm(
|
||||||
|
belongs_to = "super::activity::Entity",
|
||||||
|
from = "Column::Activity",
|
||||||
|
to = "super::activity::Column::Id"
|
||||||
|
)]
|
||||||
|
Activity,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Related<super::activity::Entity> for Entity {
|
||||||
|
fn to() -> RelationDef {
|
||||||
|
Relation::Activity.def()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActiveModelBehavior for ActiveModel {}
|
||||||
|
|
||||||
|
impl Model {
|
||||||
|
pub fn next_delivery(&self) -> ChronoDateTimeUtc {
|
||||||
|
match self.attempt {
|
||||||
|
0 => chrono::Utc::now() + std::time::Duration::from_secs(10),
|
||||||
|
1 => chrono::Utc::now() + std::time::Duration::from_secs(60),
|
||||||
|
2 => chrono::Utc::now() + std::time::Duration::from_secs(5 * 60),
|
||||||
|
3 => chrono::Utc::now() + std::time::Duration::from_secs(20 * 60),
|
||||||
|
4 => chrono::Utc::now() + std::time::Duration::from_secs(60 * 60),
|
||||||
|
5 => chrono::Utc::now() + std::time::Duration::from_secs(12 * 60 * 60),
|
||||||
|
_ => chrono::Utc::now() + std::time::Duration::from_secs(24 * 60 * 60),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn expired(&self) -> bool {
|
||||||
|
chrono::Utc::now() - self.created > chrono::Duration::days(7)
|
||||||
|
}
|
||||||
|
}
|
|
@ -9,6 +9,7 @@ pub mod share;
|
||||||
pub mod like;
|
pub mod like;
|
||||||
pub mod credential;
|
pub mod credential;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
|
pub mod delivery;
|
||||||
|
|
||||||
pub mod faker;
|
pub mod faker;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue