1
0
Fork 0
forked from alemi/upub

chore: moved around stuff

This commit is contained in:
əlemi 2024-03-27 04:00:18 +01:00
parent c3face463e
commit db8ecc7c3d
Signed by: alemi
GPG key ID: A4895B84D311642C
5 changed files with 139 additions and 135 deletions

View file

@ -1,7 +1,7 @@
use axum::{extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json};
use sea_orm::{ColumnTrait, Condition, DbErr, EntityTrait, IntoActiveModel, Order, QueryFilter, QueryOrder, QuerySelect, SelectColumns, Set};
use sea_orm::{EntityTrait, IntoActiveModel, Order, QueryOrder, QuerySelect, Set};
use crate::{activitypub::{jsonld::LD, JsonLD, Pagination, PUBLIC_TARGET}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, model::{self, activity, object, FieldError}, server::Context, url};
use crate::{activitypub::{jsonld::LD, JsonLD, Pagination}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url};
pub async fn get(
State(ctx): State<Context>,
@ -16,36 +16,6 @@ pub async fn get(
))
}
#[derive(Debug, thiserror::Error)]
pub enum UpubError {
#[error("database error: {0}")]
Database(#[from] sea_orm::DbErr),
#[error("api returned {0}")]
Status(StatusCode),
#[error("missing field: {0}")]
Field(#[from] FieldError),
#[error("openssl error: {0}")]
OpenSSL(#[from] openssl::error::ErrorStack),
#[error("fetch error: {0}")]
Reqwest(#[from] reqwest::Error),
}
impl From<StatusCode> for UpubError {
fn from(value: StatusCode) -> Self {
UpubError::Status(value)
}
}
impl IntoResponse for UpubError {
fn into_response(self) -> axum::response::Response {
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string()).into_response()
}
}
pub struct CreationResult(pub String);
impl IntoResponse for CreationResult {
fn into_response(self) -> axum::response::Response {
@ -77,9 +47,9 @@ pub async fn page(
// conditions = conditions.add(model::addressing::Column::Server.eq(x));
// }
match activity::Entity::find()
.find_also_related(object::Entity)
.order_by(activity::Column::Published, Order::Desc)
match model::activity::Entity::find()
.find_also_related(model::object::Entity)
.order_by(model::activity::Column::Published, Order::Desc)
.limit(limit)
.offset(offset)
.all(ctx.db()).await
@ -309,67 +279,3 @@ pub async fn post(
}
}
}
impl Context {
async fn expand_addressing(&self, uid: &str, mut targets: Vec<String>) -> Result<Vec<String>, DbErr> {
let following_addr = format!("{uid}/followers");
if let Some(i) = targets.iter().position(|x| x == &following_addr) {
targets.remove(i);
model::relation::Entity::find()
.filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string())))
.select_column(model::relation::Column::Follower)
.into_tuple::<String>()
.all(self.db())
.await?
.into_iter()
.for_each(|x| targets.push(x));
}
Ok(targets)
}
async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> Result<(), DbErr> {
let addressings : Vec<model::addressing::ActiveModel> = targets
.iter()
.map(|to| model::addressing::ActiveModel {
server: Set(Context::server(to)),
actor: Set(to.to_string()),
activity: Set(aid.to_string()),
object: Set(oid.map(|x| x.to_string())),
published: Set(chrono::Utc::now()),
..Default::default()
})
.collect();
model::addressing::Entity::insert_many(addressings)
.exec(self.db())
.await?;
Ok(())
}
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> {
let deliveries : Vec<model::delivery::ActiveModel> = targets
.iter()
.filter(|to| Context::server(to) != self.base())
.filter(|to| to != &PUBLIC_TARGET)
.map(|to| model::delivery::ActiveModel {
actor: Set(from.to_string()),
// TODO we should resolve each user by id and check its inbox because we can't assume
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
target: Set(format!("{}/inbox", to)),
activity: Set(aid.to_string()),
created: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
..Default::default()
})
.collect();
model::delivery::Entity::insert_many(deliveries)
.exec(self.db())
.await?;
Ok(())
}
}

View file

@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts, StatusCode}};
use openssl::{hash::MessageDigest, pkey::PKey, sign::Verifier};
use openssl::hash::MessageDigest;
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
use crate::{model, server::Context};
@ -48,44 +48,45 @@ where
}
}
if let Some(sig) = parts
.headers
.get("Signature")
.map(|v| v.to_str().unwrap_or(""))
{
let signature = HttpSignature::try_from(sig)?;
let user_id = signature.key_id.split('#').next().unwrap_or("").to_string();
let data : String = signature.headers.iter()
.map(|header| {
if header == "(request-target)" {
format!("(request-target): {} {}", parts.method, parts.uri)
} else {
format!(
"{header}: {}",
parts.headers.get(header)
.map(|h| h.to_str().unwrap_or(""))
.unwrap_or("")
)
}
})
.collect::<Vec<String>>() // TODO can we avoid this unneeded allocation?
.join("\n");
// if let Some(sig) = parts
// .headers
// .get("Signature")
// .map(|v| v.to_str().unwrap_or(""))
// {
// let signature = HttpSignature::try_from(sig)?;
// let user_id = signature.key_id.split('#').next().unwrap_or("").to_string();
// let data : String = signature.headers.iter()
// .map(|header| {
// if header == "(request-target)" {
// format!("(request-target): {} {}", parts.method, parts.uri)
// } else {
// format!(
// "{header}: {}",
// parts.headers.get(header)
// .map(|h| h.to_str().unwrap_or(""))
// .unwrap_or("")
// )
// }
// })
// .collect::<Vec<String>>() // TODO can we avoid this unneeded allocation?
// .join("\n");
let user = ctx.fetch().user(&user_id).await.map_err(|_e| StatusCode::UNAUTHORIZED)?;
let pubkey = PKey::public_key_from_pem(user.public_key.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut verifier = Verifier::new(signature.digest(), &pubkey).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
verifier.update(data.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
if verifier.verify(signature.signature.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)? {
identity = Identity::Remote(user_id);
} else {
return Err(StatusCode::FORBIDDEN);
}
}
// let user = ctx.fetch().user(&user_id).await.map_err(|_e| StatusCode::UNAUTHORIZED)?;
// let pubkey = PKey::public_key_from_pem(user.public_key.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
// let mut verifier = Verifier::new(signature.digest(), &pubkey).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
// verifier.update(data.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
// if verifier.verify(signature.signature.as_bytes()).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)? {
// identity = Identity::Remote(user_id);
// } else {
// return Err(StatusCode::FORBIDDEN);
// }
// }
Ok(AuthIdentity(identity))
}
}
#[allow(unused)] // TODO am i gonna reimplement http signatures for verification?
pub struct HttpSignature {
key_id: String,
algorithm: String,
@ -94,6 +95,7 @@ pub struct HttpSignature {
}
impl HttpSignature {
#[allow(unused)] // TODO am i gonna reimplement http signatures for verification?
pub fn digest(&self) -> MessageDigest {
match self.algorithm.as_str() {
"rsa-sha512" => MessageDigest::sha512(),

View file

@ -4,7 +4,7 @@ use reqwest::header::{CONTENT_TYPE, USER_AGENT};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder};
use tokio::task::JoinHandle;
use crate::{activitypub::{activity::ap_activity, object::ap_object, user::outbox::UpubError}, activitystream::{object::activity::ActivityMut, Node}, model, server::Context, VERSION};
use crate::{activitypub::{activity::ap_activity, object::ap_object}, activitystream::{object::activity::ActivityMut, Node}, errors::UpubError, model, server::Context, VERSION};
pub struct Dispatcher;

View file

@ -1,3 +1,36 @@
#[derive(Debug, thiserror::Error)]
pub enum UpubError {
#[error("database error: {0}")]
Database(#[from] sea_orm::DbErr),
#[error("api returned {0}")]
Status(axum::http::StatusCode),
#[error("missing field: {0}")]
Field(#[from] crate::model::FieldError),
#[error("openssl error: {0}")]
OpenSSL(#[from] openssl::error::ErrorStack),
#[error("fetch error: {0}")]
Reqwest(#[from] reqwest::Error),
}
impl From<axum::http::StatusCode> for UpubError {
fn from(value: axum::http::StatusCode) -> Self {
UpubError::Status(value)
}
}
impl axum::response::IntoResponse for UpubError {
fn into_response(self) -> axum::response::Response {
(
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
self.to_string()
).into_response()
}
}
pub trait LoggableError {
fn info_failed(self, msg: &str);
fn warn_failed(self, msg: &str);

View file

@ -1,9 +1,9 @@
use std::{str::Utf8Error, sync::Arc};
use openssl::rsa::Rsa;
use sea_orm::{DatabaseConnection, DbErr, EntityTrait};
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, QueryFilter, SelectColumns, Set};
use crate::{dispatcher::Dispatcher, fetcher::Fetcher, model};
use crate::{activitypub::PUBLIC_TARGET, dispatcher::Dispatcher, fetcher::Fetcher, model};
#[derive(Clone)]
pub struct Context(Arc<ContextInner>);
@ -133,4 +133,67 @@ impl Context {
.unwrap_or("")
.to_string()
}
pub async fn expand_addressing(&self, uid: &str, mut targets: Vec<String>) -> Result<Vec<String>, DbErr> {
let following_addr = format!("{uid}/followers");
if let Some(i) = targets.iter().position(|x| x == &following_addr) {
targets.remove(i);
model::relation::Entity::find()
.filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string())))
.select_column(model::relation::Column::Follower)
.into_tuple::<String>()
.all(self.db())
.await?
.into_iter()
.for_each(|x| targets.push(x));
}
Ok(targets)
}
pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> Result<(), DbErr> {
let addressings : Vec<model::addressing::ActiveModel> = targets
.iter()
.filter(|x| !x.ends_with("/followers"))
.map(|to| model::addressing::ActiveModel {
server: Set(Context::server(to)),
actor: Set(to.to_string()),
activity: Set(aid.to_string()),
object: Set(oid.map(|x| x.to_string())),
published: Set(chrono::Utc::now()),
..Default::default()
})
.collect();
model::addressing::Entity::insert_many(addressings)
.exec(self.db())
.await?;
Ok(())
}
pub async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> {
let deliveries : Vec<model::delivery::ActiveModel> = targets
.iter()
.filter(|to| Context::server(to) != self.base())
.filter(|to| to != &PUBLIC_TARGET)
.map(|to| model::delivery::ActiveModel {
actor: Set(from.to_string()),
// TODO we should resolve each user by id and check its inbox because we can't assume
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
target: Set(format!("{}/inbox", to)),
activity: Set(aid.to_string()),
created: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
..Default::default()
})
.collect();
model::delivery::Entity::insert_many(deliveries)
.exec(self.db())
.await?;
Ok(())
}
}