diff --git a/Cargo.lock b/Cargo.lock index de555e8f..a1607bd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4693,6 +4693,7 @@ name = "upub" version = "0.2.0" dependencies = [ "apb", + "async-recursion", "async-trait", "base64 0.22.1", "chrono", diff --git a/upub/core/Cargo.toml b/upub/core/Cargo.toml index f369062e..06a643ee 100644 --- a/upub/core/Cargo.toml +++ b/upub/core/Cargo.toml @@ -12,6 +12,7 @@ readme = "README.md" [dependencies] thiserror = "1" +async-recursion = "1.1" async-trait = "0.1" sha256 = "1.5" openssl = "0.10" # TODO handle pubkeys with a smaller crate diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index 52292dc7..853a5785 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -1,12 +1,12 @@ -use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseTransaction, DbErr, EntityTrait}; +use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait}; use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { async fn expand_addressing(&self, targets: Vec) -> Result, DbErr>; - async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>; - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>; + async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; } #[async_trait::async_trait] @@ -32,7 +32,7 @@ impl Addresser for crate::Context { Ok(out) } - async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> { + async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> { // TODO address_to became kind of expensive, with these two selects right away and then another // select for each target we're addressing to... can this be improved?? let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; @@ -75,7 +75,7 @@ impl Addresser for crate::Context { Ok(()) } - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> { + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> { let mut deliveries = Vec::new(); for target in targets.iter() .filter(|to| !to.is_empty()) @@ -83,7 +83,7 @@ impl Addresser for crate::Context { .filter(|to| to != &apb::target::PUBLIC) { // TODO fetch concurrently - match self.fetch_user(target).await { + match self.fetch_user(target, tx).await { Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( crate::model::job::ActiveModel { internal: sea_orm::ActiveValue::NotSet, diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index c3773a2f..2b7f4a2b 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; -use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet, TransactionTrait}; +use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet}; use crate::traits::normalize::AP; @@ -86,22 +86,18 @@ pub trait Fetcher { async fn webfinger(&self, user: &str, host: &str) -> Result, PullError>; - async fn fetch_domain(&self, domain: &str) -> Result; + async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result; - async fn fetch_user(&self, id: &str) -> Result; - async fn resolve_user(&self, actor: serde_json::Value) -> Result; + async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result; + async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result; - async fn fetch_activity(&self, id: &str) -> Result; - async fn resolve_activity(&self, activity: serde_json::Value) -> Result; + async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result; + async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result; - async fn fetch_object(&self, id: &str) -> Result { self.fetch_object_r(id, 0).await } - async fn resolve_object(&self, object: serde_json::Value) -> Result { self.resolve_object_r(object, 0).await } + async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result; + async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result; - async fn fetch_object_r(&self, id: &str, depth: u32) -> Result; - async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result; - - - async fn fetch_thread(&self, id: &str) -> Result<(), PullError>; + async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), PullError>; async fn request( method: reqwest::Method, @@ -163,7 +159,7 @@ pub trait Fetcher { #[async_trait::async_trait] impl Fetcher for crate::Context { async fn pull_r(&self, id: &str, depth: u32) -> Result, PullError> { - let _domain = self.fetch_domain(&crate::Context::server(id)).await?; + // let _domain = self.fetch_domain(&crate::Context::server(id)).await?; let document = Self::request( Method::GET, id, None, @@ -223,8 +219,8 @@ impl Fetcher for crate::Context { Ok(None) } - async fn fetch_domain(&self, domain: &str) -> Result { - if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(self.db()).await? { + async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result { + if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? { return Ok(x); // already in db, easy } @@ -265,8 +261,8 @@ impl Fetcher for crate::Context { let mut active_model = instance_model.clone().into_active_model(); active_model.internal = NotSet; - crate::model::instance::Entity::insert(active_model).exec(self.db()).await?; - let internal = crate::model::instance::Entity::domain_to_internal(domain, self.db()) + crate::model::instance::Entity::insert(active_model).exec(tx).await?; + let internal = crate::model::instance::Entity::domain_to_internal(domain, tx) .await? .ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?; instance_model.internal = internal; @@ -274,9 +270,11 @@ impl Fetcher for crate::Context { Ok(instance_model) } - async fn resolve_user(&self, mut document: serde_json::Value) -> Result { + async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result { let id = document.id()?.to_string(); + let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?; + // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time if let Ok(followers_url) = document.followers().id() { let req = Self::request( @@ -311,118 +309,121 @@ impl Fetcher for crate::Context { // TODO this may fail: while fetching, remote server may fetch our service actor. // if it does so with http signature, we will fetch that actor in background // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - crate::model::actor::Entity::insert(user_model).exec(self.db()).await?; + crate::model::actor::Entity::insert(user_model).exec(tx).await?; // TODO fetch it back to get the internal id Ok( crate::model::actor::Entity::find_by_ap_id(&id) - .one(self.db()) + .one(tx) .await? .ok_or_else(|| DbErr::RecordNotFound(id.to_string()))? ) } - async fn fetch_user(&self, id: &str) -> Result { - if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result { + if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? { return Ok(x); // already in db, easy } let document = self.pull(id).await?.actor()?; - self.resolve_user(document).await + self.resolve_user(document, tx).await } - async fn fetch_activity(&self, id: &str) -> Result { - if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result { + if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? { return Ok(x); // already in db, easy } let activity = self.pull(id).await?.activity()?; - self.resolve_activity(activity).await + self.resolve_activity(activity, tx).await } - async fn resolve_activity(&self, activity: serde_json::Value) -> Result { + async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result { + let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?; + if let Ok(activity_actor) = activity.actor().id() { - if let Err(e) = self.fetch_user(activity_actor).await { + if let Err(e) = self.fetch_user(activity_actor, tx).await { tracing::warn!("could not get actor of fetched activity: {e}"); } } if let Ok(activity_object) = activity.object().id() { - if let Err(e) = self.fetch_object(activity_object).await { + if let Err(e) = self.fetch_object(activity_object, tx).await { tracing::warn!("could not get object of fetched activity: {e}"); } } - let tx = self.db().begin().await?; - - let activity_model = self.insert_activity(activity, &tx).await?; + let activity_model = self.insert_activity(activity, tx).await?; let addressed = activity_model.addressed(); let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(activity_model.internal), None, &expanded_addresses, &tx).await?; - - tx.commit().await?; + self.address_to(Some(activity_model.internal), None, &expanded_addresses, tx).await?; Ok(activity_model) } - async fn fetch_thread(&self, _id: &str) -> Result<(), PullError> { + async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), PullError> { // crawl_replies(self, id, 0).await todo!() } - async fn fetch_object_r(&self, id: &str, depth: u32) -> Result { - if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(self.db()).await? { - return Ok(x); // already in db, easy - } - - let object = self.pull(id).await?.object()?; - - self.resolve_object_r(object, depth).await + async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result { + fetch_object_r(self, id, 0, tx).await } - async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result { - let id = object.id()?.to_string(); - - if let Ok(oid) = object.id() { - if oid != id { - if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? { - return Ok(x); // already in db, but with id different that given url - } - } - } - - if let Ok(attributed_to) = object.attributed_to().id() { - if let Err(e) = self.fetch_user(attributed_to).await { - tracing::warn!("could not get actor of fetched object: {e}"); - } - } - - let addressed = object.addressed(); - - if let Ok(reply) = object.in_reply_to().id() { - if depth <= self.cfg().security.thread_crawl_depth { - self.fetch_object_r(reply, depth + 1).await?; - } else { - tracing::warn!("thread deeper than {}, giving up fetching more replies", self.cfg().security.thread_crawl_depth); - } - } - - let tx = self.db().begin().await?; - - let object_model = self.insert_object(object, &tx).await?; - - let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(None, Some(object_model.internal), &expanded_addresses, &tx).await?; - - tx.commit().await?; - - Ok(object_model) + async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result { + resolve_object_r(self, object, 0, tx).await } } +#[async_recursion::async_recursion] +async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result { + if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? { + return Ok(x); // already in db, easy + } + + let object = ctx.pull(id).await?.object()?; + + resolve_object_r(ctx, object, depth, tx).await +} + +async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result { + let id = object.id()?.to_string(); + + if let Ok(oid) = object.id() { + if oid != id { + if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(tx).await? { + return Ok(x); // already in db, but with id different that given url + } + } + } + + if let Ok(attributed_to) = object.attributed_to().id() { + if let Err(e) = ctx.fetch_user(attributed_to, tx).await { + tracing::warn!("could not get actor of fetched object: {e}"); + } + } + + let addressed = object.addressed(); + + if let Ok(reply) = object.in_reply_to().id() { + if depth <= ctx.cfg().security.thread_crawl_depth { + fetch_object_r(ctx, reply, depth + 1, tx).await?; + } else { + tracing::warn!("thread deeper than {}, giving up fetching more replies", ctx.cfg().security.thread_crawl_depth); + } + } + + let object_model = ctx.insert_object(object, tx).await?; + + let expanded_addresses = ctx.expand_addressing(addressed).await?; + ctx.address_to(None, Some(object_model.internal), &expanded_addresses, tx).await?; + + Ok(object_model) +} + #[async_trait::async_trait] pub trait Fetchable : Sync + Send { async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>; diff --git a/upub/core/src/traits/normalize.rs b/upub/core/src/traits/normalize.rs index 137fbbfb..16a469e5 100644 --- a/upub/core/src/traits/normalize.rs +++ b/upub/core/src/traits/normalize.rs @@ -1,5 +1,5 @@ use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DatabaseTransaction, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; #[derive(Debug, thiserror::Error)] pub enum NormalizerError { @@ -12,14 +12,14 @@ pub enum NormalizerError { #[async_trait::async_trait] pub trait Normalizer { - async fn insert_object(&self, obj: impl apb::Object, tx: &DatabaseTransaction) -> Result; - async fn insert_activity(&self, act: impl apb::Activity, tx: &DatabaseTransaction) -> Result; + async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result; + async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result; } #[async_trait::async_trait] impl Normalizer for crate::Context { - async fn insert_object(&self, object: impl apb::Object, tx: &DatabaseTransaction) -> Result { + async fn insert_object(&self, object: impl apb::Object, tx: &impl ConnectionTrait) -> Result { let oid = object.id()?.to_string(); let uid = object.attributed_to().id().str(); let t = object.object_type()?; @@ -131,7 +131,7 @@ impl Normalizer for crate::Context { Ok(object_model) } - async fn insert_activity(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result { + async fn insert_activity(&self, activity: impl apb::Activity, tx: &impl ConnectionTrait) -> Result { let mut activity_model = AP::activity(&activity)?; let mut active_model = activity_model.clone().into_active_model(); diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 9ccfd800..a55158f4 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -61,7 +61,7 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat return Err(ProcessorError::Unprocessable); }; if let Ok(reply) = object_node.in_reply_to().id() { - if let Err(e) = ctx.fetch_object(reply).await { + if let Err(e) = ctx.fetch_object(reply, tx).await { tracing::warn!("failed fetching replies for received object: {e}"); } } @@ -80,7 +80,7 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .ok_or(ProcessorError::Incomplete)?; let object_uri = activity.object().id()?.to_string(); let published = activity.published().unwrap_or_else(|_|chrono::Utc::now()); - let obj = ctx.fetch_object(&object_uri).await?; + let obj = ctx.fetch_object(&object_uri, tx).await?; if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) .any(tx) .await? @@ -127,7 +127,7 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .await? .ok_or(ProcessorError::Incomplete)?; let target_actor = activity.object().id()?.to_string(); - let usr = ctx.fetch_user(&target_actor).await?; + let usr = ctx.fetch_user(&target_actor, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; let relation_model = crate::model::relation::ActiveModel { internal: NotSet, @@ -346,7 +346,7 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); - let actor = ctx.fetch_user(&uid).await?; + let actor = ctx.fetch_user(&uid, tx).await?; let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx) .await? .ok_or(ProcessorError::Incomplete)?; @@ -369,7 +369,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D // objects are processed down below, make a mock Internal::Object(internal) Pull::Object(x) => crate::context::Internal::Object( - ctx.resolve_object(x).await?.internal + ctx.resolve_object(x, tx).await?.internal ), } } diff --git a/upub/routes/src/activitypub/activity.rs b/upub/routes/src/activitypub/activity.rs index 91cefd50..3efd69f9 100644 --- a/upub/routes/src/activitypub/activity.rs +++ b/upub/routes/src/activitypub/activity.rs @@ -1,5 +1,5 @@ use axum::extract::{Path, Query, State}; -use sea_orm::{ColumnTrait, QueryFilter}; +use sea_orm::{ColumnTrait, QueryFilter, TransactionTrait}; use upub::{model::{self, addressing::Event, attachment::BatchFillable}, traits::Fetcher, Context}; use apb::LD; @@ -15,7 +15,9 @@ pub async fn view( ) -> crate::ApiResult> { let aid = ctx.aid(&id); if auth.is_local() && query.fetch && !ctx.is_local(&aid) { - let obj = ctx.fetch_activity(&aid).await?; + let tx = ctx.db().begin().await?; + let obj = ctx.fetch_activity(&aid, &tx).await?; + tx.commit().await?; if obj.id != aid { return Err(crate::ApiError::Redirect(obj.id)); } diff --git a/upub/routes/src/activitypub/object/mod.rs b/upub/routes/src/activitypub/object/mod.rs index baad5b37..58cb4c83 100644 --- a/upub/routes/src/activitypub/object/mod.rs +++ b/upub/routes/src/activitypub/object/mod.rs @@ -2,7 +2,7 @@ pub mod replies; use apb::{CollectionMut, ObjectMut, LD}; use axum::extract::{Path, Query, State}; -use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns}; +use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait}; use upub::{model::{self, addressing::Event}, traits::Fetcher, Context}; use crate::{builders::JsonLD, AuthIdentity}; @@ -17,7 +17,9 @@ pub async fn view( ) -> crate::ApiResult> { let oid = ctx.oid(&id); if auth.is_local() && query.fetch && !ctx.is_local(&oid) { - let obj = ctx.fetch_object(&oid).await?; + let tx = ctx.db().begin().await?; + let obj = ctx.fetch_object(&oid, &tx).await?; + tx.commit().await?; // some implementations serve statuses on different urls than their AP id if obj.id != oid { return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); diff --git a/upub/routes/src/activitypub/user/mod.rs b/upub/routes/src/activitypub/user/mod.rs index 88cd4326..91a0daf9 100644 --- a/upub/routes/src/activitypub/user/mod.rs +++ b/upub/routes/src/activitypub/user/mod.rs @@ -30,7 +30,7 @@ pub async fn view( } } if query.fetch && !ctx.is_local(&uid) { - ctx.fetch_user(&uid).await?; + ctx.fetch_user(&uid, ctx.db()).await?; } } let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db()) diff --git a/upub/routes/src/auth.rs b/upub/routes/src/auth.rs index 0a63d2ae..2a0fa3f9 100644 --- a/upub/routes/src/auth.rs +++ b/upub/routes/src/auth.rs @@ -120,7 +120,7 @@ where .next().ok_or(ApiError::bad_request())? .to_string(); - match ctx.fetch_user(&user_id).await { + match ctx.fetch_user(&user_id, ctx.db()).await { Err(e) => tracing::warn!("failed resolving http signature actor: {e}"), Ok(user) => match http_signature .build_from_parts(parts)