1
0
Fork 0
forked from alemi/upub

fix!: oops actually use jsonb here too

sorry last breaking db change i swear (<---lie)
This commit is contained in:
əlemi 2024-06-08 16:58:16 +02:00
parent e7b25bfe1d
commit fb0242221b
Signed by: alemi
GPG key ID: A4895B84D311642C
6 changed files with 9 additions and 13 deletions

View file

@ -17,7 +17,7 @@ pub struct Model {
pub actor: String,
pub target: Option<String>,
pub activity: String,
pub payload: Option<String>,
pub payload: Option<serde_json::Value>,
pub published: ChronoDateTimeUtc,
pub not_before: ChronoDateTimeUtc,
pub attempt: i16,

View file

@ -44,7 +44,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Jobs::Actor).string().not_null())
.col(ColumnDef::new(Jobs::Target).string().null())
.col(ColumnDef::new(Jobs::Activity).string().not_null())
.col(ColumnDef::new(Jobs::Payload).string().null())
.col(ColumnDef::new(Jobs::Payload).json_binary().null())
.col(ColumnDef::new(Jobs::Published).timestamp_with_time_zone().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Jobs::NotBefore).timestamp_with_time_zone().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Jobs::Attempt).small_integer().not_null().default(0))

View file

@ -62,7 +62,7 @@ pub async fn post(
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing back json object"))),
payload: Set(Some(activity)),
};
model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -76,7 +76,7 @@ pub async fn post(
actor: Set(uid),
target: Set(None),
activity: Set(aid),
payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing json payload"))),
payload: Set(Some(activity)),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0)

View file

@ -3,18 +3,14 @@ use upub::traits::Processor;
pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate::JobResult<()> {
let Some(ref payload) = job.payload else {
let Some(ref activity) = job.payload else {
tracing::error!("abandoning inbound job without payload: {job:#?}");
return Ok(());
};
let Ok(activity) = serde_json::from_str::<serde_json::Value>(payload) else {
tracing::error!("abandoning inbound job with invalid payload: {job:#?}");
return Ok(());
};
let tx = ctx.db().begin().await?;
ctx.process(activity, &tx).await?;
// TODO can we get rid of this clone?
ctx.process(activity.clone(), &tx).await?;
tx.commit().await?;
Ok(())

View file

@ -4,8 +4,8 @@ use upub::{model, traits::{Addresser, Processor}, Context};
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?;
let mut activity : serde_json::Value = serde_json::from_str(payload)?;
// TODO can we get rid of this cloned??
let mut activity = job.payload.as_ref().cloned().ok_or(crate::JobError::MissingPayload)?;
let mut t = activity.object_type()?;
let tx = ctx.db().begin().await?;