diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index 6e0e8886..2729cd3c 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -1,89 +1,27 @@ +use apb::target::Addressed; use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait}; use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { - async fn expand_addressing(&self, targets: Vec, tx: &impl ConnectionTrait) -> Result, DbErr>; - async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>; + async fn deliver(&self, to: Vec, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>; + async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; + async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr>; } #[async_trait::async_trait] impl Addresser for crate::Context { - async fn expand_addressing(&self, targets: Vec, tx: &impl ConnectionTrait) -> Result, DbErr> { - let mut out = Vec::new(); - for target in targets { - if target.ends_with("/followers") { - let target_id = target.replace("/followers", ""); - let mut followers = crate::model::relation::Entity::followers(&target_id, tx) - .await? - .unwrap_or_else(Vec::new); - if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO - followers.push(target_id); - } - for follower in followers { - out.push(follower); - } - } else { - out.push(target); - } - } - Ok(out) - } - - async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> { - // TODO address_to became kind of expensive, with these two selects right away and then another - // select for each target we're addressing to... can this be improved?? - let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; - let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await.unwrap_or(false) } else { false }; - let mut addressing = Vec::new(); - for target in targets - .iter() - .filter(|to| !to.is_empty()) - .filter(|to| !to.ends_with("/followers")) - .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to)) - { - let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { - match ( - crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), tx).await?, - crate::model::actor::Entity::ap_to_internal(target, tx).await?, - ) { - (Some(server), Some(actor)) => (Some(server), Some(actor)), - (None, _) => { tracing::error!("failed resolving domain of {target}"); continue; }, - (_, None) => { tracing::error!("failed resolving actor {target}"); continue; }, - } - }; - addressing.push( - crate::model::addressing::ActiveModel { - internal: NotSet, - instance: Set(server), - actor: Set(actor), - activity: Set(aid), - object: Set(oid), - published: Set(chrono::Utc::now()), - } - ); - } - - if !addressing.is_empty() { - crate::model::addressing::Entity::insert_many(addressing) - .exec(tx) - .await?; - } - - Ok(()) - } - - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> { + async fn deliver(&self, to: Vec, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr> { + let to = expand_addressing(to, tx).await?; let mut deliveries = Vec::new(); - for target in targets.iter() + for target in to.into_iter() .filter(|to| !to.is_empty()) .filter(|to| crate::Context::server(to) != self.domain()) - .filter(|to| to != &apb::target::PUBLIC) + .filter(|to| to != apb::target::PUBLIC) { // TODO fetch concurrently - match self.fetch_user(target, tx).await { + match self.fetch_user(&target, tx).await { Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( crate::model::job::ActiveModel { internal: sea_orm::ActiveValue::NotSet, @@ -115,4 +53,77 @@ impl Addresser for crate::Context { Ok(()) } + + async fn address_object(&self, object: &crate::model::object::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { + let to = expand_addressing(object.addressed(), tx).await?; + address_to(self, to, None, Some(object.internal), tx).await + } + + async fn address_activity(&self, activity: &crate::model::activity::Model, tx: &impl ConnectionTrait) -> Result<(), DbErr> { + let to = expand_addressing(activity.mentioning(), tx).await?; + address_to(self, to, Some(activity.internal), None, tx).await + } +} + +async fn address_to(ctx: &crate::Context, to: Vec, aid: Option, oid: Option, tx: &impl ConnectionTrait) -> Result<(), DbErr> { + // TODO address_to became kind of expensive, with these two selects right away and then another + // select for each target we're addressing to... can this be improved?? + let local_activity = if let Some(x) = aid { ctx.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; + let local_object = if let Some(x) = oid { ctx.is_local_internal_object(x).await.unwrap_or(false) } else { false }; + let mut addressing = Vec::new(); + for target in to.into_iter() + .filter(|to| !to.is_empty()) + .filter(|to| !to.ends_with("/followers")) + .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || ctx.is_local(to)) + { + let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { + match ( + crate::model::instance::Entity::domain_to_internal(&crate::Context::server(&target), tx).await?, + crate::model::actor::Entity::ap_to_internal(&target, tx).await?, + ) { + (Some(server), Some(actor)) => (Some(server), Some(actor)), + (None, _) => { tracing::error!("failed resolving domain of {target}"); continue; }, + (_, None) => { tracing::error!("failed resolving actor {target}"); continue; }, + } + }; + addressing.push( + crate::model::addressing::ActiveModel { + internal: NotSet, + instance: Set(server), + actor: Set(actor), + activity: Set(aid), + object: Set(oid), + published: Set(chrono::Utc::now()), + } + ); + } + + if !addressing.is_empty() { + crate::model::addressing::Entity::insert_many(addressing) + .exec(tx) + .await?; + } + + Ok(()) +} + +async fn expand_addressing(targets: Vec, tx: &impl ConnectionTrait) -> Result, DbErr> { + let mut out = Vec::new(); + for target in targets { + if target.ends_with("/followers") { + let target_id = target.replace("/followers", ""); + let mut followers = crate::model::relation::Entity::followers(&target_id, tx) + .await? + .unwrap_or_else(Vec::new); + if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO + followers.push(target_id); + } + for follower in followers { + out.push(follower); + } + } else { + out.push(target); + } + } + Ok(out) } diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index a9628a78..edeeefe4 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -1,12 +1,12 @@ use std::collections::BTreeMap; -use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; +use apb::{Activity, Actor, ActorMut, Base, Collection, Object}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet}; use crate::traits::normalize::AP; -use super::{Addresser, Normalizer}; +use super::Normalizer; use httpsign::HttpSignature; #[derive(Debug, Clone)] @@ -358,10 +358,6 @@ impl Fetcher for crate::Context { let activity_model = self.insert_activity(activity, tx).await?; - let addressed = activity_model.addressed(); - let expanded_addresses = self.expand_addressing(addressed, tx).await?; - self.address_to(Some(activity_model.internal), None, &expanded_addresses, tx).await?; - Ok(activity_model) } @@ -407,8 +403,6 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth } } - let addressed = object.addressed(); - if let Ok(reply) = object.in_reply_to().id() { if depth <= ctx.cfg().security.thread_crawl_depth { fetch_object_r(ctx, reply, depth + 1, tx).await?; @@ -419,9 +413,6 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth let object_model = ctx.insert_object(object, tx).await?; - let expanded_addresses = ctx.expand_addressing(addressed, tx).await?; - ctx.address_to(None, Some(object_model.internal), &expanded_addresses, tx).await?; - Ok(object_model) } diff --git a/upub/core/src/traits/normalize.rs b/upub/core/src/traits/normalize.rs index 57bd472a..35248630 100644 --- a/upub/core/src/traits/normalize.rs +++ b/upub/core/src/traits/normalize.rs @@ -1,6 +1,8 @@ use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; +use super::Addresser; + #[derive(Debug, thiserror::Error)] pub enum NormalizerError { #[error("normalized document misses required field: {0:?}")] @@ -23,13 +25,11 @@ pub trait Normalizer { impl Normalizer for crate::Context { async fn insert_object(&self, object: impl apb::Object, tx: &impl ConnectionTrait) -> Result { - let oid = object.id()?.to_string(); - let uid = object.attributed_to().id().str(); - let mut object_active_model = AP::object_q(&object)?; + let mut object_model = AP::object(&object)?; // make sure content only contains a safe subset of html - if let Ok(content) = object.content() { - object_active_model.content = Set(Some(mdhtml::safe_html(content))); + if let Some(content) = object_model.content { + object_model.content = Some(mdhtml::safe_html(&content)); } // fix context for remote posts @@ -38,19 +38,22 @@ impl Normalizer for crate::Context { // > some whole other way to do this?? im thinking but misskey aaaa!! TODO if let Ok(reply) = object.in_reply_to().id() { if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(tx).await? { - object_active_model.context = Set(o.context); + object_model.context = o.context; } else { - object_active_model.context = Set(None); // TODO to be filled by some other task + object_model.context = None; // TODO to be filled by some other task } } else { - object_active_model.context = Set(Some(oid.clone())); + object_model.context = Some(object_model.id.clone()); } + let mut object_active_model = object_model.clone().into_active_model(); + object_active_model.internal = NotSet; crate::model::object::Entity::insert(object_active_model).exec(tx).await?; - let object_model = crate::model::object::Entity::find_by_ap_id(&oid) - .one(tx) + object_model.internal = crate::model::object::Entity::ap_to_internal(&object_model.id, tx) .await? - .ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?; + .ok_or_else(|| DbErr::RecordNotFound(object_model.id.clone()))?; + + self.address_object(&object_model, tx).await?; // update replies counter if let Some(ref in_reply_to) = object_model.in_reply_to { @@ -61,10 +64,10 @@ impl Normalizer for crate::Context { .await?; } // update statuses counter - if let Some(object_author) = uid { + if let Some(ref object_author) = object_model.attributed_to { crate::model::actor::Entity::update_many() .col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1)) - .filter(crate::model::actor::Column::Id.eq(&object_author)) + .filter(crate::model::actor::Column::Id.eq(object_author)) .exec(tx) .await?; } @@ -137,6 +140,8 @@ impl Normalizer for crate::Context { .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; activity_model.internal = internal; + self.address_activity(&activity_model, tx).await?; + Ok(activity_model) } } diff --git a/upub/worker/src/outbound.rs b/upub/worker/src/outbound.rs index 19c2c93e..47d75461 100644 --- a/upub/worker/src/outbound.rs +++ b/upub/worker/src/outbound.rs @@ -58,12 +58,9 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult< )); } - // TODO we expand addressing twice, ugghhhhh - let targets = ctx.expand_addressing(activity.addressed(), &tx).await?; - + let targets = activity.addressed(); ctx.process(activity, &tx).await?; - - ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?; + ctx.deliver(targets, &job.activity, &job.actor, &tx).await?; tx.commit().await?;