fix: outbound->delivery, local->outbound

also stop discarding duplicated deliveries, ouchhhh
This commit is contained in:
əlemi 2024-06-06 19:04:08 +02:00
parent 0e779c3096
commit f6d30b3bec
Signed by: alemi
GPG key ID: A4895B84D311642C
9 changed files with 143 additions and 139 deletions

View file

@ -99,7 +99,7 @@ enum Mode {
#[derive(Debug, Clone, clap::ValueEnum)]
enum Filter {
All,
Local,
Delivery,
Inbound,
Outbound,
}
@ -188,7 +188,7 @@ impl From<Filter> for Option<upub::model::job::JobType> {
fn from(value: Filter) -> Self {
match value {
Filter::All => None,
Filter::Local => Some(upub::model::job::JobType::Local),
Filter::Delivery => Some(upub::model::job::JobType::Delivery),
Filter::Inbound => Some(upub::model::job::JobType::Inbound),
Filter::Outbound => Some(upub::model::job::JobType::Outbound),
}

View file

@ -5,7 +5,7 @@ use sea_orm::entity::prelude::*;
pub enum JobType {
Inbound = 1,
Outbound = 2,
Local = 3,
Delivery = 3,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]

View file

@ -88,7 +88,7 @@ impl Addresser for crate::Context {
crate::model::job::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
actor: Set(from.to_string()),
job_type: Set(crate::model::job::JobType::Outbound),
job_type: Set(crate::model::job::JobType::Delivery),
payload: Set(None),
// TODO we should resolve each user by id and check its inbox because we can't assume
// it's /actors/{id}/inbox for every software, but oh well it's waaaaay easier now

View file

@ -56,7 +56,7 @@ pub async fn post(
let job = model::job::ActiveModel {
internal: NotSet,
activity: Set(aid.clone()),
job_type: Set(model::job::JobType::Local),
job_type: Set(model::job::JobType::Outbound),
actor: Set(uid.clone()),
target: Set(None),
published: Set(chrono::Utc::now()),

View file

@ -0,0 +1,63 @@
use sea_orm::EntityTrait;
use reqwest::Method;
use apb::{LD, Node, ActivityMut};
use upub::{Context, model, traits::Fetcher};
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
tracing::info!("delivering {} to {:?}", job.activity, job.target);
let payload = match model::activity::Entity::find_by_ap_id(&job.activity)
.find_also_related(model::object::Entity)
.one(ctx.db())
.await?
{
Some((activity, None)) => activity.ap().ld_context(),
Some((activity, Some(object))) => {
let always_embed = matches!(
activity.activity_type,
apb::ActivityType::Create
| apb::ActivityType::Undo
| apb::ActivityType::Update
| apb::ActivityType::Accept(_)
| apb::ActivityType::Reject(_)
);
if always_embed {
activity.ap().set_object(Node::object(object.ap())).ld_context()
} else {
activity.ap().ld_context()
}
},
None => {
tracing::info!("skipping dispatch for deleted object {}", job.activity);
return Ok(());
},
};
let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor)
.one(ctx.db())
.await?
else {
tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor);
return Ok(());
};
let Some(key) = actor.private_key
else {
tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor);
return Ok(());
};
if let Err(e) = Context::request(
Method::POST, job.target.as_deref().unwrap_or(""),
Some(&serde_json::to_string(&payload).unwrap()),
&job.actor, &key, ctx.domain()
).await {
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
model::job::Entity::insert(job.clone().repeat())
.exec(ctx.db())
.await?;
}
Ok(())
}

View file

@ -104,12 +104,16 @@ impl JobDispatcher for Context {
restart!(now);
}
if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity)
.one(self.db())
.await
{
tracing::info!("dropping already processed job '{}'", job.activity);
restart!(now);
if job.job_type != model::job::JobType::Delivery {
// delivery jobs are all pre-processed activities
// inbound/outbound jobs carry side effects which should only happen once
if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity)
.one(self.db())
.await
{
tracing::info!("dropping already processed job '{}'", job.activity);
restart!(now);
}
}
let _ctx = self.clone();
@ -117,7 +121,7 @@ impl JobDispatcher for Context {
let res = match job.job_type {
model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await,
model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await,
model::job::JobType::Local => crate::local::process(_ctx.clone(), &job).await,
model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await,
};
if let Err(e) = res {

View file

@ -1,7 +1,7 @@
pub mod dispatcher;
pub mod inbound;
pub mod outbound;
pub mod local;
pub mod delivery;
pub use dispatcher::{JobError, JobResult};

View file

@ -1,71 +0,0 @@
use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait};
use upub::{model, traits::{Addresser, Processor}, Context};
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?;
let mut activity : serde_json::Value = serde_json::from_str(payload)?;
let mut t = activity.object_type()?;
let tx = ctx.db().begin().await?;
if matches!(t, apb::ObjectType::Note) {
activity = apb::new()
.set_activity_type(Some(apb::ActivityType::Create))
.set_object(apb::Node::object(activity));
t = apb::ObjectType::Activity(apb::ActivityType::Create);
}
activity = activity
.set_id(Some(&job.activity))
.set_actor(apb::Node::link(job.actor.clone()))
.set_published(Some(chrono::Utc::now()));
if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) {
let raw_oid = Context::new_id();
let oid = ctx.oid(&raw_oid);
// object must be embedded, wont dereference here
let object = activity.object().extract().ok_or(apb::FieldErr("object"))?;
// TODO regex hell here i come...
let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern");
let mut content = object.content().map(|x| x.to_string()).ok();
if let Some(c) = content {
let mut tmp = mdhtml::safe_markdown(&c);
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
if let Ok(Some(uid)) = model::actor::Entity::find()
.filter(model::actor::Column::PreferredUsername.eq(user))
.filter(model::actor::Column::Domain.eq(domain))
.select_only()
.select_column(model::actor::Column::Id)
.into_tuple::<String>()
.one(&tx)
.await
{
tmp = tmp.replacen(full, &format!("<a href=\"{uid}\" class=\"u-url mention\">@{user}</a>"), 1);
}
}
content = Some(tmp);
}
activity = activity
.set_object(apb::Node::object(
object
.set_id(Some(&oid))
.set_content(content.as_deref())
.set_attributed_to(apb::Node::link(job.actor.clone()))
.set_published(Some(chrono::Utc::now()))
.set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))),
));
}
// TODO we expand addressing twice, ugghhhhh
let targets = ctx.expand_addressing(activity.addressed()).await?;
ctx.process(activity, &tx).await?;
ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?;
tx.commit().await?;
Ok(())
}

View file

@ -1,63 +1,71 @@
use sea_orm::EntityTrait;
use reqwest::Method;
use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait};
use upub::{model, traits::{Addresser, Processor}, Context};
use apb::{LD, Node, ActivityMut};
use upub::{Context, model, traits::Fetcher};
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
tracing::info!("delivering {} to {:?}", job.activity, job.target);
let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?;
let mut activity : serde_json::Value = serde_json::from_str(payload)?;
let mut t = activity.object_type()?;
let tx = ctx.db().begin().await?;
let payload = match model::activity::Entity::find_by_ap_id(&job.activity)
.find_also_related(model::object::Entity)
.one(ctx.db())
.await?
{
Some((activity, None)) => activity.ap().ld_context(),
Some((activity, Some(object))) => {
let always_embed = matches!(
activity.activity_type,
apb::ActivityType::Create
| apb::ActivityType::Undo
| apb::ActivityType::Update
| apb::ActivityType::Accept(_)
| apb::ActivityType::Reject(_)
);
if always_embed {
activity.ap().set_object(Node::object(object.ap())).ld_context()
} else {
activity.ap().ld_context()
}
},
None => {
tracing::info!("skipping dispatch for deleted object {}", job.activity);
return Ok(());
},
};
let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor)
.one(ctx.db())
.await?
else {
tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor);
return Ok(());
};
let Some(key) = actor.private_key
else {
tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor);
return Ok(());
};
if let Err(e) = Context::request(
Method::POST, job.target.as_deref().unwrap_or(""),
Some(&serde_json::to_string(&payload).unwrap()),
&job.actor, &key, ctx.domain()
).await {
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
model::job::Entity::insert(job.clone().repeat())
.exec(ctx.db())
.await?;
if matches!(t, apb::ObjectType::Note) {
activity = apb::new()
.set_activity_type(Some(apb::ActivityType::Create))
.set_object(apb::Node::object(activity));
t = apb::ObjectType::Activity(apb::ActivityType::Create);
}
activity = activity
.set_id(Some(&job.activity))
.set_actor(apb::Node::link(job.actor.clone()))
.set_published(Some(chrono::Utc::now()));
if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) {
let raw_oid = Context::new_id();
let oid = ctx.oid(&raw_oid);
// object must be embedded, wont dereference here
let object = activity.object().extract().ok_or(apb::FieldErr("object"))?;
// TODO regex hell here i come...
let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern");
let mut content = object.content().map(|x| x.to_string()).ok();
if let Some(c) = content {
let mut tmp = mdhtml::safe_markdown(&c);
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
if let Ok(Some(uid)) = model::actor::Entity::find()
.filter(model::actor::Column::PreferredUsername.eq(user))
.filter(model::actor::Column::Domain.eq(domain))
.select_only()
.select_column(model::actor::Column::Id)
.into_tuple::<String>()
.one(&tx)
.await
{
tmp = tmp.replacen(full, &format!("<a href=\"{uid}\" class=\"u-url mention\">@{user}</a>"), 1);
}
}
content = Some(tmp);
}
activity = activity
.set_object(apb::Node::object(
object
.set_id(Some(&oid))
.set_content(content.as_deref())
.set_attributed_to(apb::Node::link(job.actor.clone()))
.set_published(Some(chrono::Utc::now()))
.set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))),
));
}
// TODO we expand addressing twice, ugghhhhh
let targets = ctx.expand_addressing(activity.addressed(), &tx).await?;
ctx.process(activity, &tx).await?;
ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?;
tx.commit().await?;
Ok(())
}