From 5863bdf04e9281cdd3c99e8ce8c664324ce8880a Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 13 Apr 2024 00:44:53 +0200 Subject: [PATCH] feat: signed fetches, improved signatures code should make sure that it still works --- src/server/context.rs | 2 +- src/server/dispatcher.rs | 68 +++------------------- src/server/fetcher.rs | 119 ++++++++++++++++++++++++++++++++++----- 3 files changed, 114 insertions(+), 75 deletions(-) diff --git a/src/server/context.rs b/src/server/context.rs index fba35269..6b0f59c7 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -40,7 +40,7 @@ impl Context { if domain.starts_with("http") { domain = domain.replace("https://", "").replace("http://", ""); } - let dispatcher = Dispatcher::new(); + let dispatcher = Dispatcher::default(); for _ in 0..1 { // TODO customize delivery workers amount dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! } diff --git a/src/server/dispatcher.rs b/src/server/dispatcher.rs index a926f945..29241f59 100644 --- a/src/server/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -1,11 +1,10 @@ -use base64::Engine; -use openssl::{hash::MessageDigest, pkey::{PKey, Private}, sign::Signer}; -use reqwest::header::{CONTENT_TYPE, USER_AGENT}; +use openssl::pkey::PKey; +use reqwest::Method; use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder}; use tokio::{sync::broadcast, task::JoinHandle}; use apb::{ActivityMut, Node}; -use crate::{routes::activitypub::{activity::ap_activity, object::ap_object}, errors::UpubError, model, server::Context, VERSION}; +use crate::{errors::UpubError, model, routes::activitypub::{activity::ap_activity, object::ap_object}, server::fetcher::Fetcher}; pub struct Dispatcher { waker: broadcast::Sender<()>, @@ -99,7 +98,11 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut continue; }; - if let Err(e) = deliver(&key, &delivery.target, &delivery.actor, payload, &domain).await { + if let Err(e) = Fetcher::request::<()>( + Method::POST, &delivery.target, + Some(&serde_json::to_string(&payload).unwrap()), + &delivery.actor, &key, &domain + ).await { tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); let new_delivery = model::delivery::ActiveModel { id: sea_orm::ActiveValue::NotSet, @@ -114,58 +117,3 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut } } } - -async fn deliver(key: &PKey, to: &str, from: &str, payload: serde_json::Value, domain: &str) -> Result<(), UpubError> { - let payload = serde_json::to_string(&payload).unwrap(); - let digest = format!("sha-256={}", base64::prelude::BASE64_STANDARD.encode(openssl::sha::sha256(payload.as_bytes()))); - let host = Context::server(to); - let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT" - let path = to.replace("https://", "").replace("http://", "").replace(&host, ""); - - // let headers : BTreeMap = [ - // ("Host".to_string(), host.clone()), - // ("Date".to_string(), date.clone()), - // ("Digest".to_string(), digest.clone()), - // ].into(); - - // let signature_header = Config::new() - // .dont_use_created_field() - // .require_header("host") - // .require_header("date") - // .require_header("digest") - // .begin_sign("POST", &path, headers) - // .unwrap() - // .sign(format!("{from}#main-key"), |to_sign| { - // tracing::info!("signing '{to_sign}'"); - // let mut signer = Signer::new(MessageDigest::sha256(), key)?; - // signer.update(to_sign.as_bytes())?; - // let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec()?); - // Ok(signature) as Result<_, UpubError> - // }) - // .unwrap() - // .signature_header(); - - let signature_header = { - let to_sign = format!("(request-target): post {path}\nhost: {host}\ndate: {date}\ndigest: {digest}"); - let mut signer = Signer::new(MessageDigest::sha256(), key)?; - signer.update(to_sign.as_bytes())?; - let signature = base64::prelude::BASE64_STANDARD.encode(signer.sign_to_vec()?); - format!("keyId=\"{from}#main-key\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{signature}\"") - }; - - reqwest::Client::new() - .post(to) - .header("Host", host) - .header("Date", date) - .header("Digest", digest) - .header("Signature", signature_header) - .header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"") - .header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email - .body(payload) - .send() - .await? - .error_for_status()?; - - Ok(()) -} - diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 2ab6fe5e..67deb4ce 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -1,9 +1,15 @@ -use openssl::pkey::{PKey, Private}; -use reqwest::header::USER_AGENT; +use std::collections::BTreeMap; + +use base64::Engine; +use http_signature_normalization::Config; +use openssl::{hash::MessageDigest, pkey::{PKey, Private}, sign::Signer}; +use reqwest::{header::{CONTENT_TYPE, USER_AGENT}, Method}; use sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel}; use crate::{VERSION, model}; +use super::Context; + #[derive(Debug, thiserror::Error)] pub enum FetchError { @@ -19,13 +25,73 @@ pub enum FetchError { pub struct Fetcher { db: DatabaseConnection, - _key: PKey, // TODO store pre-parsed + key: PKey, // TODO store pre-parsed domain: String, // TODO merge directly with Context so we don't need to copy this } impl Fetcher { pub fn new(db: DatabaseConnection, domain: String, key: String) -> Self { - Fetcher { db, domain, _key: PKey::private_key_from_pem(key.as_bytes()).unwrap() } + Fetcher { db, domain, key: PKey::private_key_from_pem(key.as_bytes()).unwrap() } + } + + pub async fn request( + method: reqwest::Method, + url: &str, + payload: Option<&str>, + from: &str, + key: &PKey, + domain: &str, + ) -> reqwest::Result { + let host = Context::server(url); + let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT" + let path = url.replace("https://", "").replace("http://", "").replace(&host, ""); + let mut headers : BTreeMap = [ + ("Host".to_string(), host.clone()), + ("Date".to_string(), date.clone()), + ].into(); + + let mut client = + reqwest::Client::new() + .request(method, url) + .header("Host", host) + .header("Date", date); + + let mut signature_cfg = Config::new(); + + if let Some(payload) = payload { + let digest = format!("sha-256={}", base64::prelude::BASE64_STANDARD.encode(openssl::sha::sha256(payload.as_bytes()))); + headers.insert("Digest".to_string(), digest.clone()); + signature_cfg = signature_cfg.require_header("digest"); + client = client + .header("Digest", digest) + .body(payload.to_string()); + } + + let signature_header = signature_cfg + .dont_use_created_field() + .require_header("host") + .require_header("date") + .begin_sign("POST", &path, headers) + .unwrap() + .sign(format!("{from}#main-key"), |to_sign| { + tracing::info!("signing '{to_sign}'"); + let mut signer = Signer::new(MessageDigest::sha256(), key)?; + signer.update(to_sign.as_bytes())?; + let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec()?); + Ok(signature) as crate::Result<_> + }) + .unwrap() + .signature_header(); + + client + .header("Signature", signature_header) + .header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"") + .header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email + .send() + .await? + .error_for_status()? + .json() + .await } pub async fn user(&self, id: &str) -> Result { @@ -33,16 +99,9 @@ impl Fetcher { return Ok(x); // already in db, easy } - // TODO sign http fetches, we got the app key and db to get user keys just in case - tracing::info!("fetching {id}"); - let user = reqwest::Client::new() - .get(id) - .header(USER_AGENT, format!("upub+{VERSION} ({})", self.domain)) // TODO put instance admin email - .send() - .await? - .json::() - .await?; - + let user = Self::request::( + Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, + ).await?; let user_model = model::user::Model::new(&user)?; model::user::Entity::insert(user_model.clone().into_active_model()) @@ -50,4 +109,36 @@ impl Fetcher { Ok(user_model) } + + pub async fn activity(&self, id: &str) -> Result { + if let Some(x) = model::activity::Entity::find_by_id(id).one(&self.db).await? { + return Ok(x); // already in db, easy + } + + let activity = Self::request::( + Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, + ).await?; + let activity_model = model::activity::Model::new(&activity)?; + + model::activity::Entity::insert(activity_model.clone().into_active_model()) + .exec(&self.db).await?; + + Ok(activity_model) + } + + pub async fn object(&self, id: &str) -> Result { + if let Some(x) = model::object::Entity::find_by_id(id).one(&self.db).await? { + return Ok(x); // already in db, easy + } + + let object = Self::request::( + Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, + ).await?; + let object_model = model::object::Model::new(&object)?; + + model::object::Entity::insert(object_model.clone().into_active_model()) + .exec(&self.db).await?; + + Ok(object_model) + } }