From 3a6e63244860056d5408574f96b41716922637b9 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 30 May 2024 23:58:22 +0200 Subject: [PATCH] fix: process announced activities too not super clean but should work. todo merge inbox/outbox and move common logic is side_effects, then this can be made nicer --- src/server/inbox.rs | 78 +++++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 5aaa5309..12f74b8c 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -1,9 +1,10 @@ use apb::{target::Addressed, Activity, Base, Object}; +use reqwest::StatusCode; use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}}; -use super::{fetcher::Fetcher, Context, side_effects::SideEffects}; +use super::{fetcher::{Fetcher, PullResult}, side_effects::SideEffects, Context}; #[axum::async_trait] @@ -261,34 +262,55 @@ impl apb::server::Inbox for Context { let actor = self.fetch_user(&uid).await?; let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?; - let activity_model = self.insert_activity(activity.clone(), Some(server)).await?; + + match self.pull(&announced_id).await? { + PullResult::Actor(_) => Err(UpubError::unprocessable()), + PullResult::Object(object) => { + let object_model = self.resolve_object(object).await?; + let activity_model = self.insert_activity(activity.clone(), Some(server.clone())).await?; - let announced = self.fetch_object(&announced_id).await?; - // relays send us activities as Announce, but we don't really want to count those towards the - // total shares count of an object, so just fetch the object and be done with it - if matches!(actor.actor_type, apb::ActorType::Person) { - tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); - return Ok(()) + // relays send us objects as Announce, but we don't really want to count those towards the + // total shares count of an object, so just fetch the object and be done with it + if !matches!(actor.actor_type, apb::ActorType::Person) { + tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); + return Ok(()) + } + + let share = model::announce::ActiveModel { + internal: NotSet, + actor: Set(internal_uid), + object: Set(object_model.internal), + published: Set(activity.published().unwrap_or(chrono::Utc::now())), + }; + + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + model::announce::Entity::insert(share) + .exec(self.db()).await?; + model::object::Entity::update_many() + .col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1)) + .filter(model::object::Column::Internal.eq(object_model.internal)) + .exec(self.db()) + .await?; + + tracing::info!("{} shared {}", activity_model.actor, announced_id); + Ok(()) + }, + PullResult::Activity(activity) => { + // groups update all members of other things that happen inside, process those + let server = Context::server(activity.id().unwrap_or_default()); + match activity.activity_type().ok_or_else(UpubError::bad_request)? { + apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(self.like(server, activity).await?), + apb::ActivityType::Create => Ok(self.create(server, activity).await?), + apb::ActivityType::Undo => Ok(self.undo(server, activity).await?), + apb::ActivityType::Delete => Ok(self.delete(server, activity).await?), + apb::ActivityType::Update => Ok(self.update(server, activity).await?), + x => { + tracing::warn!("ignoring unhandled announced activity of type {x:?}"); + Err(StatusCode::NOT_IMPLEMENTED.into()) + }, + } + }, } - - let share = model::announce::ActiveModel { - internal: NotSet, - actor: Set(internal_uid), - object: Set(announced.internal), - published: Set(activity.published().unwrap_or(chrono::Utc::now())), - }; - - let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; - model::announce::Entity::insert(share) - .exec(self.db()).await?; - model::object::Entity::update_many() - .col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1)) - .filter(model::object::Column::Internal.eq(announced.internal)) - .exec(self.db()) - .await?; - - tracing::info!("{} shared {}", activity_model.actor, announced.id); - Ok(()) } }