From f6d30b3becab72cf95455dfbeae9418ab515c270 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 19:04:08 +0200 Subject: [PATCH] fix: outbound->delivery, local->outbound also stop discarding duplicated deliveries, ouchhhh --- main.rs | 4 +- upub/core/src/model/job.rs | 2 +- upub/core/src/traits/address.rs | 2 +- upub/routes/src/activitypub/user/outbox.rs | 2 +- upub/worker/src/delivery.rs | 63 +++++++++++ upub/worker/src/dispatcher.rs | 18 ++-- upub/worker/src/lib.rs | 2 +- upub/worker/src/local.rs | 71 ------------- upub/worker/src/outbound.rs | 118 +++++++++++---------- 9 files changed, 143 insertions(+), 139 deletions(-) create mode 100644 upub/worker/src/delivery.rs delete mode 100644 upub/worker/src/local.rs diff --git a/main.rs b/main.rs index 69df0d4..ca9d375 100644 --- a/main.rs +++ b/main.rs @@ -99,7 +99,7 @@ enum Mode { #[derive(Debug, Clone, clap::ValueEnum)] enum Filter { All, - Local, + Delivery, Inbound, Outbound, } @@ -188,7 +188,7 @@ impl From for Option { fn from(value: Filter) -> Self { match value { Filter::All => None, - Filter::Local => Some(upub::model::job::JobType::Local), + Filter::Delivery => Some(upub::model::job::JobType::Delivery), Filter::Inbound => Some(upub::model::job::JobType::Inbound), Filter::Outbound => Some(upub::model::job::JobType::Outbound), } diff --git a/upub/core/src/model/job.rs b/upub/core/src/model/job.rs index ff9d4cf..b46eecc 100644 --- a/upub/core/src/model/job.rs +++ b/upub/core/src/model/job.rs @@ -5,7 +5,7 @@ use sea_orm::entity::prelude::*; pub enum JobType { Inbound = 1, Outbound = 2, - Local = 3, + Delivery = 3, } #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index 853a578..a8357ed 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -88,7 +88,7 @@ impl Addresser for crate::Context { crate::model::job::ActiveModel { internal: sea_orm::ActiveValue::NotSet, actor: Set(from.to_string()), - job_type: Set(crate::model::job::JobType::Outbound), + job_type: Set(crate::model::job::JobType::Delivery), payload: Set(None), // TODO we should resolve each user by id and check its inbox because we can't assume // it's /actors/{id}/inbox for every software, but oh well it's waaaaay easier now diff --git a/upub/routes/src/activitypub/user/outbox.rs b/upub/routes/src/activitypub/user/outbox.rs index 2913953..5abb87d 100644 --- a/upub/routes/src/activitypub/user/outbox.rs +++ b/upub/routes/src/activitypub/user/outbox.rs @@ -56,7 +56,7 @@ pub async fn post( let job = model::job::ActiveModel { internal: NotSet, activity: Set(aid.clone()), - job_type: Set(model::job::JobType::Local), + job_type: Set(model::job::JobType::Outbound), actor: Set(uid.clone()), target: Set(None), published: Set(chrono::Utc::now()), diff --git a/upub/worker/src/delivery.rs b/upub/worker/src/delivery.rs new file mode 100644 index 0000000..b8a59d4 --- /dev/null +++ b/upub/worker/src/delivery.rs @@ -0,0 +1,63 @@ +use sea_orm::EntityTrait; +use reqwest::Method; + +use apb::{LD, Node, ActivityMut}; +use upub::{Context, model, traits::Fetcher}; + +pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { + tracing::info!("delivering {} to {:?}", job.activity, job.target); + + let payload = match model::activity::Entity::find_by_ap_id(&job.activity) + .find_also_related(model::object::Entity) + .one(ctx.db()) + .await? + { + Some((activity, None)) => activity.ap().ld_context(), + Some((activity, Some(object))) => { + let always_embed = matches!( + activity.activity_type, + apb::ActivityType::Create + | apb::ActivityType::Undo + | apb::ActivityType::Update + | apb::ActivityType::Accept(_) + | apb::ActivityType::Reject(_) + ); + if always_embed { + activity.ap().set_object(Node::object(object.ap())).ld_context() + } else { + activity.ap().ld_context() + } + }, + None => { + tracing::info!("skipping dispatch for deleted object {}", job.activity); + return Ok(()); + }, + }; + + let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor) + .one(ctx.db()) + .await? + else { + tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor); + return Ok(()); + }; + + let Some(key) = actor.private_key + else { + tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor); + return Ok(()); + }; + + if let Err(e) = Context::request( + Method::POST, job.target.as_deref().unwrap_or(""), + Some(&serde_json::to_string(&payload).unwrap()), + &job.actor, &key, ctx.domain() + ).await { + tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target); + model::job::Entity::insert(job.clone().repeat()) + .exec(ctx.db()) + .await?; + } + + Ok(()) +} diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index ae696db..90e9ef4 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -104,12 +104,16 @@ impl JobDispatcher for Context { restart!(now); } - if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity) - .one(self.db()) - .await - { - tracing::info!("dropping already processed job '{}'", job.activity); - restart!(now); + if job.job_type != model::job::JobType::Delivery { + // delivery jobs are all pre-processed activities + // inbound/outbound jobs carry side effects which should only happen once + if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity) + .one(self.db()) + .await + { + tracing::info!("dropping already processed job '{}'", job.activity); + restart!(now); + } } let _ctx = self.clone(); @@ -117,7 +121,7 @@ impl JobDispatcher for Context { let res = match job.job_type { model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await, model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await, - model::job::JobType::Local => crate::local::process(_ctx.clone(), &job).await, + model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await, }; if let Err(e) = res { diff --git a/upub/worker/src/lib.rs b/upub/worker/src/lib.rs index 7bffa97..0b2bbf9 100644 --- a/upub/worker/src/lib.rs +++ b/upub/worker/src/lib.rs @@ -1,7 +1,7 @@ pub mod dispatcher; pub mod inbound; pub mod outbound; -pub mod local; +pub mod delivery; pub use dispatcher::{JobError, JobResult}; diff --git a/upub/worker/src/local.rs b/upub/worker/src/local.rs deleted file mode 100644 index 6385901..0000000 --- a/upub/worker/src/local.rs +++ /dev/null @@ -1,71 +0,0 @@ -use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut}; -use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait}; -use upub::{model, traits::{Addresser, Processor}, Context}; - - -pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { - let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?; - let mut activity : serde_json::Value = serde_json::from_str(payload)?; - let mut t = activity.object_type()?; - let tx = ctx.db().begin().await?; - - if matches!(t, apb::ObjectType::Note) { - activity = apb::new() - .set_activity_type(Some(apb::ActivityType::Create)) - .set_object(apb::Node::object(activity)); - t = apb::ObjectType::Activity(apb::ActivityType::Create); - } - - activity = activity - .set_id(Some(&job.activity)) - .set_actor(apb::Node::link(job.actor.clone())) - .set_published(Some(chrono::Utc::now())); - - if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) { - let raw_oid = Context::new_id(); - let oid = ctx.oid(&raw_oid); - // object must be embedded, wont dereference here - let object = activity.object().extract().ok_or(apb::FieldErr("object"))?; - // TODO regex hell here i come... - let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern"); - let mut content = object.content().map(|x| x.to_string()).ok(); - if let Some(c) = content { - let mut tmp = mdhtml::safe_markdown(&c); - for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) { - if let Ok(Some(uid)) = model::actor::Entity::find() - .filter(model::actor::Column::PreferredUsername.eq(user)) - .filter(model::actor::Column::Domain.eq(domain)) - .select_only() - .select_column(model::actor::Column::Id) - .into_tuple::() - .one(&tx) - .await - { - tmp = tmp.replacen(full, &format!("@{user}"), 1); - } - } - content = Some(tmp); - } - - activity = activity - .set_object(apb::Node::object( - object - .set_id(Some(&oid)) - .set_content(content.as_deref()) - .set_attributed_to(apb::Node::link(job.actor.clone())) - .set_published(Some(chrono::Utc::now())) - .set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))), - )); - } - - // TODO we expand addressing twice, ugghhhhh - let targets = ctx.expand_addressing(activity.addressed()).await?; - - ctx.process(activity, &tx).await?; - - ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?; - - tx.commit().await?; - - Ok(()) -} diff --git a/upub/worker/src/outbound.rs b/upub/worker/src/outbound.rs index b8a59d4..19c2c93 100644 --- a/upub/worker/src/outbound.rs +++ b/upub/worker/src/outbound.rs @@ -1,63 +1,71 @@ -use sea_orm::EntityTrait; -use reqwest::Method; +use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait}; +use upub::{model, traits::{Addresser, Processor}, Context}; -use apb::{LD, Node, ActivityMut}; -use upub::{Context, model, traits::Fetcher}; pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { - tracing::info!("delivering {} to {:?}", job.activity, job.target); + let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?; + let mut activity : serde_json::Value = serde_json::from_str(payload)?; + let mut t = activity.object_type()?; + let tx = ctx.db().begin().await?; - let payload = match model::activity::Entity::find_by_ap_id(&job.activity) - .find_also_related(model::object::Entity) - .one(ctx.db()) - .await? - { - Some((activity, None)) => activity.ap().ld_context(), - Some((activity, Some(object))) => { - let always_embed = matches!( - activity.activity_type, - apb::ActivityType::Create - | apb::ActivityType::Undo - | apb::ActivityType::Update - | apb::ActivityType::Accept(_) - | apb::ActivityType::Reject(_) - ); - if always_embed { - activity.ap().set_object(Node::object(object.ap())).ld_context() - } else { - activity.ap().ld_context() - } - }, - None => { - tracing::info!("skipping dispatch for deleted object {}", job.activity); - return Ok(()); - }, - }; - - let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor) - .one(ctx.db()) - .await? - else { - tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor); - return Ok(()); - }; - - let Some(key) = actor.private_key - else { - tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor); - return Ok(()); - }; - - if let Err(e) = Context::request( - Method::POST, job.target.as_deref().unwrap_or(""), - Some(&serde_json::to_string(&payload).unwrap()), - &job.actor, &key, ctx.domain() - ).await { - tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target); - model::job::Entity::insert(job.clone().repeat()) - .exec(ctx.db()) - .await?; + if matches!(t, apb::ObjectType::Note) { + activity = apb::new() + .set_activity_type(Some(apb::ActivityType::Create)) + .set_object(apb::Node::object(activity)); + t = apb::ObjectType::Activity(apb::ActivityType::Create); } + activity = activity + .set_id(Some(&job.activity)) + .set_actor(apb::Node::link(job.actor.clone())) + .set_published(Some(chrono::Utc::now())); + + if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) { + let raw_oid = Context::new_id(); + let oid = ctx.oid(&raw_oid); + // object must be embedded, wont dereference here + let object = activity.object().extract().ok_or(apb::FieldErr("object"))?; + // TODO regex hell here i come... + let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern"); + let mut content = object.content().map(|x| x.to_string()).ok(); + if let Some(c) = content { + let mut tmp = mdhtml::safe_markdown(&c); + for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) { + if let Ok(Some(uid)) = model::actor::Entity::find() + .filter(model::actor::Column::PreferredUsername.eq(user)) + .filter(model::actor::Column::Domain.eq(domain)) + .select_only() + .select_column(model::actor::Column::Id) + .into_tuple::() + .one(&tx) + .await + { + tmp = tmp.replacen(full, &format!("@{user}"), 1); + } + } + content = Some(tmp); + } + + activity = activity + .set_object(apb::Node::object( + object + .set_id(Some(&oid)) + .set_content(content.as_deref()) + .set_attributed_to(apb::Node::link(job.actor.clone())) + .set_published(Some(chrono::Utc::now())) + .set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))), + )); + } + + // TODO we expand addressing twice, ugghhhhh + let targets = ctx.expand_addressing(activity.addressed(), &tx).await?; + + ctx.process(activity, &tx).await?; + + ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?; + + tx.commit().await?; + Ok(()) }