Compare commits
No commits in common. "3698d1947dc8c7dd81955ddbd20727b1195d5e61" and "f3f176406e5a804ab38c0b8c9b16c8158760e0b0" have entirely different histories.
3698d1947d
...
f3f176406e
4 changed files with 80 additions and 153 deletions
|
@ -1,4 +1,4 @@
|
||||||
use sea_orm::{sea_query::{IntoColumnRef, IntoCondition}, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityName, EntityTrait, Iden, Insert, Iterable, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait, Select, SelectColumns};
|
use sea_orm::{sea_query::{IntoColumnRef, IntoCondition}, ColumnTrait, Condition, EntityName, EntityTrait, Iden, Iterable, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait, Select, SelectColumns};
|
||||||
use crate::model;
|
use crate::model;
|
||||||
|
|
||||||
pub struct Query;
|
pub struct Query;
|
||||||
|
@ -112,16 +112,4 @@ impl Query {
|
||||||
|
|
||||||
select
|
select
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn notify(activity: i64, actor: i64) -> Insert<model::notification::ActiveModel> {
|
|
||||||
model::notification::Entity::insert(
|
|
||||||
model::notification::ActiveModel {
|
|
||||||
internal: NotSet,
|
|
||||||
activity: Set(activity),
|
|
||||||
actor: Set(actor),
|
|
||||||
seen: Set(false),
|
|
||||||
published: Set(chrono::Utc::now()),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,19 +166,43 @@ impl Normalizer for crate::Context {
|
||||||
// since ActivityPub is a mess most software doesnt' really respect addressing, or care
|
// since ActivityPub is a mess most software doesnt' really respect addressing, or care
|
||||||
// about inserting it correctly.
|
// about inserting it correctly.
|
||||||
//
|
//
|
||||||
// * we can assume that Follow and Accept activities should *at least* be
|
// * we can assume that Follow activities should *at least* be
|
||||||
// addressed to their target, since how would anyone be able to accept it otherwise???
|
// addressed to their target, since how would anyone be able to accept it otherwise???
|
||||||
|
//
|
||||||
|
// * announces are messy because everything is in the `to` field. we should probably move
|
||||||
|
// followers to `cc` and keep just as#Public and the original poster in `to`, so that they
|
||||||
|
// get notified but everyone else sees the object again
|
||||||
|
//
|
||||||
|
// * misskey sends us mentions without including us in the `to` field! basically the Create
|
||||||
|
// will have a `tag` mention but it will not include the mentioned user in the `to` field
|
||||||
|
|
||||||
match activity_model.activity_type {
|
match activity_model.activity_type {
|
||||||
apb::ActivityType::Follow
|
apb::ActivityType::Follow => {
|
||||||
| apb::ActivityType::Accept(apb::AcceptType::Accept)
|
|
||||||
=> {
|
|
||||||
if let Some(ref target) = activity_model.object {
|
if let Some(ref target) = activity_model.object {
|
||||||
if !activity_model.to.0.contains(target) {
|
if !activity_model.to.0.contains(target) {
|
||||||
activity_model.to.0.push(target.clone());
|
activity_model.to.0.push(target.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
apb::ActivityType::Announce => {
|
||||||
|
for target in activity_model.to.0.iter() {
|
||||||
|
if target.ends_with("followers") {
|
||||||
|
activity_model.cc.0.push(target.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
activity_model.to.0.retain(|x| !x.ends_with("followers"));
|
||||||
|
},
|
||||||
|
apb::ActivityType::Create => {
|
||||||
|
if let Some(object) = activity.object().get() {
|
||||||
|
for tag in object.tag().flat() {
|
||||||
|
if let Node::Link(l) = tag {
|
||||||
|
if matches!(l.link_type(), Ok(apb::LinkType::Mention)) {
|
||||||
|
activity_model.to.0.push(l.href().to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,25 +71,9 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
|
||||||
tracing::warn!("failed fetching replies for received object: {e}");
|
tracing::warn!("failed fetching replies for received object: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let notified = object_node.tag()
|
|
||||||
.flat()
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|x| Some(x.id().ok()?.to_string()))
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
|
|
||||||
let object_model = ctx.insert_object(object_node, tx).await?;
|
let object_model = ctx.insert_object(object_node, tx).await?;
|
||||||
|
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
for uid in notified {
|
|
||||||
if !ctx.is_local(&uid) { continue }
|
|
||||||
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&uid, tx).await? {
|
|
||||||
crate::Query::notify(activity_model.internal, actor_internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or("<anonymous>"), object_model.id);
|
tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or("<anonymous>"), object_model.id);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -119,20 +103,9 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
|
||||||
.exec(tx)
|
.exec(tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// likes without addressing are "silent likes", process them but dont store activity or notify
|
// likes without addressing are "silent likes", process them but dont store activity
|
||||||
if !activity.addressed().is_empty() {
|
if !activity.addressed().is_empty() {
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
// TODO check that object author is in this like addressing!!! otherwise skip notification
|
|
||||||
if let Some(ref attributed_to) = obj.attributed_to {
|
|
||||||
if ctx.is_local(attributed_to) {
|
|
||||||
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
|
|
||||||
crate::Query::notify(activity_model.internal, actor_internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("{} liked {}", actor.id, obj.id);
|
tracing::debug!("{} liked {}", actor.id, obj.id);
|
||||||
|
@ -161,18 +134,7 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da
|
||||||
|
|
||||||
// dislikes without addressing are "silent dislikes", process them but dont store activity
|
// dislikes without addressing are "silent dislikes", process them but dont store activity
|
||||||
if !activity.addressed().is_empty() {
|
if !activity.addressed().is_empty() {
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
// TODO check that object author is in this like addressing!!! otherwise skip notification
|
|
||||||
if let Some(ref attributed_to) = obj.attributed_to {
|
|
||||||
if ctx.is_local(attributed_to) {
|
|
||||||
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
|
|
||||||
crate::Query::notify(activity_model.internal, actor_internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("{} disliked {}", actor.id, obj.id);
|
tracing::debug!("{} disliked {}", actor.id, obj.id);
|
||||||
|
@ -187,12 +149,6 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
|
||||||
let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?;
|
let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?;
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
if ctx.is_local(&target_actor.id) {
|
|
||||||
crate::Query::notify(activity_model.internal, target_actor.internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(relation) = crate::model::relation::Entity::find()
|
if let Some(relation) = crate::model::relation::Entity::find()
|
||||||
.filter(crate::model::relation::Column::Follower.eq(source_actor.internal))
|
.filter(crate::model::relation::Column::Follower.eq(source_actor.internal))
|
||||||
.filter(crate::model::relation::Column::Following.eq(target_actor.internal))
|
.filter(crate::model::relation::Column::Following.eq(target_actor.internal))
|
||||||
|
@ -250,14 +206,6 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
|
||||||
|
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
if ctx.is_local(&follow_activity.actor) {
|
|
||||||
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
|
|
||||||
crate::Query::notify(activity_model.internal, actor_internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx)
|
let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx)
|
||||||
.await?
|
.await?
|
||||||
.ok_or(ProcessorError::Incomplete)?;
|
.ok_or(ProcessorError::Incomplete)?;
|
||||||
|
@ -308,17 +256,6 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
|
||||||
|
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
// TODO most software doesn't show this, but i think instead we should?? if someone rejects it's
|
|
||||||
// better to know it clearly rather than not knowing if it got lost and maybe retry (being more
|
|
||||||
// annoying)
|
|
||||||
if ctx.is_local(&follow_activity.actor) {
|
|
||||||
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
|
|
||||||
crate::Query::notify(activity_model.internal, actor_internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
crate::model::relation::Entity::delete_many()
|
crate::model::relation::Entity::delete_many()
|
||||||
.filter(crate::model::relation::Column::Activity.eq(activity_model.internal))
|
.filter(crate::model::relation::Column::Activity.eq(activity_model.internal))
|
||||||
.exec(tx)
|
.exec(tx)
|
||||||
|
@ -339,7 +276,6 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
|
||||||
if !activity.addressed().is_empty() {
|
if !activity.addressed().is_empty() {
|
||||||
ctx.insert_activity(activity, tx).await?;
|
ctx.insert_activity(activity, tx).await?;
|
||||||
}
|
}
|
||||||
// TODO we should delete notifications from CREATEs related to objects we deleted
|
|
||||||
tracing::debug!("deleted '{oid}'");
|
tracing::debug!("deleted '{oid}'");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -462,38 +398,24 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
|
||||||
_ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
|
_ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO we should store undos to make local delete deliveries work and relations make sense
|
// we should store undos to make local delete deliveries work and relations make sense
|
||||||
// except when they have empty addressing
|
// except when they have empty addressing
|
||||||
// so that also remote "secret" undos dont get stored
|
// so that also remote "secret" undos dont get stored
|
||||||
if !activity.addressed().is_empty() {
|
if !activity.addressed().is_empty() {
|
||||||
ctx.insert_activity(activity, tx).await?;
|
ctx.insert_activity(activity, tx).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? {
|
|
||||||
crate::model::notification::Entity::delete_many()
|
|
||||||
.filter(crate::model::notification::Column::Activity.eq(internal))
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||||
let announced_id = activity.object().id()?.to_string();
|
let announced_id = activity.object().id()?.to_string();
|
||||||
|
|
||||||
let object = match ctx.find_internal(&announced_id).await? {
|
match match ctx.find_internal(&announced_id).await? {
|
||||||
// if we already have this activity, skip it
|
// if we already have this activity, skip it
|
||||||
Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed
|
Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed
|
||||||
// actors and objects which we already have
|
// actors and objects which we already have
|
||||||
Some(crate::context::Internal::Actor(_)) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
|
Some(x) => x,
|
||||||
// objects that we already have
|
|
||||||
Some(crate::context::Internal::Object(internal)) => {
|
|
||||||
crate::model::object::Entity::find_by_id(internal)
|
|
||||||
.one(tx)
|
|
||||||
.await?
|
|
||||||
.ok_or_else(|| sea_orm::DbErr::RecordNotFound(format!("object#{internal}")))?
|
|
||||||
},
|
|
||||||
// something new, fetch it!
|
// something new, fetch it!
|
||||||
None => {
|
None => {
|
||||||
match ctx.pull(&announced_id).await? {
|
match ctx.pull(&announced_id).await? {
|
||||||
|
@ -502,24 +424,29 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
|
||||||
// actors are not processable at all
|
// actors are not processable at all
|
||||||
Pull::Actor(_) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
|
Pull::Actor(_) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
|
||||||
// objects are processed down below, make a mock Internal::Object(internal)
|
// objects are processed down below, make a mock Internal::Object(internal)
|
||||||
Pull::Object(x) => ctx.resolve_object(x, tx).await?,
|
Pull::Object(x) =>
|
||||||
|
crate::context::Internal::Object(
|
||||||
|
ctx.resolve_object(x, tx).await?.internal
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
} {
|
||||||
|
crate::context::Internal::Activity(_) => unreachable!(),
|
||||||
|
crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
|
||||||
|
crate::context::Internal::Object(internal) => {
|
||||||
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
|
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
|
||||||
|
|
||||||
// we only care about announces produced by "Person" actors, because there's intention
|
// we only care about announces produced by "Person" actors, because there's intention
|
||||||
// anything shared by groups, services or applications is automated: fetch it and be done
|
// anything shared by groups, services or applications is automated: fetch it and be done
|
||||||
if actor.actor_type == apb::ActorType::Person {
|
if actor.actor_type == apb::ActorType::Person {
|
||||||
// if this user never "boosted" this object before, increase its counter
|
// if this user never "boosted" this object before, increase its counter
|
||||||
if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, object.internal)
|
if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, internal)
|
||||||
.any(tx)
|
.any(tx)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
crate::model::object::Entity::update_many()
|
crate::model::object::Entity::update_many()
|
||||||
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
|
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
|
||||||
.filter(crate::model::object::Column::Internal.eq(object.internal))
|
.filter(crate::model::object::Column::Internal.eq(internal))
|
||||||
.exec(tx)
|
.exec(tx)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
@ -527,7 +454,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
|
||||||
let share = crate::model::announce::ActiveModel {
|
let share = crate::model::announce::ActiveModel {
|
||||||
internal: NotSet,
|
internal: NotSet,
|
||||||
actor: Set(actor.internal),
|
actor: Set(actor.internal),
|
||||||
object: Set(object.internal),
|
object: Set(internal),
|
||||||
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
|
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -541,19 +468,11 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
|
||||||
// or maybe create fake objects that reference the original one
|
// or maybe create fake objects that reference the original one
|
||||||
// idk!!!!
|
// idk!!!!
|
||||||
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
|
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
|
||||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
ctx.insert_activity(activity, tx).await?;
|
||||||
|
|
||||||
if let Some(ref attributed_to) = object.attributed_to {
|
|
||||||
if ctx.is_local(attributed_to) {
|
|
||||||
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
|
|
||||||
crate::Query::notify(activity_model.internal, actor_internal)
|
|
||||||
.exec(tx)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("{} shared {}", actor.id, announced_id);
|
tracing::debug!("{} shared {}", actor.id, announced_id);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,10 +55,6 @@ impl MigrationTrait for Migration {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
manager
|
|
||||||
.create_index(Index::create().name("index-notifications-activity").table(Notifications::Table).col(Notifications::Activity).to_owned())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
manager
|
manager
|
||||||
.create_index(
|
.create_index(
|
||||||
Index::create()
|
Index::create()
|
||||||
|
|
Loading…
Add table
Reference in a new issue