diff --git a/upub/cli/src/fetch.rs b/upub/cli/src/fetch.rs index e512c66..6769097 100644 --- a/upub/cli/src/fetch.rs +++ b/upub/cli/src/fetch.rs @@ -1,5 +1,5 @@ use sea_orm::{EntityTrait, TransactionTrait}; -use upub::traits::{fetch::{Fetchable, PullError}, Normalizer}; +use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer}; pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { use apb::Base; @@ -16,15 +16,17 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), Pu let tx = ctx.db().begin().await?; match obj.base_type() { Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { - upub::model::actor::Entity::insert( - upub::AP::actor_q(&obj, None).unwrap() - ).exec(&tx).await.unwrap(); + upub::model::actor::Entity::insert(upub::AP::actor_q(&obj, None)?) + .exec(&tx) + .await?; }, Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { - ctx.insert_activity(obj, &tx).await.unwrap(); + let act = ctx.insert_activity(obj, &tx).await?; + ctx.address((Some(&act), None), &tx).await?; }, Ok(apb::BaseType::Object(apb::ObjectType::Note)) => { - ctx.insert_object(obj, &tx).await.unwrap(); + let obj = ctx.insert_object(obj, &tx).await?; + ctx.address((None, Some(&obj)), &tx).await?; }, Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index d3ddf1f..f4bef2c 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use apb::target::Addressed; use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait, QuerySelect, SelectColumns}; @@ -6,8 +8,7 @@ use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { async fn deliver(&self, to: Vec, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>; - async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; - async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; + async fn address(&self, (activity, object): (Option<&crate::model::activity::Model>, Option<&crate::model::object::Model>), tx: &impl ConnectionTrait) -> Result<(), DbErr>; } #[async_trait::async_trait] @@ -54,14 +55,29 @@ impl Addresser for crate::Context { Ok(()) } - async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { - let to = expand_addressing(object.addressed(), tx).await?; - address_to(self, to, None, Some(object.internal), self.is_local(&object.id), tx).await - } - - async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { - let to = expand_addressing(activity.addressed(), tx).await?; - address_to(self, to, Some(activity.internal), None, self.is_local(&activity.id), tx).await + async fn address(&self, (activity, object): (Option<&crate::model::activity::Model>, Option<&crate::model::object::Model>), tx: &impl ConnectionTrait) -> Result<(), DbErr> { + match (activity, object) { + (None, None) => Ok(()), + (Some(activity), None) => { + let to = expand_addressing(activity.addressed(), tx).await?; + address_to(self, to, Some(activity.internal), None, self.is_local(&activity.id), tx).await + }, + (None, Some(object)) => { + let to = expand_addressing(object.addressed(), tx).await?; + address_to(self, to, None, Some(object.internal), self.is_local(&object.id), tx).await + }, + (Some(activity), Some(object)) => { + let to_activity = BTreeSet::from_iter(expand_addressing(activity.addressed(), tx).await?); + let to_object = BTreeSet::from_iter(expand_addressing(object.addressed(), tx).await?); + let to_common = to_activity.intersection(&to_object).cloned().collect(); + address_to(self, to_common, Some(activity.internal), Some(object.internal), self.is_local(&activity.id), tx).await?; + let to_only_activity = (&to_activity - &to_object).into_iter().collect(); + address_to(self, to_only_activity, Some(activity.internal), None, self.is_local(&activity.id), tx).await?; + let to_only_object = (&to_object - &to_activity).into_iter().collect(); + address_to(self, to_only_object, None, Some(object.internal), self.is_local(&activity.id), tx).await?; + Ok(()) + }, + } } } diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index 728b513..43f8da6 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -6,7 +6,7 @@ use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet}; use crate::traits::normalize::AP; -use super::Normalizer; +use super::{Addresser, Normalizer}; use httpsign::HttpSignature; #[derive(Debug, Clone)] @@ -357,6 +357,7 @@ impl Fetcher for crate::Context { } let activity_model = self.insert_activity(activity, tx).await?; + self.address((Some(&activity_model), None), tx).await?; Ok(activity_model) } @@ -412,6 +413,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth } let object_model = ctx.insert_object(object, tx).await?; + ctx.address((None, Some(&object_model)), tx).await?; Ok(object_model) } diff --git a/upub/core/src/traits/normalize.rs b/upub/core/src/traits/normalize.rs index 085843a..53fd1c2 100644 --- a/upub/core/src/traits/normalize.rs +++ b/upub/core/src/traits/normalize.rs @@ -1,8 +1,6 @@ use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Unchanged, NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; -use super::Addresser; - #[derive(Debug, thiserror::Error)] pub enum NormalizerError { #[error("normalized document misses required field: {0:?}")] @@ -53,8 +51,6 @@ impl Normalizer for crate::Context { .await? .ok_or_else(|| DbErr::RecordNotFound(object_model.id.clone()))?; - self.address_object(&object_model, tx).await?; - // update replies counter if let Some(ref in_reply_to) = object_model.in_reply_to { crate::model::object::Entity::update_many() @@ -193,8 +189,6 @@ impl Normalizer for crate::Context { .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; activity_model.internal = internal; - self.address_activity(&activity_model, tx).await?; - Ok(activity_model) } } diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 78eb24a..5756c90 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,6 +1,6 @@ use apb::{target::Addressed, Activity, Base, Object}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Fetcher, Normalizer}}; +use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] pub enum ProcessorError { @@ -79,8 +79,8 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .collect::>(); let object_model = ctx.insert_object(object_node, tx).await?; - let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), Some(&object_model)), tx).await?; for uid in notified { if !ctx.is_local(&uid) { continue } @@ -122,6 +122,7 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab // likes without addressing are "silent likes", process them but dont store activity or notify if !activity.addressed().is_empty() { let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; // TODO check that object author is in this like addressing!!! otherwise skip notification if let Some(ref attributed_to) = obj.attributed_to { @@ -162,6 +163,7 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da // dislikes without addressing are "silent dislikes", process them but dont store activity if !activity.addressed().is_empty() { let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; // TODO check that object author is in this like addressing!!! otherwise skip notification if let Some(ref attributed_to) = obj.attributed_to { @@ -186,6 +188,7 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .ok_or(ProcessorError::Incomplete)?; let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; if ctx.is_local(&target_actor.id) { crate::Query::notify(activity_model.internal, target_actor.internal) @@ -249,6 +252,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat } let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; if ctx.is_local(&follow_activity.actor) { if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? { @@ -307,6 +311,7 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat } let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; // TODO most software doesn't show this, but i think instead we should?? if someone rejects it's // better to know it clearly rather than not knowing if it got lost and maybe retry (being more @@ -337,7 +342,8 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat // except when they have empty addressing // so that also remote "secret" deletes dont get stored if !activity.addressed().is_empty() { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; } // TODO we should delete notifications from CREATEs related to objects we deleted tracing::debug!("deleted '{oid}'"); @@ -380,7 +386,8 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat // updates can be silently discarded except if local. we dont really care about knowing when // remote documents change, there's the "updated" field, just want the most recent version if ctx.is_local(&actor_id) { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; } tracing::debug!("{} updated {}", actor_id, oid); @@ -466,7 +473,8 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab // except when they have empty addressing // so that also remote "secret" undos dont get stored if !activity.addressed().is_empty() { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; } if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? { @@ -542,6 +550,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D // idk!!!! if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) { let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address((Some(&activity_model), None), tx).await?; if let Some(ref attributed_to) = object.attributed_to { if ctx.is_local(attributed_to) {