From db8ecc7c3d7b277685b594a9ca3b9fcd1aa6d4b9 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 27 Mar 2024 04:00:18 +0100 Subject: [PATCH] chore: moved around stuff --- src/activitypub/user/outbox.rs | 104 ++------------------------------- src/auth.rs | 68 ++++++++++----------- src/dispatcher.rs | 2 +- src/errors.rs | 33 +++++++++++ src/server.rs | 67 ++++++++++++++++++++- 5 files changed, 139 insertions(+), 135 deletions(-) diff --git a/src/activitypub/user/outbox.rs b/src/activitypub/user/outbox.rs index 0fcb3ad..bed9468 100644 --- a/src/activitypub/user/outbox.rs +++ b/src/activitypub/user/outbox.rs @@ -1,7 +1,7 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json}; -use sea_orm::{ColumnTrait, Condition, DbErr, EntityTrait, IntoActiveModel, Order, QueryFilter, QueryOrder, QuerySelect, SelectColumns, Set}; +use sea_orm::{EntityTrait, IntoActiveModel, Order, QueryOrder, QuerySelect, Set}; -use crate::{activitypub::{jsonld::LD, JsonLD, Pagination, PUBLIC_TARGET}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, model::{self, activity, object, FieldError}, server::Context, url}; +use crate::{activitypub::{jsonld::LD, JsonLD, Pagination}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; pub async fn get( State(ctx): State, @@ -16,36 +16,6 @@ pub async fn get( )) } -#[derive(Debug, thiserror::Error)] -pub enum UpubError { - #[error("database error: {0}")] - Database(#[from] sea_orm::DbErr), - - #[error("api returned {0}")] - Status(StatusCode), - - #[error("missing field: {0}")] - Field(#[from] FieldError), - - #[error("openssl error: {0}")] - OpenSSL(#[from] openssl::error::ErrorStack), - - #[error("fetch error: {0}")] - Reqwest(#[from] reqwest::Error), -} - -impl From for UpubError { - fn from(value: StatusCode) -> Self { - UpubError::Status(value) - } -} - -impl IntoResponse for UpubError { - fn into_response(self) -> axum::response::Response { - (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response() - } -} - pub struct CreationResult(pub String); impl IntoResponse for CreationResult { fn into_response(self) -> axum::response::Response { @@ -77,9 +47,9 @@ pub async fn page( // conditions = conditions.add(model::addressing::Column::Server.eq(x)); // } - match activity::Entity::find() - .find_also_related(object::Entity) - .order_by(activity::Column::Published, Order::Desc) + match model::activity::Entity::find() + .find_also_related(model::object::Entity) + .order_by(model::activity::Column::Published, Order::Desc) .limit(limit) .offset(offset) .all(ctx.db()).await @@ -309,67 +279,3 @@ pub async fn post( } } } - -impl Context { - async fn expand_addressing(&self, uid: &str, mut targets: Vec) -> Result, DbErr> { - let following_addr = format!("{uid}/followers"); - if let Some(i) = targets.iter().position(|x| x == &following_addr) { - targets.remove(i); - model::relation::Entity::find() - .filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string()))) - .select_column(model::relation::Column::Follower) - .into_tuple::() - .all(self.db()) - .await? - .into_iter() - .for_each(|x| targets.push(x)); - } - Ok(targets) - } - - async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> Result<(), DbErr> { - let addressings : Vec = targets - .iter() - .map(|to| model::addressing::ActiveModel { - server: Set(Context::server(to)), - actor: Set(to.to_string()), - activity: Set(aid.to_string()), - object: Set(oid.map(|x| x.to_string())), - published: Set(chrono::Utc::now()), - ..Default::default() - }) - .collect(); - - model::addressing::Entity::insert_many(addressings) - .exec(self.db()) - .await?; - - Ok(()) - } - - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> { - let deliveries : Vec = targets - .iter() - .filter(|to| Context::server(to) != self.base()) - .filter(|to| to != &PUBLIC_TARGET) - .map(|to| model::delivery::ActiveModel { - actor: Set(from.to_string()), - // TODO we should resolve each user by id and check its inbox because we can't assume - // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now - target: Set(format!("{}/inbox", to)), - activity: Set(aid.to_string()), - created: Set(chrono::Utc::now()), - not_before: Set(chrono::Utc::now()), - attempt: Set(0), - ..Default::default() - }) - .collect(); - - model::delivery::Entity::insert_many(deliveries) - .exec(self.db()) - .await?; - - Ok(()) - } -} - diff --git a/src/auth.rs b/src/auth.rs index ccca3ce..51eaea3 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts, StatusCode}}; -use openssl::{hash::MessageDigest, pkey::PKey, sign::Verifier}; +use openssl::hash::MessageDigest; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; use crate::{model, server::Context}; @@ -48,44 +48,45 @@ where } } - if let Some(sig) = parts - .headers - .get("Signature") - .map(|v| v.to_str().unwrap_or("")) - { - let signature = HttpSignature::try_from(sig)?; - let user_id = signature.key_id.split('#').next().unwrap_or("").to_string(); - let data : String = signature.headers.iter() - .map(|header| { - if header == "(request-target)" { - format!("(request-target): {} {}", parts.method, parts.uri) - } else { - format!( - "{header}: {}", - parts.headers.get(header) - .map(|h| h.to_str().unwrap_or("")) - .unwrap_or("") - ) - } - }) - .collect::>() // TODO can we avoid this unneeded allocation? - .join("\n"); + // if let Some(sig) = parts + // .headers + // .get("Signature") + // .map(|v| v.to_str().unwrap_or("")) + // { + // let signature = HttpSignature::try_from(sig)?; + // let user_id = signature.key_id.split('#').next().unwrap_or("").to_string(); + // let data : String = signature.headers.iter() + // .map(|header| { + // if header == "(request-target)" { + // format!("(request-target): {} {}", parts.method, parts.uri) + // } else { + // format!( + // "{header}: {}", + // parts.headers.get(header) + // .map(|h| h.to_str().unwrap_or("")) + // .unwrap_or("") + // ) + // } + // }) + // .collect::>() // TODO can we avoid this unneeded allocation? + // .join("\n"); - let user = ctx.fetch().user(&user_id).await.map_err(|_e| StatusCode::UNAUTHORIZED)?; - let pubkey = PKey::public_key_from_pem(user.public_key.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; - let mut verifier = Verifier::new(signature.digest(), &pubkey).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; - verifier.update(data.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; - if verifier.verify(signature.signature.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)? { - identity = Identity::Remote(user_id); - } else { - return Err(StatusCode::FORBIDDEN); - } - } + // let user = ctx.fetch().user(&user_id).await.map_err(|_e| StatusCode::UNAUTHORIZED)?; + // let pubkey = PKey::public_key_from_pem(user.public_key.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; + // let mut verifier = Verifier::new(signature.digest(), &pubkey).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; + // verifier.update(data.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; + // if verifier.verify(signature.signature.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)? { + // identity = Identity::Remote(user_id); + // } else { + // return Err(StatusCode::FORBIDDEN); + // } + // } Ok(AuthIdentity(identity)) } } +#[allow(unused)] // TODO am i gonna reimplement http signatures for verification? pub struct HttpSignature { key_id: String, algorithm: String, @@ -94,6 +95,7 @@ pub struct HttpSignature { } impl HttpSignature { + #[allow(unused)] // TODO am i gonna reimplement http signatures for verification? pub fn digest(&self) -> MessageDigest { match self.algorithm.as_str() { "rsa-sha512" => MessageDigest::sha512(), diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 907d480..53d5f5c 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -4,7 +4,7 @@ use reqwest::header::{CONTENT_TYPE, USER_AGENT}; use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder}; use tokio::task::JoinHandle; -use crate::{activitypub::{activity::ap_activity, object::ap_object, user::outbox::UpubError}, activitystream::{object::activity::ActivityMut, Node}, model, server::Context, VERSION}; +use crate::{activitypub::{activity::ap_activity, object::ap_object}, activitystream::{object::activity::ActivityMut, Node}, errors::UpubError, model, server::Context, VERSION}; pub struct Dispatcher; diff --git a/src/errors.rs b/src/errors.rs index 6e62053..ebc1911 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,36 @@ +#[derive(Debug, thiserror::Error)] +pub enum UpubError { + #[error("database error: {0}")] + Database(#[from] sea_orm::DbErr), + + #[error("api returned {0}")] + Status(axum::http::StatusCode), + + #[error("missing field: {0}")] + Field(#[from] crate::model::FieldError), + + #[error("openssl error: {0}")] + OpenSSL(#[from] openssl::error::ErrorStack), + + #[error("fetch error: {0}")] + Reqwest(#[from] reqwest::Error), +} + +impl From for UpubError { + fn from(value: axum::http::StatusCode) -> Self { + UpubError::Status(value) + } +} + +impl axum::response::IntoResponse for UpubError { + fn into_response(self) -> axum::response::Response { + ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + self.to_string() + ).into_response() + } +} + pub trait LoggableError { fn info_failed(self, msg: &str); fn warn_failed(self, msg: &str); diff --git a/src/server.rs b/src/server.rs index afa6e02..8b1ee3b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,9 +1,9 @@ use std::{str::Utf8Error, sync::Arc}; use openssl::rsa::Rsa; -use sea_orm::{DatabaseConnection, DbErr, EntityTrait}; +use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, QueryFilter, SelectColumns, Set}; -use crate::{dispatcher::Dispatcher, fetcher::Fetcher, model}; +use crate::{activitypub::PUBLIC_TARGET, dispatcher::Dispatcher, fetcher::Fetcher, model}; #[derive(Clone)] pub struct Context(Arc); @@ -133,4 +133,67 @@ impl Context { .unwrap_or("") .to_string() } + + pub async fn expand_addressing(&self, uid: &str, mut targets: Vec) -> Result, DbErr> { + let following_addr = format!("{uid}/followers"); + if let Some(i) = targets.iter().position(|x| x == &following_addr) { + targets.remove(i); + model::relation::Entity::find() + .filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string()))) + .select_column(model::relation::Column::Follower) + .into_tuple::() + .all(self.db()) + .await? + .into_iter() + .for_each(|x| targets.push(x)); + } + Ok(targets) + } + + pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> Result<(), DbErr> { + let addressings : Vec = targets + .iter() + .filter(|x| !x.ends_with("/followers")) + .map(|to| model::addressing::ActiveModel { + server: Set(Context::server(to)), + actor: Set(to.to_string()), + activity: Set(aid.to_string()), + object: Set(oid.map(|x| x.to_string())), + published: Set(chrono::Utc::now()), + ..Default::default() + }) + .collect(); + + model::addressing::Entity::insert_many(addressings) + .exec(self.db()) + .await?; + + Ok(()) + } + + pub async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> { + let deliveries : Vec = targets + .iter() + .filter(|to| Context::server(to) != self.base()) + .filter(|to| to != &PUBLIC_TARGET) + .map(|to| model::delivery::ActiveModel { + actor: Set(from.to_string()), + // TODO we should resolve each user by id and check its inbox because we can't assume + // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now + target: Set(format!("{}/inbox", to)), + activity: Set(aid.to_string()), + created: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + attempt: Set(0), + ..Default::default() + }) + .collect(); + + model::delivery::Entity::insert_many(deliveries) + .exec(self.db()) + .await?; + + Ok(()) + } } +