From 86ed372a54de63a5d75aa65ee5e54e47d2d42ad7 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 30 May 2024 19:52:12 +0200 Subject: [PATCH] chore: deduplicated side effects code --- src/server/inbox.rs | 62 +++++------------------------- src/server/mod.rs | 1 + src/server/outbox.rs | 59 +++++++--------------------- src/server/side_effects.rs | 78 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 99 deletions(-) create mode 100644 src/server/side_effects.rs diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 1689323f..ad7f186f 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Conditio use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}}; -use super::{fetcher::Fetcher, Context}; +use super::{fetcher::Fetcher, Context, side_effects::SideEffects}; #[axum::async_trait] @@ -44,19 +44,7 @@ impl apb::server::Inbox for Context { } let activity_model = self.insert_activity(activity, Some(server)).await?; - let like = model::like::ActiveModel { - internal: NotSet, - actor: Set(internal_uid), - object: Set(obj.internal), - activity: Set(activity_model.internal), - published: Set(activity_model.published), - }; - model::like::Entity::insert(like).exec(self.db()).await?; - model::object::Entity::update_many() - .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) - .filter(model::object::Column::Internal.eq(obj.internal)) - .exec(self.db()) - .await?; + self.process_like(internal_uid, obj.internal, activity_model.internal, activity_model.published).await?; let mut expanded_addressing = self.expand_addressing(activity_model.addressed()).await?; if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! expanded_addressing.push( @@ -249,10 +237,9 @@ impl apb::server::Inbox for Context { async fn undo(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; + let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; // TODO in theory we could work with just object_id but right now only accept embedded let undone_activity = activity.object().extract().ok_or_else(UpubError::bad_request)?; - let activity_type = undone_activity.activity_type().ok_or_else(UpubError::bad_request)?; - let undone_object_id = undone_activity.object().id().ok_or_else(UpubError::bad_request)?; let undone_activity_author = undone_activity.actor().id().ok_or_else(UpubError::bad_request)?; // can't undo activities from remote actors! @@ -260,50 +247,19 @@ impl apb::server::Inbox for Context { return Err(UpubError::forbidden()); }; - self.insert_activity(activity.clone(), Some(server)).await?; + let activity_model = self.insert_activity(activity.clone(), Some(server)).await?; - match activity_type { - apb::ActivityType::Like => { - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?; - model::like::Entity::delete_many() - .filter( - Condition::all() - .add(model::like::Column::Actor.eq(internal_uid)) - .add(model::like::Column::Object.eq(internal_oid)) - ) - .exec(self.db()) - .await?; - model::object::Entity::update_many() - .filter(model::object::Column::Internal.eq(internal_oid)) - .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1)) - .exec(self.db()) - .await?; - }, - apb::ActivityType::Follow => { - let undone_aid = undone_activity.id().ok_or_else(UpubError::bad_request)?; - let internal_aid = model::activity::Entity::ap_to_internal(undone_aid, self.db()).await?; - model::relation::Entity::delete_many() - .filter(model::relation::Column::Activity.eq(internal_aid)) - .exec(self.db()) - .await?; - model::actor::Entity::update_many() - .filter(model::actor::Column::Id.eq(&undone_object_id)) - .col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1)) - .exec(self.db()) - .await?; - }, - _ => { - tracing::error!("received 'Undo' for unimplemented activity: {}", serde_json::to_string_pretty(&activity).unwrap()); - return Err(StatusCode::NOT_IMPLEMENTED.into()); - }, - } + let targets = self.expand_addressing(activity.addressed()).await?; + self.process_undo(internal_uid, activity).await?; + + self.address_to(Some(activity_model.internal), None, &targets).await?; Ok(()) } async fn announce(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { let uid = activity.actor().id().ok_or_else(|| UpubError::field("actor"))?; + let actor = self.fetch_user(&uid).await?; let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?; let activity_model = self.insert_activity(activity.clone(), Some(server)).await?; diff --git a/src/server/mod.rs b/src/server/mod.rs index 457fa08b..1083e67f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -10,5 +10,6 @@ pub mod auth; pub mod builders; pub mod httpsign; pub mod normalizer; +pub mod side_effects; pub use context::Context; diff --git a/src/server/outbox.rs b/src/server/outbox.rs index e36993a4..d3b55cdb 100644 --- a/src/server/outbox.rs +++ b/src/server/outbox.rs @@ -2,9 +2,9 @@ use apb::{target::Addressed, Activity, ActivityMut, Base, BaseMut, Node, Object, use reqwest::StatusCode; use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns}; -use crate::{errors::UpubError, model}; +use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD}; -use super::{addresser::Addresser, builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, Context}; +use super::{addresser::Addresser, builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, side_effects::SideEffects, Context}; #[axum::async_trait] @@ -107,20 +107,7 @@ impl apb::server::Outbox for Context { Some(self.domain().to_string()), ).await?; - let like_model = model::like::ActiveModel { - internal: NotSet, - actor: Set(internal_uid), - object: Set(obj_model.internal), - activity: Set(activity_model.internal), - published: Set(chrono::Utc::now()), - }; - - model::like::Entity::insert(like_model).exec(self.db()).await?; - model::object::Entity::update_many() - .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) - .filter(model::object::Column::Internal.eq(obj_model.internal)) - .exec(self.db()) - .await?; + self.process_like(internal_uid, obj_model.internal, activity_model.internal, chrono::Utc::now()).await?; self.dispatch(&uid, activity_targets, &aid, None).await?; @@ -250,7 +237,7 @@ impl apb::server::Outbox for Context { async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result { let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); + let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?; let old_activity = model::activity::Entity::find_by_ap_id(&old_aid) .one(self.db()) @@ -259,40 +246,20 @@ impl apb::server::Outbox for Context { if old_activity.actor != uid { return Err(UpubError::forbidden()); } - let activity_object = old_activity.object.ok_or_else(UpubError::bad_request)?; - let actor_internal = model::actor::Entity::ap_to_internal(&old_activity.actor, self.db()).await?; - let activity_model = model::activity::ActiveModel::new( - &activity + let activity_model = self.insert_activity( + activity.clone() .set_id(Some(&aid)) .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()) - .await?; + .set_published(Some(chrono::Utc::now())), + Some(self.domain().to_string()) + ).await?; - match old_activity.activity_type { - apb::ActivityType::Like => { - let object_internal = model::object::Entity::ap_to_internal(&activity_object, self.db()).await?; - model::like::Entity::delete_many() - .filter(model::like::Column::Actor.eq(actor_internal)) - .filter(model::like::Column::Object.eq(object_internal)) - .exec(self.db()) - .await?; - }, - apb::ActivityType::Follow => { - let target_internal = model::actor::Entity::ap_to_internal(&activity_object, self.db()).await?; - model::relation::Entity::delete_many() - .filter(model::relation::Column::Follower.eq(actor_internal)) - .filter(model::relation::Column::Following.eq(target_internal)) - .exec(self.db()) - .await?; - }, - t => tracing::error!("extra side effects for activity {t:?} not implemented"), - } + let targets = self.expand_addressing(activity.addressed()).await?; + self.process_undo(internal_uid, activity).await?; - self.dispatch(&uid, activity_targets, &aid, None).await?; + self.address_to(Some(activity_model.internal), None, &targets).await?; + self.deliver_to(&activity_model.id, &uid, &targets).await?; Ok(aid) } diff --git a/src/server/side_effects.rs b/src/server/side_effects.rs new file mode 100644 index 00000000..57e0d260 --- /dev/null +++ b/src/server/side_effects.rs @@ -0,0 +1,78 @@ +use reqwest::StatusCode; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter}; + +use crate::{errors::UpubError, model}; + +#[axum::async_trait] +pub trait SideEffects { + async fn process_like(&self, who: i64, what: i64, with: i64, when: chrono::DateTime) -> crate::Result<()>; + async fn process_undo(&self, who: i64, activity: impl apb::Activity) -> crate::Result<()>; +} + +#[axum::async_trait] +impl SideEffects for super::Context { + async fn process_like(&self, who: i64, what: i64, with: i64, when: chrono::DateTime) -> crate::Result<()> { + let like = model::like::ActiveModel { + internal: NotSet, + actor: Set(who), + object: Set(what), + activity: Set(with), + published: Set(when), + }; + model::like::Entity::insert(like).exec(self.db()).await?; + model::object::Entity::update_many() + .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) + .filter(model::object::Column::Internal.eq(what)) + .exec(self.db()) + .await?; + + Ok(()) + } + + async fn process_undo(&self, who: i64, activity: impl apb::Activity) -> crate::Result<()> { + let undone_object_id = activity.object().id().ok_or_else(UpubError::bad_request)?; + match activity.activity_type() { + Some(apb::ActivityType::Like) => { + let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?; + model::like::Entity::delete_many() + .filter( + Condition::all() + .add(model::like::Column::Actor.eq(who)) + .add(model::like::Column::Object.eq(internal_oid)) + ) + .exec(self.db()) + .await?; + model::object::Entity::update_many() + .filter(model::object::Column::Internal.eq(internal_oid)) + .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1)) + .exec(self.db()) + .await?; + }, + Some(apb::ActivityType::Follow) => { + let undone_aid = activity.object().id().ok_or_else(UpubError::bad_request)?; + let internal_aid = model::activity::Entity::ap_to_internal(&undone_aid, self.db()).await?; + model::relation::Entity::delete_many() + .filter(model::relation::Column::Activity.eq(internal_aid)) + .exec(self.db()) + .await?; + model::actor::Entity::update_many() + .filter(model::actor::Column::Internal.eq(who)) + .col_expr(model::actor::Column::FollowingCount, Expr::col(model::actor::Column::FollowingCount).sub(1)) + .exec(self.db()) + .await?; + model::actor::Entity::update_many() + .filter(model::actor::Column::Id.eq(&undone_object_id)) + .col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1)) + .exec(self.db()) + .await?; + }, + t => { + tracing::error!("received 'Undo' for unimplemented activity type: {t:?}"); + return Err(StatusCode::NOT_IMPLEMENTED.into()); + }, + } + + + Ok(()) + } +}