diff --git a/upub/core/src/selector/query.rs b/upub/core/src/selector/query.rs index d0d3707..501f23d 100644 --- a/upub/core/src/selector/query.rs +++ b/upub/core/src/selector/query.rs @@ -1,4 +1,4 @@ -use sea_orm::{sea_query::{IntoColumnRef, IntoCondition}, ColumnTrait, Condition, EntityName, EntityTrait, Iden, Iterable, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait, Select, SelectColumns}; +use sea_orm::{sea_query::{IntoColumnRef, IntoCondition}, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityName, EntityTrait, Iden, Insert, Iterable, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait, Select, SelectColumns}; use crate::model; pub struct Query; @@ -112,4 +112,16 @@ impl Query { select } + + pub fn notify(activity: i64, actor: i64) -> Insert { + model::notification::Entity::insert( + model::notification::ActiveModel { + internal: NotSet, + activity: Set(activity), + actor: Set(actor), + seen: Set(false), + published: Set(chrono::Utc::now()), + } + ) + } } diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 2e62822..78eb24a 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -71,9 +71,25 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat tracing::warn!("failed fetching replies for received object: {e}"); } } + + let notified = object_node.tag() + .flat() + .into_iter() + .filter_map(|x| Some(x.id().ok()?.to_string())) + .collect::>(); + let object_model = ctx.insert_object(object_node, tx).await?; - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + + for uid in notified { + if !ctx.is_local(&uid) { continue } + if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&uid, tx).await? { + crate::Query::notify(activity_model.internal, actor_internal) + .exec(tx) + .await?; + } + } tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or(""), object_model.id); Ok(()) @@ -103,9 +119,20 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab .exec(tx) .await?; - // likes without addressing are "silent likes", process them but dont store activity + // likes without addressing are "silent likes", process them but dont store activity or notify if !activity.addressed().is_empty() { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + + // TODO check that object author is in this like addressing!!! otherwise skip notification + if let Some(ref attributed_to) = obj.attributed_to { + if ctx.is_local(attributed_to) { + if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? { + crate::Query::notify(activity_model.internal, actor_internal) + .exec(tx) + .await?; + } + } + } } tracing::debug!("{} liked {}", actor.id, obj.id); @@ -134,7 +161,18 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da // dislikes without addressing are "silent dislikes", process them but dont store activity if !activity.addressed().is_empty() { - ctx.insert_activity(activity, tx).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + + // TODO check that object author is in this like addressing!!! otherwise skip notification + if let Some(ref attributed_to) = obj.attributed_to { + if ctx.is_local(attributed_to) { + if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? { + crate::Query::notify(activity_model.internal, actor_internal) + .exec(tx) + .await?; + } + } + } } tracing::debug!("{} disliked {}", actor.id, obj.id); @@ -149,6 +187,12 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?; + if ctx.is_local(&target_actor.id) { + crate::Query::notify(activity_model.internal, target_actor.internal) + .exec(tx) + .await?; + } + if let Some(relation) = crate::model::relation::Entity::find() .filter(crate::model::relation::Column::Follower.eq(source_actor.internal)) .filter(crate::model::relation::Column::Following.eq(target_actor.internal)) @@ -206,6 +250,14 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat let activity_model = ctx.insert_activity(activity, tx).await?; + if ctx.is_local(&follow_activity.actor) { + if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? { + crate::Query::notify(activity_model.internal, actor_internal) + .exec(tx) + .await?; + } + } + let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx) .await? .ok_or(ProcessorError::Incomplete)?; @@ -256,6 +308,17 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat let activity_model = ctx.insert_activity(activity, tx).await?; + // TODO most software doesn't show this, but i think instead we should?? if someone rejects it's + // better to know it clearly rather than not knowing if it got lost and maybe retry (being more + // annoying) + if ctx.is_local(&follow_activity.actor) { + if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? { + crate::Query::notify(activity_model.internal, actor_internal) + .exec(tx) + .await?; + } + } + crate::model::relation::Entity::delete_many() .filter(crate::model::relation::Column::Activity.eq(activity_model.internal)) .exec(tx) @@ -276,6 +339,7 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat if !activity.addressed().is_empty() { ctx.insert_activity(activity, tx).await?; } + // TODO we should delete notifications from CREATEs related to objects we deleted tracing::debug!("deleted '{oid}'"); Ok(()) } @@ -398,24 +462,38 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab _ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), } - // we should store undos to make local delete deliveries work and relations make sense + // TODO we should store undos to make local delete deliveries work and relations make sense // except when they have empty addressing // so that also remote "secret" undos dont get stored if !activity.addressed().is_empty() { ctx.insert_activity(activity, tx).await?; } + if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? { + crate::model::notification::Entity::delete_many() + .filter(crate::model::notification::Column::Activity.eq(internal)) + .exec(tx) + .await?; + } + Ok(()) } pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let announced_id = activity.object().id()?.to_string(); - match match ctx.find_internal(&announced_id).await? { + let object = match ctx.find_internal(&announced_id).await? { // if we already have this activity, skip it Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed // actors and objects which we already have - Some(x) => x, + Some(crate::context::Internal::Actor(_)) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), + // objects that we already have + Some(crate::context::Internal::Object(internal)) => { + crate::model::object::Entity::find_by_id(internal) + .one(tx) + .await? + .ok_or_else(|| sea_orm::DbErr::RecordNotFound(format!("object#{internal}")))? + }, // something new, fetch it! None => { match ctx.pull(&announced_id).await? { @@ -424,55 +502,58 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D // actors are not processable at all Pull::Actor(_) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), // objects are processed down below, make a mock Internal::Object(internal) - Pull::Object(x) => - crate::context::Internal::Object( - ctx.resolve_object(x, tx).await?.internal - ), + Pull::Object(x) => ctx.resolve_object(x, tx).await?, } } - } { - crate::context::Internal::Activity(_) => unreachable!(), - crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable(activity.id()?.to_string())), - crate::context::Internal::Object(internal) => { - let actor = ctx.fetch_user(activity.actor().id()?, tx).await?; + }; - // we only care about announces produced by "Person" actors, because there's intention - // anything shared by groups, services or applications is automated: fetch it and be done - if actor.actor_type == apb::ActorType::Person { - // if this user never "boosted" this object before, increase its counter - if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, internal) - .any(tx) - .await? - { - crate::model::object::Entity::update_many() - .col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) - .filter(crate::model::object::Column::Internal.eq(internal)) + let actor = ctx.fetch_user(activity.actor().id()?, tx).await?; + + // we only care about announces produced by "Person" actors, because there's intention + // anything shared by groups, services or applications is automated: fetch it and be done + if actor.actor_type == apb::ActorType::Person { + // if this user never "boosted" this object before, increase its counter + if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, object.internal) + .any(tx) + .await? + { + crate::model::object::Entity::update_many() + .col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) + .filter(crate::model::object::Column::Internal.eq(object.internal)) + .exec(tx) + .await?; + } + + let share = crate::model::announce::ActiveModel { + internal: NotSet, + actor: Set(actor.internal), + object: Set(object.internal), + published: Set(activity.published().unwrap_or(chrono::Utc::now())), + }; + + crate::model::announce::Entity::insert(share) + .exec(tx).await?; + } + + // TODO we should probably insert an activity, otherwise this won't appear on timelines!! + // or maybe go update all addressing records for this object, pushing them up + // or maybe create new addressing rows with more recent dates + // or maybe create fake objects that reference the original one + // idk!!!! + if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) { + let activity_model = ctx.insert_activity(activity, tx).await?; + + if let Some(ref attributed_to) = object.attributed_to { + if ctx.is_local(attributed_to) { + if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? { + crate::Query::notify(activity_model.internal, actor_internal) .exec(tx) .await?; } - - let share = crate::model::announce::ActiveModel { - internal: NotSet, - actor: Set(actor.internal), - object: Set(internal), - published: Set(activity.published().unwrap_or(chrono::Utc::now())), - }; - - crate::model::announce::Entity::insert(share) - .exec(tx).await?; } - - // TODO we should probably insert an activity, otherwise this won't appear on timelines!! - // or maybe go update all addressing records for this object, pushing them up - // or maybe create new addressing rows with more recent dates - // or maybe create fake objects that reference the original one - // idk!!!! - if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) { - ctx.insert_activity(activity, tx).await?; - } - - tracing::debug!("{} shared {}", actor.id, announced_id); - Ok(()) - }, + } } + + tracing::debug!("{} shared {}", actor.id, announced_id); + Ok(()) }