fix: merge Create addressing

basically bring Addresser out of Normalizer again and address manually
everywhere so that in Create we can join the row. in the future we may
be able to resolve contexts more wisely and thus be able to insert
merged rows for likes and announces too, but for now this should be
quite enough
This commit is contained in:
əlemi 2024-06-28 02:50:46 +02:00
parent 75fce425ad
commit 54e6b517e2
Signed by: alemi
GPG key ID: A4895B84D311642C
5 changed files with 51 additions and 28 deletions

View file

@ -1,5 +1,5 @@
use sea_orm::{EntityTrait, TransactionTrait}; use sea_orm::{EntityTrait, TransactionTrait};
use upub::traits::{fetch::{Fetchable, PullError}, Normalizer}; use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer};
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> {
use apb::Base; use apb::Base;
@ -16,15 +16,17 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), Pu
let tx = ctx.db().begin().await?; let tx = ctx.db().begin().await?;
match obj.base_type() { match obj.base_type() {
Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => {
upub::model::actor::Entity::insert( upub::model::actor::Entity::insert(upub::AP::actor_q(&obj, None)?)
upub::AP::actor_q(&obj, None).unwrap() .exec(&tx)
).exec(&tx).await.unwrap(); .await?;
}, },
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
ctx.insert_activity(obj, &tx).await.unwrap(); let act = ctx.insert_activity(obj, &tx).await?;
ctx.address((Some(&act), None), &tx).await?;
}, },
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => { Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
ctx.insert_object(obj, &tx).await.unwrap(); let obj = ctx.insert_object(obj, &tx).await?;
ctx.address((None, Some(&obj)), &tx).await?;
}, },
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),

View file

@ -1,3 +1,5 @@
use std::collections::BTreeSet;
use apb::target::Addressed; use apb::target::Addressed;
use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait, QuerySelect, SelectColumns}; use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait, QuerySelect, SelectColumns};
@ -6,8 +8,7 @@ use crate::traits::fetch::Fetcher;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Addresser { pub trait Addresser {
async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn address(&self, (activity, object): (Option<&crate::model::activity::Model>, Option<&crate::model::object::Model>), tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -54,14 +55,29 @@ impl Addresser for crate::Context {
Ok(()) Ok(())
} }
async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { async fn address(&self, (activity, object): (Option<&crate::model::activity::Model>, Option<&crate::model::object::Model>), tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let to = expand_addressing(object.addressed(), tx).await?; match (activity, object) {
address_to(self, to, None, Some(object.internal), self.is_local(&object.id), tx).await (None, None) => Ok(()),
} (Some(activity), None) => {
async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let to = expand_addressing(activity.addressed(), tx).await?; let to = expand_addressing(activity.addressed(), tx).await?;
address_to(self, to, Some(activity.internal), None, self.is_local(&activity.id), tx).await address_to(self, to, Some(activity.internal), None, self.is_local(&activity.id), tx).await
},
(None, Some(object)) => {
let to = expand_addressing(object.addressed(), tx).await?;
address_to(self, to, None, Some(object.internal), self.is_local(&object.id), tx).await
},
(Some(activity), Some(object)) => {
let to_activity = BTreeSet::from_iter(expand_addressing(activity.addressed(), tx).await?);
let to_object = BTreeSet::from_iter(expand_addressing(object.addressed(), tx).await?);
let to_common = to_activity.intersection(&to_object).cloned().collect();
address_to(self, to_common, Some(activity.internal), Some(object.internal), self.is_local(&activity.id), tx).await?;
let to_only_activity = (&to_activity - &to_object).into_iter().collect();
address_to(self, to_only_activity, Some(activity.internal), None, self.is_local(&activity.id), tx).await?;
let to_only_object = (&to_object - &to_activity).into_iter().collect();
address_to(self, to_only_object, None, Some(object.internal), self.is_local(&activity.id), tx).await?;
Ok(())
},
}
} }
} }

View file

@ -6,7 +6,7 @@ use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet};
use crate::traits::normalize::AP; use crate::traits::normalize::AP;
use super::Normalizer; use super::{Addresser, Normalizer};
use httpsign::HttpSignature; use httpsign::HttpSignature;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -357,6 +357,7 @@ impl Fetcher for crate::Context {
} }
let activity_model = self.insert_activity(activity, tx).await?; let activity_model = self.insert_activity(activity, tx).await?;
self.address((Some(&activity_model), None), tx).await?;
Ok(activity_model) Ok(activity_model)
} }
@ -412,6 +413,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
} }
let object_model = ctx.insert_object(object, tx).await?; let object_model = ctx.insert_object(object, tx).await?;
ctx.address((None, Some(&object_model)), tx).await?;
Ok(object_model) Ok(object_model)
} }

View file

@ -1,8 +1,6 @@
use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey};
use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Unchanged, NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Unchanged, NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter};
use super::Addresser;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum NormalizerError { pub enum NormalizerError {
#[error("normalized document misses required field: {0:?}")] #[error("normalized document misses required field: {0:?}")]
@ -53,8 +51,6 @@ impl Normalizer for crate::Context {
.await? .await?
.ok_or_else(|| DbErr::RecordNotFound(object_model.id.clone()))?; .ok_or_else(|| DbErr::RecordNotFound(object_model.id.clone()))?;
self.address_object(&object_model, tx).await?;
// update replies counter // update replies counter
if let Some(ref in_reply_to) = object_model.in_reply_to { if let Some(ref in_reply_to) = object_model.in_reply_to {
crate::model::object::Entity::update_many() crate::model::object::Entity::update_many()
@ -193,8 +189,6 @@ impl Normalizer for crate::Context {
.ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?;
activity_model.internal = internal; activity_model.internal = internal;
self.address_activity(&activity_model, tx).await?;
Ok(activity_model) Ok(activity_model)
} }
} }

View file

@ -1,6 +1,6 @@
use apb::{target::Addressed, Activity, Base, Object}; use apb::{target::Addressed, Activity, Base, Object};
use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Fetcher, Normalizer}}; use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ProcessorError { pub enum ProcessorError {
@ -79,8 +79,8 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let object_model = ctx.insert_object(object_node, tx).await?; let object_model = ctx.insert_object(object_node, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), Some(&object_model)), tx).await?;
for uid in notified { for uid in notified {
if !ctx.is_local(&uid) { continue } if !ctx.is_local(&uid) { continue }
@ -122,6 +122,7 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
// likes without addressing are "silent likes", process them but dont store activity or notify // likes without addressing are "silent likes", process them but dont store activity or notify
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
// TODO check that object author is in this like addressing!!! otherwise skip notification // TODO check that object author is in this like addressing!!! otherwise skip notification
if let Some(ref attributed_to) = obj.attributed_to { if let Some(ref attributed_to) = obj.attributed_to {
@ -162,6 +163,7 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da
// dislikes without addressing are "silent dislikes", process them but dont store activity // dislikes without addressing are "silent dislikes", process them but dont store activity
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
// TODO check that object author is in this like addressing!!! otherwise skip notification // TODO check that object author is in this like addressing!!! otherwise skip notification
if let Some(ref attributed_to) = obj.attributed_to { if let Some(ref attributed_to) = obj.attributed_to {
@ -186,6 +188,7 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
.ok_or(ProcessorError::Incomplete)?; .ok_or(ProcessorError::Incomplete)?;
let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?; let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
if ctx.is_local(&target_actor.id) { if ctx.is_local(&target_actor.id) {
crate::Query::notify(activity_model.internal, target_actor.internal) crate::Query::notify(activity_model.internal, target_actor.internal)
@ -249,6 +252,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
} }
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
if ctx.is_local(&follow_activity.actor) { if ctx.is_local(&follow_activity.actor) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? { if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
@ -307,6 +311,7 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
} }
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
// TODO most software doesn't show this, but i think instead we should?? if someone rejects it's // TODO most software doesn't show this, but i think instead we should?? if someone rejects it's
// better to know it clearly rather than not knowing if it got lost and maybe retry (being more // better to know it clearly rather than not knowing if it got lost and maybe retry (being more
@ -337,7 +342,8 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
// except when they have empty addressing // except when they have empty addressing
// so that also remote "secret" deletes dont get stored // so that also remote "secret" deletes dont get stored
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
} }
// TODO we should delete notifications from CREATEs related to objects we deleted // TODO we should delete notifications from CREATEs related to objects we deleted
tracing::debug!("deleted '{oid}'"); tracing::debug!("deleted '{oid}'");
@ -380,7 +386,8 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
// updates can be silently discarded except if local. we dont really care about knowing when // updates can be silently discarded except if local. we dont really care about knowing when
// remote documents change, there's the "updated" field, just want the most recent version // remote documents change, there's the "updated" field, just want the most recent version
if ctx.is_local(&actor_id) { if ctx.is_local(&actor_id) {
ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
} }
tracing::debug!("{} updated {}", actor_id, oid); tracing::debug!("{} updated {}", actor_id, oid);
@ -466,7 +473,8 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
// except when they have empty addressing // except when they have empty addressing
// so that also remote "secret" undos dont get stored // so that also remote "secret" undos dont get stored
if !activity.addressed().is_empty() { if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
} }
if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? { if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? {
@ -542,6 +550,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
// idk!!!! // idk!!!!
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) { if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
ctx.address((Some(&activity_model), None), tx).await?;
if let Some(ref attributed_to) = object.attributed_to { if let Some(ref attributed_to) = object.attributed_to {
if ctx.is_local(attributed_to) { if ctx.is_local(attributed_to) {