diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 94cb40be..3b197474 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,5 +1,5 @@ use apb::{target::Addressed, Activity, Base, Object}; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] @@ -121,15 +121,37 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat .ok_or(ProcessorError::Incomplete)?; let usr = ctx.fetch_user(activity.object().id()?, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; - let relation_model = crate::model::relation::ActiveModel { - internal: NotSet, - accept: Set(None), - activity: Set(activity_model.internal), - follower: Set(source_actor_internal), - following: Set(usr.internal), - }; - crate::model::relation::Entity::insert(relation_model) - .exec(tx).await?; + + if let Some(relation) = crate::model::relation::Entity::find() + .filter(crate::model::relation::Column::Follower.eq(source_actor_internal)) + .filter(crate::model::relation::Column::Following.eq(usr.internal)) + .select_only() + .select_column(crate::model::relation::Column::Internal) + .into_tuple::() + .one(tx) + .await? + { + // already requested, update pending row + crate::model::relation::Entity::update_many() + .col_expr(crate::model::relation::Column::Activity, Expr::value(Some(activity_model.internal))) + .filter(crate::model::relation::Column::Internal.eq(relation)) + .exec(tx) + .await?; + + } else { + + // new follow request, make new row + let relation_model = crate::model::relation::ActiveModel { + internal: NotSet, + accept: Set(None), + activity: Set(activity_model.internal), + follower: Set(source_actor_internal), + following: Set(usr.internal), + }; + crate::model::relation::Entity::insert(relation_model) + .exec(tx).await?; + } + tracing::info!("{} wants to follow {}", activity_model.actor, usr.id); Ok(()) } @@ -147,9 +169,18 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat let activity_model = ctx.insert_activity(activity, tx).await?; + let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx) + .await? + .ok_or(ProcessorError::Incomplete)?; + let following = crate::model::actor::Entity::ap_to_internal(&activity_model.actor, tx) + .await? + .ok_or(ProcessorError::Incomplete)?; + crate::model::relation::Entity::update_many() .col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) - .filter(crate::model::relation::Column::Activity.eq(follow_activity.internal)) + .col_expr(crate::model::relation::Column::Activity, Expr::value(Some(follow_activity.internal))) + .filter(crate::model::relation::Column::Follower.eq(follower)) + .filter(crate::model::relation::Column::Following.eq(following)) .exec(tx).await?; crate::model::actor::Entity::update_many() @@ -157,7 +188,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat crate::model::actor::Column::FollowingCount, Expr::col(crate::model::actor::Column::FollowingCount).add(1) ) - .filter(crate::model::actor::Column::Id.eq(&follow_activity.actor)) + .filter(crate::model::actor::Column::Internal.eq(follower)) .exec(tx) .await?; crate::model::actor::Entity::update_many() @@ -165,7 +196,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat crate::model::actor::Column::FollowersCount, Expr::col(crate::model::actor::Column::FollowersCount).add(1) ) - .filter(crate::model::actor::Column::Id.eq(&follow_activity.actor)) + .filter(crate::model::actor::Column::Internal.eq(following)) .exec(tx) .await?;