feat: dont store activities unless necessary

This commit is contained in:
əlemi 2024-06-07 23:16:51 +02:00
parent 87d0d7b6d2
commit a53c93c1c5
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 101 additions and 136 deletions

View file

@ -43,3 +43,9 @@ impl Related<super::object::Entity> for Entity {
}
impl ActiveModelBehavior for ActiveModel {}
impl Entity {
pub fn find_by_uid_oid(uid: i64, oid: i64) -> Select<Entity> {
Entity::find().filter(Column::Actor.eq(uid)).filter(Column::Object.eq(oid))
}
}

View file

@ -1,6 +1,6 @@
use apb::{target::Addressed, Activity, Base, Object};
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}};
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter};
use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Fetcher, Normalizer}};
#[derive(Debug, thiserror::Error)]
pub enum ProcessorError {
@ -60,33 +60,30 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
tracing::error!("refusing to process activity without embedded object");
return Err(ProcessorError::Unprocessable(activity.id()?.to_string()));
};
let oid = object_node.id()?.to_string();
let addressed = object_node.addressed();
let activity_model = ctx.insert_activity(activity, tx).await?;
let internal_oid = if let Some(internal) = model::object::Entity::ap_to_internal(&oid, tx).await? {
tracing::debug!("skipping insertion of already known object #{internal}");
internal
} else {
if model::object::Entity::ap_to_internal(object_node.id()?, tx).await?.is_some() {
return Err(ProcessorError::AlreadyProcessed);
}
if object_node.attributed_to().id()? != activity.actor().id()? {
return Err(ProcessorError::Unauthorized);
}
if let Ok(reply) = object_node.in_reply_to().id() {
if let Err(e) = ctx.fetch_object(reply, tx).await {
tracing::warn!("failed fetching replies for received object: {e}");
}
}
let object_model = ctx.insert_object(object_node, tx).await?;
object_model.internal
};
let expanded_addressing = ctx.expand_addressing(addressed, tx).await?;
ctx.address_to(Some(activity_model.internal), Some(internal_oid), &expanded_addressing, tx).await?;
tracing::info!("{} posted {}", activity_model.actor, oid);
// only likes mentioning local users are stored to generate notifications, everything else
// produces side effects but no activity, and thus no notification
if activity.mentioning().iter().any(|x| ctx.is_local(x)) {
ctx.insert_activity(activity, tx).await?;
}
tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or("<anonymous>"), object_model.id);
Ok(())
}
pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let uid = activity.actor().id()?.to_string();
let actor = ctx.fetch_user(&uid, tx).await?;
let object_uri = activity.object().id()?.to_string();
let published = activity.published().unwrap_or_else(|_|chrono::Utc::now());
let obj = ctx.fetch_object(&object_uri, tx).await?;
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
let obj = ctx.fetch_object(activity.object().id()?, tx).await?;
if crate::model::like::Entity::find_by_uid_oid(actor.internal, obj.internal)
.any(tx)
.await?
@ -94,15 +91,13 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
return Err(ProcessorError::AlreadyProcessed);
}
let activity_model = ctx.insert_activity(activity, tx).await?;
let like = crate::model::like::ActiveModel {
internal: NotSet,
actor: Set(actor.internal),
object: Set(obj.internal),
activity: Set(activity_model.internal),
published: Set(published),
published: Set(activity.published().unwrap_or_else(|_|chrono::Utc::now())),
};
crate::model::like::Entity::insert(like).exec(tx).await?;
crate::model::object::Entity::update_many()
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1))
@ -110,30 +105,21 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
.exec(tx)
.await?;
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?;
if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!!
expanded_addressing.push(
crate::model::object::Entity::find_by_id(obj.internal)
.select_only()
.select_column(crate::model::object::Column::AttributedTo)
.into_tuple::<String>()
.one(tx)
.await?
.ok_or(ProcessorError::Incomplete)?
);
// only likes mentioning local users are stored to generate notifications, everything else
// produces side effects but no activity, and thus no notification
if activity.mentioning().iter().any(|x| ctx.is_local(x)) {
ctx.insert_activity(activity, tx).await?;
}
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
tracing::info!("{} liked {}", uid, obj.id);
tracing::debug!("{} liked {}", actor.id, obj.id);
Ok(())
}
pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let source_actor = activity.actor().id()?.to_string();
let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, tx)
let source_actor_internal = crate::model::actor::Entity::ap_to_internal(activity.actor().id()?, tx)
.await?
.ok_or(ProcessorError::Incomplete)?;
let target_actor = activity.object().id()?.to_string();
let usr = ctx.fetch_user(&target_actor, tx).await?;
let usr = ctx.fetch_user(activity.object().id()?, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?;
let relation_model = crate::model::relation::ActiveModel {
internal: NotSet,
@ -144,30 +130,28 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
};
crate::model::relation::Entity::insert(relation_model)
.exec(tx).await?;
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?;
if !expanded_addressing.contains(&target_actor) {
expanded_addressing.push(target_actor);
}
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
tracing::info!("{} wants to follow {}", source_actor, usr.id);
tracing::info!("{} wants to follow {}", activity_model.actor, usr.id);
Ok(())
}
pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
// TODO what about TentativeAccept
let target_actor = activity.actor().id()?.to_string();
let follow_request_id = activity.object().id()?.to_string();
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
let follow_activity = crate::model::activity::Entity::find_by_ap_id(activity.object().id()?)
.one(tx)
.await?
.ok_or(ProcessorError::Incomplete)?;
if follow_activity.object.unwrap_or_default() != target_actor {
if follow_activity.object.unwrap_or_default() != activity.actor().id()? {
return Err(ProcessorError::Unauthorized);
}
let activity_model = ctx.insert_activity(activity, tx).await?;
crate::model::relation::Entity::update_many()
.col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
.filter(crate::model::relation::Column::Activity.eq(follow_activity.internal))
.exec(tx).await?;
crate::model::actor::Entity::update_many()
.col_expr(
crate::model::actor::Column::FollowingCount,
@ -185,31 +169,19 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
.exec(tx)
.await?;
crate::model::relation::Entity::update_many()
.col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
.filter(crate::model::relation::Column::Activity.eq(follow_activity.internal))
.exec(tx).await?;
tracing::debug!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor);
tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor);
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?;
if !expanded_addressing.contains(&follow_activity.actor) {
expanded_addressing.push(follow_activity.actor);
}
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
Ok(())
}
pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
// TODO what about TentativeReject?
let uid = activity.actor().id()?.to_string();
let follow_request_id = activity.object().id()?.to_string();
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
let follow_activity = crate::model::activity::Entity::find_by_ap_id(activity.object().id()?)
.one(tx)
.await?
.ok_or(ProcessorError::Incomplete)?;
if follow_activity.object.unwrap_or_default() != uid {
if follow_activity.object.unwrap_or_default() != activity.actor().id()? {
return Err(ProcessorError::Unauthorized);
}
@ -220,14 +192,8 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
.exec(tx)
.await?;
tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor);
tracing::debug!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor);
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?;
if !expanded_addressing.contains(&follow_activity.actor) {
expanded_addressing.push(follow_activity.actor);
}
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
Ok(())
}
@ -239,16 +205,13 @@ pub async fn delete(_ctx: &crate::Context, activity: impl apb::Activity, tx: &Da
Ok(())
}
pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let uid = activity.actor().id()?.to_string();
pub async fn update(_ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let Some(object_node) = activity.object().extract() else {
tracing::error!("refusing to process activity without embedded object");
return Err(ProcessorError::Unprocessable(activity.id()?.to_string()));
};
let oid = object_node.id()?.to_string();
let activity_model = ctx.insert_activity(activity, tx).await?;
match object_node.object_type()? {
apb::ObjectType::Actor(_) => {
let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, tx)
@ -272,38 +235,34 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
.exec(tx)
.await?;
},
t => tracing::warn!("no side effects implemented for update type {t:?}"),
_ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
}
tracing::info!("{} updated {}", uid, oid);
let expanded_addressing = ctx.expand_addressing(activity_model.addressed(), tx).await?;
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
tracing::debug!("{} updated {}", activity.actor().id()?, oid);
Ok(())
}
pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let uid = activity.actor().id()?.to_string();
pub async fn undo(_ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
// TODO in theory we could work with just object_id but right now only accept embedded
let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?;
let undone_activity_author = undone_activity.as_activity()?.actor().id()?.to_string();
if uid != undone_activity_author {
return Err(ProcessorError::Unauthorized);
}
let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string();
let undone_activity = activity.object()
.extract()
.ok_or(apb::FieldErr("object"))?;
let uid = activity.actor().id()?.to_string();
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx)
.await?
.ok_or(ProcessorError::Incomplete)?;
let targets = ctx.expand_addressing(activity.addressed(), tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address_to(Some(activity_model.internal), None, &targets, tx).await?;
if uid != undone_activity.as_activity()?.actor().id()? {
return Err(ProcessorError::Unauthorized);
}
match undone_activity.as_activity()?.activity_type()? {
apb::ActivityType::Like => {
let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, tx)
let internal_oid = crate::model::object::Entity::ap_to_internal(
undone_activity.as_activity()?.object().id()?,
tx
)
.await?
.ok_or(ProcessorError::Incomplete)?;
crate::model::like::Entity::delete_many()
@ -321,7 +280,10 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
.await?;
},
apb::ActivityType::Follow => {
let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, tx)
let internal_uid_following = crate::model::actor::Entity::ap_to_internal(
undone_activity.as_activity()?.object().id()?,
tx,
)
.await?
.ok_or(ProcessorError::Incomplete)?;
crate::model::relation::Entity::delete_many()
@ -340,21 +302,14 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
.exec(tx)
.await?;
},
t => {
tracing::error!("received 'Undo' activity '{}' for unimplemented activity type: {t:?}", activity_model.id);
return Err(ProcessorError::Unprocessable(activity_model.id));
},
_ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
}
Ok(())
}
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let uid = activity.actor().id()?.to_string();
let actor = ctx.fetch_user(&uid, tx).await?;
let announced_id = activity.object().id()?.to_string();
let published = activity.published().unwrap_or(chrono::Utc::now());
let addressed = activity.addressed();
match match ctx.find_internal(&announced_id).await? {
// if we already have this activity, skip it
@ -379,37 +334,41 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
crate::context::Internal::Activity(_) => Err(ProcessorError::AlreadyProcessed), // ???
crate::context::Internal::Object(internal) => {
let object_model = model::object::Entity::find_by_id(internal)
.one(tx)
.await?
.ok_or_else(|| sea_orm::DbErr::RecordNotFound(internal.to_string()))?;
let activity_model = ctx.insert_activity(activity, tx).await?;
// relays send us objects as Announce, but we don't really want to count those towards the
// total shares count of an object, so just fetch the object and be done with it
if !matches!(actor.actor_type, apb::ActorType::Person) {
tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id);
return Ok(())
}
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
// we only care about "organic" announces, as in those produced by people
// anything shared by groups, services or applications is just mirroring: fetch it and be done
if actor.actor_type == apb::ActorType::Person {
let share = crate::model::announce::ActiveModel {
internal: NotSet,
actor: Set(actor.internal),
object: Set(object_model.internal),
published: Set(published),
object: Set(internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
let expanded_addressing = ctx.expand_addressing(addressed, tx).await?;
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
crate::model::announce::Entity::insert(share)
.exec(tx).await?;
// if this user never "boosted" this object before, increase its counter
if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, internal)
.any(tx)
.await?
{
crate::model::object::Entity::update_many()
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
.filter(crate::model::object::Column::Internal.eq(object_model.internal))
.filter(crate::model::object::Column::Internal.eq(internal))
.exec(tx)
.await?;
}
tracing::info!("{} shared {}", activity_model.actor, announced_id);
// TODO we should probably insert an activity, otherwise this won't appear on timelines!!
// or maybe go update all addressing records for this object, pushing them up
// or maybe create new addressing rows with more recent dates
// or maybe create fake objects that reference the original one
// idk!!!!
}
tracing::debug!("{} shared {}", actor.id, announced_id);
Ok(())
},
}