feat: signed fetches, improved signatures code

should make sure that it still works
This commit is contained in:
əlemi 2024-04-13 00:44:53 +02:00
parent 95ab7a50ef
commit 5863bdf04e
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 114 additions and 75 deletions

View file

@ -40,7 +40,7 @@ impl Context {
if domain.starts_with("http") { if domain.starts_with("http") {
domain = domain.replace("https://", "").replace("http://", ""); domain = domain.replace("https://", "").replace("http://", "");
} }
let dispatcher = Dispatcher::new(); let dispatcher = Dispatcher::default();
for _ in 0..1 { // TODO customize delivery workers amount for _ in 0..1 { // TODO customize delivery workers amount
dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!!
} }

View file

@ -1,11 +1,10 @@
use base64::Engine; use openssl::pkey::PKey;
use openssl::{hash::MessageDigest, pkey::{PKey, Private}, sign::Signer}; use reqwest::Method;
use reqwest::header::{CONTENT_TYPE, USER_AGENT};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder}; use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder};
use tokio::{sync::broadcast, task::JoinHandle}; use tokio::{sync::broadcast, task::JoinHandle};
use apb::{ActivityMut, Node}; use apb::{ActivityMut, Node};
use crate::{routes::activitypub::{activity::ap_activity, object::ap_object}, errors::UpubError, model, server::Context, VERSION}; use crate::{errors::UpubError, model, routes::activitypub::{activity::ap_activity, object::ap_object}, server::fetcher::Fetcher};
pub struct Dispatcher { pub struct Dispatcher {
waker: broadcast::Sender<()>, waker: broadcast::Sender<()>,
@ -99,7 +98,11 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
continue; continue;
}; };
if let Err(e) = deliver(&key, &delivery.target, &delivery.actor, payload, &domain).await { if let Err(e) = Fetcher::request::<()>(
Method::POST, &delivery.target,
Some(&serde_json::to_string(&payload).unwrap()),
&delivery.actor, &key, &domain
).await {
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
let new_delivery = model::delivery::ActiveModel { let new_delivery = model::delivery::ActiveModel {
id: sea_orm::ActiveValue::NotSet, id: sea_orm::ActiveValue::NotSet,
@ -114,58 +117,3 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
} }
} }
} }
async fn deliver(key: &PKey<Private>, to: &str, from: &str, payload: serde_json::Value, domain: &str) -> Result<(), UpubError> {
let payload = serde_json::to_string(&payload).unwrap();
let digest = format!("sha-256={}", base64::prelude::BASE64_STANDARD.encode(openssl::sha::sha256(payload.as_bytes())));
let host = Context::server(to);
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
let path = to.replace("https://", "").replace("http://", "").replace(&host, "");
// let headers : BTreeMap<String, String> = [
// ("Host".to_string(), host.clone()),
// ("Date".to_string(), date.clone()),
// ("Digest".to_string(), digest.clone()),
// ].into();
// let signature_header = Config::new()
// .dont_use_created_field()
// .require_header("host")
// .require_header("date")
// .require_header("digest")
// .begin_sign("POST", &path, headers)
// .unwrap()
// .sign(format!("{from}#main-key"), |to_sign| {
// tracing::info!("signing '{to_sign}'");
// let mut signer = Signer::new(MessageDigest::sha256(), key)?;
// signer.update(to_sign.as_bytes())?;
// let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec()?);
// Ok(signature) as Result<_, UpubError>
// })
// .unwrap()
// .signature_header();
let signature_header = {
let to_sign = format!("(request-target): post {path}\nhost: {host}\ndate: {date}\ndigest: {digest}");
let mut signer = Signer::new(MessageDigest::sha256(), key)?;
signer.update(to_sign.as_bytes())?;
let signature = base64::prelude::BASE64_STANDARD.encode(signer.sign_to_vec()?);
format!("keyId=\"{from}#main-key\",algorithm=\"rsa-sha256\",headers=\"(request-target) host date digest\",signature=\"{signature}\"")
};
reqwest::Client::new()
.post(to)
.header("Host", host)
.header("Date", date)
.header("Digest", digest)
.header("Signature", signature_header)
.header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
.header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email
.body(payload)
.send()
.await?
.error_for_status()?;
Ok(())
}

View file

@ -1,9 +1,15 @@
use openssl::pkey::{PKey, Private}; use std::collections::BTreeMap;
use reqwest::header::USER_AGENT;
use base64::Engine;
use http_signature_normalization::Config;
use openssl::{hash::MessageDigest, pkey::{PKey, Private}, sign::Signer};
use reqwest::{header::{CONTENT_TYPE, USER_AGENT}, Method};
use sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel}; use sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel};
use crate::{VERSION, model}; use crate::{VERSION, model};
use super::Context;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum FetchError { pub enum FetchError {
@ -19,13 +25,73 @@ pub enum FetchError {
pub struct Fetcher { pub struct Fetcher {
db: DatabaseConnection, db: DatabaseConnection,
_key: PKey<Private>, // TODO store pre-parsed key: PKey<Private>, // TODO store pre-parsed
domain: String, // TODO merge directly with Context so we don't need to copy this domain: String, // TODO merge directly with Context so we don't need to copy this
} }
impl Fetcher { impl Fetcher {
pub fn new(db: DatabaseConnection, domain: String, key: String) -> Self { pub fn new(db: DatabaseConnection, domain: String, key: String) -> Self {
Fetcher { db, domain, _key: PKey::private_key_from_pem(key.as_bytes()).unwrap() } Fetcher { db, domain, key: PKey::private_key_from_pem(key.as_bytes()).unwrap() }
}
pub async fn request<T: serde::de::DeserializeOwned>(
method: reqwest::Method,
url: &str,
payload: Option<&str>,
from: &str,
key: &PKey<Private>,
domain: &str,
) -> reqwest::Result<T> {
let host = Context::server(url);
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
let mut headers : BTreeMap<String, String> = [
("Host".to_string(), host.clone()),
("Date".to_string(), date.clone()),
].into();
let mut client =
reqwest::Client::new()
.request(method, url)
.header("Host", host)
.header("Date", date);
let mut signature_cfg = Config::new();
if let Some(payload) = payload {
let digest = format!("sha-256={}", base64::prelude::BASE64_STANDARD.encode(openssl::sha::sha256(payload.as_bytes())));
headers.insert("Digest".to_string(), digest.clone());
signature_cfg = signature_cfg.require_header("digest");
client = client
.header("Digest", digest)
.body(payload.to_string());
}
let signature_header = signature_cfg
.dont_use_created_field()
.require_header("host")
.require_header("date")
.begin_sign("POST", &path, headers)
.unwrap()
.sign(format!("{from}#main-key"), |to_sign| {
tracing::info!("signing '{to_sign}'");
let mut signer = Signer::new(MessageDigest::sha256(), key)?;
signer.update(to_sign.as_bytes())?;
let signature = base64::prelude::BASE64_URL_SAFE.encode(signer.sign_to_vec()?);
Ok(signature) as crate::Result<_>
})
.unwrap()
.signature_header();
client
.header("Signature", signature_header)
.header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
.header(USER_AGENT, format!("upub+{VERSION} ({domain})")) // TODO put instance admin email
.send()
.await?
.error_for_status()?
.json()
.await
} }
pub async fn user(&self, id: &str) -> Result<model::user::Model, FetchError> { pub async fn user(&self, id: &str) -> Result<model::user::Model, FetchError> {
@ -33,16 +99,9 @@ impl Fetcher {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
// TODO sign http fetches, we got the app key and db to get user keys just in case let user = Self::request::<serde_json::Value>(
tracing::info!("fetching {id}"); Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain,
let user = reqwest::Client::new() ).await?;
.get(id)
.header(USER_AGENT, format!("upub+{VERSION} ({})", self.domain)) // TODO put instance admin email
.send()
.await?
.json::<serde_json::Value>()
.await?;
let user_model = model::user::Model::new(&user)?; let user_model = model::user::Model::new(&user)?;
model::user::Entity::insert(user_model.clone().into_active_model()) model::user::Entity::insert(user_model.clone().into_active_model())
@ -50,4 +109,36 @@ impl Fetcher {
Ok(user_model) Ok(user_model)
} }
pub async fn activity(&self, id: &str) -> Result<model::activity::Model, FetchError> {
if let Some(x) = model::activity::Entity::find_by_id(id).one(&self.db).await? {
return Ok(x); // already in db, easy
}
let activity = Self::request::<serde_json::Value>(
Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain,
).await?;
let activity_model = model::activity::Model::new(&activity)?;
model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(&self.db).await?;
Ok(activity_model)
}
pub async fn object(&self, id: &str) -> Result<model::object::Model, FetchError> {
if let Some(x) = model::object::Entity::find_by_id(id).one(&self.db).await? {
return Ok(x); // already in db, easy
}
let object = Self::request::<serde_json::Value>(
Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain,
).await?;
let object_model = model::object::Model::new(&object)?;
model::object::Entity::insert(object_model.clone().into_active_model())
.exec(&self.db).await?;
Ok(object_model)
}
} }