diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index a55158f..2cf549e 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -60,16 +60,24 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat tracing::error!("refusing to process activity without embedded object"); return Err(ProcessorError::Unprocessable); }; - if let Ok(reply) = object_node.in_reply_to().id() { - if let Err(e) = ctx.fetch_object(reply, tx).await { - tracing::warn!("failed fetching replies for received object: {e}"); - } - } + let oid = object_node.id()?.to_string(); + let addressed = object_node.addressed(); let activity_model = ctx.insert_activity(activity, tx).await?; - let object_model = ctx.insert_object(object_node, tx).await?; - let expanded_addressing = ctx.expand_addressing(object_model.addressed()).await?; - ctx.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing, tx).await?; - tracing::info!("{} posted {}", activity_model.actor, object_model.id); + let internal_oid = if let Some(internal) = model::object::Entity::ap_to_internal(&oid, tx).await? { + tracing::debug!("skipping insertion of already known object #{internal}"); + internal + } else { + if let Ok(reply) = object_node.in_reply_to().id() { + if let Err(e) = ctx.fetch_object(reply, tx).await { + tracing::warn!("failed fetching replies for received object: {e}"); + } + } + let object_model = ctx.insert_object(object_node, tx).await?; + object_model.internal + }; + let expanded_addressing = ctx.expand_addressing(addressed).await?; + ctx.address_to(Some(activity_model.internal), Some(internal_oid), &expanded_addressing, tx).await?; + tracing::info!("{} posted {}", activity_model.actor, oid); Ok(()) }