diff --git a/upub/worker/src/delivery.rs b/upub/worker/src/delivery.rs index b8a59d45..5745d9d0 100644 --- a/upub/worker/src/delivery.rs +++ b/upub/worker/src/delivery.rs @@ -7,33 +7,44 @@ use upub::{Context, model, traits::Fetcher}; pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { tracing::info!("delivering {} to {:?}", job.activity, job.target); - let payload = match model::activity::Entity::find_by_ap_id(&job.activity) - .find_also_related(model::object::Entity) + let Some(activity) = model::activity::Entity::find_by_ap_id(&job.activity) .one(ctx.db()) .await? - { - Some((activity, None)) => activity.ap().ld_context(), - Some((activity, Some(object))) => { - let always_embed = matches!( - activity.activity_type, - apb::ActivityType::Create - | apb::ActivityType::Undo - | apb::ActivityType::Update - | apb::ActivityType::Accept(_) - | apb::ActivityType::Reject(_) - ); - if always_embed { - activity.ap().set_object(Node::object(object.ap())).ld_context() - } else { - activity.ap().ld_context() - } - }, - None => { - tracing::info!("skipping dispatch for deleted object {}", job.activity); - return Ok(()); - }, + else { + tracing::info!("skipping dispatch for deleted object {}", job.activity); + return Ok(()); }; + let object = if let Some(ref oid) = activity.object { + match activity.activity_type { + apb::ActivityType::Create => + model::object::Entity::find_by_ap_id(oid) + .one(ctx.db()) + .await? + .map(|x| x.ap()), + apb::ActivityType::Accept(_) | apb::ActivityType::Reject(_) | apb::ActivityType::Undo => + model::activity::Entity::find_by_ap_id(oid) + .one(ctx.db()) + .await? + .map(|x| x.ap()), + apb::ActivityType::Update => { + if let Some(o) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? { + Some(o.ap()) + } else if let Some(a) = model::actor::Entity::find_by_ap_id(oid).one(ctx.db()).await? { + Some(a.ap()) + } else { + None + } + }, + _ => None, + } + } else { None }; + + let mut payload = activity.ap(); + if let Some(object) = object { + payload = payload.set_object(apb::Node::object(object)); + } + let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor) .one(ctx.db()) .await? @@ -50,7 +61,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult< if let Err(e) = Context::request( Method::POST, job.target.as_deref().unwrap_or(""), - Some(&serde_json::to_string(&payload).unwrap()), + Some(&serde_json::to_string(&payload.ld_context()).unwrap()), &job.actor, &key, ctx.domain() ).await { tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);