1
0
Fork 0
forked from alemi/upub

fix: process announced activities too

not super clean but should work. todo merge inbox/outbox and move common
logic is side_effects, then this can be made nicer
This commit is contained in:
əlemi 2024-05-30 23:58:22 +02:00
parent d8b53c7c93
commit 3a6e632448
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -1,9 +1,10 @@
use apb::{target::Addressed, Activity, Base, Object}; use apb::{target::Addressed, Activity, Base, Object};
use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}}; use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}};
use super::{fetcher::Fetcher, Context, side_effects::SideEffects}; use super::{fetcher::{Fetcher, PullResult}, side_effects::SideEffects, Context};
#[axum::async_trait] #[axum::async_trait]
@ -261,34 +262,55 @@ impl apb::server::Inbox for Context {
let actor = self.fetch_user(&uid).await?; let actor = self.fetch_user(&uid).await?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?; let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?;
let activity_model = self.insert_activity(activity.clone(), Some(server)).await?;
let announced = self.fetch_object(&announced_id).await?; match self.pull(&announced_id).await? {
// relays send us activities as Announce, but we don't really want to count those towards the PullResult::Actor(_) => Err(UpubError::unprocessable()),
// total shares count of an object, so just fetch the object and be done with it PullResult::Object(object) => {
if matches!(actor.actor_type, apb::ActorType::Person) { let object_model = self.resolve_object(object).await?;
tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); let activity_model = self.insert_activity(activity.clone(), Some(server.clone())).await?;
return Ok(())
// relays send us objects as Announce, but we don't really want to count those towards the
// total shares count of an object, so just fetch the object and be done with it
if !matches!(actor.actor_type, apb::ActorType::Person) {
tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id);
return Ok(())
}
let share = model::announce::ActiveModel {
internal: NotSet,
actor: Set(internal_uid),
object: Set(object_model.internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
model::announce::Entity::insert(share)
.exec(self.db()).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
.filter(model::object::Column::Internal.eq(object_model.internal))
.exec(self.db())
.await?;
tracing::info!("{} shared {}", activity_model.actor, announced_id);
Ok(())
},
PullResult::Activity(activity) => {
// groups update all members of other things that happen inside, process those
let server = Context::server(activity.id().unwrap_or_default());
match activity.activity_type().ok_or_else(UpubError::bad_request)? {
apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(self.like(server, activity).await?),
apb::ActivityType::Create => Ok(self.create(server, activity).await?),
apb::ActivityType::Undo => Ok(self.undo(server, activity).await?),
apb::ActivityType::Delete => Ok(self.delete(server, activity).await?),
apb::ActivityType::Update => Ok(self.update(server, activity).await?),
x => {
tracing::warn!("ignoring unhandled announced activity of type {x:?}");
Err(StatusCode::NOT_IMPLEMENTED.into())
},
}
},
} }
let share = model::announce::ActiveModel {
internal: NotSet,
actor: Set(internal_uid),
object: Set(announced.internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
model::announce::Entity::insert(share)
.exec(self.db()).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
.filter(model::object::Column::Internal.eq(announced.internal))
.exec(self.db())
.await?;
tracing::info!("{} shared {}", activity_model.actor, announced.id);
Ok(())
} }
} }