1
0
Fork 0
forked from alemi/upub

chore: deduplicated side effects code

This commit is contained in:
əlemi 2024-05-30 19:52:12 +02:00
parent a3921622cb
commit 86ed372a54
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 101 additions and 99 deletions

View file

@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Conditio
use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}}; use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}};
use super::{fetcher::Fetcher, Context}; use super::{fetcher::Fetcher, Context, side_effects::SideEffects};
#[axum::async_trait] #[axum::async_trait]
@ -44,19 +44,7 @@ impl apb::server::Inbox for Context {
} }
let activity_model = self.insert_activity(activity, Some(server)).await?; let activity_model = self.insert_activity(activity, Some(server)).await?;
let like = model::like::ActiveModel { self.process_like(internal_uid, obj.internal, activity_model.internal, activity_model.published).await?;
internal: NotSet,
actor: Set(internal_uid),
object: Set(obj.internal),
activity: Set(activity_model.internal),
published: Set(activity_model.published),
};
model::like::Entity::insert(like).exec(self.db()).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Internal.eq(obj.internal))
.exec(self.db())
.await?;
let mut expanded_addressing = self.expand_addressing(activity_model.addressed()).await?; let mut expanded_addressing = self.expand_addressing(activity_model.addressed()).await?;
if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!!
expanded_addressing.push( expanded_addressing.push(
@ -249,10 +237,9 @@ impl apb::server::Inbox for Context {
async fn undo(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { async fn undo(&self, server: String, activity: serde_json::Value) -> crate::Result<()> {
let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
// TODO in theory we could work with just object_id but right now only accept embedded // TODO in theory we could work with just object_id but right now only accept embedded
let undone_activity = activity.object().extract().ok_or_else(UpubError::bad_request)?; let undone_activity = activity.object().extract().ok_or_else(UpubError::bad_request)?;
let activity_type = undone_activity.activity_type().ok_or_else(UpubError::bad_request)?;
let undone_object_id = undone_activity.object().id().ok_or_else(UpubError::bad_request)?;
let undone_activity_author = undone_activity.actor().id().ok_or_else(UpubError::bad_request)?; let undone_activity_author = undone_activity.actor().id().ok_or_else(UpubError::bad_request)?;
// can't undo activities from remote actors! // can't undo activities from remote actors!
@ -260,50 +247,19 @@ impl apb::server::Inbox for Context {
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
}; };
self.insert_activity(activity.clone(), Some(server)).await?; let activity_model = self.insert_activity(activity.clone(), Some(server)).await?;
match activity_type { let targets = self.expand_addressing(activity.addressed()).await?;
apb::ActivityType::Like => { self.process_undo(internal_uid, activity).await?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?; self.address_to(Some(activity_model.internal), None, &targets).await?;
model::like::Entity::delete_many()
.filter(
Condition::all()
.add(model::like::Column::Actor.eq(internal_uid))
.add(model::like::Column::Object.eq(internal_oid))
)
.exec(self.db())
.await?;
model::object::Entity::update_many()
.filter(model::object::Column::Internal.eq(internal_oid))
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1))
.exec(self.db())
.await?;
},
apb::ActivityType::Follow => {
let undone_aid = undone_activity.id().ok_or_else(UpubError::bad_request)?;
let internal_aid = model::activity::Entity::ap_to_internal(undone_aid, self.db()).await?;
model::relation::Entity::delete_many()
.filter(model::relation::Column::Activity.eq(internal_aid))
.exec(self.db())
.await?;
model::actor::Entity::update_many()
.filter(model::actor::Column::Id.eq(&undone_object_id))
.col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1))
.exec(self.db())
.await?;
},
_ => {
tracing::error!("received 'Undo' for unimplemented activity: {}", serde_json::to_string_pretty(&activity).unwrap());
return Err(StatusCode::NOT_IMPLEMENTED.into());
},
}
Ok(()) Ok(())
} }
async fn announce(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { async fn announce(&self, server: String, activity: serde_json::Value) -> crate::Result<()> {
let uid = activity.actor().id().ok_or_else(|| UpubError::field("actor"))?; let uid = activity.actor().id().ok_or_else(|| UpubError::field("actor"))?;
let actor = self.fetch_user(&uid).await?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?; let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?;
let activity_model = self.insert_activity(activity.clone(), Some(server)).await?; let activity_model = self.insert_activity(activity.clone(), Some(server)).await?;

View file

@ -10,5 +10,6 @@ pub mod auth;
pub mod builders; pub mod builders;
pub mod httpsign; pub mod httpsign;
pub mod normalizer; pub mod normalizer;
pub mod side_effects;
pub use context::Context; pub use context::Context;

View file

@ -2,9 +2,9 @@ use apb::{target::Addressed, Activity, ActivityMut, Base, BaseMut, Node, Object,
use reqwest::StatusCode; use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns};
use crate::{errors::UpubError, model}; use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD};
use super::{addresser::Addresser, builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, Context}; use super::{addresser::Addresser, builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, side_effects::SideEffects, Context};
#[axum::async_trait] #[axum::async_trait]
@ -107,20 +107,7 @@ impl apb::server::Outbox for Context {
Some(self.domain().to_string()), Some(self.domain().to_string()),
).await?; ).await?;
let like_model = model::like::ActiveModel { self.process_like(internal_uid, obj_model.internal, activity_model.internal, chrono::Utc::now()).await?;
internal: NotSet,
actor: Set(internal_uid),
object: Set(obj_model.internal),
activity: Set(activity_model.internal),
published: Set(chrono::Utc::now()),
};
model::like::Entity::insert(like_model).exec(self.db()).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Internal.eq(obj_model.internal))
.exec(self.db())
.await?;
self.dispatch(&uid, activity_targets, &aid, None).await?; self.dispatch(&uid, activity_targets, &aid, None).await?;
@ -250,7 +237,7 @@ impl apb::server::Outbox for Context {
async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> { async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed(); let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?; let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?;
let old_activity = model::activity::Entity::find_by_ap_id(&old_aid) let old_activity = model::activity::Entity::find_by_ap_id(&old_aid)
.one(self.db()) .one(self.db())
@ -259,40 +246,20 @@ impl apb::server::Outbox for Context {
if old_activity.actor != uid { if old_activity.actor != uid {
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
} }
let activity_object = old_activity.object.ok_or_else(UpubError::bad_request)?;
let actor_internal = model::actor::Entity::ap_to_internal(&old_activity.actor, self.db()).await?;
let activity_model = model::activity::ActiveModel::new( let activity_model = self.insert_activity(
&activity activity.clone()
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now())) .set_published(Some(chrono::Utc::now())),
)?; Some(self.domain().to_string())
model::activity::Entity::insert(activity_model.into_active_model()) ).await?;
.exec(self.db())
.await?;
match old_activity.activity_type { let targets = self.expand_addressing(activity.addressed()).await?;
apb::ActivityType::Like => { self.process_undo(internal_uid, activity).await?;
let object_internal = model::object::Entity::ap_to_internal(&activity_object, self.db()).await?;
model::like::Entity::delete_many()
.filter(model::like::Column::Actor.eq(actor_internal))
.filter(model::like::Column::Object.eq(object_internal))
.exec(self.db())
.await?;
},
apb::ActivityType::Follow => {
let target_internal = model::actor::Entity::ap_to_internal(&activity_object, self.db()).await?;
model::relation::Entity::delete_many()
.filter(model::relation::Column::Follower.eq(actor_internal))
.filter(model::relation::Column::Following.eq(target_internal))
.exec(self.db())
.await?;
},
t => tracing::error!("extra side effects for activity {t:?} not implemented"),
}
self.dispatch(&uid, activity_targets, &aid, None).await?; self.address_to(Some(activity_model.internal), None, &targets).await?;
self.deliver_to(&activity_model.id, &uid, &targets).await?;
Ok(aid) Ok(aid)
} }

View file

@ -0,0 +1,78 @@
use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter};
use crate::{errors::UpubError, model};
#[axum::async_trait]
pub trait SideEffects {
async fn process_like(&self, who: i64, what: i64, with: i64, when: chrono::DateTime<chrono::Utc>) -> crate::Result<()>;
async fn process_undo(&self, who: i64, activity: impl apb::Activity) -> crate::Result<()>;
}
#[axum::async_trait]
impl SideEffects for super::Context {
async fn process_like(&self, who: i64, what: i64, with: i64, when: chrono::DateTime<chrono::Utc>) -> crate::Result<()> {
let like = model::like::ActiveModel {
internal: NotSet,
actor: Set(who),
object: Set(what),
activity: Set(with),
published: Set(when),
};
model::like::Entity::insert(like).exec(self.db()).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Internal.eq(what))
.exec(self.db())
.await?;
Ok(())
}
async fn process_undo(&self, who: i64, activity: impl apb::Activity) -> crate::Result<()> {
let undone_object_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
match activity.activity_type() {
Some(apb::ActivityType::Like) => {
let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?;
model::like::Entity::delete_many()
.filter(
Condition::all()
.add(model::like::Column::Actor.eq(who))
.add(model::like::Column::Object.eq(internal_oid))
)
.exec(self.db())
.await?;
model::object::Entity::update_many()
.filter(model::object::Column::Internal.eq(internal_oid))
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1))
.exec(self.db())
.await?;
},
Some(apb::ActivityType::Follow) => {
let undone_aid = activity.object().id().ok_or_else(UpubError::bad_request)?;
let internal_aid = model::activity::Entity::ap_to_internal(&undone_aid, self.db()).await?;
model::relation::Entity::delete_many()
.filter(model::relation::Column::Activity.eq(internal_aid))
.exec(self.db())
.await?;
model::actor::Entity::update_many()
.filter(model::actor::Column::Internal.eq(who))
.col_expr(model::actor::Column::FollowingCount, Expr::col(model::actor::Column::FollowingCount).sub(1))
.exec(self.db())
.await?;
model::actor::Entity::update_many()
.filter(model::actor::Column::Id.eq(&undone_object_id))
.col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1))
.exec(self.db())
.await?;
},
t => {
tracing::error!("received 'Undo' for unimplemented activity type: {t:?}");
return Err(StatusCode::NOT_IMPLEMENTED.into());
},
}
Ok(())
}
}