From 1bb8df0ac51f243d478e021a52db575db8811c0a Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 19:04:39 +0200 Subject: [PATCH] fix: pass transaction to expand_addressing too --- upub/core/src/model/relation.rs | 4 ++-- upub/core/src/traits/address.rs | 6 +++--- upub/core/src/traits/fetch.rs | 4 ++-- upub/core/src/traits/process.rs | 16 ++++++++-------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/upub/core/src/model/relation.rs b/upub/core/src/model/relation.rs index 1a6d43e..df7fe36 100644 --- a/upub/core/src/model/relation.rs +++ b/upub/core/src/model/relation.rs @@ -63,7 +63,7 @@ impl ActiveModelBehavior for ActiveModel {} impl Entity { // TODO this is 2 queries!!! can it be optimized down to 1? - pub async fn followers(uid: &str, db: &DatabaseConnection) -> Result>, DbErr> { + pub async fn followers(uid: &str, db: &impl ConnectionTrait) -> Result>, DbErr> { let Some(internal_id) = super::actor::Entity::ap_to_internal(uid, db).await? else { return Ok(None); @@ -88,7 +88,7 @@ impl Entity { } // TODO this is 2 queries!!! can it be optimized down to 1? - pub async fn following(uid: &str, db: &DatabaseConnection) -> Result>, DbErr> { + pub async fn following(uid: &str, db: &impl ConnectionTrait) -> Result>, DbErr> { let Some(internal_id) = super::actor::Entity::ap_to_internal(uid, db).await? else { return Ok(None); diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index a8357ed..6e0e888 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -4,19 +4,19 @@ use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { - async fn expand_addressing(&self, targets: Vec) -> Result, DbErr>; + async fn expand_addressing(&self, targets: Vec, tx: &impl ConnectionTrait) -> Result, DbErr>; async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; } #[async_trait::async_trait] impl Addresser for crate::Context { - async fn expand_addressing(&self, targets: Vec) -> Result, DbErr> { + async fn expand_addressing(&self, targets: Vec, tx: &impl ConnectionTrait) -> Result, DbErr> { let mut out = Vec::new(); for target in targets { if target.ends_with("/followers") { let target_id = target.replace("/followers", ""); - let mut followers = crate::model::relation::Entity::followers(&target_id, self.db()) + let mut followers = crate::model::relation::Entity::followers(&target_id, tx) .await? .unwrap_or_else(Vec::new); if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index 2b7f4a2..d54bee9 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -358,7 +358,7 @@ impl Fetcher for crate::Context { let activity_model = self.insert_activity(activity, tx).await?; let addressed = activity_model.addressed(); - let expanded_addresses = self.expand_addressing(addressed).await?; + let expanded_addresses = self.expand_addressing(addressed, tx).await?; self.address_to(Some(activity_model.internal), None, &expanded_addresses, tx).await?; Ok(activity_model) @@ -418,7 +418,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth let object_model = ctx.insert_object(object, tx).await?; - let expanded_addresses = ctx.expand_addressing(addressed).await?; + let expanded_addresses = ctx.expand_addressing(addressed, tx).await?; ctx.address_to(None, Some(object_model.internal), &expanded_addresses, tx).await?; Ok(object_model) diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 0b2f0da..5230aa3 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -75,7 +75,7 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat let object_model = ctx.insert_object(object_node, tx).await?; object_model.internal }; - let expanded_addressing = ctx.expand_addressing(addressed).await?; + let expanded_addressing = ctx.expand_addressing(addressed, tx).await?; ctx.address_to(Some(activity_model.internal), Some(internal_oid), &expanded_addressing, tx).await?; tracing::info!("{} posted {}", activity_model.actor, oid); Ok(()) @@ -110,7 +110,7 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .exec(tx) .await?; - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! expanded_addressing.push( crate::model::object::Entity::find_by_id(obj.internal) @@ -144,7 +144,7 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat }; crate::model::relation::Entity::insert(relation_model) .exec(tx).await?; - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; if !expanded_addressing.contains(&target_actor) { expanded_addressing.push(target_actor); } @@ -192,7 +192,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; if !expanded_addressing.contains(&follow_activity.actor) { expanded_addressing.push(follow_activity.actor); } @@ -222,7 +222,7 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor); - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; if !expanded_addressing.contains(&follow_activity.actor) { expanded_addressing.push(follow_activity.actor); } @@ -276,7 +276,7 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat } tracing::info!("{} updated {}", uid, oid); - let expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; + let expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; Ok(()) } @@ -298,7 +298,7 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .ok_or(ProcessorError::Incomplete)?; let activity_type = activity.activity_type()?; - let targets = ctx.expand_addressing(activity.addressed()).await?; + let targets = ctx.expand_addressing(activity.addressed(), tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; ctx.address_to(Some(activity_model.internal), None, &targets, tx).await?; @@ -400,7 +400,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D published: Set(published), }; - let expanded_addressing = ctx.expand_addressing(addressed).await?; + let expanded_addressing = ctx.expand_addressing(addressed, tx).await?; ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; crate::model::announce::Entity::insert(share) .exec(tx).await?;