fix: delivery also embeds activities in activities
it was supposed to do already but i was just joining on objects, oops
This commit is contained in:
parent
7079662391
commit
c14531afc8
1 changed files with 35 additions and 24 deletions
|
@ -7,33 +7,44 @@ use upub::{Context, model, traits::Fetcher};
|
||||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||||
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
||||||
|
|
||||||
let payload = match model::activity::Entity::find_by_ap_id(&job.activity)
|
let Some(activity) = model::activity::Entity::find_by_ap_id(&job.activity)
|
||||||
.find_also_related(model::object::Entity)
|
|
||||||
.one(ctx.db())
|
.one(ctx.db())
|
||||||
.await?
|
.await?
|
||||||
{
|
else {
|
||||||
Some((activity, None)) => activity.ap().ld_context(),
|
tracing::info!("skipping dispatch for deleted object {}", job.activity);
|
||||||
Some((activity, Some(object))) => {
|
return Ok(());
|
||||||
let always_embed = matches!(
|
|
||||||
activity.activity_type,
|
|
||||||
apb::ActivityType::Create
|
|
||||||
| apb::ActivityType::Undo
|
|
||||||
| apb::ActivityType::Update
|
|
||||||
| apb::ActivityType::Accept(_)
|
|
||||||
| apb::ActivityType::Reject(_)
|
|
||||||
);
|
|
||||||
if always_embed {
|
|
||||||
activity.ap().set_object(Node::object(object.ap())).ld_context()
|
|
||||||
} else {
|
|
||||||
activity.ap().ld_context()
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
tracing::info!("skipping dispatch for deleted object {}", job.activity);
|
|
||||||
return Ok(());
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let object = if let Some(ref oid) = activity.object {
|
||||||
|
match activity.activity_type {
|
||||||
|
apb::ActivityType::Create =>
|
||||||
|
model::object::Entity::find_by_ap_id(oid)
|
||||||
|
.one(ctx.db())
|
||||||
|
.await?
|
||||||
|
.map(|x| x.ap()),
|
||||||
|
apb::ActivityType::Accept(_) | apb::ActivityType::Reject(_) | apb::ActivityType::Undo =>
|
||||||
|
model::activity::Entity::find_by_ap_id(oid)
|
||||||
|
.one(ctx.db())
|
||||||
|
.await?
|
||||||
|
.map(|x| x.ap()),
|
||||||
|
apb::ActivityType::Update => {
|
||||||
|
if let Some(o) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? {
|
||||||
|
Some(o.ap())
|
||||||
|
} else if let Some(a) = model::actor::Entity::find_by_ap_id(oid).one(ctx.db()).await? {
|
||||||
|
Some(a.ap())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
} else { None };
|
||||||
|
|
||||||
|
let mut payload = activity.ap();
|
||||||
|
if let Some(object) = object {
|
||||||
|
payload = payload.set_object(apb::Node::object(object));
|
||||||
|
}
|
||||||
|
|
||||||
let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor)
|
let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor)
|
||||||
.one(ctx.db())
|
.one(ctx.db())
|
||||||
.await?
|
.await?
|
||||||
|
@ -50,7 +61,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
||||||
|
|
||||||
if let Err(e) = Context::request(
|
if let Err(e) = Context::request(
|
||||||
Method::POST, job.target.as_deref().unwrap_or(""),
|
Method::POST, job.target.as_deref().unwrap_or(""),
|
||||||
Some(&serde_json::to_string(&payload).unwrap()),
|
Some(&serde_json::to_string(&payload.ld_context()).unwrap()),
|
||||||
&job.actor, &key, ctx.domain()
|
&job.actor, &key, ctx.domain()
|
||||||
).await {
|
).await {
|
||||||
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
|
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
|
||||||
|
|
Loading…
Reference in a new issue