upub/src/server/dispatcher.rs

172 lines
5.9 KiB
Rust
Raw Normal View History

use base64::Engine;
use openssl::{hash::MessageDigest, pkey::{PKey, Private}, sign::Signer};
2024-03-26 03:41:17 +01:00
use reqwest::header::{CONTENT_TYPE, USER_AGENT};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder};
use tokio::{sync::broadcast, task::JoinHandle};
use apb::{ActivityMut, Node};
use crate::{routes::activitypub::{activity::ap_activity, object::ap_object}, errors::UpubError, model, server::Context, VERSION};
pub struct Dispatcher {
waker: broadcast::Sender<()>,
}
impl Default for Dispatcher {
fn default() -> Self {
let (waker, _) = broadcast::channel(1);
Dispatcher { waker }
}
}
impl Dispatcher {
pub fn new() -> Self { Dispatcher::default() }
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
let waker = self.waker.subscribe();
tokio::spawn(async move {
if let Err(e) = worker(db, domain, poll_interval, waker).await {
2024-03-26 03:11:59 +01:00
tracing::error!("delivery worker exited with error: {e}");
}
})
}
pub fn wakeup(&self) {
match self.waker.send(()) {
Err(_) => tracing::error!("no worker to wakeup"),
Ok(n) => tracing::debug!("woken {n} workers"),
}
}
2024-03-26 03:11:59 +01:00
}
async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut waker: broadcast::Receiver<()>) -> Result<(), UpubError> {
2024-03-26 03:11:59 +01:00
loop {
let Some(delivery) = model::delivery::Entity::find()
.filter(Condition::all().add(model::delivery::Column::NotBefore.lte(chrono::Utc::now())))
.order_by(model::delivery::Column::NotBefore, Order::Asc)
.one(&db)
.await?
2024-03-26 23:53:44 +01:00
else {
tokio::select! {
biased;
_ = waker.recv() => {},
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
}
2024-03-26 23:53:44 +01:00
continue
};
2024-03-26 03:11:59 +01:00
let del_row = model::delivery::ActiveModel {
id: sea_orm::ActiveValue::Set(delivery.id),
..Default::default()
};
let del = model::delivery::Entity::delete(del_row)
.exec(&db)
.await?;
2024-03-26 03:11:59 +01:00
if del.rows_affected == 0 {
// another worker claimed this delivery
continue; // go back to the top
}
if delivery.expired() {
// try polling for another one
continue; // go back to top
}
2024-03-26 03:11:59 +01:00
tracing::info!("delivering {} to {}", delivery.activity, delivery.target);
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
2024-03-26 03:11:59 +01:00
.find_also_related(model::object::Entity)
.one(&db)
.await? // TODO probably should not fail here and at least re-insert the delivery
{
Some((activity, Some(object))) => ap_activity(activity).set_object(Node::object(ap_object(object))),
Some((activity, None)) => ap_activity(activity),
None => {
tracing::warn!("skipping dispatch for deleted object {}", delivery.activity);
continue;
},
};
2024-03-26 03:21:00 +01:00
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
.one(&db).await?
2024-03-26 03:11:59 +01:00
else {
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
continue;
};
2024-03-26 03:21:00 +01:00
let Ok(key) = PKey::private_key_from_pem(key.as_bytes())
2024-03-26 03:11:59 +01:00
else {
tracing::error!("failed parsing private key for user {}", delivery.actor);
continue;
};
if let Err(e) = deliver(&key, &delivery.target, &delivery.actor, payload, &domain).await {
2024-03-26 03:11:59 +01:00
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
let new_delivery = model::delivery::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()),
actor: sea_orm::ActiveValue::Set(delivery.actor),
target: sea_orm::ActiveValue::Set(delivery.target),
activity: sea_orm::ActiveValue::Set(delivery.activity),
created: sea_orm::ActiveValue::Set(delivery.created),
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
};
model::delivery::Entity::insert(new_delivery).exec(&db).await?;
}
}
}
async fn deliver(key: &PKey<Private>, to: &str, from: &str, payload: serde_json::Value, domain: &str) -> Result<(), UpubError> {
let payload = serde_json::to_string(&payload).unwrap();
2024-03-27 02:11:14 +01:00
let digest = format!("sha-256={}", base64::prelude::BASE64_STANDARD.encode(openssl::sha::sha256(payload.as_bytes())));
let host = Context::server(to);
2024-03-27 02:08:50 +01:00
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
let path = to.replace("https://", "").replace("http://", "").replace(&host, "");
// let headers : BTreeMap<String, String> = [
// ("Host".to_string(), host.clone()),
// ("Date".to_string(), date.clone()),
// ("Digest".to_string(), digest.clone()),
// ].into();
2024-03-26 21:24:10 +01:00
// let signature_header = Config::new()
// .dont_use_created_field()
// .require_header("host")
// .require_header("date")
// .require_header("digest")
// .begin_sign("POST", &path, headers)
// .unwrap()
// .sign(format!("{from}#main-key"), |to_sign| {
// tracing::info!("signing '{to_sign}'");
// let mut signer = Signer::new(MessageDigest::sha256(), key)?;
// signer.update(to_sign.as_bytes())?;
// let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec()?);
// Ok(signature) as Result<_, UpubError>
// })
// .unwrap()
// .signature_header();
let signature_header = {
let to_sign = format!("(request-target): post {path}\nhost: {host}\ndate: {date}\ndigest: {digest}");
2024-03-26 21:24:10 +01:00
let mut signer = Signer::new(MessageDigest::sha256(), key)?;
signer.update(to_sign.as_bytes())?;
2024-03-27 02:11:14 +01:00
let signature = base64::prelude::BASE64_STANDARD.encode(signer.sign_to_vec()?);
2024-03-26 23:42:22 +01:00
format!("keyId=\"{from}#main-key\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{signature}\"")
2024-03-26 21:24:10 +01:00
};
reqwest::Client::new()
.post(to)
.header("Host", host)
2024-03-26 02:50:58 +01:00
.header("Date", date)
.header("Digest", digest)
2024-03-26 02:50:58 +01:00
.header("Signature", signature_header)
2024-03-26 21:02:07 +01:00
.header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
2024-03-26 02:50:58 +01:00
.header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email
.body(payload)
2024-03-26 02:50:58 +01:00
.send()
.await?
.error_for_status()?;
Ok(())
2024-03-26 02:50:58 +01:00
}