From 54e6b517e2079d5aa5d2321d9a754f2be4b5e209 Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 28 Jun 2024 02:50:46 +0200 Subject: [PATCH] fix: merge Create addressing basically bring Addresser out of Normalizer again and address manually everywhere so that in Create we can join the row. in the future we may be able to resolve contexts more wisely and thus be able to insert merged rows for likes and announces too, but for now this should be quite enough --- upub/cli/src/fetch.rs | 14 ++++++------ upub/core/src/traits/address.rs | 36 ++++++++++++++++++++++--------- upub/core/src/traits/fetch.rs | 4 +++- upub/core/src/traits/normalize.rs | 6 ------ upub/core/src/traits/process.rs | 19 +++++++++++----- 5 files changed, 51 insertions(+), 28 deletions(-) diff --git a/upub/cli/src/fetch.rs b/upub/cli/src/fetch.rs index e512c664..6769097a 100644 --- a/upub/cli/src/fetch.rs +++ b/upub/cli/src/fetch.rs @@ -1,5 +1,5 @@ use sea_orm::{EntityTrait, TransactionTrait}; -use upub::traits::{fetch::{Fetchable, PullError}, Normalizer}; +use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer}; pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { use apb::Base; @@ -16,15 +16,17 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), Pu let tx = ctx.db().begin().await?; match obj.base_type() { Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { - upub::model::actor::Entity::insert( - upub::AP::actor_q(&obj, None).unwrap() - ).exec(&tx).await.unwrap(); + upub::model::actor::Entity::insert(upub::AP::actor_q(&obj, None)?) + .exec(&tx) + .await?; }, Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { - ctx.insert_activity(obj, &tx).await.unwrap(); + let act = ctx.insert_activity(obj, &tx).await?; + ctx.address((Some(&act), None), &tx).await?; }, Ok(apb::BaseType::Object(apb::ObjectType::Note)) => { - ctx.insert_object(obj, &tx).await.unwrap(); + let obj = ctx.insert_object(obj, &tx).await?; + ctx.address((None, Some(&obj)), &tx).await?; }, Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index d3ddf1fd..f4bef2c4 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use apb::target::Addressed; use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait, QuerySelect, SelectColumns}; @@ -6,8 +8,7 @@ use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { async fn deliver(&self, to: Vec, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>; - async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; - async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; + async fn address(&self, (activity, object): (Option<&crate::model::activity::Model>, Option<&crate::model::object::Model>), tx: &impl ConnectionTrait) -> Result<(), DbErr>; } #[async_trait::async_trait] @@ -54,14 +55,29 @@ impl Addresser for crate::Context { Ok(()) } - async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { - let to = expand_addressing(object.addressed(), tx).await?; - address_to(self, to, None, Some(object.internal), self.is_local(&object.id), tx).await - } - - async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { - let to = expand_addressing(activity.addressed(), tx).await?; - address_to(self, to, Some(activity.internal), None, self.is_local(&activity.id), tx).await + async fn address(&self, (activity, object): (Option<&crate::model::activity::Model>, Option<&crate::model::object::Model>), tx: &impl ConnectionTrait) -> Result<(), DbErr> { + match (activity, object) { + (None, None) => Ok(()), + (Some(activity), None) => { + let to = expand_addressing(activity.addressed(), tx).await?; + address_to(self, to, Some(activity.internal), None, self.is_local(&activity.id), tx).await + }, + (None, Some(object)) => { + let to = expand_addressing(object.addressed(), tx).await?; + address_to(self, to, None, Some(object.internal), self.is_local(&object.id), tx).await + }, + (Some(activity), Some(object)) => { + let to_activity = BTreeSet::from_iter(expand_addressing(activity.addressed(), tx).await?); + let to_object = BTreeSet::from_iter(expand_addressing(object.addressed(), tx).await?); + let to_common = to_activity.intersection(&to_object).cloned().collect(); + address_to(self, to_common, Some(activity.internal), Some(object.internal), self.is_local(&activity.id), tx).await?; + let to_only_activity = (&to_activity - &to_object).into_iter().collect(); + address_to(self, to_only_activity, Some(activity.internal), None, self.is_local(&activity.id), tx).await?; + let to_only_object = (&to_object - &to_activity).into_iter().collect(); + address_to(self, to_only_object, None, Some(object.internal), self.is_local(&activity.id), tx).await?; + Ok(()) + }, + } } } diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index 728b513d..43f8da6e 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -6,7 +6,7 @@ use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet}; use crate::traits::normalize::AP; -use super::Normalizer; +use super::{Addresser, Normalizer}; use httpsign::HttpSignature; #[derive(Debug, Clone)] @@ -357,6 +357,7 @@ impl Fetcher for crate::Context { } let activity_model = self.insert_activity(activity, tx).await?; + self.address((Some(&activity_model), None), tx).await?; Ok(activity_model) } @@ -412,6 +413,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth } let object_model = ctx.insert_object(object, tx).await?; + ctx.address((None, Some(&object_model)), tx).await?; Ok(object_model) } diff --git a/upub/core/src/traits/normalize.rs b/upub/core/src/traits/normalize.rs index 085843ae..53fd1c2b 100644 --- a/upub/core/src/traits/normalize.rs +++ b/upub/core/src/traits/normalize.rs @@ -1,8 +1,6 @@ use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Unchanged, NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; -use super::Addresser; - #[derive(Debug, thiserror::Error)] pub enum NormalizerError { #[error("normalized document misses required field: {0:?}")] @@ -53,8 +51,6 @@ impl Normalizer for crate::Context { .await? .ok_or_else(|| DbErr::RecordNotFound(object_model.id.clone()))?; - self.address_object(&object_model, tx).await?; - // update replies counter if let Some(ref in_reply_to) = object_model.in_reply_to { crate::model::object::Entity::update_many() @@ -193,8 +189,6 @@ impl Normalizer for crate::Context { .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; activity_model.internal = internal; - self.address_activity(&activity_model, tx).await?; - Ok(activity_model) } } diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 78eb24ae..5756c901 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,6 +1,6 @@ use apb::{target::Addressed, Activity, Base, Object}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Fetcher, Normalizer}}; +use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] pub enum ProcessorError { @@ -79,8 +79,8 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .collect::>(); let object_model = ctx.insert_object(object_node, tx).await?; - let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), Some(&object_model)), tx).await?; for uid in notified { if !ctx.is_local(&uid) { continue } @@ -122,6 +122,7 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab // likes without addressing are "silent likes", process them but dont store activity or notify if !activity.addressed().is_empty() { let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; // TODO check that object author is in this like addressing!!! otherwise skip notification if let Some(ref attributed_to) = obj.attributed_to { @@ -162,6 +163,7 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da // dislikes without addressing are "silent dislikes", process them but dont store activity if !activity.addressed().is_empty() { let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; // TODO check that object author is in this like addressing!!! otherwise skip notification if let Some(ref attributed_to) = obj.attributed_to { @@ -186,6 +188,7 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .ok_or(ProcessorError::Incomplete)?; let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; if ctx.is_local(&target_actor.id) { crate::Query::notify(activity_model.internal, target_actor.internal) @@ -249,6 +252,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat } let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; if ctx.is_local(&follow_activity.actor) { if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? { @@ -307,6 +311,7 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat } let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; // TODO most software doesn't show this, but i think instead we should?? if someone rejects it's // better to know it clearly rather than not knowing if it got lost and maybe retry (being more @@ -337,7 +342,8 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat // except when they have empty addressing // so that also remote "secret" deletes dont get stored if !activity.addressed().is_empty() { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; } // TODO we should delete notifications from CREATEs related to objects we deleted tracing::debug!("deleted '{oid}'"); @@ -380,7 +386,8 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat // updates can be silently discarded except if local. we dont really care about knowing when // remote documents change, there's the "updated" field, just want the most recent version if ctx.is_local(&actor_id) { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; } tracing::debug!("{} updated {}", actor_id, oid); @@ -466,7 +473,8 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab // except when they have empty addressing // so that also remote "secret" undos dont get stored if !activity.addressed().is_empty() { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; } if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? { @@ -542,6 +550,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D // idk!!!! if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) { let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; if let Some(ref attributed_to) = object.attributed_to { if ctx.is_local(attributed_to) {