diff --git a/upub/cli/src/fetch.rs b/upub/cli/src/fetch.rs index 6769097a..eeb2e3ee 100644 --- a/upub/cli/src/fetch.rs +++ b/upub/cli/src/fetch.rs @@ -1,7 +1,7 @@ use sea_orm::{EntityTrait, TransactionTrait}; -use upub::traits::{fetch::{Fetchable, PullError}, Addresser, Normalizer}; +use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Normalizer}; -pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { +pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), RequestError> { use apb::Base; let mut node = apb::Node::link(uri.to_string()); diff --git a/upub/cli/src/relay.rs b/upub/cli/src/relay.rs index 6dd4e402..40bfc62e 100644 --- a/upub/cli/src/relay.rs +++ b/upub/cli/src/relay.rs @@ -1,6 +1,6 @@ use apb::{ActivityMut, BaseMut, ObjectMut}; use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait}; -use upub::traits::{fetch::PullError, Fetcher}; +use upub::traits::{fetch::RequestError, Fetcher}; #[derive(Debug, Clone, clap::Subcommand)] /// available actions to take on relays @@ -29,7 +29,7 @@ pub enum RelayCommand { }, } -pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> { +pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), RequestError> { let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; diff --git a/upub/cli/src/thread.rs b/upub/cli/src/thread.rs index b22ddd0d..8ed847b7 100644 --- a/upub/cli/src/thread.rs +++ b/upub/cli/src/thread.rs @@ -1,7 +1,7 @@ use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; -use upub::traits::{fetch::PullError, Fetcher}; +use upub::traits::{fetch::RequestError, Fetcher}; -pub async fn thread(ctx: upub::Context) -> Result<(), PullError> { +pub async fn thread(ctx: upub::Context) -> Result<(), RequestError> { use futures::TryStreamExt; let db = ctx.db(); diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index 815e8d68..0406b92b 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -17,7 +17,7 @@ pub enum Pull { } #[derive(Debug, thiserror::Error)] -pub enum PullError { +pub enum RequestError { #[error("dereferenced resource ({0:?}) doesn't match requested type ({1:?})")] Mismatch(apb::ObjectType, apb::ObjectType), @@ -46,33 +46,33 @@ pub enum PullError { HttpSignature(#[from] httpsign::HttpSignatureError), } -impl PullError { +impl RequestError { fn mismatch(expected: apb::ObjectType, found: apb::ObjectType) -> Self { - PullError::Mismatch(expected, found) + RequestError::Mismatch(expected, found) } } impl Pull { - pub fn actor(self) -> Result { + pub fn actor(self) -> Result { match self { Self::Actor(x) => Ok(x), - Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), - Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))), + Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), + Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))), } } - pub fn activity(self) -> Result { + pub fn activity(self) -> Result { match self { - Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), + Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), Self::Activity(x) => Ok(x), - Self::Object(x) => Err(PullError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))), + Self::Object(x) => Err(RequestError::mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))), } } - pub fn object(self) -> Result { + pub fn object(self) -> Result { match self { - Self::Actor(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), - Self::Activity(x) => Err(PullError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), + Self::Actor(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), + Self::Activity(x) => Err(RequestError::mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), Self::Object(x) => Ok(x), } } @@ -80,24 +80,24 @@ impl Pull { #[async_trait::async_trait] pub trait Fetcher { - async fn pull(&self, id: &str) -> Result, PullError> { self.pull_r(id, 0).await } - async fn pull_r(&self, id: &str, depth: u32) -> Result, PullError>; + async fn pull(&self, id: &str) -> Result, RequestError> { self.pull_r(id, 0).await } + async fn pull_r(&self, id: &str, depth: u32) -> Result, RequestError>; - async fn webfinger(&self, user: &str, host: &str) -> Result, PullError>; + async fn webfinger(&self, user: &str, host: &str) -> Result, RequestError>; - async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result; + async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result; - async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result; - async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result; + async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result; + async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result; - async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result; - async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result; + async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result; + async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result; - async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result; - async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result; + async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result; + async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result; - async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), PullError>; + async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError>; async fn request( method: reqwest::Method, @@ -106,7 +106,7 @@ pub trait Fetcher { from: &str, key: &str, domain: &str, - ) -> Result { + ) -> Result { let host = crate::Context::server(url); let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT" let path = url.replace("https://", "").replace("http://", "").replace(&host, ""); @@ -147,7 +147,7 @@ pub trait Fetcher { match response.error_for_status_ref() { Ok(_) => Ok(response), Err(e) => - Err(PullError::Fetch( + Err(RequestError::Fetch( e.status().unwrap_or_default(), response.text().await?, )), @@ -158,7 +158,7 @@ pub trait Fetcher { #[async_trait::async_trait] impl Fetcher for crate::Context { - async fn pull_r(&self, id: &str, depth: u32) -> Result, PullError> { + async fn pull_r(&self, id: &str, depth: u32) -> Result, RequestError> { tracing::debug!("fetching {id}"); // let _domain = self.fetch_domain(&crate::Context::server(id)).await?; @@ -173,14 +173,14 @@ impl Fetcher for crate::Context { let doc_id = document.id()?; if id != doc_id { if depth >= self.cfg().security.max_id_redirects { - return Err(PullError::TooManyRedirects); + return Err(RequestError::TooManyRedirects); } return self.pull(doc_id).await; } match document.object_type()? { - apb::ObjectType::Collection(x) => Err(PullError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))), - apb::ObjectType::Tombstone => Err(PullError::Tombstone), + apb::ObjectType::Collection(x) => Err(RequestError::mismatch(apb::ObjectType::Object, apb::ObjectType::Collection(x))), + apb::ObjectType::Tombstone => Err(RequestError::Tombstone), apb::ObjectType::Activity(_) => Ok(Pull::Activity(document)), apb::ObjectType::Actor(_) => Ok(Pull::Actor(document)), _ => Ok(Pull::Object(document)), @@ -188,7 +188,7 @@ impl Fetcher for crate::Context { } - async fn webfinger(&self, user: &str, host: &str) -> Result, PullError> { + async fn webfinger(&self, user: &str, host: &str) -> Result, RequestError> { let subject = format!("acct:{user}@{host}"); let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}"); let resource = reqwest::Client::new() @@ -220,7 +220,7 @@ impl Fetcher for crate::Context { Ok(None) } - async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result { + async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result { if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? { return Ok(x); // already in db, easy } @@ -271,7 +271,7 @@ impl Fetcher for crate::Context { Ok(instance_model) } - async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result { + async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result { let id = document.id()?.to_string(); let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?; @@ -321,7 +321,7 @@ impl Fetcher for crate::Context { ) } - async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result { + async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result { if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? { return Ok(x); // already in db, easy } @@ -331,7 +331,7 @@ impl Fetcher for crate::Context { self.resolve_user(document, tx).await } - async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result { + async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result { if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? { return Ok(x); // already in db, easy } @@ -341,7 +341,7 @@ impl Fetcher for crate::Context { self.resolve_activity(activity, tx).await } - async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result { + async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result { let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?; if let Ok(activity_actor) = activity.actor().id() { @@ -362,22 +362,22 @@ impl Fetcher for crate::Context { Ok(activity_model) } - async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), PullError> { + async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> { // crawl_replies(self, id, 0).await todo!() } - async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result { + async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result { fetch_object_r(self, id, 0, tx).await } - async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result { + async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result { resolve_object_r(self, object, 0, tx).await } } #[async_recursion::async_recursion] -async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result { +async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result { if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? { return Ok(x); // already in db, easy } @@ -387,7 +387,7 @@ async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl Co resolve_object_r(ctx, object, depth, tx).await } -async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result { +async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result { let id = object.id()?.to_string(); if let Ok(oid) = object.id() { @@ -420,12 +420,12 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth #[async_trait::async_trait] pub trait Fetchable : Sync + Send { - async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>; + async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>; } #[async_trait::async_trait] impl Fetchable for apb::Node { - async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> { + async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> { if let apb::Node::Link(uri) = self { if let Ok(href) = uri.href() { *self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain()) diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 5756c901..3cf12ab4 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -26,7 +26,7 @@ pub enum ProcessorError { NormalizerError(#[from] crate::traits::normalize::NormalizerError), #[error("failed fetching resource: {0:?}")] - PullError(#[from] crate::traits::fetch::PullError), + PullError(#[from] crate::traits::fetch::RequestError), } #[async_trait::async_trait] diff --git a/upub/routes/src/auth.rs b/upub/routes/src/auth.rs index 8a3a4023..23995207 100644 --- a/upub/routes/src/auth.rs +++ b/upub/routes/src/auth.rs @@ -1,7 +1,7 @@ use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}}; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; use httpsign::HttpSignature; -use upub::traits::{fetch::PullError, Fetcher}; +use upub::traits::{fetch::RequestError, Fetcher}; use crate::ApiError; @@ -118,7 +118,7 @@ where .to_string(); match ctx.fetch_user(&user_id, ctx.db()).await { - Err(PullError::Database(x)) => return Err(PullError::Database(x).into()), + Err(RequestError::Database(x)) => return Err(RequestError::Database(x).into()), Err(e) => tracing::debug!("could not fetch {user_id} to verify signature: {e}"), Ok(user) => { let signature = http_signature.build_from_parts(parts); diff --git a/upub/routes/src/error.rs b/upub/routes/src/error.rs index a9970072..9b1f2221 100644 --- a/upub/routes/src/error.rs +++ b/upub/routes/src/error.rs @@ -17,7 +17,7 @@ pub enum ApiError { // TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut // helps with debugging! #[error("fetch error: {0:?}")] - FetchError(#[from] upub::traits::fetch::PullError), + FetchError(#[from] upub::traits::fetch::RequestError), // wrapper error to return arbitraty status codes #[error("{0}")] diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs index ff01f698..8805de63 100644 --- a/upub/worker/src/dispatcher.rs +++ b/upub/worker/src/dispatcher.rs @@ -1,7 +1,7 @@ use reqwest::StatusCode; use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder}; -use upub::{model, traits::{fetch::PullError, process::ProcessorError}, Context}; +use upub::{model, traits::{fetch::RequestError, process::ProcessorError}, Context}; #[derive(Debug, thiserror::Error)] pub enum JobError { @@ -137,16 +137,16 @@ impl JobDispatcher for Context { tracing::error!("dropping job with malformed activity (missing field {f})"), Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) => tracing::info!("dropping job already processed: {}", job.activity), - Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::FORBIDDEN, e)))) => + Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::FORBIDDEN, e)))) => tracing::warn!("dropping job because requested resource is not accessible: {e}"), - Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::NOT_FOUND, e)))) => + Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::NOT_FOUND, e)))) => tracing::warn!("dropping job because requested resource is not available: {e}"), - Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(StatusCode::GONE, e)))) => + Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::GONE, e)))) => tracing::warn!("dropping job because requested resource is no longer available: {e}"), - Err(JobError::ProcessorError(ProcessorError::PullError(PullError::Malformed(f)))) => + Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Malformed(f)))) => tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"), Err(e) => { - if let JobError::ProcessorError(ProcessorError::PullError(PullError::Fetch(status, ref e))) = e { + if let JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(status, ref e))) = e { // TODO maybe convert this in generic .is_client_error() check, but excluding 401s // and 400s because we want to retry those. also maybe 406s? idk theres a lot i // just want to drop lemmy.cafe jobs