diff --git a/upub/core/src/model/job.rs b/upub/core/src/model/job.rs index 30a5570d..1cc0c6e8 100644 --- a/upub/core/src/model/job.rs +++ b/upub/core/src/model/job.rs @@ -17,7 +17,7 @@ pub struct Model { pub actor: String, pub target: Option, pub activity: String, - pub payload: Option, + pub payload: Option, pub published: ChronoDateTimeUtc, pub not_before: ChronoDateTimeUtc, pub attempt: i16, diff --git a/upub/migrations/src/m20240605_000001_add_jobs_table.rs b/upub/migrations/src/m20240605_000001_add_jobs_table.rs index 9aabbc4b..cdbf4b06 100644 --- a/upub/migrations/src/m20240605_000001_add_jobs_table.rs +++ b/upub/migrations/src/m20240605_000001_add_jobs_table.rs @@ -44,7 +44,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Jobs::Actor).string().not_null()) .col(ColumnDef::new(Jobs::Target).string().null()) .col(ColumnDef::new(Jobs::Activity).string().not_null()) - .col(ColumnDef::new(Jobs::Payload).string().null()) + .col(ColumnDef::new(Jobs::Payload).json_binary().null()) .col(ColumnDef::new(Jobs::Published).timestamp_with_time_zone().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Jobs::NotBefore).timestamp_with_time_zone().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Jobs::Attempt).small_integer().not_null().default(0)) diff --git a/upub/routes/src/activitypub/actor/outbox.rs b/upub/routes/src/activitypub/actor/outbox.rs index 8a3e3c74..e8b9ea49 100644 --- a/upub/routes/src/activitypub/actor/outbox.rs +++ b/upub/routes/src/activitypub/actor/outbox.rs @@ -62,7 +62,7 @@ pub async fn post( published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), attempt: Set(0), - payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing back json object"))), + payload: Set(Some(activity)), }; model::job::Entity::insert(job).exec(ctx.db()).await?; diff --git a/upub/routes/src/activitypub/inbox.rs b/upub/routes/src/activitypub/inbox.rs index b9646919..4d645947 100644 --- a/upub/routes/src/activitypub/inbox.rs +++ b/upub/routes/src/activitypub/inbox.rs @@ -76,7 +76,7 @@ pub async fn post( actor: Set(uid), target: Set(None), activity: Set(aid), - payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing json payload"))), + payload: Set(Some(activity)), published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), attempt: Set(0) diff --git a/upub/worker/src/inbound.rs b/upub/worker/src/inbound.rs index 9431706e..4ac4002a 100644 --- a/upub/worker/src/inbound.rs +++ b/upub/worker/src/inbound.rs @@ -3,18 +3,14 @@ use upub::traits::Processor; pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate::JobResult<()> { - let Some(ref payload) = job.payload else { + let Some(ref activity) = job.payload else { tracing::error!("abandoning inbound job without payload: {job:#?}"); return Ok(()); }; - let Ok(activity) = serde_json::from_str::(payload) else { - tracing::error!("abandoning inbound job with invalid payload: {job:#?}"); - return Ok(()); - }; - let tx = ctx.db().begin().await?; - ctx.process(activity, &tx).await?; + // TODO can we get rid of this clone? + ctx.process(activity.clone(), &tx).await?; tx.commit().await?; Ok(()) diff --git a/upub/worker/src/outbound.rs b/upub/worker/src/outbound.rs index 438040ac..f2b97612 100644 --- a/upub/worker/src/outbound.rs +++ b/upub/worker/src/outbound.rs @@ -4,8 +4,8 @@ use upub::{model, traits::{Addresser, Processor}, Context}; pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { - let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?; - let mut activity : serde_json::Value = serde_json::from_str(payload)?; + // TODO can we get rid of this cloned?? + let mut activity = job.payload.as_ref().cloned().ok_or(crate::JobError::MissingPayload)?; let mut t = activity.object_type()?; let tx = ctx.db().begin().await?;