Compare commits

...

3 commits

4 changed files with 152 additions and 79 deletions

View file

@ -1,4 +1,4 @@
use sea_orm::{sea_query::{IntoColumnRef, IntoCondition}, ColumnTrait, Condition, EntityName, EntityTrait, Iden, Iterable, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait, Select, SelectColumns}; use sea_orm::{sea_query::{IntoColumnRef, IntoCondition}, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityName, EntityTrait, Iden, Insert, Iterable, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait, Select, SelectColumns};
use crate::model; use crate::model;
pub struct Query; pub struct Query;
@ -112,4 +112,16 @@ impl Query {
select select
} }
pub fn notify(activity: i64, actor: i64) -> Insert<model::notification::ActiveModel> {
model::notification::Entity::insert(
model::notification::ActiveModel {
internal: NotSet,
activity: Set(activity),
actor: Set(actor),
seen: Set(false),
published: Set(chrono::Utc::now()),
}
)
}
} }

View file

@ -166,43 +166,19 @@ impl Normalizer for crate::Context {
// since ActivityPub is a mess most software doesnt' really respect addressing, or care // since ActivityPub is a mess most software doesnt' really respect addressing, or care
// about inserting it correctly. // about inserting it correctly.
// //
// * we can assume that Follow activities should *at least* be // * we can assume that Follow and Accept activities should *at least* be
// addressed to their target, since how would anyone be able to accept it otherwise??? // addressed to their target, since how would anyone be able to accept it otherwise???
//
// * announces are messy because everything is in the `to` field. we should probably move
// followers to `cc` and keep just as#Public and the original poster in `to`, so that they
// get notified but everyone else sees the object again
//
// * misskey sends us mentions without including us in the `to` field! basically the Create
// will have a `tag` mention but it will not include the mentioned user in the `to` field
match activity_model.activity_type { match activity_model.activity_type {
apb::ActivityType::Follow => { apb::ActivityType::Follow
| apb::ActivityType::Accept(apb::AcceptType::Accept)
=> {
if let Some(ref target) = activity_model.object { if let Some(ref target) = activity_model.object {
if !activity_model.to.0.contains(target) { if !activity_model.to.0.contains(target) {
activity_model.to.0.push(target.clone()); activity_model.to.0.push(target.clone());
} }
} }
}, },
apb::ActivityType::Announce => {
for target in activity_model.to.0.iter() {
if target.ends_with("followers") {
activity_model.cc.0.push(target.clone());
}
}
activity_model.to.0.retain(|x| !x.ends_with("followers"));
},
apb::ActivityType::Create => {
if let Some(object) = activity.object().get() {
for tag in object.tag().flat() {
if let Node::Link(l) = tag {
if matches!(l.link_type(), Ok(apb::LinkType::Mention)) {
activity_model.to.0.push(l.href().to_string());
}
}
}
}
},
_ => {}, _ => {},
} }

View file

@ -71,9 +71,25 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
tracing::warn!("failed fetching replies for received object: {e}"); tracing::warn!("failed fetching replies for received object: {e}");
} }
} }
let notified = object_node.tag()
.flat()
.into_iter()
.filter_map(|x| Some(x.id().ok()?.to_string()))
.collect::<Vec<String>>();
let object_model = ctx.insert_object(object_node, tx).await?; let object_model = ctx.insert_object(object_node, tx).await?;
ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
for uid in notified {
if !ctx.is_local(&uid) { continue }
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&uid, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
.exec(tx)
.await?;
}
}
tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or("<anonymous>"), object_model.id); tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or("<anonymous>"), object_model.id);
Ok(()) Ok(())
@ -103,9 +119,20 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
.exec(tx) .exec(tx)
.await?; .await?;
// likes without addressing are "silent likes", process them but dont store activity // likes without addressing are "silent likes", process them but dont store activity or notify
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
// TODO check that object author is in this like addressing!!! otherwise skip notification
if let Some(ref attributed_to) = obj.attributed_to {
if ctx.is_local(attributed_to) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
.exec(tx)
.await?;
}
}
}
} }
tracing::debug!("{} liked {}", actor.id, obj.id); tracing::debug!("{} liked {}", actor.id, obj.id);
@ -134,7 +161,18 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da
// dislikes without addressing are "silent dislikes", process them but dont store activity // dislikes without addressing are "silent dislikes", process them but dont store activity
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
// TODO check that object author is in this like addressing!!! otherwise skip notification
if let Some(ref attributed_to) = obj.attributed_to {
if ctx.is_local(attributed_to) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
.exec(tx)
.await?;
}
}
}
} }
tracing::debug!("{} disliked {}", actor.id, obj.id); tracing::debug!("{} disliked {}", actor.id, obj.id);
@ -149,6 +187,12 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?; let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
if ctx.is_local(&target_actor.id) {
crate::Query::notify(activity_model.internal, target_actor.internal)
.exec(tx)
.await?;
}
if let Some(relation) = crate::model::relation::Entity::find() if let Some(relation) = crate::model::relation::Entity::find()
.filter(crate::model::relation::Column::Follower.eq(source_actor.internal)) .filter(crate::model::relation::Column::Follower.eq(source_actor.internal))
.filter(crate::model::relation::Column::Following.eq(target_actor.internal)) .filter(crate::model::relation::Column::Following.eq(target_actor.internal))
@ -206,6 +250,14 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
if ctx.is_local(&follow_activity.actor) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
.exec(tx)
.await?;
}
}
let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx) let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx)
.await? .await?
.ok_or(ProcessorError::Incomplete)?; .ok_or(ProcessorError::Incomplete)?;
@ -256,6 +308,17 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
// TODO most software doesn't show this, but i think instead we should?? if someone rejects it's
// better to know it clearly rather than not knowing if it got lost and maybe retry (being more
// annoying)
if ctx.is_local(&follow_activity.actor) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
.exec(tx)
.await?;
}
}
crate::model::relation::Entity::delete_many() crate::model::relation::Entity::delete_many()
.filter(crate::model::relation::Column::Activity.eq(activity_model.internal)) .filter(crate::model::relation::Column::Activity.eq(activity_model.internal))
.exec(tx) .exec(tx)
@ -276,6 +339,7 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?; ctx.insert_activity(activity, tx).await?;
} }
// TODO we should delete notifications from CREATEs related to objects we deleted
tracing::debug!("deleted '{oid}'"); tracing::debug!("deleted '{oid}'");
Ok(()) Ok(())
} }
@ -398,24 +462,38 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
_ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), _ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
} }
// we should store undos to make local delete deliveries work and relations make sense // TODO we should store undos to make local delete deliveries work and relations make sense
// except when they have empty addressing // except when they have empty addressing
// so that also remote "secret" undos dont get stored // so that also remote "secret" undos dont get stored
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?; ctx.insert_activity(activity, tx).await?;
} }
if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? {
crate::model::notification::Entity::delete_many()
.filter(crate::model::notification::Column::Activity.eq(internal))
.exec(tx)
.await?;
}
Ok(()) Ok(())
} }
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let announced_id = activity.object().id()?.to_string(); let announced_id = activity.object().id()?.to_string();
match match ctx.find_internal(&announced_id).await? { let object = match ctx.find_internal(&announced_id).await? {
// if we already have this activity, skip it // if we already have this activity, skip it
Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed
// actors and objects which we already have // actors and objects which we already have
Some(x) => x, Some(crate::context::Internal::Actor(_)) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
// objects that we already have
Some(crate::context::Internal::Object(internal)) => {
crate::model::object::Entity::find_by_id(internal)
.one(tx)
.await?
.ok_or_else(|| sea_orm::DbErr::RecordNotFound(format!("object#{internal}")))?
},
// something new, fetch it! // something new, fetch it!
None => { None => {
match ctx.pull(&announced_id).await? { match ctx.pull(&announced_id).await? {
@ -424,55 +502,58 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
// actors are not processable at all // actors are not processable at all
Pull::Actor(_) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())), Pull::Actor(_) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
// objects are processed down below, make a mock Internal::Object(internal) // objects are processed down below, make a mock Internal::Object(internal)
Pull::Object(x) => Pull::Object(x) => ctx.resolve_object(x, tx).await?,
crate::context::Internal::Object(
ctx.resolve_object(x, tx).await?.internal
),
} }
} }
} { };
crate::context::Internal::Activity(_) => unreachable!(),
crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
crate::context::Internal::Object(internal) => {
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
// we only care about announces produced by "Person" actors, because there's intention let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
// anything shared by groups, services or applications is automated: fetch it and be done
if actor.actor_type == apb::ActorType::Person { // we only care about announces produced by "Person" actors, because there's intention
// if this user never "boosted" this object before, increase its counter // anything shared by groups, services or applications is automated: fetch it and be done
if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, internal) if actor.actor_type == apb::ActorType::Person {
.any(tx) // if this user never "boosted" this object before, increase its counter
.await? if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, object.internal)
{ .any(tx)
crate::model::object::Entity::update_many() .await?
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) {
.filter(crate::model::object::Column::Internal.eq(internal)) crate::model::object::Entity::update_many()
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
.filter(crate::model::object::Column::Internal.eq(object.internal))
.exec(tx)
.await?;
}
let share = crate::model::announce::ActiveModel {
internal: NotSet,
actor: Set(actor.internal),
object: Set(object.internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
crate::model::announce::Entity::insert(share)
.exec(tx).await?;
}
// TODO we should probably insert an activity, otherwise this won't appear on timelines!!
// or maybe go update all addressing records for this object, pushing them up
// or maybe create new addressing rows with more recent dates
// or maybe create fake objects that reference the original one
// idk!!!!
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
let activity_model = ctx.insert_activity(activity, tx).await?;
if let Some(ref attributed_to) = object.attributed_to {
if ctx.is_local(attributed_to) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
.exec(tx) .exec(tx)
.await?; .await?;
} }
let share = crate::model::announce::ActiveModel {
internal: NotSet,
actor: Set(actor.internal),
object: Set(internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
crate::model::announce::Entity::insert(share)
.exec(tx).await?;
} }
}
// TODO we should probably insert an activity, otherwise this won't appear on timelines!!
// or maybe go update all addressing records for this object, pushing them up
// or maybe create new addressing rows with more recent dates
// or maybe create fake objects that reference the original one
// idk!!!!
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
ctx.insert_activity(activity, tx).await?;
}
tracing::debug!("{} shared {}", actor.id, announced_id);
Ok(())
},
} }
tracing::debug!("{} shared {}", actor.id, announced_id);
Ok(())
} }

View file

@ -55,6 +55,10 @@ impl MigrationTrait for Migration {
) )
.await?; .await?;
manager
.create_index(Index::create().name("index-notifications-activity").table(Notifications::Table).col(Notifications::Activity).to_owned())
.await?;
manager manager
.create_index( .create_index(
Index::create() Index::create()