From a53c93c1c52481d3dcba939cfefd170c207e2d49 Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 7 Jun 2024 23:16:51 +0200 Subject: [PATCH] feat: dont store activities unless necessary --- upub/core/src/model/announce.rs | 6 + upub/core/src/traits/process.rs | 231 +++++++++++++------------------- 2 files changed, 101 insertions(+), 136 deletions(-) diff --git a/upub/core/src/model/announce.rs b/upub/core/src/model/announce.rs index 5a800b41..9a999442 100644 --- a/upub/core/src/model/announce.rs +++ b/upub/core/src/model/announce.rs @@ -43,3 +43,9 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_by_uid_oid(uid: i64, oid: i64) -> Select { + Entity::find().filter(Column::Actor.eq(uid)).filter(Column::Object.eq(oid)) + } +} diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 2a817447..c6fa710c 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,6 +1,6 @@ use apb::{target::Addressed, Activity, Base, Object}; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter}; +use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] pub enum ProcessorError { @@ -60,33 +60,30 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat tracing::error!("refusing to process activity without embedded object"); return Err(ProcessorError::Unprocessable(activity.id()?.to_string())); }; - let oid = object_node.id()?.to_string(); - let addressed = object_node.addressed(); - let activity_model = ctx.insert_activity(activity, tx).await?; - let internal_oid = if let Some(internal) = model::object::Entity::ap_to_internal(&oid, tx).await? { - tracing::debug!("skipping insertion of already known object #{internal}"); - internal - } else { - if let Ok(reply) = object_node.in_reply_to().id() { - if let Err(e) = ctx.fetch_object(reply, tx).await { - tracing::warn!("failed fetching replies for received object: {e}"); - } + if model::object::Entity::ap_to_internal(object_node.id()?, tx).await?.is_some() { + return Err(ProcessorError::AlreadyProcessed); + } + if object_node.attributed_to().id()? != activity.actor().id()? { + return Err(ProcessorError::Unauthorized); + } + if let Ok(reply) = object_node.in_reply_to().id() { + if let Err(e) = ctx.fetch_object(reply, tx).await { + tracing::warn!("failed fetching replies for received object: {e}"); } - let object_model = ctx.insert_object(object_node, tx).await?; - object_model.internal - }; - let expanded_addressing = ctx.expand_addressing(addressed, tx).await?; - ctx.address_to(Some(activity_model.internal), Some(internal_oid), &expanded_addressing, tx).await?; - tracing::info!("{} posted {}", activity_model.actor, oid); + } + let object_model = ctx.insert_object(object_node, tx).await?; + // only likes mentioning local users are stored to generate notifications, everything else + // produces side effects but no activity, and thus no notification + if activity.mentioning().iter().any(|x| ctx.is_local(x)) { + ctx.insert_activity(activity, tx).await?; + } + tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or(""), object_model.id); Ok(()) } pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { - let uid = activity.actor().id()?.to_string(); - let actor = ctx.fetch_user(&uid, tx).await?; - let object_uri = activity.object().id()?.to_string(); - let published = activity.published().unwrap_or_else(|_|chrono::Utc::now()); - let obj = ctx.fetch_object(&object_uri, tx).await?; + let actor = ctx.fetch_user(activity.actor().id()?, tx).await?; + let obj = ctx.fetch_object(activity.object().id()?, tx).await?; if crate::model::like::Entity::find_by_uid_oid(actor.internal, obj.internal) .any(tx) .await? @@ -94,15 +91,13 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab return Err(ProcessorError::AlreadyProcessed); } - let activity_model = ctx.insert_activity(activity, tx).await?; - let like = crate::model::like::ActiveModel { internal: NotSet, actor: Set(actor.internal), object: Set(obj.internal), - activity: Set(activity_model.internal), - published: Set(published), + published: Set(activity.published().unwrap_or_else(|_|chrono::Utc::now())), }; + crate::model::like::Entity::insert(like).exec(tx).await?; crate::model::object::Entity::update_many() .col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1)) @@ -110,30 +105,21 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .exec(tx) .await?; - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; - if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! - expanded_addressing.push( - crate::model::object::Entity::find_by_id(obj.internal) - .select_only() - .select_column(crate::model::object::Column::AttributedTo) - .into_tuple::() - .one(tx) - .await? - .ok_or(ProcessorError::Incomplete)? - ); + // only likes mentioning local users are stored to generate notifications, everything else + // produces side effects but no activity, and thus no notification + if activity.mentioning().iter().any(|x| ctx.is_local(x)) { + ctx.insert_activity(activity, tx).await?; } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; - tracing::info!("{} liked {}", uid, obj.id); + + tracing::debug!("{} liked {}", actor.id, obj.id); Ok(()) } pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { - let source_actor = activity.actor().id()?.to_string(); - let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, tx) + let source_actor_internal = crate::model::actor::Entity::ap_to_internal(activity.actor().id()?, tx) .await? .ok_or(ProcessorError::Incomplete)?; - let target_actor = activity.object().id()?.to_string(); - let usr = ctx.fetch_user(&target_actor, tx).await?; + let usr = ctx.fetch_user(activity.object().id()?, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; let relation_model = crate::model::relation::ActiveModel { internal: NotSet, @@ -144,30 +130,28 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat }; crate::model::relation::Entity::insert(relation_model) .exec(tx).await?; - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; - if !expanded_addressing.contains(&target_actor) { - expanded_addressing.push(target_actor); - } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; - tracing::info!("{} wants to follow {}", source_actor, usr.id); + tracing::info!("{} wants to follow {}", activity_model.actor, usr.id); Ok(()) } pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { // TODO what about TentativeAccept - let target_actor = activity.actor().id()?.to_string(); - let follow_request_id = activity.object().id()?.to_string(); - let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id) + let follow_activity = crate::model::activity::Entity::find_by_ap_id(activity.object().id()?) .one(tx) .await? .ok_or(ProcessorError::Incomplete)?; - if follow_activity.object.unwrap_or_default() != target_actor { + if follow_activity.object.unwrap_or_default() != activity.actor().id()? { return Err(ProcessorError::Unauthorized); } let activity_model = ctx.insert_activity(activity, tx).await?; + crate::model::relation::Entity::update_many() + .col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) + .filter(crate::model::relation::Column::Activity.eq(follow_activity.internal)) + .exec(tx).await?; + crate::model::actor::Entity::update_many() .col_expr( crate::model::actor::Column::FollowingCount, @@ -185,31 +169,19 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .exec(tx) .await?; - crate::model::relation::Entity::update_many() - .col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) - .filter(crate::model::relation::Column::Activity.eq(follow_activity.internal)) - .exec(tx).await?; + tracing::debug!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor); - tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); - - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; - if !expanded_addressing.contains(&follow_activity.actor) { - expanded_addressing.push(follow_activity.actor); - } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; Ok(()) } pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { // TODO what about TentativeReject? - let uid = activity.actor().id()?.to_string(); - let follow_request_id = activity.object().id()?.to_string(); - let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id) + let follow_activity = crate::model::activity::Entity::find_by_ap_id(activity.object().id()?) .one(tx) .await? .ok_or(ProcessorError::Incomplete)?; - if follow_activity.object.unwrap_or_default() != uid { + if follow_activity.object.unwrap_or_default() != activity.actor().id()? { return Err(ProcessorError::Unauthorized); } @@ -220,14 +192,8 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .exec(tx) .await?; - tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor); + tracing::debug!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor); - let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; - if !expanded_addressing.contains(&follow_activity.actor) { - expanded_addressing.push(follow_activity.actor); - } - - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; Ok(()) } @@ -239,16 +205,13 @@ pub async fn delete(_ctx: &crate::Context, activity: impl apb::Activity, tx: &Da Ok(()) } -pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { - let uid = activity.actor().id()?.to_string(); +pub async fn update(_ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let Some(object_node) = activity.object().extract() else { tracing::error!("refusing to process activity without embedded object"); return Err(ProcessorError::Unprocessable(activity.id()?.to_string())); }; let oid = object_node.id()?.to_string(); - let activity_model = ctx.insert_activity(activity, tx).await?; - match object_node.object_type()? { apb::ObjectType::Actor(_) => { let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, tx) @@ -272,38 +235,34 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .exec(tx) .await?; }, - t => tracing::warn!("no side effects implemented for update type {t:?}"), + _ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), } - tracing::info!("{} updated {}", uid, oid); - let expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?; - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; + tracing::debug!("{} updated {}", activity.actor().id()?, oid); Ok(()) } -pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { - let uid = activity.actor().id()?.to_string(); +pub async fn undo(_ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { // TODO in theory we could work with just object_id but right now only accept embedded - let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?; - let undone_activity_author = undone_activity.as_activity()?.actor().id()?.to_string(); - - if uid != undone_activity_author { - return Err(ProcessorError::Unauthorized); - } - - let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string(); + let undone_activity = activity.object() + .extract() + .ok_or(apb::FieldErr("object"))?; + let uid = activity.actor().id()?.to_string(); let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx) .await? .ok_or(ProcessorError::Incomplete)?; - let targets = ctx.expand_addressing(activity.addressed(), tx).await?; - let activity_model = ctx.insert_activity(activity, tx).await?; - ctx.address_to(Some(activity_model.internal), None, &targets, tx).await?; + if uid != undone_activity.as_activity()?.actor().id()? { + return Err(ProcessorError::Unauthorized); + } match undone_activity.as_activity()?.activity_type()? { apb::ActivityType::Like => { - let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, tx) + let internal_oid = crate::model::object::Entity::ap_to_internal( + undone_activity.as_activity()?.object().id()?, + tx + ) .await? .ok_or(ProcessorError::Incomplete)?; crate::model::like::Entity::delete_many() @@ -321,7 +280,10 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .await?; }, apb::ActivityType::Follow => { - let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, tx) + let internal_uid_following = crate::model::actor::Entity::ap_to_internal( + undone_activity.as_activity()?.object().id()?, + tx, + ) .await? .ok_or(ProcessorError::Incomplete)?; crate::model::relation::Entity::delete_many() @@ -340,21 +302,14 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .exec(tx) .await?; }, - t => { - tracing::error!("received 'Undo' activity '{}' for unimplemented activity type: {t:?}", activity_model.id); - return Err(ProcessorError::Unprocessable(activity_model.id)); - }, + _ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), } Ok(()) } pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { - let uid = activity.actor().id()?.to_string(); - let actor = ctx.fetch_user(&uid, tx).await?; let announced_id = activity.object().id()?.to_string(); - let published = activity.published().unwrap_or(chrono::Utc::now()); - let addressed = activity.addressed(); match match ctx.find_internal(&announced_id).await? { // if we already have this activity, skip it @@ -379,37 +334,41 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable(activity.id()?.to_string())), crate::context::Internal::Activity(_) => Err(ProcessorError::AlreadyProcessed), // ??? crate::context::Internal::Object(internal) => { - let object_model = model::object::Entity::find_by_id(internal) - .one(tx) - .await? - .ok_or_else(|| sea_orm::DbErr::RecordNotFound(internal.to_string()))?; - let activity_model = ctx.insert_activity(activity, tx).await?; + let actor = ctx.fetch_user(activity.actor().id()?, tx).await?; - // relays send us objects as Announce, but we don't really want to count those towards the - // total shares count of an object, so just fetch the object and be done with it - if !matches!(actor.actor_type, apb::ActorType::Person) { - tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); - return Ok(()) + // we only care about "organic" announces, as in those produced by people + // anything shared by groups, services or applications is just mirroring: fetch it and be done + if actor.actor_type == apb::ActorType::Person { + let share = crate::model::announce::ActiveModel { + internal: NotSet, + actor: Set(actor.internal), + object: Set(internal), + published: Set(activity.published().unwrap_or(chrono::Utc::now())), + }; + + crate::model::announce::Entity::insert(share) + .exec(tx).await?; + + // if this user never "boosted" this object before, increase its counter + if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, internal) + .any(tx) + .await? + { + crate::model::object::Entity::update_many() + .col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) + .filter(crate::model::object::Column::Internal.eq(internal)) + .exec(tx) + .await?; + } + + // TODO we should probably insert an activity, otherwise this won't appear on timelines!! + // or maybe go update all addressing records for this object, pushing them up + // or maybe create new addressing rows with more recent dates + // or maybe create fake objects that reference the original one + // idk!!!! } - let share = crate::model::announce::ActiveModel { - internal: NotSet, - actor: Set(actor.internal), - object: Set(object_model.internal), - published: Set(published), - }; - - let expanded_addressing = ctx.expand_addressing(addressed, tx).await?; - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; - crate::model::announce::Entity::insert(share) - .exec(tx).await?; - crate::model::object::Entity::update_many() - .col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) - .filter(crate::model::object::Column::Internal.eq(object_model.internal)) - .exec(tx) - .await?; - - tracing::info!("{} shared {}", activity_model.actor, announced_id); + tracing::debug!("{} shared {}", actor.id, announced_id); Ok(()) }, }