chore: refactor addressing: expand inside methods

This commit is contained in:
əlemi 2024-06-07 23:14:49 +02:00
parent 827fb287db
commit 06fcf09a5f
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 104 additions and 100 deletions

View file

@ -1,89 +1,27 @@
use apb::target::Addressed;
use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait}; use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait};
use crate::traits::fetch::Fetcher; use crate::traits::fetch::Fetcher;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Addresser { pub trait Addresser {
async fn expand_addressing(&self, targets: Vec<String>, tx: &impl ConnectionTrait) -> Result<Vec<String>, DbErr>; async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Addresser for crate::Context { impl Addresser for crate::Context {
async fn expand_addressing(&self, targets: Vec<String>, tx: &impl ConnectionTrait) -> Result<Vec<String>, DbErr> { async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let mut out = Vec::new(); let to = expand_addressing(to, tx).await?;
for target in targets {
if target.ends_with("/followers") {
let target_id = target.replace("/followers", "");
let mut followers = crate::model::relation::Entity::followers(&target_id, tx)
.await?
.unwrap_or_else(Vec::new);
if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO
followers.push(target_id);
}
for follower in followers {
out.push(follower);
}
} else {
out.push(target);
}
}
Ok(out)
}
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> {
// TODO address_to became kind of expensive, with these two selects right away and then another
// select for each target we're addressing to... can this be improved??
let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false };
let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await.unwrap_or(false) } else { false };
let mut addressing = Vec::new();
for target in targets
.iter()
.filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to))
{
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
match (
crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), tx).await?,
crate::model::actor::Entity::ap_to_internal(target, tx).await?,
) {
(Some(server), Some(actor)) => (Some(server), Some(actor)),
(None, _) => { tracing::error!("failed resolving domain of {target}"); continue; },
(_, None) => { tracing::error!("failed resolving actor {target}"); continue; },
}
};
addressing.push(
crate::model::addressing::ActiveModel {
internal: NotSet,
instance: Set(server),
actor: Set(actor),
activity: Set(aid),
object: Set(oid),
published: Set(chrono::Utc::now()),
}
);
}
if !addressing.is_empty() {
crate::model::addressing::Entity::insert_many(addressing)
.exec(tx)
.await?;
}
Ok(())
}
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let mut deliveries = Vec::new(); let mut deliveries = Vec::new();
for target in targets.iter() for target in to.into_iter()
.filter(|to| !to.is_empty()) .filter(|to| !to.is_empty())
.filter(|to| crate::Context::server(to) != self.domain()) .filter(|to| crate::Context::server(to) != self.domain())
.filter(|to| to != &apb::target::PUBLIC) .filter(|to| to != apb::target::PUBLIC)
{ {
// TODO fetch concurrently // TODO fetch concurrently
match self.fetch_user(target, tx).await { match self.fetch_user(&target, tx).await {
Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
crate::model::job::ActiveModel { crate::model::job::ActiveModel {
internal: sea_orm::ActiveValue::NotSet, internal: sea_orm::ActiveValue::NotSet,
@ -115,4 +53,77 @@ impl Addresser for crate::Context {
Ok(()) Ok(())
} }
async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let to = expand_addressing(object.addressed(), tx).await?;
address_to(self, to, None, Some(object.internal), tx).await
}
async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let to = expand_addressing(activity.mentioning(), tx).await?;
address_to(self, to, Some(activity.internal), None, tx).await
}
}
async fn address_to(ctx: &crate::Context, to: Vec<String>, aid: Option<i64>, oid: Option<i64>, tx: &impl ConnectionTrait) -> Result<(), DbErr> {
// TODO address_to became kind of expensive, with these two selects right away and then another
// select for each target we're addressing to... can this be improved??
let local_activity = if let Some(x) = aid { ctx.is_local_internal_activity(x).await.unwrap_or(false) } else { false };
let local_object = if let Some(x) = oid { ctx.is_local_internal_object(x).await.unwrap_or(false) } else { false };
let mut addressing = Vec::new();
for target in to.into_iter()
.filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || ctx.is_local(to))
{
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
match (
crate::model::instance::Entity::domain_to_internal(&crate::Context::server(&target), tx).await?,
crate::model::actor::Entity::ap_to_internal(&target, tx).await?,
) {
(Some(server), Some(actor)) => (Some(server), Some(actor)),
(None, _) => { tracing::error!("failed resolving domain of {target}"); continue; },
(_, None) => { tracing::error!("failed resolving actor {target}"); continue; },
}
};
addressing.push(
crate::model::addressing::ActiveModel {
internal: NotSet,
instance: Set(server),
actor: Set(actor),
activity: Set(aid),
object: Set(oid),
published: Set(chrono::Utc::now()),
}
);
}
if !addressing.is_empty() {
crate::model::addressing::Entity::insert_many(addressing)
.exec(tx)
.await?;
}
Ok(())
}
async fn expand_addressing(targets: Vec<String>, tx: &impl ConnectionTrait) -> Result<Vec<String>, DbErr> {
let mut out = Vec::new();
for target in targets {
if target.ends_with("/followers") {
let target_id = target.replace("/followers", "");
let mut followers = crate::model::relation::Entity::followers(&target_id, tx)
.await?
.unwrap_or_else(Vec::new);
if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO
followers.push(target_id);
}
for follower in followers {
out.push(follower);
}
} else {
out.push(target);
}
}
Ok(out)
} }

View file

@ -1,12 +1,12 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; use apb::{Activity, Actor, ActorMut, Base, Collection, Object};
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet}; use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet};
use crate::traits::normalize::AP; use crate::traits::normalize::AP;
use super::{Addresser, Normalizer}; use super::Normalizer;
use httpsign::HttpSignature; use httpsign::HttpSignature;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -358,10 +358,6 @@ impl Fetcher for crate::Context {
let activity_model = self.insert_activity(activity, tx).await?; let activity_model = self.insert_activity(activity, tx).await?;
let addressed = activity_model.addressed();
let expanded_addresses = self.expand_addressing(addressed, tx).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addresses, tx).await?;
Ok(activity_model) Ok(activity_model)
} }
@ -407,8 +403,6 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
} }
} }
let addressed = object.addressed();
if let Ok(reply) = object.in_reply_to().id() { if let Ok(reply) = object.in_reply_to().id() {
if depth <= ctx.cfg().security.thread_crawl_depth { if depth <= ctx.cfg().security.thread_crawl_depth {
fetch_object_r(ctx, reply, depth + 1, tx).await?; fetch_object_r(ctx, reply, depth + 1, tx).await?;
@ -419,9 +413,6 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
let object_model = ctx.insert_object(object, tx).await?; let object_model = ctx.insert_object(object, tx).await?;
let expanded_addresses = ctx.expand_addressing(addressed, tx).await?;
ctx.address_to(None, Some(object_model.internal), &expanded_addresses, tx).await?;
Ok(object_model) Ok(object_model)
} }

View file

@ -1,6 +1,8 @@
use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey};
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter};
use super::Addresser;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum NormalizerError { pub enum NormalizerError {
#[error("normalized document misses required field: {0:?}")] #[error("normalized document misses required field: {0:?}")]
@ -23,13 +25,11 @@ pub trait Normalizer {
impl Normalizer for crate::Context { impl Normalizer for crate::Context {
async fn insert_object(&self, object: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError> { async fn insert_object(&self, object: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError> {
let oid = object.id()?.to_string(); let mut object_model = AP::object(&object)?;
let uid = object.attributed_to().id().str();
let mut object_active_model = AP::object_q(&object)?;
// make sure content only contains a safe subset of html // make sure content only contains a safe subset of html
if let Ok(content) = object.content() { if let Some(content) = object_model.content {
object_active_model.content = Set(Some(mdhtml::safe_html(content))); object_model.content = Some(mdhtml::safe_html(&content));
} }
// fix context for remote posts // fix context for remote posts
@ -38,19 +38,22 @@ impl Normalizer for crate::Context {
// > some whole other way to do this?? im thinking but misskey aaaa!! TODO // > some whole other way to do this?? im thinking but misskey aaaa!! TODO
if let Ok(reply) = object.in_reply_to().id() { if let Ok(reply) = object.in_reply_to().id() {
if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(tx).await? { if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(tx).await? {
object_active_model.context = Set(o.context); object_model.context = o.context;
} else { } else {
object_active_model.context = Set(None); // TODO to be filled by some other task object_model.context = None; // TODO to be filled by some other task
} }
} else { } else {
object_active_model.context = Set(Some(oid.clone())); object_model.context = Some(object_model.id.clone());
} }
let mut object_active_model = object_model.clone().into_active_model();
object_active_model.internal = NotSet;
crate::model::object::Entity::insert(object_active_model).exec(tx).await?; crate::model::object::Entity::insert(object_active_model).exec(tx).await?;
let object_model = crate::model::object::Entity::find_by_ap_id(&oid) object_model.internal = crate::model::object::Entity::ap_to_internal(&object_model.id, tx)
.one(tx)
.await? .await?
.ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?; .ok_or_else(|| DbErr::RecordNotFound(object_model.id.clone()))?;
self.address_object(&object_model, tx).await?;
// update replies counter // update replies counter
if let Some(ref in_reply_to) = object_model.in_reply_to { if let Some(ref in_reply_to) = object_model.in_reply_to {
@ -61,10 +64,10 @@ impl Normalizer for crate::Context {
.await?; .await?;
} }
// update statuses counter // update statuses counter
if let Some(object_author) = uid { if let Some(ref object_author) = object_model.attributed_to {
crate::model::actor::Entity::update_many() crate::model::actor::Entity::update_many()
.col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1)) .col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1))
.filter(crate::model::actor::Column::Id.eq(&object_author)) .filter(crate::model::actor::Column::Id.eq(object_author))
.exec(tx) .exec(tx)
.await?; .await?;
} }
@ -137,6 +140,8 @@ impl Normalizer for crate::Context {
.ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?;
activity_model.internal = internal; activity_model.internal = internal;
self.address_activity(&activity_model, tx).await?;
Ok(activity_model) Ok(activity_model)
} }
} }

View file

@ -58,12 +58,9 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
)); ));
} }
// TODO we expand addressing twice, ugghhhhh let targets = activity.addressed();
let targets = ctx.expand_addressing(activity.addressed(), &tx).await?;
ctx.process(activity, &tx).await?; ctx.process(activity, &tx).await?;
ctx.deliver(targets, &job.activity, &job.actor, &tx).await?;
ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?;
tx.commit().await?; tx.commit().await?;