Compare commits
3 commits
90f483a0ba
...
b086fe969f
Author | SHA1 | Date | |
---|---|---|---|
b086fe969f | |||
11b4ae8678 | |||
9adeff6fbf |
17 changed files with 115 additions and 72 deletions
|
@ -1,7 +1,7 @@
|
|||
use sea_orm::{EntityTrait, TransactionTrait};
|
||||
use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer};
|
||||
use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Normalizer};
|
||||
|
||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> {
|
||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), RequestError> {
|
||||
use apb::Base;
|
||||
|
||||
let mut node = apb::Node::link(uri.to_string());
|
||||
|
|
|
@ -81,6 +81,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
|
|||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0),
|
||||
payload: Set(Some(undo_activity)),
|
||||
error: Set(None),
|
||||
};
|
||||
|
||||
tracing::info!("undoing {}", activity.id);
|
||||
|
@ -121,6 +122,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
|
|||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0),
|
||||
payload: Set(Some(undo_activity)),
|
||||
error: Set(None),
|
||||
};
|
||||
|
||||
tracing::info!("deleting {}", object.id);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use apb::{ActivityMut, BaseMut, ObjectMut};
|
||||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait};
|
||||
use upub::traits::{fetch::PullError, Fetcher};
|
||||
use upub::traits::{fetch::RequestError, Fetcher};
|
||||
|
||||
#[derive(Debug, Clone, clap::Subcommand)]
|
||||
/// available actions to take on relays
|
||||
|
@ -29,7 +29,7 @@ pub enum RelayCommand {
|
|||
},
|
||||
}
|
||||
|
||||
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> {
|
||||
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), RequestError> {
|
||||
let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
|
||||
|
@ -84,6 +84,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
|||
attempt: Set(0),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
error: Set(None),
|
||||
};
|
||||
tracing::info!("following relay {actor}");
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
@ -119,6 +120,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
|||
attempt: Set(0),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
error: Set(None),
|
||||
};
|
||||
tracing::info!("accepting relay {actor}");
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
@ -155,6 +157,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
|||
attempt: Set(0),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
error: Set(None),
|
||||
};
|
||||
tracing::info!("unfollowing relay {actor}");
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
@ -190,6 +193,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
|||
attempt: Set(0),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
error: Set(None),
|
||||
};
|
||||
tracing::info!("unfollowing relay {actor}");
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
|
||||
use upub::traits::{fetch::PullError, Fetcher};
|
||||
use upub::traits::{fetch::RequestError, Fetcher};
|
||||
|
||||
pub async fn thread(ctx: upub::Context) -> Result<(), PullError> {
|
||||
pub async fn thread(ctx: upub::Context) -> Result<(), RequestError> {
|
||||
use futures::TryStreamExt;
|
||||
let db = ctx.db();
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ pub struct Model {
|
|||
pub published: ChronoDateTimeUtc,
|
||||
pub not_before: ChronoDateTimeUtc,
|
||||
pub attempt: i16,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
|
@ -41,7 +42,7 @@ impl Model {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn repeat(self) -> ActiveModel {
|
||||
pub fn repeat(self, error: Option<String>) -> ActiveModel {
|
||||
ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
job_type: sea_orm::ActiveValue::Set(self.job_type),
|
||||
|
@ -52,6 +53,7 @@ impl Model {
|
|||
activity: sea_orm::ActiveValue::Set(self.activity),
|
||||
published: sea_orm::ActiveValue::Set(self.published),
|
||||
attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
|
||||
error: sea_orm::ActiveValue::Set(error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ impl Addresser for crate::Context {
|
|||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0),
|
||||
error: Set(None),
|
||||
}
|
||||
),
|
||||
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),
|
||||
|
|
|
@ -17,7 +17,7 @@ pub enum Pull<T> {
|
|||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PullError {
|
||||
pub enum RequestError {
|
||||
#[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")]
|
||||
Mismatch(apb::ObjectType, apb::ObjectType),
|
||||
|
||||
|
@ -46,33 +46,33 @@ pub enum PullError {
|
|||
HttpSignature(#[from] httpsign::HttpSignatureError),
|
||||
}
|
||||
|
||||
impl PullError {
|
||||
impl RequestError {
|
||||
fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self {
|
||||
PullError::Mismatch(expected, found)
|
||||
RequestError::Mismatch(expected, found)
|
||||
}
|
||||
}
|
||||
|
||||
impl Pull<serde_json::Value> {
|
||||
pub fn actor(self) -> Result<serde_json::Value, PullError> {
|
||||
pub fn actor(self) -> Result<serde_json::Value, RequestError> {
|
||||
match self {
|
||||
Self::Actor(x) => Ok(x),
|
||||
Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
||||
Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
||||
Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
||||
Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn activity(self) -> Result<serde_json::Value, PullError> {
|
||||
pub fn activity(self) -> Result<serde_json::Value, RequestError> {
|
||||
match self {
|
||||
Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
||||
Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
||||
Self::Activity(x) => Ok(x),
|
||||
Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
||||
Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn object(self) -> Result<serde_json::Value, PullError> {
|
||||
pub fn object(self) -> Result<serde_json::Value, RequestError> {
|
||||
match self {
|
||||
Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
||||
Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
||||
Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
||||
Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
||||
Self::Object(x) => Ok(x),
|
||||
}
|
||||
}
|
||||
|
@ -80,24 +80,24 @@ impl Pull<serde_json::Value> {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Fetcher {
|
||||
async fn pull(&self, id: &str) -> Result<Pull<serde_json::Value>, PullError> { self.pull_r(id, 0).await }
|
||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError>;
|
||||
async fn pull(&self, id: &str) -> Result<Pull<serde_json::Value>, RequestError> { self.pull_r(id, 0).await }
|
||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, RequestError>;
|
||||
|
||||
|
||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>;
|
||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, RequestError>;
|
||||
|
||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError>;
|
||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>;
|
||||
|
||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>;
|
||||
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>;
|
||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
||||
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
||||
|
||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>;
|
||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>;
|
||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
||||
|
||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>;
|
||||
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>;
|
||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError>;
|
||||
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError>;
|
||||
|
||||
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), PullError>;
|
||||
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError>;
|
||||
|
||||
async fn request(
|
||||
method: reqwest::Method,
|
||||
|
@ -106,7 +106,7 @@ pub trait Fetcher {
|
|||
from: &str,
|
||||
key: &str,
|
||||
domain: &str,
|
||||
) -> Result<Response, PullError> {
|
||||
) -> Result<Response, RequestError> {
|
||||
let host = crate::Context::server(url);
|
||||
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
|
||||
let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
|
||||
|
@ -147,7 +147,7 @@ pub trait Fetcher {
|
|||
match response.error_for_status_ref() {
|
||||
Ok(_) => Ok(response),
|
||||
Err(e) =>
|
||||
Err(PullError::Fetch(
|
||||
Err(RequestError::Fetch(
|
||||
e.status().unwrap_or_default(),
|
||||
response.text().await?,
|
||||
)),
|
||||
|
@ -158,7 +158,7 @@ pub trait Fetcher {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
impl Fetcher for crate::Context {
|
||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> {
|
||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, RequestError> {
|
||||
tracing::debug!("fetching {id}");
|
||||
// let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
|
||||
|
||||
|
@ -173,14 +173,14 @@ impl Fetcher for crate::Context {
|
|||
let doc_id = document.id()?;
|
||||
if id != doc_id {
|
||||
if depth >= self.cfg().security.max_id_redirects {
|
||||
return Err(PullError::TooManyRedirects);
|
||||
return Err(RequestError::TooManyRedirects);
|
||||
}
|
||||
return self.pull(doc_id).await;
|
||||
}
|
||||
|
||||
match document.object_type()? {
|
||||
apb::ObjectType::Collection(x) => Err(PullError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))),
|
||||
apb::ObjectType::Tombstone => Err(PullError::Tombstone),
|
||||
apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))),
|
||||
apb::ObjectType::Tombstone => Err(RequestError::Tombstone),
|
||||
apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)),
|
||||
apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)),
|
||||
_ => Ok(Pull::Object(document)),
|
||||
|
@ -188,7 +188,7 @@ impl Fetcher for crate::Context {
|
|||
}
|
||||
|
||||
|
||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError> {
|
||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, RequestError> {
|
||||
let subject = format!("acct:{user}@{host}");
|
||||
let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}");
|
||||
let resource = reqwest::Client::new()
|
||||
|
@ -220,7 +220,7 @@ impl Fetcher for crate::Context {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError> {
|
||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError> {
|
||||
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
@ -271,7 +271,7 @@ impl Fetcher for crate::Context {
|
|||
Ok(instance_model)
|
||||
}
|
||||
|
||||
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> {
|
||||
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
||||
let id = document.id()?.to_string();
|
||||
|
||||
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
|
||||
|
@ -321,7 +321,7 @@ impl Fetcher for crate::Context {
|
|||
)
|
||||
}
|
||||
|
||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> {
|
||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
||||
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
@ -331,7 +331,7 @@ impl Fetcher for crate::Context {
|
|||
self.resolve_user(document, tx).await
|
||||
}
|
||||
|
||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> {
|
||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
|
||||
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
@ -341,7 +341,7 @@ impl Fetcher for crate::Context {
|
|||
self.resolve_activity(activity, tx).await
|
||||
}
|
||||
|
||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> {
|
||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
|
||||
let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?;
|
||||
|
||||
if let Ok(activity_actor) = activity.actor().id() {
|
||||
|
@ -362,22 +362,22 @@ impl Fetcher for crate::Context {
|
|||
Ok(activity_model)
|
||||
}
|
||||
|
||||
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), PullError> {
|
||||
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> {
|
||||
// crawl_replies(self, id, 0).await
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||
fetch_object_r(self, id, 0, tx).await
|
||||
}
|
||||
|
||||
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
||||
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||
resolve_object_r(self, object, 0, tx).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_recursion::async_recursion]
|
||||
async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
||||
async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
@ -387,7 +387,7 @@ async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl Co
|
|||
resolve_object_r(ctx, object, depth, tx).await
|
||||
}
|
||||
|
||||
async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
||||
async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||
let id = object.id()?.to_string();
|
||||
|
||||
if let Ok(oid) = object.id() {
|
||||
|
@ -420,12 +420,12 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Fetchable : Sync + Send {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>;
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Fetchable for apb::Node<serde_json::Value> {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> {
|
||||
if let apb::Node::Link(uri) = self {
|
||||
if let Ok(href) = uri.href() {
|
||||
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||
|
|
|
@ -26,7 +26,7 @@ pub enum ProcessorError {
|
|||
NormalizerError(#[from] crate::traits::normalize::NormalizerError),
|
||||
|
||||
#[error("failed fetching resource: {0:?}")]
|
||||
PullError(#[from] crate::traits::fetch::PullError),
|
||||
PullError(#[from] crate::traits::fetch::RequestError),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
|
|
@ -16,6 +16,7 @@ mod m20240628_000001_add_followers_following_indexes;
|
|||
mod m20240628_000002_add_credentials_activated;
|
||||
mod m20240703_000001_add_audience_index;
|
||||
mod m20240703_000002_add_image_to_objects;
|
||||
mod m20240706_000001_add_error_to_jobs;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
|
@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
|
|||
Box::new(m20240628_000002_add_credentials_activated::Migration),
|
||||
Box::new(m20240703_000001_add_audience_index::Migration),
|
||||
Box::new(m20240703_000002_add_image_to_objects::Migration),
|
||||
Box::new(m20240706_000001_add_error_to_jobs::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ pub enum Jobs {
|
|||
Published,
|
||||
NotBefore,
|
||||
Attempt,
|
||||
Error, // added after
|
||||
}
|
||||
|
||||
|
||||
|
|
35
upub/migrations/src/m20240706_000001_add_error_to_jobs.rs
Normal file
35
upub/migrations/src/m20240706_000001_add_error_to_jobs.rs
Normal file
|
@ -0,0 +1,35 @@
|
|||
use sea_orm_migration::prelude::*;
|
||||
|
||||
use crate::m20240605_000001_add_jobs_table::Jobs;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(Jobs::Table)
|
||||
.add_column(ColumnDef::new(Jobs::Error).string().null())
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.alter_table(
|
||||
Table::alter()
|
||||
.table(Jobs::Table)
|
||||
.drop_column(Jobs::Error)
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -67,6 +67,7 @@ pub async fn post(
|
|||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0),
|
||||
payload: Set(Some(activity)),
|
||||
error: Set(None),
|
||||
};
|
||||
|
||||
model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
|
|
@ -79,7 +79,8 @@ pub async fn post(
|
|||
payload: Set(Some(activity)),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0)
|
||||
attempt: Set(0),
|
||||
error: Set(None),
|
||||
};
|
||||
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}};
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||
use httpsign::HttpSignature;
|
||||
use upub::traits::{fetch::PullError, Fetcher};
|
||||
use upub::traits::{fetch::RequestError, Fetcher};
|
||||
|
||||
use crate::ApiError;
|
||||
|
||||
|
@ -118,7 +118,7 @@ where
|
|||
.to_string();
|
||||
|
||||
match ctx.fetch_user(&user_id, ctx.db()).await {
|
||||
Err(PullError::Database(x)) => return Err(PullError::Database(x).into()),
|
||||
Err(RequestError::Database(x)) => return Err(RequestError::Database(x).into()),
|
||||
Err(e) => tracing::debug!("could not fetch {user_id} to verify signature: {e}"),
|
||||
Ok(user) => {
|
||||
let signature = http_signature.build_from_parts(parts);
|
||||
|
|
|
@ -17,7 +17,7 @@ pub enum ApiError {
|
|||
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
||||
// helps with debugging!
|
||||
#[error("fetch error: {0:?}")]
|
||||
FetchError(#[from] upub::traits::fetch::PullError),
|
||||
FetchError(#[from] upub::traits::fetch::RequestError),
|
||||
|
||||
// wrapper error to return arbitraty status codes
|
||||
#[error("{0}")]
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use sea_orm::EntityTrait;
|
||||
use reqwest::Method;
|
||||
|
||||
use apb::{LD, ActivityMut};
|
||||
|
@ -59,16 +58,11 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
|||
return Ok(());
|
||||
};
|
||||
|
||||
if let Err(e) = Context::request(
|
||||
Context::request(
|
||||
Method::POST, job.target.as_deref().unwrap_or(""),
|
||||
Some(&serde_json::to_string(&payload.ld_context()).unwrap()),
|
||||
&job.actor, &key, ctx.domain()
|
||||
).await {
|
||||
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
|
||||
model::job::Entity::insert(job.clone().repeat())
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
}
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use reqwest::StatusCode;
|
||||
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
|
||||
|
||||
use upub::{model, traits::{fetch::PullError, process::ProcessorError}, Context};
|
||||
use upub::{model, traits::{fetch::RequestError, process::ProcessorError}, Context};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum JobError {
|
||||
|
@ -17,11 +17,11 @@ pub enum JobError {
|
|||
#[error("malformed job: missing payload")]
|
||||
MissingPayload,
|
||||
|
||||
#[error("no available implementation to process job")]
|
||||
Unprocessable,
|
||||
|
||||
#[error("error processing activity: {0:?}")]
|
||||
ProcessorError(#[from] upub::traits::process::ProcessorError),
|
||||
|
||||
#[error("error delivering activity: {0}")]
|
||||
DeliveryError(#[from] upub::traits::fetch::RequestError),
|
||||
}
|
||||
|
||||
pub type JobResult<T> = Result<T, JobError>;
|
||||
|
@ -137,16 +137,16 @@ impl JobDispatcher for Context {
|
|||
tracing::error!("dropping job with malformed activity (missing field {f})"),
|
||||
Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) =>
|
||||
tracing::info!("dropping job already processed: {}", job.activity),
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::FORBIDDEN, e)))) =>
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::FORBIDDEN, e)))) =>
|
||||
tracing::warn!("dropping job because requested resource is not accessible: {e}"),
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::NOT_FOUND, e)))) =>
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::NOT_FOUND, e)))) =>
|
||||
tracing::warn!("dropping job because requested resource is not available: {e}"),
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::GONE, e)))) =>
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::GONE, e)))) =>
|
||||
tracing::warn!("dropping job because requested resource is no longer available: {e}"),
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Malformed(f)))) =>
|
||||
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Malformed(f)))) =>
|
||||
tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"),
|
||||
Err(e) => {
|
||||
if let JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(status, ref e))) = e {
|
||||
if let JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(status, ref e))) = e {
|
||||
// TODO maybe convert this in generic .is_client_error() check, but excluding 401s
|
||||
// and 400s because we want to retry those. also maybe 406s? idk theres a lot i
|
||||
// just want to drop lemmy.cafe jobs
|
||||
|
@ -156,7 +156,7 @@ impl JobDispatcher for Context {
|
|||
}
|
||||
}
|
||||
tracing::error!("failed processing job '{}': {e}", job.activity);
|
||||
let active = job.clone().repeat();
|
||||
let active = job.clone().repeat(Some(e.to_string()));
|
||||
let mut count = 0;
|
||||
loop {
|
||||
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {
|
||||
|
|
Loading…
Reference in a new issue