Compare commits
3 commits
90f483a0ba
...
b086fe969f
Author | SHA1 | Date | |
---|---|---|---|
b086fe969f | |||
11b4ae8678 | |||
9adeff6fbf |
17 changed files with 115 additions and 72 deletions
|
@ -1,7 +1,7 @@
|
||||||
use sea_orm::{EntityTrait, TransactionTrait};
|
use sea_orm::{EntityTrait, TransactionTrait};
|
||||||
use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer};
|
use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Normalizer};
|
||||||
|
|
||||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> {
|
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), RequestError> {
|
||||||
use apb::Base;
|
use apb::Base;
|
||||||
|
|
||||||
let mut node = apb::Node::link(uri.to_string());
|
let mut node = apb::Node::link(uri.to_string());
|
||||||
|
|
|
@ -81,6 +81,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
payload: Set(Some(undo_activity)),
|
payload: Set(Some(undo_activity)),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::info!("undoing {}", activity.id);
|
tracing::info!("undoing {}", activity.id);
|
||||||
|
@ -121,6 +122,7 @@ pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Res
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
payload: Set(Some(undo_activity)),
|
payload: Set(Some(undo_activity)),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::info!("deleting {}", object.id);
|
tracing::info!("deleting {}", object.id);
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use apb::{ActivityMut, BaseMut, ObjectMut};
|
use apb::{ActivityMut, BaseMut, ObjectMut};
|
||||||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait};
|
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait};
|
||||||
use upub::traits::{fetch::PullError, Fetcher};
|
use upub::traits::{fetch::RequestError, Fetcher};
|
||||||
|
|
||||||
#[derive(Debug, Clone, clap::Subcommand)]
|
#[derive(Debug, Clone, clap::Subcommand)]
|
||||||
/// available actions to take on relays
|
/// available actions to take on relays
|
||||||
|
@ -29,7 +29,7 @@ pub enum RelayCommand {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> {
|
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), RequestError> {
|
||||||
let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
|
let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
|
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
|
||||||
|
@ -84,6 +84,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
published: Set(chrono::Utc::now()),
|
published: Set(chrono::Utc::now()),
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
tracing::info!("following relay {actor}");
|
tracing::info!("following relay {actor}");
|
||||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
@ -119,6 +120,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
published: Set(chrono::Utc::now()),
|
published: Set(chrono::Utc::now()),
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
tracing::info!("accepting relay {actor}");
|
tracing::info!("accepting relay {actor}");
|
||||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
@ -155,6 +157,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
published: Set(chrono::Utc::now()),
|
published: Set(chrono::Utc::now()),
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
tracing::info!("unfollowing relay {actor}");
|
tracing::info!("unfollowing relay {actor}");
|
||||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
@ -190,6 +193,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
published: Set(chrono::Utc::now()),
|
published: Set(chrono::Utc::now()),
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
tracing::info!("unfollowing relay {actor}");
|
tracing::info!("unfollowing relay {actor}");
|
||||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
|
use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
|
||||||
use upub::traits::{fetch::PullError, Fetcher};
|
use upub::traits::{fetch::RequestError, Fetcher};
|
||||||
|
|
||||||
pub async fn thread(ctx: upub::Context) -> Result<(), PullError> {
|
pub async fn thread(ctx: upub::Context) -> Result<(), RequestError> {
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
let db = ctx.db();
|
let db = ctx.db();
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ pub struct Model {
|
||||||
pub published: ChronoDateTimeUtc,
|
pub published: ChronoDateTimeUtc,
|
||||||
pub not_before: ChronoDateTimeUtc,
|
pub not_before: ChronoDateTimeUtc,
|
||||||
pub attempt: i16,
|
pub attempt: i16,
|
||||||
|
pub error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
@ -41,7 +42,7 @@ impl Model {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn repeat(self) -> ActiveModel {
|
pub fn repeat(self, error: Option<String>) -> ActiveModel {
|
||||||
ActiveModel {
|
ActiveModel {
|
||||||
internal: sea_orm::ActiveValue::NotSet,
|
internal: sea_orm::ActiveValue::NotSet,
|
||||||
job_type: sea_orm::ActiveValue::Set(self.job_type),
|
job_type: sea_orm::ActiveValue::Set(self.job_type),
|
||||||
|
@ -52,6 +53,7 @@ impl Model {
|
||||||
activity: sea_orm::ActiveValue::Set(self.activity),
|
activity: sea_orm::ActiveValue::Set(self.activity),
|
||||||
published: sea_orm::ActiveValue::Set(self.published),
|
published: sea_orm::ActiveValue::Set(self.published),
|
||||||
attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
|
attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
|
||||||
|
error: sea_orm::ActiveValue::Set(error),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ impl Addresser for crate::Context {
|
||||||
published: Set(chrono::Utc::now()),
|
published: Set(chrono::Utc::now()),
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
|
error: Set(None),
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),
|
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),
|
||||||
|
|
|
@ -17,7 +17,7 @@ pub enum Pull<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum PullError {
|
pub enum RequestError {
|
||||||
#[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")]
|
#[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")]
|
||||||
Mismatch(apb::ObjectType, apb::ObjectType),
|
Mismatch(apb::ObjectType, apb::ObjectType),
|
||||||
|
|
||||||
|
@ -46,33 +46,33 @@ pub enum PullError {
|
||||||
HttpSignature(#[from] httpsign::HttpSignatureError),
|
HttpSignature(#[from] httpsign::HttpSignatureError),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PullError {
|
impl RequestError {
|
||||||
fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self {
|
fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self {
|
||||||
PullError::Mismatch(expected, found)
|
RequestError::Mismatch(expected, found)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Pull<serde_json::Value> {
|
impl Pull<serde_json::Value> {
|
||||||
pub fn actor(self) -> Result<serde_json::Value, PullError> {
|
pub fn actor(self) -> Result<serde_json::Value, RequestError> {
|
||||||
match self {
|
match self {
|
||||||
Self::Actor(x) => Ok(x),
|
Self::Actor(x) => Ok(x),
|
||||||
Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
||||||
Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn activity(self) -> Result<serde_json::Value, PullError> {
|
pub fn activity(self) -> Result<serde_json::Value, RequestError> {
|
||||||
match self {
|
match self {
|
||||||
Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
||||||
Self::Activity(x) => Ok(x),
|
Self::Activity(x) => Ok(x),
|
||||||
Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn object(self) -> Result<serde_json::Value, PullError> {
|
pub fn object(self) -> Result<serde_json::Value, RequestError> {
|
||||||
match self {
|
match self {
|
||||||
Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
|
||||||
Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
|
||||||
Self::Object(x) => Ok(x),
|
Self::Object(x) => Ok(x),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,24 +80,24 @@ impl Pull<serde_json::Value> {
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait Fetcher {
|
pub trait Fetcher {
|
||||||
async fn pull(&self, id: &str) -> Result<Pull<serde_json::Value>, PullError> { self.pull_r(id, 0).await }
|
async fn pull(&self, id: &str) -> Result<Pull<serde_json::Value>, RequestError> { self.pull_r(id, 0).await }
|
||||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError>;
|
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, RequestError>;
|
||||||
|
|
||||||
|
|
||||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>;
|
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, RequestError>;
|
||||||
|
|
||||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError>;
|
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>;
|
||||||
|
|
||||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>;
|
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
||||||
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>;
|
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
||||||
|
|
||||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>;
|
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
||||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>;
|
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
||||||
|
|
||||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>;
|
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError>;
|
||||||
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>;
|
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError>;
|
||||||
|
|
||||||
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), PullError>;
|
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError>;
|
||||||
|
|
||||||
async fn request(
|
async fn request(
|
||||||
method: reqwest::Method,
|
method: reqwest::Method,
|
||||||
|
@ -106,7 +106,7 @@ pub trait Fetcher {
|
||||||
from: &str,
|
from: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
domain: &str,
|
domain: &str,
|
||||||
) -> Result<Response, PullError> {
|
) -> Result<Response, RequestError> {
|
||||||
let host = crate::Context::server(url);
|
let host = crate::Context::server(url);
|
||||||
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
|
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
|
||||||
let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
|
let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
|
||||||
|
@ -147,7 +147,7 @@ pub trait Fetcher {
|
||||||
match response.error_for_status_ref() {
|
match response.error_for_status_ref() {
|
||||||
Ok(_) => Ok(response),
|
Ok(_) => Ok(response),
|
||||||
Err(e) =>
|
Err(e) =>
|
||||||
Err(PullError::Fetch(
|
Err(RequestError::Fetch(
|
||||||
e.status().unwrap_or_default(),
|
e.status().unwrap_or_default(),
|
||||||
response.text().await?,
|
response.text().await?,
|
||||||
)),
|
)),
|
||||||
|
@ -158,7 +158,7 @@ pub trait Fetcher {
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Fetcher for crate::Context {
|
impl Fetcher for crate::Context {
|
||||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> {
|
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, RequestError> {
|
||||||
tracing::debug!("fetching {id}");
|
tracing::debug!("fetching {id}");
|
||||||
// let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
|
// let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
|
||||||
|
|
||||||
|
@ -173,14 +173,14 @@ impl Fetcher for crate::Context {
|
||||||
let doc_id = document.id()?;
|
let doc_id = document.id()?;
|
||||||
if id != doc_id {
|
if id != doc_id {
|
||||||
if depth >= self.cfg().security.max_id_redirects {
|
if depth >= self.cfg().security.max_id_redirects {
|
||||||
return Err(PullError::TooManyRedirects);
|
return Err(RequestError::TooManyRedirects);
|
||||||
}
|
}
|
||||||
return self.pull(doc_id).await;
|
return self.pull(doc_id).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
match document.object_type()? {
|
match document.object_type()? {
|
||||||
apb::ObjectType::Collection(x) => Err(PullError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))),
|
apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))),
|
||||||
apb::ObjectType::Tombstone => Err(PullError::Tombstone),
|
apb::ObjectType::Tombstone => Err(RequestError::Tombstone),
|
||||||
apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)),
|
apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)),
|
||||||
apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)),
|
apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)),
|
||||||
_ => Ok(Pull::Object(document)),
|
_ => Ok(Pull::Object(document)),
|
||||||
|
@ -188,7 +188,7 @@ impl Fetcher for crate::Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError> {
|
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, RequestError> {
|
||||||
let subject = format!("acct:{user}@{host}");
|
let subject = format!("acct:{user}@{host}");
|
||||||
let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}");
|
let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}");
|
||||||
let resource = reqwest::Client::new()
|
let resource = reqwest::Client::new()
|
||||||
|
@ -220,7 +220,7 @@ impl Fetcher for crate::Context {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError> {
|
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError> {
|
||||||
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? {
|
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
@ -271,7 +271,7 @@ impl Fetcher for crate::Context {
|
||||||
Ok(instance_model)
|
Ok(instance_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> {
|
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
||||||
let id = document.id()?.to_string();
|
let id = document.id()?.to_string();
|
||||||
|
|
||||||
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
|
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
|
||||||
|
@ -321,7 +321,7 @@ impl Fetcher for crate::Context {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> {
|
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
||||||
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? {
|
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
@ -331,7 +331,7 @@ impl Fetcher for crate::Context {
|
||||||
self.resolve_user(document, tx).await
|
self.resolve_user(document, tx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> {
|
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
|
||||||
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? {
|
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
@ -341,7 +341,7 @@ impl Fetcher for crate::Context {
|
||||||
self.resolve_activity(activity, tx).await
|
self.resolve_activity(activity, tx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> {
|
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
|
||||||
let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?;
|
let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?;
|
||||||
|
|
||||||
if let Ok(activity_actor) = activity.actor().id() {
|
if let Ok(activity_actor) = activity.actor().id() {
|
||||||
|
@ -362,22 +362,22 @@ impl Fetcher for crate::Context {
|
||||||
Ok(activity_model)
|
Ok(activity_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), PullError> {
|
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> {
|
||||||
// crawl_replies(self, id, 0).await
|
// crawl_replies(self, id, 0).await
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||||
fetch_object_r(self, id, 0, tx).await
|
fetch_object_r(self, id, 0, tx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||||
resolve_object_r(self, object, 0, tx).await
|
resolve_object_r(self, object, 0, tx).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_recursion::async_recursion]
|
#[async_recursion::async_recursion]
|
||||||
async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||||
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? {
|
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
@ -387,7 +387,7 @@ async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl Co
|
||||||
resolve_object_r(ctx, object, depth, tx).await
|
resolve_object_r(ctx, object, depth, tx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
|
async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||||
let id = object.id()?.to_string();
|
let id = object.id()?.to_string();
|
||||||
|
|
||||||
if let Ok(oid) = object.id() {
|
if let Ok(oid) = object.id() {
|
||||||
|
@ -420,12 +420,12 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait Fetchable : Sync + Send {
|
pub trait Fetchable : Sync + Send {
|
||||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>;
|
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Fetchable for apb::Node<serde_json::Value> {
|
impl Fetchable for apb::Node<serde_json::Value> {
|
||||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> {
|
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> {
|
||||||
if let apb::Node::Link(uri) = self {
|
if let apb::Node::Link(uri) = self {
|
||||||
if let Ok(href) = uri.href() {
|
if let Ok(href) = uri.href() {
|
||||||
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
|
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||||
|
|
|
@ -26,7 +26,7 @@ pub enum ProcessorError {
|
||||||
NormalizerError(#[from] crate::traits::normalize::NormalizerError),
|
NormalizerError(#[from] crate::traits::normalize::NormalizerError),
|
||||||
|
|
||||||
#[error("failed fetching resource: {0:?}")]
|
#[error("failed fetching resource: {0:?}")]
|
||||||
PullError(#[from] crate::traits::fetch::PullError),
|
PullError(#[from] crate::traits::fetch::RequestError),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
|
|
|
@ -16,6 +16,7 @@ mod m20240628_000001_add_followers_following_indexes;
|
||||||
mod m20240628_000002_add_credentials_activated;
|
mod m20240628_000002_add_credentials_activated;
|
||||||
mod m20240703_000001_add_audience_index;
|
mod m20240703_000001_add_audience_index;
|
||||||
mod m20240703_000002_add_image_to_objects;
|
mod m20240703_000002_add_image_to_objects;
|
||||||
|
mod m20240706_000001_add_error_to_jobs;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
|
@ -39,6 +40,7 @@ impl MigratorTrait for Migrator {
|
||||||
Box::new(m20240628_000002_add_credentials_activated::Migration),
|
Box::new(m20240628_000002_add_credentials_activated::Migration),
|
||||||
Box::new(m20240703_000001_add_audience_index::Migration),
|
Box::new(m20240703_000001_add_audience_index::Migration),
|
||||||
Box::new(m20240703_000002_add_image_to_objects::Migration),
|
Box::new(m20240703_000002_add_image_to_objects::Migration),
|
||||||
|
Box::new(m20240706_000001_add_error_to_jobs::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ pub enum Jobs {
|
||||||
Published,
|
Published,
|
||||||
NotBefore,
|
NotBefore,
|
||||||
Attempt,
|
Attempt,
|
||||||
|
Error, // added after
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
35
upub/migrations/src/m20240706_000001_add_error_to_jobs.rs
Normal file
35
upub/migrations/src/m20240706_000001_add_error_to_jobs.rs
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
use crate::m20240605_000001_add_jobs_table::Jobs;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Jobs::Table)
|
||||||
|
.add_column(ColumnDef::new(Jobs::Error).string().null())
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Jobs::Table)
|
||||||
|
.drop_column(Jobs::Error)
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -67,6 +67,7 @@ pub async fn post(
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
attempt: Set(0),
|
attempt: Set(0),
|
||||||
payload: Set(Some(activity)),
|
payload: Set(Some(activity)),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
model::job::Entity::insert(job).exec(ctx.db()).await?;
|
model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
|
|
@ -79,7 +79,8 @@ pub async fn post(
|
||||||
payload: Set(Some(activity)),
|
payload: Set(Some(activity)),
|
||||||
published: Set(chrono::Utc::now()),
|
published: Set(chrono::Utc::now()),
|
||||||
not_before: Set(chrono::Utc::now()),
|
not_before: Set(chrono::Utc::now()),
|
||||||
attempt: Set(0)
|
attempt: Set(0),
|
||||||
|
error: Set(None),
|
||||||
};
|
};
|
||||||
|
|
||||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}};
|
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}};
|
||||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||||
use httpsign::HttpSignature;
|
use httpsign::HttpSignature;
|
||||||
use upub::traits::{fetch::PullError, Fetcher};
|
use upub::traits::{fetch::RequestError, Fetcher};
|
||||||
|
|
||||||
use crate::ApiError;
|
use crate::ApiError;
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ where
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
match ctx.fetch_user(&user_id, ctx.db()).await {
|
match ctx.fetch_user(&user_id, ctx.db()).await {
|
||||||
Err(PullError::Database(x)) => return Err(PullError::Database(x).into()),
|
Err(RequestError::Database(x)) => return Err(RequestError::Database(x).into()),
|
||||||
Err(e) => tracing::debug!("could not fetch {user_id} to verify signature: {e}"),
|
Err(e) => tracing::debug!("could not fetch {user_id} to verify signature: {e}"),
|
||||||
Ok(user) => {
|
Ok(user) => {
|
||||||
let signature = http_signature.build_from_parts(parts);
|
let signature = http_signature.build_from_parts(parts);
|
||||||
|
|
|
@ -17,7 +17,7 @@ pub enum ApiError {
|
||||||
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
||||||
// helps with debugging!
|
// helps with debugging!
|
||||||
#[error("fetch error: {0:?}")]
|
#[error("fetch error: {0:?}")]
|
||||||
FetchError(#[from] upub::traits::fetch::PullError),
|
FetchError(#[from] upub::traits::fetch::RequestError),
|
||||||
|
|
||||||
// wrapper error to return arbitraty status codes
|
// wrapper error to return arbitraty status codes
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use sea_orm::EntityTrait;
|
|
||||||
use reqwest::Method;
|
use reqwest::Method;
|
||||||
|
|
||||||
use apb::{LD, ActivityMut};
|
use apb::{LD, ActivityMut};
|
||||||
|
@ -59,16 +58,11 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = Context::request(
|
Context::request(
|
||||||
Method::POST, job.target.as_deref().unwrap_or(""),
|
Method::POST, job.target.as_deref().unwrap_or(""),
|
||||||
Some(&serde_json::to_string(&payload.ld_context()).unwrap()),
|
Some(&serde_json::to_string(&payload.ld_context()).unwrap()),
|
||||||
&job.actor, &key, ctx.domain()
|
&job.actor, &key, ctx.domain()
|
||||||
).await {
|
).await?;
|
||||||
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
|
|
||||||
model::job::Entity::insert(job.clone().repeat())
|
|
||||||
.exec(ctx.db())
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
|
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
|
||||||
|
|
||||||
use upub::{model, traits::{fetch::PullError, process::ProcessorError}, Context};
|
use upub::{model, traits::{fetch::RequestError, process::ProcessorError}, Context};
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum JobError {
|
pub enum JobError {
|
||||||
|
@ -17,11 +17,11 @@ pub enum JobError {
|
||||||
#[error("malformed job: missing payload")]
|
#[error("malformed job: missing payload")]
|
||||||
MissingPayload,
|
MissingPayload,
|
||||||
|
|
||||||
#[error("no available implementation to process job")]
|
|
||||||
Unprocessable,
|
|
||||||
|
|
||||||
#[error("error processing activity: {0:?}")]
|
#[error("error processing activity: {0:?}")]
|
||||||
ProcessorError(#[from] upub::traits::process::ProcessorError),
|
ProcessorError(#[from] upub::traits::process::ProcessorError),
|
||||||
|
|
||||||
|
#[error("error delivering activity: {0}")]
|
||||||
|
DeliveryError(#[from] upub::traits::fetch::RequestError),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type JobResult<T> = Result<T, JobError>;
|
pub type JobResult<T> = Result<T, JobError>;
|
||||||
|
@ -137,16 +137,16 @@ impl JobDispatcher for Context {
|
||||||
tracing::error!("dropping job with malformed activity (missing field {f})"),
|
tracing::error!("dropping job with malformed activity (missing field {f})"),
|
||||||
Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) =>
|
Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) =>
|
||||||
tracing::info!("dropping job already processed: {}", job.activity),
|
tracing::info!("dropping job already processed: {}", job.activity),
|
||||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::FORBIDDEN, e)))) =>
|
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::FORBIDDEN, e)))) =>
|
||||||
tracing::warn!("dropping job because requested resource is not accessible: {e}"),
|
tracing::warn!("dropping job because requested resource is not accessible: {e}"),
|
||||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::NOT_FOUND, e)))) =>
|
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::NOT_FOUND, e)))) =>
|
||||||
tracing::warn!("dropping job because requested resource is not available: {e}"),
|
tracing::warn!("dropping job because requested resource is not available: {e}"),
|
||||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::GONE, e)))) =>
|
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::GONE, e)))) =>
|
||||||
tracing::warn!("dropping job because requested resource is no longer available: {e}"),
|
tracing::warn!("dropping job because requested resource is no longer available: {e}"),
|
||||||
Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Malformed(f)))) =>
|
Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Malformed(f)))) =>
|
||||||
tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"),
|
tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if let JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(status, ref e))) = e {
|
if let JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(status, ref e))) = e {
|
||||||
// TODO maybe convert this in generic .is_client_error() check, but excluding 401s
|
// TODO maybe convert this in generic .is_client_error() check, but excluding 401s
|
||||||
// and 400s because we want to retry those. also maybe 406s? idk theres a lot i
|
// and 400s because we want to retry those. also maybe 406s? idk theres a lot i
|
||||||
// just want to drop lemmy.cafe jobs
|
// just want to drop lemmy.cafe jobs
|
||||||
|
@ -156,7 +156,7 @@ impl JobDispatcher for Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tracing::error!("failed processing job '{}': {e}", job.activity);
|
tracing::error!("failed processing job '{}': {e}", job.activity);
|
||||||
let active = job.clone().repeat();
|
let active = job.clone().repeat(Some(e.to_string()));
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
loop {
|
loop {
|
||||||
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {
|
match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {
|
||||||
|
|
Loading…
Reference in a new issue