diff --git a/Cargo.lock b/Cargo.lock index ba4d0a5..64b1cec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,7 +140,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" [[package]] name = "apb" -version = "0.1.1" +version = "0.2.0" dependencies = [ "async-trait", "chrono", @@ -1688,6 +1688,17 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "httpsign" +version = "0.1.0" +dependencies = [ + "axum", + "base64 0.22.1", + "openssl", + "thiserror", + "tracing", +] + [[package]] name = "hyper" version = "1.3.1" @@ -4735,6 +4746,7 @@ dependencies = [ "sha256", "tracing", "upub", + "upub-processor", "uuid", ] @@ -4745,6 +4757,25 @@ dependencies = [ "sea-orm-migration", ] +[[package]] +name = "upub-processor" +version = "0.2.0" +dependencies = [ + "apb", + "async-trait", + "chrono", + "httpsign", + "jrd", + "mdhtml", + "reqwest", + "sea-orm", + "serde_json", + "thiserror", + "tokio", + "tracing", + "upub", +] + [[package]] name = "upub-routes" version = "0.2.0" @@ -4752,6 +4783,7 @@ dependencies = [ "apb", "axum", "chrono", + "httpsign", "jrd", "mastodon-async-entities", "nodeinfo", @@ -4761,6 +4793,7 @@ dependencies = [ "serde", "serde_json", "sha256", + "thiserror", "time", "tokio", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index 1ee9c27..cd3c787 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,9 @@ members = [ "upub/cli", "upub/migrations", "upub/routes", + "upub/processor", "web", + "utils/httpsign", "utils/mdhtml", "utils/uriproxy" ] diff --git a/apb/src/jsonld.rs b/apb/src/jsonld.rs index 7e3012a..2391d74 100644 --- a/apb/src/jsonld.rs +++ b/apb/src/jsonld.rs @@ -12,7 +12,7 @@ impl LD for serde_json::Value { ctx.insert("sensitive".to_string(), serde_json::Value::String("as:sensitive".into())); ctx.insert("quoteUrl".to_string(), serde_json::Value::String("as:quoteUrl".into())); match o_type { - Some(crate::ObjectType::Actor(_)) => { + Ok(crate::ObjectType::Actor(_)) => { ctx.insert("counters".to_string(), serde_json::Value::String("https://ns.alemi.dev/as/counters/#".into())); ctx.insert("followingCount".to_string(), serde_json::Value::String("counters:followingCount".into())); ctx.insert("followersCount".to_string(), serde_json::Value::String("counters:followersCount".into())); @@ -21,13 +21,13 @@ impl LD for serde_json::Value { ctx.insert("followingMe".to_string(), serde_json::Value::String("fe:followingMe".into())); ctx.insert("followedByMe".to_string(), serde_json::Value::String("fe:followedByMe".into())); }, - Some(_) => { + Ok(_) => { ctx.insert("fe".to_string(), serde_json::Value::String("https://ns.alemi.dev/as/fe/#".into())); ctx.insert("likedByMe".to_string(), serde_json::Value::String("fe:likedByMe".into())); ctx.insert("ostatus".to_string(), serde_json::Value::String("http://ostatus.org#".into())); ctx.insert("conversation".to_string(), serde_json::Value::String("ostatus:conversation".into())); }, - None => {}, + Err(_) => {}, } obj.insert( "@context".to_string(), diff --git a/upub/cli/Cargo.toml b/upub/cli/Cargo.toml index ec11cdf..2884a7d 100644 --- a/upub/cli/Cargo.toml +++ b/upub/cli/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" [dependencies] apb = { path = "../../apb/" } upub = { path = "../core" } +upub-processor = { path = "../processor/" } tracing = "0.1" serde_json = "1" sha256 = "1.5" diff --git a/upub/cli/src/fetch.rs b/upub/cli/src/fetch.rs index f5af92d..ef99456 100644 --- a/upub/cli/src/fetch.rs +++ b/upub/cli/src/fetch.rs @@ -1,33 +1,32 @@ use sea_orm::EntityTrait; -use upub::server::{fetcher::Fetchable, normalizer::Normalizer}; +use upub_processor::{fetch::{Fetchable, PullError}, normalize::{AP, Normalizer}}; -pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> upub::Result<()> { +pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { use apb::Base; let mut node = apb::Node::link(uri.to_string()); node.fetch(&ctx).await?; let obj = node.extract().expect("node still empty after fetch?"); - let server = upub::Context::server(&uri); println!("{}", serde_json::to_string_pretty(&obj).unwrap()); if save { match obj.base_type() { - Some(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { + Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { upub::model::actor::Entity::insert( - upub::model::actor::ActiveModel::new(&obj).unwrap() + AP::actor_q(&obj).unwrap() ).exec(ctx.db()).await.unwrap(); }, - Some(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { - ctx.insert_activity(obj, Some(server)).await.unwrap(); + Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { + ctx.insert_activity(obj).await.unwrap(); }, - Some(apb::BaseType::Object(apb::ObjectType::Note)) => { - ctx.insert_object(obj, Some(server)).await.unwrap(); + Ok(apb::BaseType::Object(apb::ObjectType::Note)) => { + ctx.insert_object(obj).await.unwrap(); }, - Some(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), - Some(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), - None => tracing::error!("no type on object"), + Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), + Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), + Err(_) => tracing::error!("no type on object"), } } diff --git a/upub/cli/src/lib.rs b/upub/cli/src/lib.rs index 2636f9b..a8f9009 100644 --- a/upub/cli/src/lib.rs +++ b/upub/cli/src/lib.rs @@ -98,7 +98,7 @@ pub async fn run(ctx: upub::Context, command: CliCommand) -> upub::Result<()> { CliCommand::Faker { count } => Ok(faker(ctx, count as i64).await?), CliCommand::Fetch { uri, save } => - Ok(fetch(ctx, uri, save).await?), + Ok(fetch(ctx, uri, save).await.map_err(|_e| upub::Error::internal_server_error())?), CliCommand::Relay { actor, accept } => Ok(relay(ctx, actor, accept).await?), CliCommand::Fix { likes, shares, replies } => diff --git a/upub/cli/src/relay.rs b/upub/cli/src/relay.rs index 94b8dc2..c2a672e 100644 --- a/upub/cli/src/relay.rs +++ b/upub/cli/src/relay.rs @@ -1,6 +1,6 @@ use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; -use upub::server::addresser::Addresser; +use upub_processor::address::Addresser; pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> upub::Result<()> { let aid = ctx.aid(&uuid::Uuid::new_v4().to_string()); diff --git a/upub/cli/src/update.rs b/upub/cli/src/update.rs index ca5f48b..cac556a 100644 --- a/upub/cli/src/update.rs +++ b/upub/cli/src/update.rs @@ -1,7 +1,7 @@ use futures::TryStreamExt; use sea_orm::{ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter}; -use upub::server::fetcher::Fetcher; +use upub_processor::{fetch::Fetcher, normalize::AP}; pub async fn update_users(ctx: upub::Context, days: i64) -> upub::Result<()> { let mut count = 0; @@ -19,7 +19,7 @@ pub async fn update_users(ctx: upub::Context, days: i64) -> upub::Result<()> { match ctx.pull(&user.id).await.map(|x| x.actor()) { Err(e) => tracing::warn!("could not update user {}: {e}", user.id), Ok(Err(e)) => tracing::warn!("could not update user {}: {e}", user.id), - Ok(Ok(doc)) => match upub::model::actor::ActiveModel::new(&doc) { + Ok(Ok(doc)) => match AP::actor_q(&doc) { Ok(mut u) => { u.internal = Set(user.internal); u.updated = Set(chrono::Utc::now()); diff --git a/upub/core/src/errors.rs b/upub/core/src/errors.rs index 874d158..594f674 100644 --- a/upub/core/src/errors.rs +++ b/upub/core/src/errors.rs @@ -8,8 +8,8 @@ pub enum UpubError { #[error("{0}")] Status(axum::http::StatusCode), - #[error("missing field: {0}")] - Field(#[from] crate::model::FieldError), + #[error("{0}")] + Field(#[from] apb::FieldErr), #[error("openssl error: {0:?}")] OpenSSL(#[from] openssl::error::ErrorStack), @@ -68,10 +68,6 @@ impl UpubError { pub fn internal_server_error() -> Self { Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR) } - - pub fn field(field: &'static str) -> Self { - Self::Field(crate::model::FieldError(field)) - } } pub type UpubResult = Result; diff --git a/upub/core/src/ext.rs b/upub/core/src/ext.rs index 2ebe26e..7b11045 100644 --- a/upub/core/src/ext.rs +++ b/upub/core/src/ext.rs @@ -1,19 +1,19 @@ #[axum::async_trait] pub trait AnyQuery { - async fn any(self, db: &sea_orm::DatabaseConnection) -> crate::Result; + async fn any(self, db: &sea_orm::DatabaseConnection) -> Result; } #[axum::async_trait] impl AnyQuery for sea_orm::Select { - async fn any(self, db: &sea_orm::DatabaseConnection) -> crate::Result { + async fn any(self, db: &sea_orm::DatabaseConnection) -> Result { Ok(self.one(db).await?.is_some()) } } #[axum::async_trait] impl AnyQuery for sea_orm::Selector { - async fn any(self, db: &sea_orm::DatabaseConnection) -> crate::Result { + async fn any(self, db: &sea_orm::DatabaseConnection) -> Result { Ok(self.one(db).await?.is_some()) } } diff --git a/upub/core/src/model/activity.rs b/upub/core/src/model/activity.rs index 4b657ba..440df53 100644 --- a/upub/core/src/model/activity.rs +++ b/upub/core/src/model/activity.rs @@ -1,7 +1,7 @@ -use apb::{ActivityMut, ActivityType, BaseMut, ObjectMut}; +use apb::{field::OptionalString, ActivityMut, ActivityType, BaseMut, ObjectMut}; use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; -use crate::{model::Audience, errors::UpubError}; +use crate::model::Audience; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "activities")] @@ -76,28 +76,27 @@ impl Entity { Entity::find().filter(Column::Id.eq(id)) } - pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result { + pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result, DbErr> { Entity::find() .filter(Column::Id.eq(id)) .select_only() .select_column(Column::Internal) .into_tuple::() .one(db) - .await? - .ok_or_else(UpubError::not_found) + .await } } impl ActiveModel { - //#[deprecated = "should remove this, get models thru normalizer"] - pub fn new(activity: &impl apb::Activity) -> Result { + #[deprecated = "use AP::activity() from processor::normalize"] + pub fn new(activity: &impl apb::Activity) -> Result { Ok(ActiveModel { internal: sea_orm::ActiveValue::NotSet, - id: sea_orm::ActiveValue::Set(activity.id().ok_or(super::FieldError("id"))?.to_string()), - activity_type: sea_orm::ActiveValue::Set(activity.activity_type().ok_or(super::FieldError("type"))?), - actor: sea_orm::ActiveValue::Set(activity.actor().id().ok_or(super::FieldError("actor"))?), - object: sea_orm::ActiveValue::Set(activity.object().id()), - target: sea_orm::ActiveValue::Set(activity.target().id()), + id: sea_orm::ActiveValue::Set(activity.id()?.to_string()), + activity_type: sea_orm::ActiveValue::Set(activity.activity_type()?), + actor: sea_orm::ActiveValue::Set(activity.actor().id()?.to_string()), + object: sea_orm::ActiveValue::Set(activity.object().id().str()), + target: sea_orm::ActiveValue::Set(activity.target().id().str()), published: sea_orm::ActiveValue::Set(activity.published().unwrap_or(chrono::Utc::now())), to: sea_orm::ActiveValue::Set(activity.to().into()), bto: sea_orm::ActiveValue::Set(activity.bto().into()), diff --git a/upub/core/src/model/actor.rs b/upub/core/src/model/actor.rs index 47853e3..33816ff 100644 --- a/upub/core/src/model/actor.rs +++ b/upub/core/src/model/actor.rs @@ -1,8 +1,6 @@ use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; -use apb::{Actor, ActorMut, ActorType, BaseMut, DocumentMut, Endpoints, EndpointsMut, Object, ObjectMut, PublicKey, PublicKeyMut}; - -use crate::errors::UpubError; +use apb::{field::OptionalString, Actor, ActorMut, ActorType, BaseMut, DocumentMut, Endpoints, EndpointsMut, Object, ObjectMut, PublicKey, PublicKeyMut}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "actors")] @@ -149,43 +147,43 @@ impl Entity { Entity::delete_many().filter(Column::Id.eq(id)) } - pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result { + pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result, DbErr> { Entity::find() .filter(Column::Id.eq(id)) .select_only() .select_column(Column::Internal) .into_tuple::() .one(db) - .await? - .ok_or_else(UpubError::not_found) + .await } } impl ActiveModel { - pub fn new(object: &impl Actor) -> Result { - let ap_id = object.id().ok_or(super::FieldError("id"))?.to_string(); + #[deprecated = "use AP::actor() from processor::normalize"] + pub fn new(object: &impl Actor) -> Result { + let ap_id = object.id()?.to_string(); let (domain, fallback_preferred_username) = split_user_id(&ap_id); Ok(ActiveModel { internal: sea_orm::ActiveValue::NotSet, domain: sea_orm::ActiveValue::Set(domain), id: sea_orm::ActiveValue::Set(ap_id), preferred_username: sea_orm::ActiveValue::Set(object.preferred_username().unwrap_or(&fallback_preferred_username).to_string()), - actor_type: sea_orm::ActiveValue::Set(object.actor_type().ok_or(super::FieldError("type"))?), - name: sea_orm::ActiveValue::Set(object.name().map(|x| x.to_string())), - summary: sea_orm::ActiveValue::Set(object.summary().map(|x| x.to_string())), - icon: sea_orm::ActiveValue::Set(object.icon().get().and_then(|x| x.url().id())), - image: sea_orm::ActiveValue::Set(object.image().get().and_then(|x| x.url().id())), - inbox: sea_orm::ActiveValue::Set(object.inbox().id()), - outbox: sea_orm::ActiveValue::Set(object.outbox().id()), - shared_inbox: sea_orm::ActiveValue::Set(object.endpoints().get().and_then(|x| Some(x.shared_inbox()?.to_string()))), - followers: sea_orm::ActiveValue::Set(object.followers().id()), - following: sea_orm::ActiveValue::Set(object.following().id()), + actor_type: sea_orm::ActiveValue::Set(object.actor_type()?), + name: sea_orm::ActiveValue::Set(object.name().str()), + summary: sea_orm::ActiveValue::Set(object.summary().str()), + icon: sea_orm::ActiveValue::Set(object.icon().get().and_then(|x| x.url().id().str())), + image: sea_orm::ActiveValue::Set(object.image().get().and_then(|x| x.url().id().str())), + inbox: sea_orm::ActiveValue::Set(object.inbox().id().str()), + outbox: sea_orm::ActiveValue::Set(object.outbox().id().str()), + shared_inbox: sea_orm::ActiveValue::Set(object.endpoints().get().and_then(|x| x.shared_inbox().str())), + followers: sea_orm::ActiveValue::Set(object.followers().id().str()), + following: sea_orm::ActiveValue::Set(object.following().id().str()), published: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())), updated: sea_orm::ActiveValue::Set(chrono::Utc::now()), following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i32), followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i32), statuses_count: sea_orm::ActiveValue::Set(object.statuses_count().unwrap_or(0) as i32), - public_key: sea_orm::ActiveValue::Set(object.public_key().get().ok_or(super::FieldError("publicKey"))?.public_key_pem().to_string()), + public_key: sea_orm::ActiveValue::Set(object.public_key().get().ok_or(apb::FieldErr("publicKey"))?.public_key_pem().to_string()), private_key: sea_orm::ActiveValue::Set(None), // there's no way to transport privkey over AP json, must come from DB }) } diff --git a/upub/core/src/model/instance.rs b/upub/core/src/model/instance.rs index 58039e5..c87b5bd 100644 --- a/upub/core/src/model/instance.rs +++ b/upub/core/src/model/instance.rs @@ -1,8 +1,6 @@ use nodeinfo::NodeInfoOwned; use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; -use crate::errors::UpubError; - #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "instances")] pub struct Model { @@ -48,23 +46,20 @@ impl Entity { Entity::find().filter(Column::Domain.eq(domain)) } - pub async fn domain_to_internal(domain: &str, db: &DatabaseConnection) -> crate::Result { + pub async fn domain_to_internal(domain: &str, db: &DatabaseConnection) -> Result, DbErr> { Entity::find() .filter(Column::Domain.eq(domain)) .select_only() .select_column(Column::Internal) .into_tuple::() .one(db) - .await? - .ok_or_else(UpubError::not_found) + .await } - pub async fn nodeinfo(domain: &str) -> crate::Result { - Ok( - reqwest::get(format!("https://{domain}/nodeinfo/2.0.json")) - .await? - .json() - .await? - ) + pub async fn nodeinfo(domain: &str) -> reqwest::Result { + reqwest::get(format!("https://{domain}/nodeinfo/2.0.json")) + .await? + .json() + .await } } diff --git a/upub/core/src/model/mod.rs b/upub/core/src/model/mod.rs index dc5fcf8..ac20903 100644 --- a/upub/core/src/model/mod.rs +++ b/upub/core/src/model/mod.rs @@ -6,8 +6,10 @@ pub mod config; pub mod credential; pub mod session; +pub mod addressing; pub mod instance; pub mod delivery; +pub mod processing; pub mod relation; pub mod announce; @@ -17,30 +19,20 @@ pub mod hashtag; pub mod mention; pub mod attachment; -pub mod addressing; - -#[derive(Debug, Clone, thiserror::Error)] -#[error("missing required field: '{0}'")] -pub struct FieldError(pub &'static str); - -impl From for axum::http::StatusCode { - fn from(value: FieldError) -> Self { - tracing::error!("bad request: {value}"); - axum::http::StatusCode::BAD_REQUEST - } -} #[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, sea_orm::FromJsonQueryResult)] pub struct Audience(pub Vec); impl From> for Audience { fn from(value: apb::Node) -> Self { + use apb::field::OptionalString; + Audience( match value { apb::Node::Empty => vec![], apb::Node::Link(l) => vec![l.href().to_string()], - apb::Node::Object(o) => if let Some(id) = o.id() { vec![id.to_string()] } else { vec![] }, - apb::Node::Array(arr) => arr.into_iter().filter_map(|l| Some(l.id()?.to_string())).collect(), + apb::Node::Object(o) => if let Ok(id) = o.id() { vec![id.to_string()] } else { vec![] }, + apb::Node::Array(arr) => arr.into_iter().filter_map(|l| l.id().str()).collect(), } ) } diff --git a/upub/core/src/model/object.rs b/upub/core/src/model/object.rs index 2c97b76..a574a13 100644 --- a/upub/core/src/model/object.rs +++ b/upub/core/src/model/object.rs @@ -1,8 +1,6 @@ -use apb::{BaseMut, Collection, CollectionMut, ObjectMut, ObjectType}; +use apb::{field::OptionalString, BaseMut, Collection, CollectionMut, ObjectMut, ObjectType}; use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; -use crate::errors::UpubError; - use super::Audience; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] @@ -131,42 +129,42 @@ impl Entity { Entity::delete_many().filter(Column::Id.eq(id)) } - pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result { + pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result, DbErr> { Entity::find() .filter(Column::Id.eq(id)) .select_only() .select_column(Column::Internal) .into_tuple::() .one(db) - .await? - .ok_or_else(UpubError::not_found) + .await } } impl ActiveModel { - pub fn new(object: &impl apb::Object) -> Result { - let t = object.object_type().ok_or(super::FieldError("type"))?; + #[deprecated = "use AP::object() from processor::normalize"] + pub fn new(object: &impl apb::Object) -> Result { + let t = object.object_type()?; if matches!(t, apb::ObjectType::Activity(_) | apb::ObjectType::Actor(_) | apb::ObjectType::Collection(_) | apb::ObjectType::Document(_) ) { - return Err(super::FieldError("type")); + return Err(apb::FieldErr("type")); } Ok(ActiveModel { internal: sea_orm::ActiveValue::NotSet, - id: sea_orm::ActiveValue::Set(object.id().ok_or(super::FieldError("id"))?.to_string()), + id: sea_orm::ActiveValue::Set(object.id()?.to_string()), object_type: sea_orm::ActiveValue::Set(t), - attributed_to: sea_orm::ActiveValue::Set(object.attributed_to().id()), - name: sea_orm::ActiveValue::Set(object.name().map(|x| x.to_string())), - summary: sea_orm::ActiveValue::Set(object.summary().map(|x| x.to_string())), - content: sea_orm::ActiveValue::Set(object.content().map(|x| x.to_string())), - context: sea_orm::ActiveValue::Set(object.context().id()), - in_reply_to: sea_orm::ActiveValue::Set(object.in_reply_to().id()), - published: sea_orm::ActiveValue::Set(object.published().unwrap_or_else(chrono::Utc::now)), - updated: sea_orm::ActiveValue::Set(object.updated().unwrap_or_else(chrono::Utc::now)), - url: sea_orm::ActiveValue::Set(object.url().id()), + attributed_to: sea_orm::ActiveValue::Set(object.attributed_to().id().str()), + name: sea_orm::ActiveValue::Set(object.name().str()), + summary: sea_orm::ActiveValue::Set(object.summary().str()), + content: sea_orm::ActiveValue::Set(object.content().str()), + context: sea_orm::ActiveValue::Set(object.context().id().str()), + in_reply_to: sea_orm::ActiveValue::Set(object.in_reply_to().id().str()), + published: sea_orm::ActiveValue::Set(object.published().unwrap_or_else(|_| chrono::Utc::now())), + updated: sea_orm::ActiveValue::Set(object.updated().unwrap_or_else(|_| chrono::Utc::now())), + url: sea_orm::ActiveValue::Set(object.url().id().str()), replies: sea_orm::ActiveValue::Set(object.replies().get() .map_or(0, |x| x.total_items().unwrap_or(0)) as i32), likes: sea_orm::ActiveValue::Set(object.likes().get() diff --git a/upub/core/src/model/relation.rs b/upub/core/src/model/relation.rs index 724e398..1a6d43e 100644 --- a/upub/core/src/model/relation.rs +++ b/upub/core/src/model/relation.rs @@ -63,8 +63,11 @@ impl ActiveModelBehavior for ActiveModel {} impl Entity { // TODO this is 2 queries!!! can it be optimized down to 1? - pub async fn followers(uid: &str, db: &DatabaseConnection) -> crate::Result> { - let internal_id = super::actor::Entity::ap_to_internal(uid, db).await?; + pub async fn followers(uid: &str, db: &DatabaseConnection) -> Result>, DbErr> { + let Some(internal_id) = super::actor::Entity::ap_to_internal(uid, db).await? + else { + return Ok(None); + }; let out = Entity::find() .join( sea_orm::JoinType::InnerJoin, @@ -81,12 +84,15 @@ impl Entity { .all(db) .await?; - Ok(out) + Ok(Some(out)) } // TODO this is 2 queries!!! can it be optimized down to 1? - pub async fn following(uid: &str, db: &DatabaseConnection) -> crate::Result> { - let internal_id = super::actor::Entity::ap_to_internal(uid, db).await?; + pub async fn following(uid: &str, db: &DatabaseConnection) -> Result>, DbErr> { + let Some(internal_id) = super::actor::Entity::ap_to_internal(uid, db).await? + else { + return Ok(None); + }; let out = Entity::find() .join( sea_orm::JoinType::InnerJoin, @@ -103,7 +109,7 @@ impl Entity { .all(db) .await?; - Ok(out) + Ok(Some(out)) } // TODO this is 3 queries!!! can it be optimized down to 1? diff --git a/upub/core/src/server/admin.rs b/upub/core/src/server/admin.rs index bd7c2ba..35c10c2 100644 --- a/upub/core/src/server/admin.rs +++ b/upub/core/src/server/admin.rs @@ -1,4 +1,4 @@ -use sea_orm::{ActiveValue::{Set, NotSet}, EntityTrait}; +use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait}; #[axum::async_trait] pub trait Administrable { @@ -10,7 +10,7 @@ pub trait Administrable { summary: Option, avatar_url: Option, banner_url: Option, - ) -> crate::Result<()>; + ) -> Result<(), DbErr>; } #[axum::async_trait] @@ -23,7 +23,7 @@ impl Administrable for super::Context { summary: Option, avatar_url: Option, banner_url: Option, - ) -> crate::Result<()> { + ) -> Result<(), DbErr> { let key = openssl::rsa::Rsa::generate(2048).unwrap(); let ap_id = self.uid(&username); let db = self.db(); diff --git a/upub/core/src/server/context.rs b/upub/core/src/server/context.rs index fec2135..b7b26c3 100644 --- a/upub/core/src/server/context.rs +++ b/upub/core/src/server/context.rs @@ -1,13 +1,10 @@ use std::{collections::BTreeSet, sync::Arc}; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; +use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use crate::{config::Config, errors::UpubError, model, ext::AnyQuery}; use uriproxy::UriClass; -use super::dispatcher::Dispatcher; - - #[derive(Clone)] pub struct Context(Arc); struct ContextInner { @@ -16,7 +13,6 @@ struct ContextInner { domain: String, protocol: String, base_url: String, - dispatcher: Dispatcher, // TODO keep these pre-parsed actor: model::actor::Model, instance: model::instance::Model, @@ -49,10 +45,6 @@ impl Context { if domain.starts_with("http") { domain = domain.replace("https://", "").replace("http://", ""); } - let dispatcher = Dispatcher::default(); - for _ in 0..1 { // TODO customize delivery workers amount - dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! - } let base_url = format!("{}{}", protocol, domain); let (actor, instance) = super::init::application(domain.clone(), base_url.clone(), &db).await?; @@ -60,8 +52,8 @@ impl Context { // TODO maybe we could provide a more descriptive error... let pkey = actor.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string(); - let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?; - let relay_sources = model::relation::Entity::following(&actor.id, &db).await?; + let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?; + let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?; let relay = Relays { sources: BTreeSet::from_iter(relay_sources), @@ -69,7 +61,7 @@ impl Context { }; Ok(Context(Arc::new(ContextInner { - base_url, db, domain, protocol, actor, instance, dispatcher, config, pkey, relay, + base_url, db, domain, protocol, actor, instance, config, pkey, relay, }))) } @@ -106,10 +98,6 @@ impl Context { &self.0.base_url } - pub fn dispatcher(&self) -> &Dispatcher { - &self.0.dispatcher - } - /// get full user id uri pub fn uid(&self, id: &str) -> String { uriproxy::uri(self.base(), UriClass::Actor, id) @@ -148,7 +136,7 @@ impl Context { id.starts_with(self.base()) } - pub async fn is_local_internal_object(&self, internal: i64) -> crate::Result { + pub async fn is_local_internal_object(&self, internal: i64) -> Result { model::object::Entity::find() .filter(model::object::Column::Internal.eq(internal)) .select_only() @@ -158,7 +146,7 @@ impl Context { .await } - pub async fn is_local_internal_activity(&self, internal: i64) -> crate::Result { + pub async fn is_local_internal_activity(&self, internal: i64) -> Result { model::activity::Entity::find() .filter(model::activity::Column::Internal.eq(internal)) .select_only() @@ -169,7 +157,7 @@ impl Context { } #[allow(unused)] - pub async fn is_local_internal_actor(&self, internal: i64) -> crate::Result { + pub async fn is_local_internal_actor(&self, internal: i64) -> Result { model::actor::Entity::find() .filter(model::actor::Column::Internal.eq(internal)) .select_only() diff --git a/upub/core/src/server/inbox.rs b/upub/core/src/server/inbox.rs deleted file mode 100644 index f00b5eb..0000000 --- a/upub/core/src/server/inbox.rs +++ /dev/null @@ -1,316 +0,0 @@ -use apb::{target::Addressed, Activity, Base, Object}; -use reqwest::StatusCode; -use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; - -use crate::{errors::{LoggableError, UpubError}, model, ext::AnyQuery, server::{addresser::Addresser, normalizer::Normalizer}}; - -use super::{fetcher::{Fetcher, PullResult}, side_effects::SideEffects, Context}; - - -#[axum::async_trait] -impl apb::server::Inbox for Context { - type Error = UpubError; - type Activity = serde_json::Value; - - async fn create(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { - let Some(object_node) = activity.object().extract() else { - // TODO we could process non-embedded activities or arrays but im lazy rn - tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); - return Err(UpubError::unprocessable()); - }; - if let Some(reply) = object_node.in_reply_to().id() { - if let Err(e) = self.fetch_object(&reply).await { - tracing::warn!("failed fetching replies for received object: {e}"); - } - } - let activity_model = self.insert_activity(activity, Some(server.clone())).await?; - let object_model = self.insert_object(object_node, Some(server)).await?; - let expanded_addressing = self.expand_addressing(activity_model.addressed()).await?; - self.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing).await?; - tracing::info!("{} posted {}", activity_model.actor, object_model.id); - Ok(()) - } - - async fn like(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { - let uid = activity.actor().id().ok_or(UpubError::bad_request())?; - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - let object_uri = activity.object().id().ok_or(UpubError::bad_request())?; - let obj = self.fetch_object(&object_uri).await?; - if model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) - .any(self.db()) - .await? - { - return Err(UpubError::not_modified()); - } - - let activity_model = self.insert_activity(activity, Some(server)).await?; - self.process_like(internal_uid, obj.internal, activity_model.internal, activity_model.published).await?; - let mut expanded_addressing = self.expand_addressing(activity_model.addressed()).await?; - if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! - expanded_addressing.push( - model::object::Entity::find_by_id(obj.internal) - .select_only() - .select_column(model::object::Column::AttributedTo) - .into_tuple::() - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)? - ); - } - self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; - tracing::info!("{} liked {}", uid, obj.id); - Ok(()) - } - - async fn follow(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string(); - let source_actor = activity.actor().id().ok_or_else(UpubError::bad_request)?; - let source_actor_internal = model::actor::Entity::ap_to_internal(&source_actor, self.db()).await?; - let target_actor = activity.object().id().ok_or_else(UpubError::bad_request)?; - let usr = self.fetch_user(&target_actor).await?; - let activity_model = model::activity::ActiveModel::new(&activity)?; - model::activity::Entity::insert(activity_model) - .exec(self.db()).await?; - let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - let relation_model = model::relation::ActiveModel { - internal: NotSet, - accept: Set(None), - activity: Set(internal_aid), - follower: Set(source_actor_internal), - following: Set(usr.internal), - }; - model::relation::Entity::insert(relation_model) - .exec(self.db()).await?; - let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; - if !expanded_addressing.contains(&target_actor) { - expanded_addressing.push(target_actor); - } - self.address_to(Some(internal_aid), None, &expanded_addressing).await?; - tracing::info!("{} wants to follow {}", source_actor, usr.id); - Ok(()) - } - - async fn accept(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - // TODO what about TentativeAccept - let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string(); - let target_actor = activity.actor().id().ok_or_else(UpubError::bad_request)?; - let follow_request_id = activity.object().id().ok_or_else(UpubError::bad_request)?; - let follow_activity = model::activity::Entity::find_by_ap_id(&follow_request_id) - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)?; - - if follow_activity.object.unwrap_or("".into()) != target_actor { - return Err(UpubError::forbidden()); - } - - let activity_model = model::activity::ActiveModel::new(&activity)?; - model::activity::Entity::insert(activity_model) - .exec(self.db()) - .await?; - let accept_internal_id = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - - model::actor::Entity::update_many() - .col_expr( - model::actor::Column::FollowingCount, - Expr::col(model::actor::Column::FollowingCount).add(1) - ) - .filter(model::actor::Column::Id.eq(&follow_activity.actor)) - .exec(self.db()) - .await?; - model::actor::Entity::update_many() - .col_expr( - model::actor::Column::FollowersCount, - Expr::col(model::actor::Column::FollowersCount).add(1) - ) - .filter(model::actor::Column::Id.eq(&follow_activity.actor)) - .exec(self.db()) - .await?; - - model::relation::Entity::update_many() - .col_expr(model::relation::Column::Accept, Expr::value(Some(accept_internal_id))) - .filter(model::relation::Column::Activity.eq(follow_activity.internal)) - .exec(self.db()).await?; - - tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); - - let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; - if !expanded_addressing.contains(&follow_activity.actor) { - expanded_addressing.push(follow_activity.actor); - } - self.address_to(Some(accept_internal_id), None, &expanded_addressing).await?; - Ok(()) - } - - async fn reject(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - // TODO what about TentativeReject? - let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string(); - let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; - let follow_request_id = activity.object().id().ok_or_else(UpubError::bad_request)?; - let follow_activity = model::activity::Entity::find_by_ap_id(&follow_request_id) - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)?; - - if follow_activity.object.unwrap_or("".into()) != uid { - return Err(UpubError::forbidden()); - } - - let activity_model = model::activity::ActiveModel::new(&activity)?; - model::activity::Entity::insert(activity_model) - .exec(self.db()) - .await?; - let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - - model::relation::Entity::delete_many() - .filter(model::relation::Column::Activity.eq(internal_aid)) - .exec(self.db()) - .await?; - - tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor); - - let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; - if !expanded_addressing.contains(&follow_activity.actor) { - expanded_addressing.push(follow_activity.actor); - } - - self.address_to(Some(internal_aid), None, &expanded_addressing).await?; - Ok(()) - } - - async fn delete(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; - model::actor::Entity::delete_by_ap_id(&oid).exec(self.db()).await.info_failed("failed deleting from users"); - model::object::Entity::delete_by_ap_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects"); - tracing::debug!("deleted '{oid}'"); - Ok(()) - } - - async fn update(&self, _server: String, activity: serde_json::Value) -> crate::Result<()> { - let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; - let aid = activity.id().ok_or_else(UpubError::bad_request)?; - let Some(object_node) = activity.object().extract() else { - // TODO we could process non-embedded activities or arrays but im lazy rn - tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); - return Err(UpubError::unprocessable()); - }; - let oid = object_node.id().ok_or_else(UpubError::bad_request)?.to_string(); - - let activity_model = model::activity::ActiveModel::new(&activity)?; - model::activity::Entity::insert(activity_model) - .exec(self.db()) - .await?; - let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; - - let internal_oid = match object_node.object_type().ok_or_else(UpubError::bad_request)? { - apb::ObjectType::Actor(_) => { - let internal_uid = model::actor::Entity::ap_to_internal(&oid, self.db()).await?; - let mut actor_model = model::actor::ActiveModel::new(&object_node)?; - actor_model.internal = Set(internal_uid); - actor_model.updated = Set(chrono::Utc::now()); - model::actor::Entity::update(actor_model) - .exec(self.db()) - .await?; - Some(internal_uid) - }, - apb::ObjectType::Note => { - let internal_oid = model::object::Entity::ap_to_internal(&oid, self.db()).await?; - let mut object_model = model::object::ActiveModel::new(&object_node)?; - object_model.internal = Set(internal_oid); - object_model.updated = Set(chrono::Utc::now()); - model::object::Entity::update(object_model) - .exec(self.db()) - .await?; - Some(internal_oid) - }, - t => { - tracing::warn!("no side effects implemented for update type {t:?}"); - None - }, - }; - - tracing::info!("{} updated {}", uid, oid); - let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(internal_aid), internal_oid, &expanded_addressing).await?; - Ok(()) - } - - async fn undo(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { - let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - // TODO in theory we could work with just object_id but right now only accept embedded - let undone_activity = activity.object().extract().ok_or_else(UpubError::bad_request)?; - let undone_activity_author = undone_activity.actor().id().ok_or_else(UpubError::bad_request)?; - - // can't undo activities from remote actors! - if server != Context::server(&undone_activity_author) { - return Err(UpubError::forbidden()); - }; - - let activity_model = self.insert_activity(activity.clone(), Some(server)).await?; - - let targets = self.expand_addressing(activity.addressed()).await?; - self.process_undo(internal_uid, activity).await?; - - self.address_to(Some(activity_model.internal), None, &targets).await?; - - Ok(()) - } - - async fn announce(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { - let uid = activity.actor().id().ok_or_else(|| UpubError::field("actor"))?; - let actor = self.fetch_user(&uid).await?; - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?; - - match self.pull(&announced_id).await? { - PullResult::Actor(_) => Err(UpubError::unprocessable()), - PullResult::Object(object) => { - let object_model = self.resolve_object(object).await?; - let activity_model = self.insert_activity(activity.clone(), Some(server.clone())).await?; - - // relays send us objects as Announce, but we don't really want to count those towards the - // total shares count of an object, so just fetch the object and be done with it - if !matches!(actor.actor_type, apb::ActorType::Person) { - tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); - return Ok(()) - } - - let share = model::announce::ActiveModel { - internal: NotSet, - actor: Set(internal_uid), - object: Set(object_model.internal), - published: Set(activity.published().unwrap_or(chrono::Utc::now())), - }; - - let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; - model::announce::Entity::insert(share) - .exec(self.db()).await?; - model::object::Entity::update_many() - .col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1)) - .filter(model::object::Column::Internal.eq(object_model.internal)) - .exec(self.db()) - .await?; - - tracing::info!("{} shared {}", activity_model.actor, announced_id); - Ok(()) - }, - PullResult::Activity(activity) => { - // groups update all members of other things that happen inside, process those - let server = Context::server(activity.id().unwrap_or_default()); - match activity.activity_type().ok_or_else(UpubError::bad_request)? { - apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(self.like(server, activity).await?), - apb::ActivityType::Create => Ok(self.create(server, activity).await?), - apb::ActivityType::Undo => Ok(self.undo(server, activity).await?), - apb::ActivityType::Delete => Ok(self.delete(server, activity).await?), - apb::ActivityType::Update => Ok(self.update(server, activity).await?), - x => { - tracing::warn!("ignoring unhandled announced activity of type {x:?}"); - Err(StatusCode::NOT_IMPLEMENTED.into()) - }, - } - }, - } - } -} diff --git a/upub/core/src/server/mod.rs b/upub/core/src/server/mod.rs index 8fcefff..90135f0 100644 --- a/upub/core/src/server/mod.rs +++ b/upub/core/src/server/mod.rs @@ -1,13 +1,5 @@ -pub mod addresser; pub mod admin; pub mod context; -pub mod dispatcher; -pub mod fetcher; -pub mod inbox; pub mod init; -pub mod outbox; -pub mod auth; -pub mod normalizer; -pub mod side_effects; pub use context::Context; diff --git a/upub/core/src/server/normalizer.rs b/upub/core/src/server/normalizer.rs deleted file mode 100644 index 7b56662..0000000 --- a/upub/core/src/server/normalizer.rs +++ /dev/null @@ -1,160 +0,0 @@ -use apb::{Node, Base, Object, Document}; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; -use crate::{errors::UpubError, model, server::Context}; - -#[axum::async_trait] -pub trait Normalizer { - async fn insert_object(&self, obj: impl apb::Object, server: Option) -> crate::Result; - async fn insert_activity(&self, act: impl apb::Activity, server: Option) -> crate::Result; -} - -#[axum::async_trait] -impl Normalizer for super::Context { - async fn insert_object(&self, object_node: impl apb::Object, server: Option) -> crate::Result { - let oid = object_node.id().ok_or_else(UpubError::bad_request)?.to_string(); - let uid = object_node.attributed_to().id(); - let mut object_model = model::object::ActiveModel::new(&object_node)?; - if let Some(server) = server { - // make sure we're allowed to create this object - if let Set(Some(object_author)) = &object_model.attributed_to { - if server != Context::server(object_author) { - return Err(UpubError::forbidden()); - } - } else if server != Context::server(&oid) { - return Err(UpubError::forbidden()); - }; - } - - // make sure content only contains a safe subset of html - if let Set(Some(content)) = object_model.content { - object_model.content = Set(Some(mdhtml::safe_html(&content))); - } - - // fix context for remote posts - // > note that this will effectively recursively try to fetch the parent object, in order to find - // > the context (which is id of topmost object). there's a recursion limit of 16 hidden inside - // > btw! also if any link is broken or we get rate limited, the whole insertion fails which is - // > kind of dumb. there should be a job system so this can be done in waves. or maybe there's - // > some whole other way to do this?? im thinking but misskey aaaa!! TODO - if let Set(Some(ref reply)) = object_model.in_reply_to { - if let Some(o) = model::object::Entity::find_by_ap_id(reply).one(self.db()).await? { - object_model.context = Set(o.context); - } else { - object_model.context = Set(None); // TODO to be filled by some other task - } - } else { - object_model.context = Set(Some(oid.clone())); - } - - model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?; - let object = model::object::Entity::find_by_ap_id(&oid).one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?; - - // update replies counter - if let Set(Some(ref in_reply_to)) = object_model.in_reply_to { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(in_reply_to)) - .col_expr(model::object::Column::Replies, Expr::col(model::object::Column::Replies).add(1)) - .exec(self.db()) - .await?; - } - // update statuses counter - if let Some(object_author) = uid { - model::actor::Entity::update_many() - .col_expr(model::actor::Column::StatusesCount, Expr::col(model::actor::Column::StatusesCount).add(1)) - .filter(model::actor::Column::Id.eq(&object_author)) - .exec(self.db()) - .await?; - } - - for attachment in object_node.attachment().flat() { - let attachment_model = match attachment { - Node::Empty => continue, - Node::Array(_) => { - tracing::warn!("ignoring array-in-array while processing attachments"); - continue - }, - Node::Link(l) => model::attachment::ActiveModel { - internal: sea_orm::ActiveValue::NotSet, - url: Set(l.href().to_string()), - object: Set(object.internal), - document_type: Set(apb::DocumentType::Page), - name: Set(l.link_name().map(|x| x.to_string())), - media_type: Set(l.link_media_type().unwrap_or("link").to_string()), - published: Set(chrono::Utc::now()), - }, - Node::Object(o) => model::attachment::ActiveModel { - internal: sea_orm::ActiveValue::NotSet, - url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())), - object: Set(object.internal), - document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), - name: Set(o.name().map(|x| x.to_string())), - media_type: Set(o.media_type().unwrap_or("link").to_string()), - published: Set(o.published().unwrap_or_else(chrono::Utc::now)), - }, - }; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - // lemmy sends us an image field in posts, treat it like an attachment i'd say - if let Some(img) = object_node.image().get() { - // TODO lemmy doesnt tell us the media type but we use it to display the thing... - let img_url = img.url().id().unwrap_or_default(); - let media_type = if img_url.ends_with("png") { - Some("image/png".to_string()) - } else if img_url.ends_with("webp") { - Some("image/webp".to_string()) - } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { - Some("image/jpeg".to_string()) - } else { - None - }; - - let attachment_model = model::attachment::ActiveModel { - internal: sea_orm::ActiveValue::NotSet, - url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())), - object: Set(object.internal), - document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), - name: Set(img.name().map(|x| x.to_string())), - media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()), - published: Set(img.published().unwrap_or_else(chrono::Utc::now)), - }; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - - Ok(object) - } - - async fn insert_activity(&self, activity: impl apb::Activity, server: Option) -> crate::Result { - let mut activity_model = model::activity::Model { - internal: 0, - id: activity.id().ok_or_else(|| UpubError::field("id"))?.to_string(), - activity_type: activity.activity_type().ok_or_else(|| UpubError::field("type"))?, - actor: activity.actor().id().ok_or_else(|| UpubError::field("actor"))?, - object: activity.object().id(), - target: activity.target().id(), - published: activity.published().unwrap_or(chrono::Utc::now()), - to: activity.to().into(), - bto: activity.bto().into(), - cc: activity.cc().into(), - bcc: activity.bcc().into(), - }; - if let Some(server) = server { - if Context::server(&activity_model.actor) != server - || Context::server(&activity_model.id) != server { - return Err(UpubError::forbidden()); - } - } - let mut active_model = activity_model.clone().into_active_model(); - active_model.internal = NotSet; - model::activity::Entity::insert(active_model) - .exec(self.db()) - .await?; - - let internal = model::activity::Entity::ap_to_internal(&activity_model.id, self.db()).await?; - activity_model.internal = internal; - Ok(activity_model) - } -} diff --git a/upub/core/src/server/side_effects.rs b/upub/core/src/server/side_effects.rs deleted file mode 100644 index 40092f8..0000000 --- a/upub/core/src/server/side_effects.rs +++ /dev/null @@ -1,79 +0,0 @@ -use reqwest::StatusCode; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter}; - -use crate::{errors::UpubError, model}; - -#[axum::async_trait] -pub trait SideEffects { - async fn process_like(&self, who: i64, what: i64, with: i64, when: chrono::DateTime) -> crate::Result<()>; - async fn process_undo(&self, who: i64, activity: impl apb::Activity) -> crate::Result<()>; -} - -#[axum::async_trait] -impl SideEffects for super::Context { - async fn process_like(&self, who: i64, what: i64, with: i64, when: chrono::DateTime) -> crate::Result<()> { - let like = model::like::ActiveModel { - internal: NotSet, - actor: Set(who), - object: Set(what), - activity: Set(with), - published: Set(when), - }; - model::like::Entity::insert(like).exec(self.db()).await?; - model::object::Entity::update_many() - .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) - .filter(model::object::Column::Internal.eq(what)) - .exec(self.db()) - .await?; - - Ok(()) - } - - async fn process_undo(&self, who: i64, activity: impl apb::Activity) -> crate::Result<()> { - let undone_object_id = activity.object().id().ok_or_else(UpubError::bad_request)?; - match activity.activity_type() { - Some(apb::ActivityType::Like) => { - let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?; - model::like::Entity::delete_many() - .filter( - Condition::all() - .add(model::like::Column::Actor.eq(who)) - .add(model::like::Column::Object.eq(internal_oid)) - ) - .exec(self.db()) - .await?; - model::object::Entity::update_many() - .filter(model::object::Column::Internal.eq(internal_oid)) - .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1)) - .exec(self.db()) - .await?; - }, - Some(apb::ActivityType::Follow) => { - let undone_aid = activity.object().id().ok_or_else(UpubError::bad_request)?; - let internal_aid = model::activity::Entity::ap_to_internal(&undone_aid, self.db()).await?; - model::relation::Entity::delete_many() - .filter(model::relation::Column::Activity.eq(internal_aid)) - .exec(self.db()) - .await?; - model::actor::Entity::update_many() - .filter(model::actor::Column::Internal.eq(who)) - .col_expr(model::actor::Column::FollowingCount, Expr::col(model::actor::Column::FollowingCount).sub(1)) - .exec(self.db()) - .await?; - model::actor::Entity::update_many() - .filter(model::actor::Column::Id.eq(&undone_object_id)) - .col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1)) - .exec(self.db()) - .await?; - }, - t => { - tracing::error!("received 'Undo' for unimplemented activity type: {t:?}"); - return Err(StatusCode::NOT_IMPLEMENTED.into()); - }, - } - - - Ok(()) - } - -} diff --git a/upub/migrations/src/m20240524_000001_create_actor_activity_object_tables.rs b/upub/migrations/src/m20240524_000001_create_actor_activity_object_tables.rs index b5721bb..d840c98 100644 --- a/upub/migrations/src/m20240524_000001_create_actor_activity_object_tables.rs +++ b/upub/migrations/src/m20240524_000001_create_actor_activity_object_tables.rs @@ -1,4 +1,5 @@ use sea_orm_migration::prelude::*; + #[derive(DeriveIden)] pub enum Actors { Table, diff --git a/upub/processor/Cargo.toml b/upub/processor/Cargo.toml new file mode 100644 index 0000000..a0cac85 --- /dev/null +++ b/upub/processor/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "upub-processor" +version = "0.2.0" +edition = "2021" +authors = [ "alemi " ] +description = "upub background activity processing worker" +license = "AGPL-3.0" +repository = "https://git.alemi.dev/upub.git" +readme = "README.md" + +[lib] + +[dependencies] +thiserror = "1" +tracing = "0.1" +async-trait = "0.1" +serde_json = "1" +sea-orm = "0.12" +jrd = "0.1" +chrono = { version = "0.4", features = ["serde"] } +tokio = { version = "1.35", features = ["full"] } # TODO slim this down +reqwest = { version = "0.12", features = ["json"] } +apb = { path = "../../apb", features = ["unstructured", "orm", "activitypub-fe", "activitypub-counters", "litepub", "ostatus", "toot"] } +httpsign = { path = "../../utils/httpsign/" } +mdhtml = { path = "../../utils/mdhtml/" } +upub = { path = "../core/" } diff --git a/upub/core/src/server/server.rs b/upub/processor/README.md similarity index 100% rename from upub/core/src/server/server.rs rename to upub/processor/README.md diff --git a/upub/core/src/server/addresser.rs b/upub/processor/src/address.rs similarity index 62% rename from upub/core/src/server/addresser.rs rename to upub/processor/src/address.rs index 80de7b8..eca3d35 100644 --- a/upub/core/src/server/addresser.rs +++ b/upub/processor/src/address.rs @@ -1,27 +1,26 @@ -use sea_orm::{ActiveValue::{NotSet, Set}, EntityTrait}; +use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait}; -use crate::model; +use crate::fetch::Fetcher; -use super::{fetcher::Fetcher, Context}; - - -#[axum::async_trait] +#[async_trait::async_trait] pub trait Addresser { - async fn expand_addressing(&self, targets: Vec) -> crate::Result>; - async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> crate::Result<()>; - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()>; + async fn expand_addressing(&self, targets: Vec) -> Result, DbErr>; + async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> Result<(), DbErr>; + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr>; //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] - async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()>; + async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> Result<(), DbErr>; } -#[axum::async_trait] -impl Addresser for super::Context { - async fn expand_addressing(&self, targets: Vec) -> crate::Result> { +#[async_trait::async_trait] +impl Addresser for upub::Context { + async fn expand_addressing(&self, targets: Vec) -> Result, DbErr> { let mut out = Vec::new(); for target in targets { if target.ends_with("/followers") { let target_id = target.replace("/followers", ""); - let mut followers = model::relation::Entity::followers(&target_id, self.db()).await?; + let mut followers = upub::model::relation::Entity::followers(&target_id, self.db()) + .await? + .unwrap_or_else(Vec::new); if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO followers.push(target_id); } @@ -35,7 +34,7 @@ impl Addresser for super::Context { Ok(out) } - async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> crate::Result<()> { + async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> Result<(), DbErr> { // TODO address_to became kind of expensive, with these two selects right away and then another // select for each target we're addressing to... can this be improved?? let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; @@ -49,17 +48,16 @@ impl Addresser for super::Context { { let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { match ( - model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await, - model::actor::Entity::ap_to_internal(target, self.db()).await, + upub::model::instance::Entity::domain_to_internal(&upub::Context::server(target), self.db()).await?, + upub::model::actor::Entity::ap_to_internal(target, self.db()).await?, ) { - (Ok(server), Ok(actor)) => (Some(server), Some(actor)), - (Err(e), Ok(_)) => { tracing::error!("failed resolving domain: {e}"); continue; }, - (Ok(_), Err(e)) => { tracing::error!("failed resolving actor: {e}"); continue; }, - (Err(es), Err(ea)) => { tracing::error!("failed resolving domain ({es}) and actor ({ea})"); continue; }, + (Some(server), Some(actor)) => (Some(server), Some(actor)), + (None, _) => { tracing::error!("failed resolving domain"); continue; }, + (_, None) => { tracing::error!("failed resolving actor"); continue; }, } }; addressing.push( - model::addressing::ActiveModel { + upub::model::addressing::ActiveModel { internal: NotSet, instance: Set(server), actor: Set(actor), @@ -71,7 +69,7 @@ impl Addresser for super::Context { } if !addressing.is_empty() { - model::addressing::Entity::insert_many(addressing) + upub::model::addressing::Entity::insert_many(addressing) .exec(self.db()) .await?; } @@ -79,17 +77,17 @@ impl Addresser for super::Context { Ok(()) } - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()> { + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> { let mut deliveries = Vec::new(); for target in targets.iter() .filter(|to| !to.is_empty()) - .filter(|to| Context::server(to) != self.domain()) + .filter(|to| upub::Context::server(to) != self.domain()) .filter(|to| to != &apb::target::PUBLIC) { // TODO fetch concurrently match self.fetch_user(target).await { - Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( - model::delivery::ActiveModel { + Ok(upub::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( + upub::model::delivery::ActiveModel { internal: sea_orm::ActiveValue::NotSet, actor: Set(from.to_string()), // TODO we should resolve each user by id and check its inbox because we can't assume @@ -107,21 +105,30 @@ impl Addresser for super::Context { } if !deliveries.is_empty() { - model::delivery::Entity::insert_many(deliveries) + upub::model::delivery::Entity::insert_many(deliveries) .exec(self.db()) .await?; } - self.dispatcher().wakeup(); + // TODO can we make deliveries instant? for better UX + // self.dispatcher().wakeup(); Ok(()) } //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] - async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { + async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> Result<(), DbErr> { let addressed = self.expand_addressing(activity_targets).await?; - let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; - let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None }; + let internal_aid = upub::model::activity::Entity::ap_to_internal(aid, self.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(aid.to_string()))?; + let internal_oid = if let Some(o) = oid { + Some( + upub::model::object::Entity::ap_to_internal(o, self.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(o.to_string()))? + ) + } else { None }; self.address_to(Some(internal_aid), internal_oid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?; Ok(()) diff --git a/upub/core/src/server/dispatcher.rs b/upub/processor/src/dispatcher.rs similarity index 100% rename from upub/core/src/server/dispatcher.rs rename to upub/processor/src/dispatcher.rs diff --git a/upub/core/src/server/fetcher.rs b/upub/processor/src/fetch.rs similarity index 54% rename from upub/core/src/server/fetcher.rs rename to upub/processor/src/fetch.rs index 0bf41f9..ca77f62 100644 --- a/upub/core/src/server/fetcher.rs +++ b/upub/processor/src/fetch.rs @@ -1,71 +1,105 @@ use std::collections::BTreeMap; use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; -use base64::Engine; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; -use sea_orm::{EntityTrait, IntoActiveModel, NotSet}; +use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet}; -use crate::{errors::UpubError, model, VERSION}; - -use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context}; +use super::{address::Addresser, normalize::Normalizer}; +use httpsign::HttpSignature; #[derive(Debug, Clone)] -pub enum PullResult { +pub enum Pull { Actor(T), Activity(T), Object(T), } -impl PullResult { - pub fn actor(self) -> crate::Result { +#[derive(Debug, thiserror::Error)] +pub enum PullError { + #[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")] + Mismatch(apb::ObjectType, apb::ObjectType), + + #[error("error fetching resource: {0:?}")] + Reqwest(#[from] reqwest::Error), + + #[error("fetch failed with status {0}: {1}")] + Fetch(reqwest::StatusCode, String), + + #[error("database error while fetching resource: {0:?}")] + Database(#[from] sea_orm::DbErr), + + #[error("dereferenced resource is malformed: {0:?}")] + Malformed(#[from] apb::FieldErr), + + #[error("error normalizing resource: {0:?}")] + Normalization(#[from] crate::normalize::NormalizerError), + + #[error("too many redirects while resolving resource id, aborting")] + TooManyRedirects, + + #[error("resource no longer exists")] + Tombstone, + + #[error("error constructing http signature: {0:?}")] + HttpSignature(#[from] httpsign::HttpSignatureError), +} + +impl PullError { + fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self { + PullError::Mismatch(expected, found) + } +} + +impl Pull { + pub fn actor(self) -> Result { match self { Self::Actor(x) => Ok(x), - Self::Activity(x) => Err(UpubError::Mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), - Self::Object(x) => Err(UpubError::Mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))), + Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), + Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))), } } - pub fn activity(self) -> crate::Result { + pub fn activity(self) -> Result { match self { - Self::Actor(x) => Err(UpubError::Mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), + Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), Self::Activity(x) => Ok(x), - Self::Object(x) => Err(UpubError::Mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))), + Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))), } } - pub fn object(self) -> crate::Result { + pub fn object(self) -> Result { match self { - Self::Actor(x) => Err(UpubError::Mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), - Self::Activity(x) => Err(UpubError::Mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), + Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), + Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), Self::Object(x) => Ok(x), } } } -#[axum::async_trait] +#[async_trait::async_trait] pub trait Fetcher { - async fn pull(&self, id: &str) -> crate::Result> { self.pull_r(id, 0).await } - async fn pull_r(&self, id: &str, depth: u32) -> crate::Result>; + async fn pull(&self, id: &str) -> Result, PullError> { self.pull_r(id, 0).await } + async fn pull_r(&self, id: &str, depth: u32) -> Result, PullError>; - async fn webfinger(&self, user: &str, host: &str) -> crate::Result; + async fn webfinger(&self, user: &str, host: &str) -> Result, PullError>; - async fn fetch_domain(&self, domain: &str) -> crate::Result; + async fn fetch_domain(&self, domain: &str) -> Result; - async fn fetch_user(&self, id: &str) -> crate::Result; - async fn resolve_user(&self, actor: serde_json::Value) -> crate::Result; + async fn fetch_user(&self, id: &str) -> Result; + async fn resolve_user(&self, actor: serde_json::Value) -> Result; - async fn fetch_activity(&self, id: &str) -> crate::Result; - async fn resolve_activity(&self, activity: serde_json::Value) -> crate::Result; + async fn fetch_activity(&self, id: &str) -> Result; + async fn resolve_activity(&self, activity: serde_json::Value) -> Result; - async fn fetch_object(&self, id: &str) -> crate::Result { self.fetch_object_r(id, 0).await } - #[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> crate::Result { self.resolve_object_r(object, 0).await } + async fn fetch_object(&self, id: &str) -> Result { self.fetch_object_r(id, 0).await } + #[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result { self.resolve_object_r(object, 0).await } - async fn fetch_object_r(&self, id: &str, depth: u32) -> crate::Result; - async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> crate::Result; + async fn fetch_object_r(&self, id: &str, depth: u32) -> Result; + async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result; - async fn fetch_thread(&self, id: &str) -> crate::Result<()>; + async fn fetch_thread(&self, id: &str) -> Result<(), PullError>; async fn request( method: reqwest::Method, @@ -74,15 +108,11 @@ pub trait Fetcher { from: &str, key: &str, domain: &str, - ) -> crate::Result { - let host = Context::server(url); + ) -> Result { + let host = upub::Context::server(url); let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT" let path = url.replace("https://", "").replace("http://", "").replace(&host, ""); - let digest = format!("sha-256={}", - base64::prelude::BASE64_STANDARD.encode( - openssl::sha::sha256(payload.unwrap_or("").as_bytes()) - ) - ); + let digest = httpsign::digest(payload.unwrap_or_default()); let headers = vec!["(request-target)", "host", "date", "digest"]; let headers_map : BTreeMap = [ @@ -106,7 +136,7 @@ pub trait Fetcher { .request(method.clone(), url) .header(ACCEPT, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"") .header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"") - .header(USER_AGENT, format!("upub+{VERSION} ({domain})")) + .header(USER_AGENT, format!("upub+{} ({domain})", upub::VERSION)) .header("Host", host.clone()) .header("Date", date.clone()) .header("Digest", digest) @@ -118,16 +148,20 @@ pub trait Fetcher { // TODO this is ugly but i want to see the raw response text when it's a failure match response.error_for_status_ref() { Ok(_) => Ok(response), - Err(e) => Err(UpubError::FetchError(e, response.text().await?)), + Err(e) => + Err(PullError::Fetch( + e.status().unwrap_or_default(), + response.text().await?, + )), } } } -#[axum::async_trait] -impl Fetcher for Context { - async fn pull_r(&self, id: &str, depth: u32) -> crate::Result> { - let _domain = self.fetch_domain(&Context::server(id)).await?; +#[async_trait::async_trait] +impl Fetcher for upub::Context { + async fn pull_r(&self, id: &str, depth: u32) -> Result, PullError> { + let _domain = self.fetch_domain(&upub::Context::server(id)).await?; let document = Self::request( Method::GET, id, None, @@ -137,62 +171,62 @@ impl Fetcher for Context { .json::() .await?; - let doc_id = document.id().ok_or_else(|| UpubError::field("id"))?; + let doc_id = document.id()?; if id != doc_id { if depth >= self.cfg().security.max_id_redirects { - return Err(UpubError::unprocessable()); + return Err(PullError::TooManyRedirects); } return self.pull(doc_id).await; } - match document.object_type() { - None => Err(UpubError::bad_request()), - Some(apb::ObjectType::Collection(_)) => Err(UpubError::unprocessable()), - Some(apb::ObjectType::Tombstone) => Err(UpubError::not_found()), - Some(apb::ObjectType::Activity(_)) => Ok(PullResult::Activity(document)), - Some(apb::ObjectType::Actor(_)) => Ok(PullResult::Actor(document)), - _ => Ok(PullResult::Object(document)), + match document.object_type()? { + apb::ObjectType::Collection(x) => Err(PullError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))), + apb::ObjectType::Tombstone => Err(PullError::Tombstone), + apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)), + apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)), + _ => Ok(Pull::Object(document)), } } - async fn webfinger(&self, user: &str, host: &str) -> crate::Result { + async fn webfinger(&self, user: &str, host: &str) -> Result, PullError> { let subject = format!("acct:{user}@{host}"); let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}"); let resource = reqwest::Client::new() .get(webfinger_uri) .header(ACCEPT, "application/jrd+json") - .header(USER_AGENT, format!("upub+{VERSION} ({})", self.domain())) + .header(USER_AGENT, format!("upub+{} ({})", upub::VERSION, self.domain())) .send() .await? .json::() .await?; if resource.subject != subject { - return Err(UpubError::unprocessable()); + tracing::error!("webfinger result ({}) differs from expected subject ({})", resource.subject, subject); + return Ok(None); } for link in resource.links { if link.rel == "self" { if let Some(href) = link.href { - return Ok(href); + return Ok(Some(href)); } } } if let Some(alias) = resource.aliases.into_iter().next() { - return Ok(alias); + return Ok(Some(alias)); } - Err(UpubError::not_found()) + Ok(None) } - async fn fetch_domain(&self, domain: &str) -> crate::Result { - if let Some(x) = model::instance::Entity::find_by_domain(domain).one(self.db()).await? { + async fn fetch_domain(&self, domain: &str) -> Result { + if let Some(x) = upub::model::instance::Entity::find_by_domain(domain).one(self.db()).await? { return Ok(x); // already in db, easy } - let mut instance_model = model::instance::Model { + let mut instance_model = upub::model::instance::Model { internal: 0, domain: domain.to_string(), name: None, @@ -211,16 +245,16 @@ impl Fetcher for Context { &format!("https://{}/", self.domain()), self.pkey(), self.domain(), ).await { if let Ok(actor) = res.json::().await { - if let Some(name) = actor.name() { + if let Ok(name) = actor.name() { instance_model.name = Some(name.to_string()); } - if let Some(icon) = actor.icon().id() { - instance_model.icon = Some(icon); + if let Ok(icon) = actor.icon().id() { + instance_model.icon = Some(icon.to_string()); } } } - if let Ok(nodeinfo) = model::instance::Entity::nodeinfo(domain).await { + if let Ok(nodeinfo) = upub::model::instance::Entity::nodeinfo(domain).await { instance_model.software = Some(nodeinfo.software.name); instance_model.version = nodeinfo.software.version; instance_model.users = nodeinfo.usage.users.and_then(|x| x.total); @@ -229,64 +263,65 @@ impl Fetcher for Context { let mut active_model = instance_model.clone().into_active_model(); active_model.internal = NotSet; - model::instance::Entity::insert(active_model).exec(self.db()).await?; - - let internal = model::instance::Entity::domain_to_internal(domain, self.db()).await?; + upub::model::instance::Entity::insert(active_model).exec(self.db()).await?; + let internal = upub::model::instance::Entity::domain_to_internal(domain, self.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?; instance_model.internal = internal; Ok(instance_model) } - async fn resolve_user(&self, mut document: serde_json::Value) -> crate::Result { - let id = document.id().ok_or_else(|| UpubError::field("id"))?.to_string(); + async fn resolve_user(&self, mut document: serde_json::Value) -> Result { + let id = document.id()?.to_string(); // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time - if let Some(followers_url) = &document.followers().id() { + if let Ok(followers_url) = document.followers().id() { let req = Self::request( Method::GET, followers_url, None, &format!("https://{}/", self.domain()), self.pkey(), self.domain(), ).await; if let Ok(res) = req { if let Ok(user_followers) = res.json::().await { - if let Some(total) = user_followers.total_items() { + if let Ok(total) = user_followers.total_items() { document = document.set_followers_count(Some(total)); } } } } - if let Some(following_url) = &document.following().id() { + if let Ok(following_url) = document.following().id() { let req = Self::request( Method::GET, following_url, None, &format!("https://{}/", self.domain()), self.pkey(), self.domain(), ).await; if let Ok(res) = req { if let Ok(user_following) = res.json::().await { - if let Some(total) = user_following.total_items() { + if let Ok(total) = user_following.total_items() { document = document.set_following_count(Some(total)); } } } } - let user_model = model::actor::ActiveModel::new(&document)?; + let user_model = upub::model::actor::ActiveModel::new(&document)?; // TODO this may fail: while fetching, remote server may fetch our service actor. // if it does so with http signature, we will fetch that actor in background // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - model::actor::Entity::insert(user_model).exec(self.db()).await?; + upub::model::actor::Entity::insert(user_model).exec(self.db()).await?; // TODO fetch it back to get the internal id Ok( - model::actor::Entity::find_by_ap_id(&id) + upub::model::actor::Entity::find_by_ap_id(&id) .one(self.db()) .await? - .ok_or_else(UpubError::internal_server_error)? + .ok_or_else(|| DbErr::RecordNotFound(id.to_string()))? ) } - async fn fetch_user(&self, id: &str) -> crate::Result { - if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_user(&self, id: &str) -> Result { + if let Some(x) = upub::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } @@ -295,8 +330,8 @@ impl Fetcher for Context { self.resolve_user(document).await } - async fn fetch_activity(&self, id: &str) -> crate::Result { - if let Some(x) = model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_activity(&self, id: &str) -> Result { + if let Some(x) = upub::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } @@ -305,22 +340,20 @@ impl Fetcher for Context { self.resolve_activity(activity).await } - async fn resolve_activity(&self, activity: serde_json::Value) -> crate::Result { - let id = activity.id().ok_or_else(|| UpubError::field("id"))?.to_string(); - - if let Some(activity_actor) = activity.actor().id() { + async fn resolve_activity(&self, activity: serde_json::Value) -> Result { + if let Ok(activity_actor) = activity.actor().id() { if let Err(e) = self.fetch_user(&activity_actor).await { tracing::warn!("could not get actor of fetched activity: {e}"); } } - if let Some(activity_object) = activity.object().id() { + if let Ok(activity_object) = activity.object().id() { if let Err(e) = self.fetch_object(&activity_object).await { tracing::warn!("could not get object of fetched activity: {e}"); } } - let activity_model = self.insert_activity(activity, Some(Context::server(&id))).await?; + let activity_model = self.insert_activity(activity).await?; let addressed = activity_model.addressed(); let expanded_addresses = self.expand_addressing(addressed).await?; @@ -329,13 +362,13 @@ impl Fetcher for Context { Ok(activity_model) } - async fn fetch_thread(&self, _id: &str) -> crate::Result<()> { + async fn fetch_thread(&self, _id: &str) -> Result<(), PullError> { // crawl_replies(self, id, 0).await todo!() } - async fn fetch_object_r(&self, id: &str, depth: u32) -> crate::Result { - if let Some(x) = model::object::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_object_r(&self, id: &str, depth: u32) -> Result { + if let Some(x) = upub::model::object::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } @@ -344,34 +377,34 @@ impl Fetcher for Context { self.resolve_object_r(object, depth).await } - async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> crate::Result { - let id = object.id().ok_or_else(|| UpubError::field("id"))?.to_string(); + async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result { + let id = object.id()?.to_string(); - if let Some(oid) = object.id() { + if let Ok(oid) = object.id() { if oid != id { - if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(self.db()).await? { + if let Some(x) = upub::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? { return Ok(x); // already in db, but with id different that given url } } } - if let Some(attributed_to) = object.attributed_to().id() { - if let Err(e) = self.fetch_user(&attributed_to).await { + if let Ok(attributed_to) = object.attributed_to().id() { + if let Err(e) = self.fetch_user(attributed_to).await { tracing::warn!("could not get actor of fetched object: {e}"); } } let addressed = object.addressed(); - if let Some(reply) = object.in_reply_to().id() { + if let Ok(reply) = object.in_reply_to().id() { if depth <= self.cfg().security.thread_crawl_depth { - self.fetch_object_r(&reply, depth + 1).await?; + self.fetch_object_r(reply, depth + 1).await?; } else { tracing::warn!("thread deeper than {}, giving up fetching more replies", self.cfg().security.thread_crawl_depth); } } - let object_model = self.insert_object(object, None).await?; + let object_model = self.insert_object(object).await?; let expanded_addresses = self.expand_addressing(addressed).await?; self.address_to(None, Some(object_model.internal), &expanded_addresses).await?; @@ -380,16 +413,16 @@ impl Fetcher for Context { } } -#[axum::async_trait] +#[async_trait::async_trait] pub trait Fetchable : Sync + Send { - async fn fetch(&mut self, ctx: &crate::server::Context) -> crate::Result<&mut Self>; + async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError>; } -#[axum::async_trait] +#[async_trait::async_trait] impl Fetchable for apb::Node { - async fn fetch(&mut self, ctx: &crate::server::Context) -> crate::Result<&mut Self> { + async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError> { if let apb::Node::Link(uri) = self { - *self = Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain()) + *self = upub::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain()) .await? .json::() .await? @@ -401,14 +434,14 @@ impl Fetchable for apb::Node { } // #[async_recursion::async_recursion] -// async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> { +// async fn crawl_replies(ctx: &upub::Context, id: &str, depth: usize) -> Result<(), PullError> { // tracing::info!("crawling replies of '{id}'"); -// let object = Context::request( +// let object = upub::Context::request( // Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), // ).await?.json::().await?; // -// let object_model = model::object::Model::new(&object)?; -// match model::object::Entity::insert(object_model.into_active_model()) +// let object_model = upub::model::object::Model::new(&object)?; +// match upub::model::object::Entity::insert(object_model.into_active_model()) // .exec(ctx.db()).await // { // Ok(_) => {}, @@ -424,7 +457,7 @@ impl Fetchable for apb::Node { // // let mut page_url = match object.replies().get() { // Some(serde_json::Value::String(x)) => { -// let replies = Context::request( +// let replies = upub::Context::request( // Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), // ).await?.json::().await?; // replies.first().id() @@ -437,7 +470,7 @@ impl Fetchable for apb::Node { // }; // // while let Some(ref url) = page_url { -// let replies = Context::request( +// let replies = upub::Context::request( // Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), // ).await?.json::().await?; // diff --git a/upub/processor/src/lib.rs b/upub/processor/src/lib.rs new file mode 100644 index 0000000..db5aa6e --- /dev/null +++ b/upub/processor/src/lib.rs @@ -0,0 +1,6 @@ +pub mod address; +pub mod normalize; +pub mod process; +pub mod fetch; + +// pub mod dispatcher; diff --git a/upub/processor/src/normalize.rs b/upub/processor/src/normalize.rs new file mode 100644 index 0000000..30928ff --- /dev/null +++ b/upub/processor/src/normalize.rs @@ -0,0 +1,287 @@ +use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; + +#[derive(Debug, thiserror::Error)] +pub enum NormalizerError { + #[error("normalized document misses required field: {0:?}")] + Malformed(#[from] apb::FieldErr), + + #[error("database error while normalizing object: {0:?}")] + DbErr(#[from] sea_orm::DbErr), +} + +#[async_trait::async_trait] +pub trait Normalizer { + async fn insert_object(&self, obj: impl apb::Object) -> Result; + async fn insert_activity(&self, act: impl apb::Activity) -> Result; +} + +#[async_trait::async_trait] +impl Normalizer for upub::Context { + + async fn insert_object(&self, object: impl apb::Object) -> Result { + let oid = object.id()?.to_string(); + let uid = object.attributed_to().id().str(); + let t = object.object_type()?; + if matches!(t, + apb::ObjectType::Activity(_) + | apb::ObjectType::Actor(_) + | apb::ObjectType::Collection(_) + | apb::ObjectType::Document(_) + ) { + return Err(apb::FieldErr("type").into()); + } + let mut object_active_model = AP::object_q(&object)?; + + // make sure content only contains a safe subset of html + if let Set(Some(content)) = object_active_model.content { + object_active_model.content = Set(Some(mdhtml::safe_html(&content))); + } + + // fix context for remote posts + // > note that this will effectively recursively try to fetch the parent object, in order to find + // > the context (which is id of topmost object). there's a recursion limit of 16 hidden inside + // > btw! also if any link is broken or we get rate limited, the whole insertion fails which is + // > kind of dumb. there should be a job system so this can be done in waves. or maybe there's + // > some whole other way to do this?? im thinking but misskey aaaa!! TODO + if let Set(Some(ref reply)) = object_active_model.in_reply_to { + if let Some(o) = upub::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? { + object_active_model.context = Set(o.context); + } else { + object_active_model.context = Set(None); // TODO to be filled by some other task + } + } else { + object_active_model.context = Set(Some(oid.clone())); + } + + upub::model::object::Entity::insert(object_active_model).exec(self.db()).await?; + let object_model = upub::model::object::Entity::find_by_ap_id(&oid) + .one(self.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?; + + // update replies counter + if let Some(ref in_reply_to) = object_model.in_reply_to { + upub::model::object::Entity::update_many() + .filter(upub::model::object::Column::Id.eq(in_reply_to)) + .col_expr(upub::model::object::Column::Replies, Expr::col(upub::model::object::Column::Replies).add(1)) + .exec(self.db()) + .await?; + } + // update statuses counter + if let Some(object_author) = uid { + upub::model::actor::Entity::update_many() + .col_expr(upub::model::actor::Column::StatusesCount, Expr::col(upub::model::actor::Column::StatusesCount).add(1)) + .filter(upub::model::actor::Column::Id.eq(&object_author)) + .exec(self.db()) + .await?; + } + + for attachment in object.attachment().flat() { + let attachment_model = match attachment { + Node::Empty => continue, + Node::Array(_) => { + tracing::warn!("ignoring array-in-array while processing attachments"); + continue + }, + Node::Link(l) => upub::model::attachment::ActiveModel { + internal: sea_orm::ActiveValue::NotSet, + url: Set(l.href().to_string()), + object: Set(object_model.internal), + document_type: Set(apb::DocumentType::Page), + name: Set(l.name().str()), + media_type: Set(l.media_type().unwrap_or("link").to_string()), + published: Set(chrono::Utc::now()), + }, + Node::Object(o) => + AP::attachment_q(o.as_document()?, object_model.internal)?, + }; + upub::model::attachment::Entity::insert(attachment_model) + .exec(self.db()) + .await?; + } + // lemmy sends us an image field in posts, treat it like an attachment i'd say + if let Some(img) = object.image().get() { + // TODO lemmy doesnt tell us the media type but we use it to display the thing... + let img_url = img.url().id().str().unwrap_or_default(); + let media_type = if img_url.ends_with("png") { + Some("image/png".to_string()) + } else if img_url.ends_with("webp") { + Some("image/webp".to_string()) + } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { + Some("image/jpeg".to_string()) + } else { + None + }; + + let mut attachment_model = AP::attachment_q(img, object_model.internal)?; + + // ugly fix for lemmy + if let Some(m) = media_type { + if img.media_type().ok().is_none() { + attachment_model.media_type = Set(m); + } + } + + upub::model::attachment::Entity::insert(attachment_model) + .exec(self.db()) + .await?; + } + + Ok(object_model) + } + + async fn insert_activity(&self, activity: impl apb::Activity) -> Result { + let mut activity_model = AP::activity(&activity)?; + + let mut active_model = activity_model.clone().into_active_model(); + active_model.internal = NotSet; + upub::model::activity::Entity::insert(active_model) + .exec(self.db()) + .await?; + + let internal = upub::model::activity::Entity::ap_to_internal(&activity_model.id, self.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; + activity_model.internal = internal; + + Ok(activity_model) + } +} + +pub struct AP; + +impl AP { + pub fn activity(activity: &impl apb::Activity) -> Result { + Ok(upub::model::activity::Model { + internal: 0, + id: activity.id()?.to_string(), + activity_type: activity.activity_type()?, + actor: activity.actor().id()?.to_string(), + object: activity.object().id().str(), + target: activity.target().id().str(), + published: activity.published().unwrap_or(chrono::Utc::now()), + to: activity.to().into(), + bto: activity.bto().into(), + cc: activity.cc().into(), + bcc: activity.bcc().into(), + }) + } + + pub fn activity_q(activity: &impl apb::Activity) -> Result { + let mut m = AP::activity(activity)?.into_active_model(); + m.internal = NotSet; + Ok(m) + } + + + + + pub fn attachment(document: &impl apb::Document, parent: i64) -> Result { + Ok(upub::model::attachment::Model { + internal: 0, + url: document.url().id().str().unwrap_or_default(), + object: parent, + document_type: document.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page)), + name: document.name().str(), + media_type: document.media_type().unwrap_or("link").to_string(), + published: document.published().unwrap_or_else(|_| chrono::Utc::now()), + }) + } + + pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result { + let mut m = AP::attachment(document, parent)?.into_active_model(); + m.internal = NotSet; + Ok(m) + } + + + + pub fn object(object: &impl apb::Object) -> Result { + let t = object.object_type()?; + if matches!(t, + apb::ObjectType::Activity(_) + | apb::ObjectType::Actor(_) + | apb::ObjectType::Collection(_) + | apb::ObjectType::Document(_) + ) { + return Err(apb::FieldErr("type")); + } + Ok(upub::model::object::Model { + internal: 0, + id: object.id()?.to_string(), + object_type: t, + attributed_to: object.attributed_to().id().str(), + name: object.name().str(), + summary: object.summary().str(), + content: object.content().str(), + context: object.context().id().str(), + in_reply_to: object.in_reply_to().id().str(), + published: object.published().unwrap_or_else(|_| chrono::Utc::now()), + updated: object.updated().unwrap_or_else(|_| chrono::Utc::now()), + url: object.url().id().str(), + replies: object.replies().get() + .map_or(0, |x| x.total_items().unwrap_or(0)) as i32, + likes: object.likes().get() + .map_or(0, |x| x.total_items().unwrap_or(0)) as i32, + announces: object.shares().get() + .map_or(0, |x| x.total_items().unwrap_or(0)) as i32, + to: object.to().into(), + bto: object.bto().into(), + cc: object.cc().into(), + bcc: object.bcc().into(), + + sensitive: object.sensitive().unwrap_or(false), + }) + } + + pub fn object_q(object: &impl apb::Object) -> Result { + let mut m = AP::object(object)?.into_active_model(); + m.internal = NotSet; + Ok(m) + } + + + + pub fn actor(actor: &impl apb::Actor) -> Result { + let ap_id = actor.id()?.to_string(); + let (domain, fallback_preferred_username) = { + let clean = ap_id + .replace("http://", "") + .replace("https://", ""); + let mut splits = clean.split('/'); + let first = splits.next().unwrap_or(""); + let last = splits.last().unwrap_or(first); + (first.to_string(), last.to_string()) + }; + Ok(upub::model::actor::Model { + internal: 0, + domain, + id: ap_id, + preferred_username: actor.preferred_username().unwrap_or(&fallback_preferred_username).to_string(), + actor_type: actor.actor_type()?, + name: actor.name().str(), + summary: actor.summary().str(), + icon: actor.icon().get().and_then(|x| x.url().id().str()), + image: actor.image().get().and_then(|x| x.url().id().str()), + inbox: actor.inbox().id().str(), + outbox: actor.outbox().id().str(), + shared_inbox: actor.endpoints().get().and_then(|x| x.shared_inbox().str()), + followers: actor.followers().id().str(), + following: actor.following().id().str(), + published: actor.published().unwrap_or(chrono::Utc::now()), + updated: chrono::Utc::now(), + following_count: actor.following_count().unwrap_or(0) as i32, + followers_count: actor.followers_count().unwrap_or(0) as i32, + statuses_count: actor.statuses_count().unwrap_or(0) as i32, + public_key: actor.public_key().get().ok_or(apb::FieldErr("publicKey"))?.public_key_pem().to_string(), + private_key: None, // there's no way to transport privkey over AP json, must come from DB + }) + } + + pub fn actor_q(actor: &impl apb::Actor) -> Result { + let mut m = AP::actor(actor)?.into_active_model(); + m.internal = NotSet; + Ok(m) + } +} diff --git a/upub/core/src/server/outbox.rs b/upub/processor/src/outbox.rs similarity index 100% rename from upub/core/src/server/outbox.rs rename to upub/processor/src/outbox.rs diff --git a/upub/processor/src/process.rs b/upub/processor/src/process.rs new file mode 100644 index 0000000..ad4647f --- /dev/null +++ b/upub/processor/src/process.rs @@ -0,0 +1,409 @@ +use apb::{target::Addressed, Activity, Base, Object}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; +use upub::{errors::LoggableError, ext::AnyQuery}; +use crate::{address::Addresser, fetch::{Fetcher, Pull}, normalize::Normalizer}; + +#[derive(Debug, thiserror::Error)] +pub enum ProcessorError { + #[error("activity already processed")] + AlreadyProcessed, + + #[error("processed activity misses required field: '{0}'")] + Malformed(#[from] apb::FieldErr), + + #[error("database error while processing: {0:?}")] + DbErr(#[from] sea_orm::DbErr), + + #[error("actor is not authorized to carry out this activity")] + Unauthorized, + + #[error("could not resolve all objects involved in this activity")] + Incomplete, + + #[error("activity not processable by this application")] + Unprocessable, + + #[error("failed normalizing and inserting entity: {0:?}")] + NormalizerError(#[from] crate::normalize::NormalizerError), + + #[error("failed fetching resource: {0:?}")] + PullError(#[from] crate::fetch::PullError), +} + +#[async_trait::async_trait] +pub trait Processor { + async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError>; +} + +#[async_trait::async_trait] +impl Processor for upub::Context { + async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError> { + // TODO we could process Links and bare Objects maybe, but probably out of AP spec? + match activity.activity_type()? { + // TODO emojireacts are NOT likes, but let's process them like ones for now maybe? + apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(self, activity).await?), + apb::ActivityType::Create => Ok(create(self, activity).await?), + apb::ActivityType::Follow => Ok(follow(self, activity).await?), + apb::ActivityType::Announce => Ok(announce(self, activity).await?), + apb::ActivityType::Accept(_) => Ok(accept(self, activity).await?), + apb::ActivityType::Reject(_) => Ok(reject(self, activity).await?), + apb::ActivityType::Undo => Ok(undo(self, activity).await?), + apb::ActivityType::Delete => Ok(delete(self, activity).await?), + apb::ActivityType::Update => Ok(update(self, activity).await?), + _ => Err(ProcessorError::Unprocessable), + } + } +} + +pub async fn create(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let Some(object_node) = activity.object().extract() else { + // TODO we could process non-embedded activities or arrays but im lazy rn + tracing::error!("refusing to process activity without embedded object"); + return Err(ProcessorError::Unprocessable); + }; + if let Ok(reply) = object_node.in_reply_to().id() { + if let Err(e) = ctx.fetch_object(reply).await { + tracing::warn!("failed fetching replies for received object: {e}"); + } + } + let activity_model = ctx.insert_activity(activity).await?; + let object_model = ctx.insert_object(object_node).await?; + let expanded_addressing = ctx.expand_addressing(object_model.addressed()).await?; + ctx.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing).await?; + tracing::info!("{} posted {}", activity_model.actor, object_model.id); + Ok(()) +} + +pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let uid = activity.actor().id()?.to_string(); + let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + let object_uri = activity.object().id()?.to_string(); + let published = activity.published().unwrap_or_else(|_|chrono::Utc::now()); + let obj = ctx.fetch_object(&object_uri).await?; + if upub::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) + .any(ctx.db()) + .await? + { + return Err(ProcessorError::AlreadyProcessed); + } + + let activity_model = ctx.insert_activity(activity).await?; + + let like = upub::model::like::ActiveModel { + internal: NotSet, + actor: Set(internal_uid), + object: Set(obj.internal), + activity: Set(activity_model.internal), + published: Set(published), + }; + upub::model::like::Entity::insert(like).exec(ctx.db()).await?; + upub::model::object::Entity::update_many() + .col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).add(1)) + .filter(upub::model::object::Column::Internal.eq(obj.internal)) + .exec(ctx.db()) + .await?; + + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! + expanded_addressing.push( + upub::model::object::Entity::find_by_id(obj.internal) + .select_only() + .select_column(upub::model::object::Column::AttributedTo) + .into_tuple::() + .one(ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)? + ); + } + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + tracing::info!("{} liked {}", uid, obj.id); + Ok(()) +} + +pub async fn follow(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let source_actor = activity.actor().id()?.to_string(); + let source_actor_internal = upub::model::actor::Entity::ap_to_internal(&source_actor, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + let target_actor = activity.object().id()?.to_string(); + let usr = ctx.fetch_user(&target_actor).await?; + let activity_model = ctx.insert_activity(activity).await?; + let relation_model = upub::model::relation::ActiveModel { + internal: NotSet, + accept: Set(None), + activity: Set(activity_model.internal), + follower: Set(source_actor_internal), + following: Set(usr.internal), + }; + upub::model::relation::Entity::insert(relation_model) + .exec(ctx.db()).await?; + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + if !expanded_addressing.contains(&target_actor) { + expanded_addressing.push(target_actor); + } + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + tracing::info!("{} wants to follow {}", source_actor, usr.id); + Ok(()) +} + +pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + // TODO what about TentativeAccept + let target_actor = activity.actor().id()?.to_string(); + let follow_request_id = activity.object().id()?.to_string(); + let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id) + .one(ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + + if follow_activity.object.unwrap_or_default() != target_actor { + return Err(ProcessorError::Unauthorized); + } + + let activity_model = ctx.insert_activity(activity).await?; + + upub::model::actor::Entity::update_many() + .col_expr( + upub::model::actor::Column::FollowingCount, + Expr::col(upub::model::actor::Column::FollowingCount).add(1) + ) + .filter(upub::model::actor::Column::Id.eq(&follow_activity.actor)) + .exec(ctx.db()) + .await?; + upub::model::actor::Entity::update_many() + .col_expr( + upub::model::actor::Column::FollowersCount, + Expr::col(upub::model::actor::Column::FollowersCount).add(1) + ) + .filter(upub::model::actor::Column::Id.eq(&follow_activity.actor)) + .exec(ctx.db()) + .await?; + + upub::model::relation::Entity::update_many() + .col_expr(upub::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) + .filter(upub::model::relation::Column::Activity.eq(follow_activity.internal)) + .exec(ctx.db()).await?; + + tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); + + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + if !expanded_addressing.contains(&follow_activity.actor) { + expanded_addressing.push(follow_activity.actor); + } + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + Ok(()) +} + +pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + // TODO what about TentativeReject? + let uid = activity.actor().id()?.to_string(); + let follow_request_id = activity.object().id()?.to_string(); + let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id) + .one(ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + + if follow_activity.object.unwrap_or_default() != uid { + return Err(ProcessorError::Unauthorized); + } + + let activity_model = ctx.insert_activity(activity).await?; + + upub::model::relation::Entity::delete_many() + .filter(upub::model::relation::Column::Activity.eq(activity_model.internal)) + .exec(ctx.db()) + .await?; + + tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor); + + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + if !expanded_addressing.contains(&follow_activity.actor) { + expanded_addressing.push(follow_activity.actor); + } + + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + Ok(()) +} + +pub async fn delete(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let oid = activity.object().id()?.to_string(); + upub::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users"); + upub::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects"); + tracing::debug!("deleted '{oid}'"); + Ok(()) +} + +pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let uid = activity.actor().id()?.to_string(); + let aid = activity.id()?.to_string(); + let Some(object_node) = activity.object().extract() else { + tracing::error!("refusing to process activity without embedded object"); + return Err(ProcessorError::Unprocessable); + }; + let oid = object_node.id()?.to_string(); + + let activity_model = ctx.insert_activity(activity).await?; + + match object_node.object_type()? { + apb::ObjectType::Actor(_) => { + let internal_uid = upub::model::actor::Entity::ap_to_internal(&oid, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + let mut actor_model = upub::model::actor::ActiveModel::new(object_node.as_actor()?)?; + actor_model.internal = Set(internal_uid); + actor_model.updated = Set(chrono::Utc::now()); + upub::model::actor::Entity::update(actor_model) + .exec(ctx.db()) + .await?; + }, + apb::ObjectType::Note => { + let internal_oid = upub::model::object::Entity::ap_to_internal(&oid, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + let mut object_model = upub::model::object::ActiveModel::new(&object_node)?; + object_model.internal = Set(internal_oid); + object_model.updated = Set(chrono::Utc::now()); + upub::model::object::Entity::update(object_model) + .exec(ctx.db()) + .await?; + }, + t => tracing::warn!("no side effects implemented for update type {t:?}"), + } + + tracing::info!("{} updated {}", uid, oid); + let expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + Ok(()) +} + +pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let uid = activity.actor().id()?.to_string(); + // TODO in theory we could work with just object_id but right now only accept embedded + let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?; + let undone_activity_id = undone_activity.id()?; + let undone_activity_author = undone_activity.as_activity()?.actor().id()?.to_string(); + + if uid != undone_activity_author { + return Err(ProcessorError::Unauthorized); + } + + let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string(); + + let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + + let activity_type = activity.activity_type()?; + let targets = ctx.expand_addressing(activity.addressed()).await?; + let activity_model = ctx.insert_activity(activity).await?; + ctx.address_to(Some(activity_model.internal), None, &targets).await?; + + match activity_type { + apb::ActivityType::Like => { + let internal_oid = upub::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + upub::model::like::Entity::delete_many() + .filter( + Condition::all() + .add(upub::model::like::Column::Actor.eq(internal_uid)) + .add(upub::model::like::Column::Object.eq(internal_oid)) + ) + .exec(ctx.db()) + .await?; + upub::model::object::Entity::update_many() + .filter(upub::model::object::Column::Internal.eq(internal_oid)) + .col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).sub(1)) + .exec(ctx.db()) + .await?; + }, + apb::ActivityType::Follow => { + let internal_uid_following = upub::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + upub::model::relation::Entity::delete_many() + .filter(upub::model::relation::Column::Follower.eq(internal_uid)) + .filter(upub::model::relation::Column::Following.eq(internal_uid_following)) + .exec(ctx.db()) + .await?; + upub::model::actor::Entity::update_many() + .filter(upub::model::actor::Column::Internal.eq(internal_uid)) + .col_expr(upub::model::actor::Column::FollowingCount, Expr::col(upub::model::actor::Column::FollowingCount).sub(1)) + .exec(ctx.db()) + .await?; + upub::model::actor::Entity::update_many() + .filter(upub::model::actor::Column::Internal.eq(internal_uid_following)) + .col_expr(upub::model::actor::Column::FollowersCount, Expr::col(upub::model::actor::Column::FollowersCount).sub(1)) + .exec(ctx.db()) + .await?; + }, + t => { + tracing::error!("received 'Undo' for unimplemented activity type: {t:?}"); + return Err(ProcessorError::Unprocessable); + }, + } + + Ok(()) +} + +pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { + let uid = activity.actor().id()?.to_string(); + let actor = ctx.fetch_user(&uid).await?; + let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + .await? + .ok_or(ProcessorError::Incomplete)?; + let announced_id = activity.object().id()?.to_string(); + let published = activity.published().unwrap_or(chrono::Utc::now()); + let addressed = activity.addressed(); + + match ctx.pull(&announced_id).await? { + Pull::Actor(_) => Err(ProcessorError::Unprocessable), + Pull::Object(object) => { + + let object_model = ctx.resolve_object(object).await?; + let activity_model = ctx.insert_activity(activity).await?; + + // relays send us objects as Announce, but we don't really want to count those towards the + // total shares count of an object, so just fetch the object and be done with it + if !matches!(actor.actor_type, apb::ActorType::Person) { + tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); + return Ok(()) + } + + let share = upub::model::announce::ActiveModel { + internal: NotSet, + actor: Set(internal_uid), + object: Set(object_model.internal), + published: Set(published), + }; + + let expanded_addressing = ctx.expand_addressing(addressed).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + upub::model::announce::Entity::insert(share) + .exec(ctx.db()).await?; + upub::model::object::Entity::update_many() + .col_expr(upub::model::object::Column::Announces, Expr::col(upub::model::object::Column::Announces).add(1)) + .filter(upub::model::object::Column::Internal.eq(object_model.internal)) + .exec(ctx.db()) + .await?; + + tracing::info!("{} shared {}", activity_model.actor, announced_id); + Ok(()) + }, + Pull::Activity(activity) => { + // groups update all members of other things that happen inside, process those + match activity.activity_type()? { + apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(ctx, activity).await?), + apb::ActivityType::Create => Ok(create(ctx, activity).await?), + apb::ActivityType::Undo => Ok(undo(ctx, activity).await?), + apb::ActivityType::Delete => Ok(delete(ctx, activity).await?), + apb::ActivityType::Update => Ok(update(ctx, activity).await?), + x => { + tracing::warn!("ignoring unhandled announced activity of type {x:?}"); + Err(ProcessorError::Unprocessable) + }, + } + }, + } +} diff --git a/upub/routes/Cargo.toml b/upub/routes/Cargo.toml index c8a95b1..1a996d5 100644 --- a/upub/routes/Cargo.toml +++ b/upub/routes/Cargo.toml @@ -11,6 +11,7 @@ readme = "README.md" [lib] [dependencies] +thiserror = "1" rand = "0.8" sha256 = "1.5" chrono = { version = "0.4", features = ["serde"] } @@ -23,7 +24,8 @@ tokio = { version = "1.35", features = ["full"] } # TODO slim this down reqwest = { version = "0.12", features = ["json"] } axum = "0.7" tower-http = { version = "0.5", features = ["cors", "trace"] } -apb = { path = "../../apb", features = ["unstructured", "orm", "activitypub-fe", "activitypub-counters", "litepub", "ostatus", "toot"] } +httpsign = { path = "../../utils/httpsign/", features = ["axum"] } +apb = { path = "../../apb", features = ["unstructured", "orm", "activitypub-fe", "activitypub-counters", "litepub", "ostatus", "toot", "jsonld"] } sea-orm = { version = "0.12", features = ["macros", "sqlx-sqlite", "runtime-tokio-rustls"] } # nodeinfo = "0.0.2" # the version on crates.io doesn't re-export necessary types to build the struct!!! nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" } diff --git a/upub/routes/src/activitypub/activity.rs b/upub/routes/src/activitypub/activity.rs index 9939350..f3c87f9 100644 --- a/upub/routes/src/activitypub/activity.rs +++ b/upub/routes/src/activitypub/activity.rs @@ -1,8 +1,9 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, QueryFilter}; -use upub::{model::{self, addressing::Event, attachment::BatchFillable}, server::{auth::AuthIdentity, fetcher::Fetcher, jsonld::LD}, Context}; +use upub::{model::{self, addressing::Event, attachment::BatchFillable}, Context}; +use apb::LD; -use crate::builders::JsonLD; +use crate::{builders::JsonLD, AuthIdentity}; use super::TryFetch; @@ -11,14 +12,14 @@ pub async fn view( Path(id): Path, AuthIdentity(auth): AuthIdentity, Query(query): Query, -) -> upub::Result> { +) -> crate::ApiResult> { let aid = ctx.aid(&id); - if auth.is_local() && query.fetch && !ctx.is_local(&aid) { - let obj = ctx.fetch_activity(&aid).await?; - if obj.id != aid { - return Err(upub::Error::Redirect(obj.id)); - } - } + // if auth.is_local() && query.fetch && !ctx.is_local(&aid) { + // let obj = ctx.fetch_activity(&aid).await?; + // if obj.id != aid { + // return Err(crate::ApiError::Redirect(obj.id)); + // } + // } let row = model::addressing::Entity::find_addressed(auth.my_id()) .filter(model::activity::Column::Id.eq(&aid)) @@ -26,7 +27,7 @@ pub async fn view( .into_model::() .one(ctx.db()) .await? - .ok_or_else(upub::Error::not_found)?; + .ok_or_else(crate::ApiError::not_found)?; let mut attachments = row.load_attachments_batch(ctx.db()).await?; let attach = attachments.remove(&row.internal()); diff --git a/upub/routes/src/activitypub/application.rs b/upub/routes/src/activitypub/application.rs index 6b9d8d6..e3a5e23 100644 --- a/upub/routes/src/activitypub/application.rs +++ b/upub/routes/src/activitypub/application.rs @@ -1,15 +1,15 @@ -use apb::{ActorMut, BaseMut, ObjectMut, PublicKeyMut}; +use apb::{LD, ActorMut, BaseMut, ObjectMut, PublicKeyMut}; use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Redirect, Response}, Form, Json}; use reqwest::Method; -use upub::{server::{auth::AuthIdentity, fetcher::Fetcher, jsonld::LD}, Context}; +use upub::Context; -use crate::builders::JsonLD; +use crate::{builders::JsonLD, AuthIdentity}; pub async fn view( headers: HeaderMap, State(ctx): State, -) -> upub::Result { +) -> crate::ApiResult { if let Some(accept) = headers.get("Accept") { if let Ok(accept) = accept.to_str() { if accept.contains("text/html") && !accept.contains("application/ld+json") { @@ -47,46 +47,48 @@ pub async fn proxy_get( State(ctx): State, Query(query): Query, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { // only local users can request fetches if !ctx.cfg().security.allow_public_debugger && !auth.is_local() { - return Err(upub::Error::unauthorized()); + return Err(crate::ApiError::unauthorized()); } - Ok(Json( - Context::request( - Method::GET, - &query.id, - None, - ctx.base(), - ctx.pkey(), - &format!("{}+proxy", ctx.domain()), - ) - .await? - .json::() - .await? - )) + todo!() + // Ok(Json( + // Context::request( + // Method::GET, + // &query.id, + // None, + // ctx.base(), + // ctx.pkey(), + // &format!("{}+proxy", ctx.domain()), + // ) + // .await? + // .json::() + // .await? + // )) } pub async fn proxy_form( State(ctx): State, AuthIdentity(auth): AuthIdentity, Form(query): Form, -) -> upub::Result> { +) -> crate::ApiResult> { // only local users can request fetches if !ctx.cfg().security.allow_public_debugger && auth.is_local() { - return Err(upub::Error::unauthorized()); + return Err(crate::ApiError::unauthorized()); } - Ok(Json( - Context::request( - Method::GET, - &query.id, - None, - ctx.base(), - ctx.pkey(), - &format!("{}+proxy", ctx.domain()), - ) - .await? - .json::() - .await? - )) + todo!() + // Ok(Json( + // Context::request( + // Method::GET, + // &query.id, + // None, + // ctx.base(), + // ctx.pkey(), + // &format!("{}+proxy", ctx.domain()), + // ) + // .await? + // .json::() + // .await? + // )) } diff --git a/upub/routes/src/activitypub/auth.rs b/upub/routes/src/activitypub/auth.rs index 7dfece4..015eff7 100644 --- a/upub/routes/src/activitypub/auth.rs +++ b/upub/routes/src/activitypub/auth.rs @@ -29,7 +29,7 @@ fn token() -> String { pub async fn login( State(ctx): State, Json(login): Json -) -> upub::Result> { +) -> crate::ApiResult> { // TODO salt the pwd match upub::model::credential::Entity::find() .filter(Condition::all() @@ -57,7 +57,7 @@ pub async fn login( user: x.actor })) }, - None => Err(upub::Error::unauthorized()), + None => Err(crate::ApiError::unauthorized()), } } @@ -69,16 +69,16 @@ pub struct RefreshForm { pub async fn refresh( State(ctx): State, Json(login): Json -) -> upub::Result> { +) -> crate::ApiResult> { if !ctx.cfg().security.allow_login_refresh { - return Err(upub::Error::forbidden()); + return Err(crate::ApiError::forbidden()); } let prev = upub::model::session::Entity::find() .filter(upub::model::session::Column::Secret.eq(login.token)) .one(ctx.db()) .await? - .ok_or_else(upub::Error::unauthorized)?; + .ok_or_else(crate::ApiError::unauthorized)?; if prev.expires > chrono::Utc::now() { return Ok(Json(AuthSuccess { token: prev.secret, user: prev.actor, expires: prev.expires })); @@ -113,9 +113,9 @@ pub struct RegisterForm { pub async fn register( State(ctx): State, Json(registration): Json -) -> upub::Result> { +) -> crate::ApiResult> { if !ctx.cfg().security.allow_registration { - return Err(upub::Error::forbidden()); + return Err(crate::ApiError::forbidden()); } ctx.register_user( diff --git a/upub/routes/src/activitypub/context.rs b/upub/routes/src/activitypub/context.rs index 5f7936e..c6ec95d 100644 --- a/upub/routes/src/activitypub/context.rs +++ b/upub/routes/src/activitypub/context.rs @@ -1,8 +1,8 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter}; -use upub::{model, server::auth::AuthIdentity, Context}; +use upub::{model, Context}; -use crate::builders::JsonLD; +use crate::{AuthIdentity, builders::JsonLD}; use super::Pagination; @@ -10,7 +10,7 @@ pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { let local_context_id = upub::url!(ctx, "/context/{id}"); let context = ctx.oid(&id); @@ -28,7 +28,7 @@ pub async fn page( Path(id): Path, Query(page): Query, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { let context = ctx.oid(&id); crate::builders::paginate( diff --git a/upub/routes/src/activitypub/inbox.rs b/upub/routes/src/activitypub/inbox.rs index 6d26eab..500a339 100644 --- a/upub/routes/src/activitypub/inbox.rs +++ b/upub/routes/src/activitypub/inbox.rs @@ -1,16 +1,16 @@ -use apb::{server::Inbox, Activity, ActivityType, Base}; +use apb::{Activity, ActivityType, Base}; use axum::{extract::{Query, State}, http::StatusCode, Json}; use sea_orm::{sea_query::IntoCondition, ColumnTrait}; -use upub::{server::auth::{AuthIdentity, Identity}, Context}; +use upub::Context; -use crate::builders::JsonLD; +use crate::{AuthIdentity, Identity, builders::JsonLD}; use super::Pagination; pub async fn get( State(ctx): State, -) -> upub::Result> { +) -> crate::ApiResult> { crate::builders::collection(&upub::url!(ctx, "/inbox"), None) } @@ -18,7 +18,7 @@ pub async fn page( State(ctx): State, AuthIdentity(auth): AuthIdentity, Query(page): Query, -) -> upub::Result> { +) -> crate::ApiResult> { crate::builders::paginate( upub::url!(ctx, "/inbox/page"), upub::model::addressing::Column::Actor.is_null() @@ -42,9 +42,9 @@ pub async fn post( State(ctx): State, AuthIdentity(auth): AuthIdentity, Json(activity): Json -) -> upub::Result<()> { +) -> crate::ApiResult<()> { let Identity::Remote { domain: server, user: uid, .. } = auth else { - if activity.activity_type() == Some(ActivityType::Delete) { + if matches!(activity.activity_type(), Ok(ActivityType::Delete)) { // this is spammy af, ignore them! // we basically received a delete for a user we can't fetch and verify, meaning remote // deleted someone we never saw. technically we deleted nothing so we should return error, @@ -55,42 +55,44 @@ pub async fn post( } tracing::warn!("refusing unauthorized activity: {}", pretty_json!(activity)); if matches!(auth, Identity::Anonymous) { - return Err(upub::Error::unauthorized()); + return Err(crate::ApiError::unauthorized()); } else { - return Err(upub::Error::forbidden()); + return Err(crate::ApiError::forbidden()); } }; - let aid = activity.id().ok_or_else(|| upub::Error::field("id"))?.to_string(); - let actor = activity.actor().id().ok_or_else(|| upub::Error::field("actor"))?; + todo!() - if uid != actor { - return Err(upub::Error::unauthorized()); - } + // let aid = activity.id().ok_or_else(|| crate::ApiError::field("id"))?.to_string(); + // let actor = activity.actor().id().ok_or_else(|| crate::ApiError::field("actor"))?; - tracing::debug!("processing federated activity: '{:#}'", activity); + // if uid != actor { + // return Err(crate::ApiError::unauthorized()); + // } - // TODO we could process Links and bare Objects maybe, but probably out of AP spec? - match activity.activity_type().ok_or_else(upub::Error::bad_request)? { - ActivityType::Activity => { - tracing::warn!("skipping unprocessable base activity: {}", pretty_json!(activity)); - Err(StatusCode::UNPROCESSABLE_ENTITY.into()) // won't ingest useless stuff - }, + // tracing::debug!("processing federated activity: '{:#}'", activity); - // TODO emojireacts are NOT likes, but let's process them like ones for now maybe? - ActivityType::Like | ActivityType::EmojiReact => Ok(ctx.like(server, activity).await?), - ActivityType::Create => Ok(ctx.create(server, activity).await?), - ActivityType::Follow => Ok(ctx.follow(server, activity).await?), - ActivityType::Announce => Ok(ctx.announce(server, activity).await?), - ActivityType::Accept(_) => Ok(ctx.accept(server, activity).await?), - ActivityType::Reject(_) => Ok(ctx.reject(server, activity).await?), - ActivityType::Undo => Ok(ctx.undo(server, activity).await?), - ActivityType::Delete => Ok(ctx.delete(server, activity).await?), - ActivityType::Update => Ok(ctx.update(server, activity).await?), + // // TODO we could process Links and bare Objects maybe, but probably out of AP spec? + // match activity.activity_type().ok_or_else(crate::ApiError::bad_request)? { + // ActivityType::Activity => { + // tracing::warn!("skipping unprocessable base activity: {}", pretty_json!(activity)); + // Err(StatusCode::UNPROCESSABLE_ENTITY.into()) // won't ingest useless stuff + // }, - _x => { - tracing::info!("received unimplemented activity on inbox: {}", pretty_json!(activity)); - Err(StatusCode::NOT_IMPLEMENTED.into()) - }, - } + // // TODO emojireacts are NOT likes, but let's process them like ones for now maybe? + // ActivityType::Like | ActivityType::EmojiReact => Ok(ctx.like(server, activity).await?), + // ActivityType::Create => Ok(ctx.create(server, activity).await?), + // ActivityType::Follow => Ok(ctx.follow(server, activity).await?), + // ActivityType::Announce => Ok(ctx.announce(server, activity).await?), + // ActivityType::Accept(_) => Ok(ctx.accept(server, activity).await?), + // ActivityType::Reject(_) => Ok(ctx.reject(server, activity).await?), + // ActivityType::Undo => Ok(ctx.undo(server, activity).await?), + // ActivityType::Delete => Ok(ctx.delete(server, activity).await?), + // ActivityType::Update => Ok(ctx.update(server, activity).await?), + + // _x => { + // tracing::info!("received unimplemented activity on inbox: {}", pretty_json!(activity)); + // Err(StatusCode::NOT_IMPLEMENTED.into()) + // }, + // } } diff --git a/upub/routes/src/activitypub/object/mod.rs b/upub/routes/src/activitypub/object/mod.rs index ffb687b..69f7e34 100644 --- a/upub/routes/src/activitypub/object/mod.rs +++ b/upub/routes/src/activitypub/object/mod.rs @@ -1,11 +1,11 @@ pub mod replies; -use apb::{CollectionMut, ObjectMut}; +use apb::{CollectionMut, ObjectMut, LD}; use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns}; -use upub::{model::{self, addressing::Event}, server::{auth::AuthIdentity, fetcher::Fetcher, jsonld::LD}, Context}; +use upub::{model::{self, addressing::Event}, Context}; -use crate::builders::JsonLD; +use crate::{builders::JsonLD, AuthIdentity}; use super::TryFetch; @@ -14,15 +14,15 @@ pub async fn view( Path(id): Path, AuthIdentity(auth): AuthIdentity, Query(query): Query, -) -> upub::Result> { +) -> crate::ApiResult> { let oid = ctx.oid(&id); - if auth.is_local() && query.fetch && !ctx.is_local(&oid) { - let obj = ctx.fetch_object(&oid).await?; - // some implementations serve statuses on different urls than their AP id - if obj.id != oid { - return Err(upub::Error::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); - } - } + // if auth.is_local() && query.fetch && !ctx.is_local(&oid) { + // let obj = ctx.fetch_object(&oid).await?; + // // some implementations serve statuses on different urls than their AP id + // if obj.id != oid { + // return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); + // } + // } let item = model::addressing::Entity::find_addressed(auth.my_id()) .filter(model::object::Column::Id.eq(&oid)) @@ -30,11 +30,11 @@ pub async fn view( .into_model::() .one(ctx.db()) .await? - .ok_or_else(upub::Error::not_found)?; + .ok_or_else(crate::ApiError::not_found)?; let object = match item { - Event::Tombstone => return Err(upub::Error::not_found()), - Event::Activity(_) => return Err(upub::Error::not_found()), + Event::Tombstone => return Err(crate::ApiError::not_found()), + Event::Activity(_) => return Err(crate::ApiError::not_found()), Event::StrayObject { liked: _, object } => object, Event::DeepActivity { activity: _, liked: _, object } => object, }; diff --git a/upub/routes/src/activitypub/object/replies.rs b/upub/routes/src/activitypub/object/replies.rs index e8b16b1..d88804e 100644 --- a/upub/routes/src/activitypub/object/replies.rs +++ b/upub/routes/src/activitypub/object/replies.rs @@ -1,21 +1,21 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter}; -use upub::{model, server::{auth::AuthIdentity, fetcher::Fetcher}, Context}; +use upub::{model, Context}; -use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD}; +use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity}; pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, Query(q): Query, -) -> upub::Result> { +) -> crate::ApiResult> { let replies_id = upub::url!(ctx, "/objects/{id}/replies"); let oid = ctx.oid(&id); - if auth.is_local() && q.fetch { - ctx.fetch_thread(&oid).await?; - } + // if auth.is_local() && q.fetch { + // ctx.fetch_thread(&oid).await?; + // } let count = model::addressing::Entity::find_addressed(auth.my_id()) .filter(auth.filter_condition()) @@ -31,7 +31,7 @@ pub async fn page( Path(id): Path, Query(page): Query, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { let page_id = upub::url!(ctx, "/objects/{id}/replies/page"); let oid = ctx.oid(&id); diff --git a/upub/routes/src/activitypub/outbox.rs b/upub/routes/src/activitypub/outbox.rs index 1c81203..4ce68b0 100644 --- a/upub/routes/src/activitypub/outbox.rs +++ b/upub/routes/src/activitypub/outbox.rs @@ -1,10 +1,10 @@ use axum::{extract::{Query, State}, http::StatusCode, Json}; use sea_orm::{ColumnTrait, Condition}; -use upub::{server::auth::AuthIdentity, Context}; +use upub::Context; -use crate::{activitypub::{CreationResult, Pagination}, builders::JsonLD}; +use crate::{activitypub::{CreationResult, Pagination}, AuthIdentity, builders::JsonLD}; -pub async fn get(State(ctx): State) -> upub::Result> { +pub async fn get(State(ctx): State) -> crate::ApiResult> { crate::builders::collection(&upub::url!(ctx, "/outbox"), None) } @@ -12,7 +12,7 @@ pub async fn page( State(ctx): State, Query(page): Query, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { crate::builders::paginate( upub::url!(ctx, "/outbox/page"), Condition::all() @@ -30,7 +30,7 @@ pub async fn post( State(_ctx): State, AuthIdentity(_auth): AuthIdentity, Json(_activity): Json, -) -> upub::Result { +) -> crate::ApiResult { // TODO administrative actions may be carried out against this outbox? Err(StatusCode::NOT_IMPLEMENTED.into()) } diff --git a/upub/routes/src/activitypub/user/following.rs b/upub/routes/src/activitypub/user/following.rs index 57184dd..56eda86 100644 --- a/upub/routes/src/activitypub/user/following.rs +++ b/upub/routes/src/activitypub/user/following.rs @@ -8,7 +8,7 @@ use crate::{activitypub::Pagination, builders::JsonLD}; pub async fn get( State(ctx): State, Path(id): Path, -) -> upub::Result> { +) -> crate::ApiResult> { let follow___ = if OUTGOING { "following" } else { "followers" }; use upub::model::relation::Column::{Follower, Following}; let count = model::relation::Entity::find() @@ -25,7 +25,7 @@ pub async fn page( State(ctx): State, Path(id): Path, Query(page): Query, -) -> upub::Result> { +) -> crate::ApiResult> { let follow___ = if OUTGOING { "following" } else { "followers" }; let limit = page.batch.unwrap_or(20).min(50); let offset = page.offset.unwrap_or(0); diff --git a/upub/routes/src/activitypub/user/inbox.rs b/upub/routes/src/activitypub/user/inbox.rs index 2278e56..a877798 100644 --- a/upub/routes/src/activitypub/user/inbox.rs +++ b/upub/routes/src/activitypub/user/inbox.rs @@ -1,22 +1,22 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use sea_orm::{ColumnTrait, Condition}; -use upub::{model, server::auth::{AuthIdentity, Identity}, Context}; +use upub::{model, Context}; -use crate::{activitypub::Pagination, builders::JsonLD}; +use crate::{activitypub::Pagination, builders::JsonLD, AuthIdentity, Identity}; pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { match auth { - Identity::Anonymous => Err(StatusCode::FORBIDDEN.into()), - Identity::Remote { .. } => Err(StatusCode::FORBIDDEN.into()), + Identity::Anonymous => Err(crate::ApiError::forbidden()), + Identity::Remote { .. } => Err(crate::ApiError::forbidden()), Identity::Local { id: user, .. } => if ctx.uid(&id) == user { crate::builders::collection(&upub::url!(ctx, "/actors/{id}/inbox"), None) } else { - Err(StatusCode::FORBIDDEN.into()) + Err(crate::ApiError::forbidden()) }, } } @@ -26,13 +26,13 @@ pub async fn page( Path(id): Path, AuthIdentity(auth): AuthIdentity, Query(page): Query, -) -> upub::Result> { +) -> crate::ApiResult> { let Identity::Local { id: uid, internal } = &auth else { // local inbox is only for local users - return Err(upub::Error::forbidden()); + return Err(crate::ApiError::forbidden()); }; if uid != &ctx.uid(&id) { - return Err(upub::Error::forbidden()); + return Err(crate::ApiError::forbidden()); } crate::builders::paginate( @@ -54,7 +54,7 @@ pub async fn post( Path(_id): Path, AuthIdentity(_auth): AuthIdentity, Json(activity): Json, -) -> Result<(), upub::Error> { +) -> crate::ApiResult<()> { // POSTing to user inboxes is effectively the same as POSTing to the main inbox super::super::inbox::post(State(ctx), AuthIdentity(_auth), Json(activity)).await } diff --git a/upub/routes/src/activitypub/user/mod.rs b/upub/routes/src/activitypub/user/mod.rs index 091d82e..332089e 100644 --- a/upub/routes/src/activitypub/user/mod.rs +++ b/upub/routes/src/activitypub/user/mod.rs @@ -6,10 +6,10 @@ pub mod following; use axum::extract::{Path, Query, State}; -use apb::{ActorMut, EndpointsMut, Node, ObjectMut}; -use upub::{ext::AnyQuery, model, server::{auth::AuthIdentity, fetcher::Fetcher, jsonld::LD}, Context}; +use apb::{LD, ActorMut, EndpointsMut, Node, ObjectMut}; +use upub::{ext::AnyQuery, model, Context}; -use crate::builders::JsonLD; +use crate::{builders::JsonLD, ApiError, AuthIdentity}; use super::TryFetch; @@ -19,19 +19,21 @@ pub async fn view( AuthIdentity(auth): AuthIdentity, Path(id): Path, Query(query): Query, -) -> upub::Result> { +) -> crate::ApiResult> { let mut uid = ctx.uid(&id); - if auth.is_local() { - if id.starts_with('@') { - if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') { - uid = ctx.webfinger(user, host).await?; - } - } - if query.fetch && !ctx.is_local(&uid) { - ctx.fetch_user(&uid).await?; - } - } - let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db()).await?; + // if auth.is_local() { + // if id.starts_with('@') { + // if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') { + // uid = ctx.webfinger(user, host).await?; + // } + // } + // if query.fetch && !ctx.is_local(&uid) { + // ctx.fetch_user(&uid).await?; + // } + // } + let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db()) + .await? + .ok_or_else(ApiError::not_found)?; let (followed_by_me, following_me) = match auth.my_id() { None => (None, None), @@ -85,7 +87,7 @@ pub async fn view( .set_followed_by_me(followed_by_me) .ld_context() )), - None => Err(upub::Error::not_found()), + None => Err(crate::ApiError::not_found()), } } diff --git a/upub/routes/src/activitypub/user/outbox.rs b/upub/routes/src/activitypub/user/outbox.rs index 62b3d6e..cf2ba65 100644 --- a/upub/routes/src/activitypub/user/outbox.rs +++ b/upub/routes/src/activitypub/user/outbox.rs @@ -1,15 +1,15 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use sea_orm::{ColumnTrait, Condition}; -use apb::{server::Outbox, AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType}; -use upub::{model, server::auth::{AuthIdentity, Identity}, Context}; +use apb::{AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType}; +use upub::{model, Context}; -use crate::{activitypub::{CreationResult, Pagination}, builders::JsonLD}; +use crate::{activitypub::{CreationResult, Pagination}, builders::JsonLD, AuthIdentity, Identity}; pub async fn get( State(ctx): State, Path(id): Path, -) -> upub::Result> { +) -> crate::ApiResult> { crate::builders::collection(&upub::url!(ctx, "/actors/{id}/outbox"), None) } @@ -18,7 +18,7 @@ pub async fn page( Path(id): Path, Query(page): Query, AuthIdentity(auth): AuthIdentity, -) -> upub::Result> { +) -> crate::ApiResult> { let uid = ctx.uid(&id); crate::builders::paginate( upub::url!(ctx, "/actors/{id}/outbox/page"), @@ -42,49 +42,48 @@ pub async fn post( Path(id): Path, AuthIdentity(auth): AuthIdentity, Json(activity): Json, -) -> upub::Result { +) -> crate::ApiResult { match auth { Identity::Anonymous => Err(StatusCode::UNAUTHORIZED.into()), Identity::Remote { .. } => Err(StatusCode::NOT_IMPLEMENTED.into()), Identity::Local { id: uid, .. } => if ctx.uid(&id) == uid { tracing::debug!("processing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default()); - match activity.base_type() { - None => Err(StatusCode::BAD_REQUEST.into()), + todo!() + // match activity.base_type()? { + // BaseType::Link(_) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()), - Some(BaseType::Link(_)) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()), + // BaseType::Object(ObjectType::Note) => + // Ok(CreationResult(ctx.create_note(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Note)) => - Ok(CreationResult(ctx.create_note(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Create)) => + // Ok(CreationResult(ctx.create(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) => - Ok(CreationResult(ctx.create(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Like)) => + // Ok(CreationResult(ctx.like(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) => - Ok(CreationResult(ctx.like(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Follow)) => + // Ok(CreationResult(ctx.follow(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) => - Ok(CreationResult(ctx.follow(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Announce)) => + // Ok(CreationResult(ctx.announce(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Announce))) => - Ok(CreationResult(ctx.announce(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept))) => + // Ok(CreationResult(ctx.accept(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) => - Ok(CreationResult(ctx.accept(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject))) => + // Ok(CreationResult(ctx.reject(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) => - Ok(CreationResult(ctx.reject(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Undo)) => + // Ok(CreationResult(ctx.undo(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) => - Ok(CreationResult(ctx.undo(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Delete)) => + // Ok(CreationResult(ctx.delete(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Delete))) => - Ok(CreationResult(ctx.delete(uid, activity).await?)), + // BaseType::Object(ObjectType::Activity(ActivityType::Update)) => + // Ok(CreationResult(ctx.update(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Update))) => - Ok(CreationResult(ctx.update(uid, activity).await?)), - - Some(_) => Err(StatusCode::NOT_IMPLEMENTED.into()), - } + // _ => Err(StatusCode::NOT_IMPLEMENTED.into()), + // } } else { Err(StatusCode::FORBIDDEN.into()) } diff --git a/upub/core/src/server/auth.rs b/upub/routes/src/auth.rs similarity index 56% rename from upub/core/src/server/auth.rs rename to upub/routes/src/auth.rs index 837e6a9..8a0af89 100644 --- a/upub/core/src/server/auth.rs +++ b/upub/routes/src/auth.rs @@ -1,10 +1,9 @@ use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}}; use reqwest::StatusCode; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; +use httpsign::HttpSignature; -use crate::{errors::UpubError, model, server::Context}; - -use super::{fetcher::Fetcher, httpsign::HttpSignature}; +use crate::ApiError; #[derive(Debug, Clone)] pub enum Identity { @@ -22,15 +21,15 @@ pub enum Identity { impl Identity { pub fn filter_condition(&self) -> Condition { - let base_cond = Condition::any().add(model::addressing::Column::Actor.is_null()); + let base_cond = Condition::any().add(upub::model::addressing::Column::Actor.is_null()); match self { Identity::Anonymous => base_cond, - Identity::Remote { internal, .. } => base_cond.add(model::addressing::Column::Instance.eq(*internal)), + Identity::Remote { internal, .. } => base_cond.add(upub::model::addressing::Column::Instance.eq(*internal)), // TODO should we allow all users on same server to see? or just specific user?? Identity::Local { id, internal } => base_cond - .add(model::addressing::Column::Actor.eq(*internal)) - .add(model::activity::Column::Actor.eq(id)) - .add(model::object::Column::AttributedTo.eq(id)), + .add(upub::model::addressing::Column::Actor.eq(*internal)) + .add(upub::model::activity::Column::Actor.eq(id)) + .add(upub::model::object::Column::AttributedTo.eq(id)), } } @@ -70,13 +69,13 @@ pub struct AuthIdentity(pub Identity); #[axum::async_trait] impl FromRequestParts for AuthIdentity where - Context: FromRef, + upub::Context: FromRef, S: Send + Sync, { - type Rejection = UpubError; + type Rejection = ApiError; async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { - let ctx = Context::from_ref(state); + let ctx = upub::Context::from_ref(state); let mut identity = Identity::Anonymous; let auth_header = parts @@ -86,23 +85,21 @@ where .unwrap_or(""); if auth_header.starts_with("Bearer ") { - match model::session::Entity::find() - .filter(model::session::Column::Secret.eq(auth_header.replace("Bearer ", ""))) - .filter(model::session::Column::Expires.gt(chrono::Utc::now())) + match upub::model::session::Entity::find() + .filter(upub::model::session::Column::Secret.eq(auth_header.replace("Bearer ", ""))) + .filter(upub::model::session::Column::Expires.gt(chrono::Utc::now())) .one(ctx.db()) - .await + .await? { - Ok(None) => return Err(UpubError::unauthorized()), - Ok(Some(x)) => { + None => return Err(ApiError::unauthorized()), + Some(x) => { // TODO could we store both actor ap id and internal id in session? to avoid this extra // lookup on *every* local authed request we receive... - let internal = model::actor::Entity::ap_to_internal(&x.actor, ctx.db()).await?; + let internal = upub::model::actor::Entity::ap_to_internal(&x.actor, ctx.db()) + .await? + .ok_or_else(ApiError::internal_server_error)?; identity = Identity::Local { id: x.actor, internal }; }, - Err(e) => { - tracing::error!("failed querying user session: {e}"); - return Err(UpubError::internal_server_error()) - }, } } @@ -114,37 +111,34 @@ where let mut http_signature = HttpSignature::parse(sig); // TODO assert payload's digest is equal to signature's + // really annoying to do here because we're streaming + // the request, maybe even impossible with this design? + let user_id = http_signature.key_id .replace("/main-key", "") // gotosocial whyyyyy .split('#') - .next().ok_or(UpubError::bad_request())? + .next().ok_or(ApiError::bad_request())? .to_string(); - match ctx.fetch_user(&user_id).await { - Ok(user) => match http_signature + match upub::model::actor::Entity::find_by_ap_id(&user_id) + .one(ctx.db()) + .await? + { + Some(user) => match http_signature .build_from_parts(parts) .verify(&user.public_key) { Ok(true) => { - let user = user.id; - let domain = Context::server(&user_id); - // TODO this will fail because we never fetch and insert into instance oops - let internal = model::instance::Entity::domain_to_internal(&domain, ctx.db()).await?; - identity = Identity::Remote { user, domain, internal }; + let internal = upub::model::instance::Entity::domain_to_internal(&user.domain, ctx.db()) + .await? + .ok_or_else(ApiError::internal_server_error)?; // user but not their domain??? + identity = Identity::Remote { user: user.id, domain: user.domain, internal }; }, Ok(false) => tracing::warn!("invalid signature: {http_signature:?}"), Err(e) => tracing::error!("error verifying signature: {e}"), }, - Err(e) => { - // since most activities are deletions for users we never saw, let's handle this case - // if while fetching we receive a GONE, it means we didn't have this user and it doesn't - // exist anymore, so it must be a deletion we can ignore - if let UpubError::Reqwest(ref x) = e { - if let Some(StatusCode::GONE) = x.status() { - return Err(UpubError::Status(StatusCode::OK)); // 200 so mastodon will shut uppp - } - } - tracing::warn!("could not fetch user (won't verify): {e}"); + None => { + // TODO enqueue fetching who tried signing this } } } diff --git a/upub/routes/src/builders.rs b/upub/routes/src/builders.rs index 70e6cb6..9522778 100644 --- a/upub/routes/src/builders.rs +++ b/upub/routes/src/builders.rs @@ -1,8 +1,8 @@ -use apb::{BaseMut, CollectionMut, CollectionPageMut}; +use apb::{BaseMut, CollectionMut, CollectionPageMut, LD}; use sea_orm::{Condition, DatabaseConnection, QueryFilter, QuerySelect, RelationTrait}; use axum::response::{IntoResponse, Response}; -use upub::{model::{addressing::Event, attachment::BatchFillable}, server::jsonld::LD}; +use upub::model::{addressing::Event, attachment::BatchFillable}; use crate::activitypub::Pagination; pub async fn paginate( @@ -12,7 +12,7 @@ pub async fn paginate( page: Pagination, my_id: Option, with_users: bool, // TODO ewww too many arguments for this weird function... -) -> upub::Result> { +) -> crate::ApiResult> { let limit = page.batch.unwrap_or(20).min(50); let offset = page.offset.unwrap_or(0); @@ -45,7 +45,7 @@ pub async fn paginate( collection_page(&id, offset, limit, items) } -pub fn collection_page(id: &str, offset: u64, limit: u64, items: Vec) -> upub::Result> { +pub fn collection_page(id: &str, offset: u64, limit: u64, items: Vec) -> crate::ApiResult> { let next = if items.len() < limit as usize { apb::Node::Empty } else { @@ -63,7 +63,7 @@ pub fn collection_page(id: &str, offset: u64, limit: u64, items: Vec) -> upub::Result> { +pub fn collection(id: &str, total_items: Option) -> crate::ApiResult> { Ok(JsonLD( apb::new() .set_id(Some(id)) diff --git a/upub/routes/src/error.rs b/upub/routes/src/error.rs new file mode 100644 index 0000000..8c7dbaf --- /dev/null +++ b/upub/routes/src/error.rs @@ -0,0 +1,114 @@ +use axum::{http::StatusCode, response::Redirect}; + +#[derive(Debug, thiserror::Error)] +pub enum ApiError { + #[error("database error: {0:?}")] + Database(#[from] sea_orm::DbErr), + + #[error("encountered malformed object: {0}")] + Field(#[from] apb::FieldErr), + + #[error("http signature error: {0:?}")] + HttpSignature(#[from] httpsign::HttpSignatureError), + + #[error("fetch error: {0:?}")] + Reqwest(#[from] reqwest::Error), + + // TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut + // helps with debugging! + #[error("fetch error: {0:?} -- server responded with {1}")] + FetchError(reqwest::Error, String), + + // wrapper error to return arbitraty status codes + #[error("{0}")] + Status(StatusCode), + + // TODO this isn't really an error but i need to redirect from some routes so this allows me to + // keep the type hints on the return type, still what the hell!!!! + #[error("redirecting to {0}")] + Redirect(String), +} + +impl ApiError { + pub fn bad_request() -> Self { + Self::Status(axum::http::StatusCode::BAD_REQUEST) + } + + pub fn unprocessable() -> Self { + Self::Status(axum::http::StatusCode::UNPROCESSABLE_ENTITY) + } + + pub fn not_found() -> Self { + Self::Status(axum::http::StatusCode::NOT_FOUND) + } + + pub fn forbidden() -> Self { + Self::Status(axum::http::StatusCode::FORBIDDEN) + } + + pub fn unauthorized() -> Self { + Self::Status(axum::http::StatusCode::UNAUTHORIZED) + } + + pub fn not_modified() -> Self { + Self::Status(axum::http::StatusCode::NOT_MODIFIED) + } + + pub fn internal_server_error() -> Self { + Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR) + } +} + +pub type ApiResult = Result; + +impl From for ApiError { + fn from(value: axum::http::StatusCode) -> Self { + ApiError::Status(value) + } +} + +impl axum::response::IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + // TODO it's kind of jank to hide this print down here, i should probably learn how spans work + // in tracing and use the library's features but ehhhh + tracing::debug!("emitting error response: {self:?}"); + let descr = self.to_string(); + match self { + ApiError::Redirect(to) => Redirect::to(&to).into_response(), + ApiError::Status(status) => status.into_response(), + ApiError::Database(e) => ( + StatusCode::SERVICE_UNAVAILABLE, + axum::Json(serde_json::json!({ + "error": "database", + "inner": format!("{e:#?}"), + })) + ).into_response(), + ApiError::Reqwest(x) | ApiError::FetchError(x, _) => ( + x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + axum::Json(serde_json::json!({ + "error": "request", + "status": x.status().map(|s| s.to_string()).unwrap_or_default(), + "url": x.url().map(|x| x.to_string()).unwrap_or_default(), + "description": descr, + "inner": format!("{x:#?}"), + })) + ).into_response(), + ApiError::Field(x) => ( + axum::http::StatusCode::BAD_REQUEST, + axum::Json(serde_json::json!({ + "error": "field", + "field": x.0.to_string(), + "description": descr, + })) + ).into_response(), + x => ( + StatusCode::INTERNAL_SERVER_ERROR, + axum::Json(serde_json::json!({ + "error": "unknown", + "description": descr, + "inner": format!("{x:#?}"), + })) + ).into_response(), + } + } +} diff --git a/upub/routes/src/lib.rs b/upub/routes/src/lib.rs index e8aca61..4abbddd 100644 --- a/upub/routes/src/lib.rs +++ b/upub/routes/src/lib.rs @@ -1,5 +1,11 @@ use tower_http::classify::{SharedClassifier, StatusInRangeAsFailures}; +pub mod auth; +pub use auth::{AuthIdentity, Identity}; + +pub mod error; +pub use error::{ApiError, ApiResult}; + pub mod activitypub; #[cfg(feature = "mastodon")] diff --git a/utils/httpsign/lib.rs b/utils/httpsign/lib.rs index 849849a..6d400a5 100644 --- a/utils/httpsign/lib.rs +++ b/utils/httpsign/lib.rs @@ -18,7 +18,13 @@ pub enum HttpSignatureError { Base64(#[from] base64::DecodeError), } -type Result = std::result::Result; +pub fn digest(data: &str) -> String { + format!("sha-256={}", + base64::prelude::BASE64_STANDARD.encode( + openssl::sha::sha256(data.as_bytes()) + ) + ) +} #[derive(Debug, Clone, Default)] pub struct HttpSignature { @@ -99,14 +105,14 @@ impl HttpSignature { self } - pub fn verify(&self, key: &str) -> Result { + pub fn verify(&self, key: &str) -> Result { let pubkey = PKey::public_key_from_pem(key.as_bytes())?; let mut verifier = Verifier::new(MessageDigest::sha256(), &pubkey)?; let signature = base64::prelude::BASE64_STANDARD.decode(&self.signature)?; Ok(verifier.verify_oneshot(&signature, self.control.as_bytes())?) } - pub fn sign(&mut self, key: &str) -> Result<&str> { + pub fn sign(&mut self, key: &str) -> Result<&str, HttpSignatureError> { let privkey = PKey::private_key_from_pem(key.as_bytes())?; let mut signer = openssl::sign::Signer::new(MessageDigest::sha256(), &privkey)?; signer.update(self.control.as_bytes())?;