Compare commits

...

3 commits

Author SHA1 Message Date
b086fe969f
fix: delivery passes error up 2024-07-06 06:08:58 +02:00
11b4ae8678
chore: renamed PullError in RequestError 2024-07-06 06:08:42 +02:00
9adeff6fbf
feat: added error field to jobs
basically just for manual inspection, maybe we could drop this again
once upub is much more stable?
2024-07-06 06:03:26 +02:00
17 changed files with 115 additions and 72 deletions

View file

@ -1,7 +1,7 @@
use sea_orm::{EntityTrait, TransactionTrait}; use sea_orm::{EntityTrait, TransactionTrait};
use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer}; use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Normalizer};
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), RequestError> {
use apb::Base; use apb::Base;
let mut node = apb::Node::link(uri.to_string()); let mut node = apb::Node::link(uri.to_string());

View file

@ -81,6 +81,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
attempt: Set(0), attempt: Set(0),
payload: Set(Some(undo_activity)), payload: Set(Some(undo_activity)),
error: Set(None),
}; };
tracing::info!("undoing {}", activity.id); tracing::info!("undoing {}", activity.id);
@ -121,6 +122,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
attempt: Set(0), attempt: Set(0),
payload: Set(Some(undo_activity)), payload: Set(Some(undo_activity)),
error: Set(None),
}; };
tracing::info!("deleting {}", object.id); tracing::info!("deleting {}", object.id);

View file

@ -1,6 +1,6 @@
use apb::{ActivityMut, BaseMut, ObjectMut}; use apb::{ActivityMut, BaseMut, ObjectMut};
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait}; use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait};
use upub::traits::{fetch::PullError, Fetcher}; use upub::traits::{fetch::RequestError, Fetcher};
#[derive(Debug, Clone, clap::Subcommand)] #[derive(Debug, Clone, clap::Subcommand)]
/// available actions to take on relays /// available actions to take on relays
@ -29,7 +29,7 @@ pub enum RelayCommand {
}, },
} }
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> { pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), RequestError> {
let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
.await? .await?
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
@ -84,6 +84,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0), attempt: Set(0),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
error: Set(None),
}; };
tracing::info!("following relay {actor}"); tracing::info!("following relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?; upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
@ -119,6 +120,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0), attempt: Set(0),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
error: Set(None),
}; };
tracing::info!("accepting relay {actor}"); tracing::info!("accepting relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?; upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
@ -155,6 +157,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0), attempt: Set(0),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
error: Set(None),
}; };
tracing::info!("unfollowing relay {actor}"); tracing::info!("unfollowing relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?; upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
@ -190,6 +193,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
attempt: Set(0), attempt: Set(0),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
error: Set(None),
}; };
tracing::info!("unfollowing relay {actor}"); tracing::info!("unfollowing relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?; upub::model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -1,7 +1,7 @@
use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
use upub::traits::{fetch::PullError, Fetcher}; use upub::traits::{fetch::RequestError, Fetcher};
pub async fn thread(ctx: upub::Context) -> Result<(), PullError> { pub async fn thread(ctx: upub::Context) -> Result<(), RequestError> {
use futures::TryStreamExt; use futures::TryStreamExt;
let db = ctx.db(); let db = ctx.db();

View file

@ -21,6 +21,7 @@ pub struct Model {
pub published: ChronoDateTimeUtc, pub published: ChronoDateTimeUtc,
pub not_before: ChronoDateTimeUtc, pub not_before: ChronoDateTimeUtc,
pub attempt: i16, pub attempt: i16,
pub error: Option<String>,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -41,7 +42,7 @@ impl Model {
} }
} }
pub fn repeat(self) -> ActiveModel { pub fn repeat(self, error: Option<String>) -> ActiveModel {
ActiveModel { ActiveModel {
internal: sea_orm::ActiveValue::NotSet, internal: sea_orm::ActiveValue::NotSet,
job_type: sea_orm::ActiveValue::Set(self.job_type), job_type: sea_orm::ActiveValue::Set(self.job_type),
@ -52,6 +53,7 @@ impl Model {
activity: sea_orm::ActiveValue::Set(self.activity), activity: sea_orm::ActiveValue::Set(self.activity),
published: sea_orm::ActiveValue::Set(self.published), published: sea_orm::ActiveValue::Set(self.published),
attempt: sea_orm::ActiveValue::Set(self.attempt + 1), attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
error: sea_orm::ActiveValue::Set(error),
} }
} }
} }

View file

@ -36,6 +36,7 @@ impl Addresser for crate::Context {
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
attempt: Set(0), attempt: Set(0),
error: Set(None),
} }
), ),
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"), Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),

View file

@ -17,7 +17,7 @@ pub enum Pull<T> {
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum PullError { pub enum RequestError {
#[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")] #[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")]
Mismatch(apb::ObjectType, apb::ObjectType), Mismatch(apb::ObjectType, apb::ObjectType),
@ -46,33 +46,33 @@ pub enum PullError {
HttpSignature(#[from] httpsign::HttpSignatureError), HttpSignature(#[from] httpsign::HttpSignatureError),
} }
impl PullError { impl RequestError {
fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self { fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self {
PullError::Mismatch(expected, found) RequestError::Mismatch(expected, found)
} }
} }
impl Pull<serde_json::Value> { impl Pull<serde_json::Value> {
pub fn actor(self) -> Result<serde_json::Value, PullError> { pub fn actor(self) -> Result<serde_json::Value, RequestError> {
match self { match self {
Self::Actor(x) => Ok(x), Self::Actor(x) => Ok(x),
Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))), Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))),
} }
} }
pub fn activity(self) -> Result<serde_json::Value, PullError> { pub fn activity(self) -> Result<serde_json::Value, RequestError> {
match self { match self {
Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
Self::Activity(x) => Ok(x), Self::Activity(x) => Ok(x),
Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))), Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))),
} }
} }
pub fn object(self) -> Result<serde_json::Value, PullError> { pub fn object(self) -> Result<serde_json::Value, RequestError> {
match self { match self {
Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
Self::Object(x) => Ok(x), Self::Object(x) => Ok(x),
} }
} }
@ -80,24 +80,24 @@ impl Pull<serde_json::Value> {
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Fetcher { pub trait Fetcher {
async fn pull(&self, id: &str) -> Result<Pull<serde_json::Value>, PullError> { self.pull_r(id, 0).await } async fn pull(&self, id: &str) -> Result<Pull<serde_json::Value>, RequestError> { self.pull_r(id, 0).await }
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError>; async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, RequestError>;
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>; async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, RequestError>;
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError>; async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>;
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>; async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>; async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>; async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>; async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>; async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError>;
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>; async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError>;
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), PullError>; async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError>;
async fn request( async fn request(
method: reqwest::Method, method: reqwest::Method,
@ -106,7 +106,7 @@ pub trait Fetcher {
from: &str, from: &str,
key: &str, key: &str,
domain: &str, domain: &str,
) -> Result<Response, PullError> { ) -> Result<Response, RequestError> {
let host = crate::Context::server(url); let host = crate::Context::server(url);
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT" let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
let path = url.replace("https://", "").replace("http://", "").replace(&host, ""); let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
@ -147,7 +147,7 @@ pub trait Fetcher {
match response.error_for_status_ref() { match response.error_for_status_ref() {
Ok(_) => Ok(response), Ok(_) => Ok(response),
Err(e) => Err(e) =>
Err(PullError::Fetch( Err(RequestError::Fetch(
e.status().unwrap_or_default(), e.status().unwrap_or_default(),
response.text().await?, response.text().await?,
)), )),
@ -158,7 +158,7 @@ pub trait Fetcher {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Fetcher for crate::Context { impl Fetcher for crate::Context {
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> { async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, RequestError> {
tracing::debug!("fetching {id}"); tracing::debug!("fetching {id}");
// let _domain = self.fetch_domain(&crate::Context::server(id)).await?; // let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
@ -173,14 +173,14 @@ impl Fetcher for crate::Context {
let doc_id = document.id()?; let doc_id = document.id()?;
if id != doc_id { if id != doc_id {
if depth >= self.cfg().security.max_id_redirects { if depth >= self.cfg().security.max_id_redirects {
return Err(PullError::TooManyRedirects); return Err(RequestError::TooManyRedirects);
} }
return self.pull(doc_id).await; return self.pull(doc_id).await;
} }
match document.object_type()? { match document.object_type()? {
apb::ObjectType::Collection(x) => Err(PullError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))), apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))),
apb::ObjectType::Tombstone => Err(PullError::Tombstone), apb::ObjectType::Tombstone => Err(RequestError::Tombstone),
apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)), apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)),
apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)), apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)),
_ => Ok(Pull::Object(document)), _ => Ok(Pull::Object(document)),
@ -188,7 +188,7 @@ impl Fetcher for crate::Context {
} }
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError> { async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, RequestError> {
let subject = format!("acct:{user}@{host}"); let subject = format!("acct:{user}@{host}");
let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}"); let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}");
let resource = reqwest::Client::new() let resource = reqwest::Client::new()
@ -220,7 +220,7 @@ impl Fetcher for crate::Context {
Ok(None) Ok(None)
} }
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError> { async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError> {
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? { if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
@ -271,7 +271,7 @@ impl Fetcher for crate::Context {
Ok(instance_model) Ok(instance_model)
} }
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> { async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
let id = document.id()?.to_string(); let id = document.id()?.to_string();
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?; let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
@ -321,7 +321,7 @@ impl Fetcher for crate::Context {
) )
} }
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> { async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? { if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
@ -331,7 +331,7 @@ impl Fetcher for crate::Context {
self.resolve_user(document, tx).await self.resolve_user(document, tx).await
} }
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> { async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? { if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
@ -341,7 +341,7 @@ impl Fetcher for crate::Context {
self.resolve_activity(activity, tx).await self.resolve_activity(activity, tx).await
} }
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> { async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?; let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?;
if let Ok(activity_actor) = activity.actor().id() { if let Ok(activity_actor) = activity.actor().id() {
@ -362,22 +362,22 @@ impl Fetcher for crate::Context {
Ok(activity_model) Ok(activity_model)
} }
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), PullError> { async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> {
// crawl_replies(self, id, 0).await // crawl_replies(self, id, 0).await
todo!() todo!()
} }
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> { async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
fetch_object_r(self, id, 0, tx).await fetch_object_r(self, id, 0, tx).await
} }
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> { async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
resolve_object_r(self, object, 0, tx).await resolve_object_r(self, object, 0, tx).await
} }
} }
#[async_recursion::async_recursion] #[async_recursion::async_recursion]
async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> { async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? { if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
@ -387,7 +387,7 @@ async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl Co
resolve_object_r(ctx, object, depth, tx).await resolve_object_r(ctx, object, depth, tx).await
} }
async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> { async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
let id = object.id()?.to_string(); let id = object.id()?.to_string();
if let Ok(oid) = object.id() { if let Ok(oid) = object.id() {
@ -420,12 +420,12 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Fetchable : Sync + Send { pub trait Fetchable : Sync + Send {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>; async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Fetchable for apb::Node<serde_json::Value> { impl Fetchable for apb::Node<serde_json::Value> {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> { async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> {
if let apb::Node::Link(uri) = self { if let apb::Node::Link(uri) = self {
if let Ok(href) = uri.href() { if let Ok(href) = uri.href() {
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain()) *self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())

View file

@ -26,7 +26,7 @@ pub enum ProcessorError {
NormalizerError(#[from] crate::traits::normalize::NormalizerError), NormalizerError(#[from] crate::traits::normalize::NormalizerError),
#[error("failed fetching resource: {0:?}")] #[error("failed fetching resource: {0:?}")]
PullError(#[from] crate::traits::fetch::PullError), PullError(#[from] crate::traits::fetch::RequestError),
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View file

@ -16,6 +16,7 @@ mod m20240628_000001_add_followers_following_indexes;
mod m20240628_000002_add_credentials_activated; mod m20240628_000002_add_credentials_activated;
mod m20240703_000001_add_audience_index; mod m20240703_000001_add_audience_index;
mod m20240703_000002_add_image_to_objects; mod m20240703_000002_add_image_to_objects;
mod m20240706_000001_add_error_to_jobs;
pub struct Migrator; pub struct Migrator;
@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240628_000002_add_credentials_activated::Migration), Box::new(m20240628_000002_add_credentials_activated::Migration),
Box::new(m20240703_000001_add_audience_index::Migration), Box::new(m20240703_000001_add_audience_index::Migration),
Box::new(m20240703_000002_add_image_to_objects::Migration), Box::new(m20240703_000002_add_image_to_objects::Migration),
Box::new(m20240706_000001_add_error_to_jobs::Migration),
] ]
} }
} }

View file

@ -14,6 +14,7 @@ pub enum Jobs {
Published, Published,
NotBefore, NotBefore,
Attempt, Attempt,
Error, // added after
} }

View file

@ -0,0 +1,35 @@
use sea_orm_migration::prelude::*;
use crate::m20240605_000001_add_jobs_table::Jobs;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Jobs::Table)
.add_column(ColumnDef::new(Jobs::Error).string().null())
.to_owned()
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Jobs::Table)
.drop_column(Jobs::Error)
.to_owned()
)
.await?;
Ok(())
}
}

View file

@ -67,6 +67,7 @@ pub async fn post(
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
attempt: Set(0), attempt: Set(0),
payload: Set(Some(activity)), payload: Set(Some(activity)),
error: Set(None),
}; };
model::job::Entity::insert(job).exec(ctx.db()).await?; model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -79,7 +79,8 @@ pub async fn post(
payload: Set(Some(activity)), payload: Set(Some(activity)),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()),
attempt: Set(0) attempt: Set(0),
error: Set(None),
}; };
upub::model::job::Entity::insert(job).exec(ctx.db()).await?; upub::model::job::Entity::insert(job).exec(ctx.db()).await?;

View file

@ -1,7 +1,7 @@
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}}; use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}};
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
use httpsign::HttpSignature; use httpsign::HttpSignature;
use upub::traits::{fetch::PullError, Fetcher}; use upub::traits::{fetch::RequestError, Fetcher};
use crate::ApiError; use crate::ApiError;
@ -118,7 +118,7 @@ where
.to_string(); .to_string();
match ctx.fetch_user(&user_id, ctx.db()).await { match ctx.fetch_user(&user_id, ctx.db()).await {
Err(PullError::Database(x)) => return Err(PullError::Database(x).into()), Err(RequestError::Database(x)) => return Err(RequestError::Database(x).into()),
Err(e) => tracing::debug!("could not fetch {user_id} to verify signature: {e}"), Err(e) => tracing::debug!("could not fetch {user_id} to verify signature: {e}"),
Ok(user) => { Ok(user) => {
let signature = http_signature.build_from_parts(parts); let signature = http_signature.build_from_parts(parts);

View file

@ -17,7 +17,7 @@ pub enum ApiError {
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut // TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
// helps with debugging! // helps with debugging!
#[error("fetch error: {0:?}")] #[error("fetch error: {0:?}")]
FetchError(#[from] upub::traits::fetch::PullError), FetchError(#[from] upub::traits::fetch::RequestError),
// wrapper error to return arbitraty status codes // wrapper error to return arbitraty status codes
#[error("{0}")] #[error("{0}")]

View file

@ -1,4 +1,3 @@
use sea_orm::EntityTrait;
use reqwest::Method; use reqwest::Method;
use apb::{LD, ActivityMut}; use apb::{LD, ActivityMut};
@ -59,16 +58,11 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
return Ok(()); return Ok(());
}; };
if let Err(e) = Context::request( Context::request(
Method::POST, job.target.as_deref().unwrap_or(""), Method::POST, job.target.as_deref().unwrap_or(""),
Some(&serde_json::to_string(&payload.ld_context()).unwrap()), Some(&serde_json::to_string(&payload.ld_context()).unwrap()),
&job.actor, &key, ctx.domain() &job.actor, &key, ctx.domain()
).await { ).await?;
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
model::job::Entity::insert(job.clone().repeat())
.exec(ctx.db())
.await?;
}
Ok(()) Ok(())
} }

View file

@ -1,7 +1,7 @@
use reqwest::StatusCode; use reqwest::StatusCode;
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder}; use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
use upub::{model, traits::{fetch::PullError, process::ProcessorError}, Context}; use upub::{model, traits::{fetch::RequestError, process::ProcessorError}, Context};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum JobError { pub enum JobError {
@ -17,11 +17,11 @@ pub enum JobError {
#[error("malformed job: missing payload")] #[error("malformed job: missing payload")]
MissingPayload, MissingPayload,
#[error("no available implementation to process job")]
Unprocessable,
#[error("error processing activity: {0:?}")] #[error("error processing activity: {0:?}")]
ProcessorError(#[from] upub::traits::process::ProcessorError), ProcessorError(#[from] upub::traits::process::ProcessorError),
#[error("error delivering activity: {0}")]
DeliveryError(#[from] upub::traits::fetch::RequestError),
} }
pub type JobResult<T> = Result<T, JobError>; pub type JobResult<T> = Result<T, JobError>;
@ -137,16 +137,16 @@ impl JobDispatcher for Context {
tracing::error!("dropping job with malformed activity (missing field {f})"), tracing::error!("dropping job with malformed activity (missing field {f})"),
Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) => Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) =>
tracing::info!("dropping job already processed: {}", job.activity), tracing::info!("dropping job already processed: {}", job.activity),
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::FORBIDDEN, e)))) => Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::FORBIDDEN, e)))) =>
tracing::warn!("dropping job because requested resource is not accessible: {e}"), tracing::warn!("dropping job because requested resource is not accessible: {e}"),
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::NOT_FOUND, e)))) => Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::NOT_FOUND, e)))) =>
tracing::warn!("dropping job because requested resource is not available: {e}"), tracing::warn!("dropping job because requested resource is not available: {e}"),
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::GONE, e)))) => Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::GONE, e)))) =>
tracing::warn!("dropping job because requested resource is no longer available: {e}"), tracing::warn!("dropping job because requested resource is no longer available: {e}"),
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Malformed(f)))) => Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Malformed(f)))) =>
tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"), tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"),
Err(e) => { Err(e) => {
if let JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(status, ref e))) = e { if let JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(status, ref e))) = e {
// TODO maybe convert this in generic .is_client_error() check, but excluding 401s // TODO maybe convert this in generic .is_client_error() check, but excluding 401s
// and 400s because we want to retry those. also maybe 406s? idk theres a lot i // and 400s because we want to retry those. also maybe 406s? idk theres a lot i
// just want to drop lemmy.cafe jobs // just want to drop lemmy.cafe jobs
@ -156,7 +156,7 @@ impl JobDispatcher for Context {
} }
} }
tracing::error!("failed processing job '{}': {e}", job.activity); tracing::error!("failed processing job '{}': {e}", job.activity);
let active = job.clone().repeat(); let active = job.clone().repeat(Some(e.to_string()));
let mut count = 0; let mut count = 0;
loop { loop {
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await { match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {