From 8251e3f550950767a8db299dcb3024229bd5fff7 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 30 May 2024 22:17:26 +0200 Subject: [PATCH] chore: fetcher rework --- src/cli/update.rs | 5 +- src/errors.rs | 12 ++ src/server/fetcher.rs | 247 ++++++++++++++++++++++++------------------ 3 files changed, 159 insertions(+), 105 deletions(-) diff --git a/src/cli/update.rs b/src/cli/update.rs index ce0faf3f..9d757895 100644 --- a/src/cli/update.rs +++ b/src/cli/update.rs @@ -16,9 +16,10 @@ pub async fn update_users(ctx: crate::server::Context, days: i64) -> crate::Resu while let Some(user) = stream.try_next().await? { if ctx.is_local(&user.id) { continue } - match ctx.pull_user(&user.id).await { + match ctx.pull(&user.id).await.map(|x| x.actor()) { Err(e) => tracing::warn!("could not update user {}: {e}", user.id), - Ok(doc) => match crate::model::actor::ActiveModel::new(&doc) { + Ok(Err(e)) => tracing::warn!("could not update user {}: {e}", user.id), + Ok(Ok(doc)) => match crate::model::actor::ActiveModel::new(&doc) { Ok(mut u) => { u.internal = Set(user.internal); u.updated = Set(chrono::Utc::now()); diff --git a/src/errors.rs b/src/errors.rs index 16aef3d0..b173ab93 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -28,6 +28,9 @@ pub enum UpubError { #[error("invalid base64 string: {0:?}")] Base64(#[from] base64::DecodeError), + #[error("type mismatch on object: expected {0:?}, found {1:?}")] + Mismatch(apb::ObjectType, apb::ObjectType), + // TODO this isn't really an error but i need to redirect from some routes so this allows me to // keep the type hints on the return type, still what the hell!!!! #[error("redirecting to {0}")] @@ -108,6 +111,15 @@ impl axum::response::IntoResponse for UpubError { "description": format!("missing required field from request: '{}'", x.0), })) ).into_response(), + UpubError::Mismatch(expected, found) => ( + axum::http::StatusCode::UNPROCESSABLE_ENTITY, + axum::Json(serde_json::json!({ + "error": "type", + "expected": expected.as_ref().to_string(), + "found": found.as_ref().to_string(), + "description": self.to_string(), + })) + ).into_response(), _ => ( StatusCode::INTERNAL_SERVER_ERROR, axum::Json(serde_json::json!({ diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 83d1adcc..82229b93 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -9,20 +9,57 @@ use crate::{errors::UpubError, model, VERSION}; use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context}; +pub enum PullResult { + Actor(T), + Activity(T), + Object(T), +} + +impl PullResult { + pub fn actor(self) -> crate::Result { + match self { + Self::Actor(x) => Ok(x), + Self::Activity(x) => Err(UpubError::Mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), + Self::Object(x) => Err(UpubError::Mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))), + } + } + + pub fn activity(self) -> crate::Result { + match self { + Self::Actor(x) => Err(UpubError::Mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), + Self::Activity(x) => Ok(x), + Self::Object(x) => Err(UpubError::Mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))), + } + } + + pub fn object(self) -> crate::Result { + match self { + Self::Actor(x) => Err(UpubError::Mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))), + Self::Activity(x) => Err(UpubError::Mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))), + Self::Object(x) => Ok(x), + } + } +} + #[axum::async_trait] pub trait Fetcher { + async fn pull(&self, id: &str) -> crate::Result>; async fn webfinger(&self, user: &str, host: &str) -> crate::Result; async fn fetch_domain(&self, domain: &str) -> crate::Result; async fn fetch_user(&self, id: &str) -> crate::Result; - async fn pull_user(&self, id: &str) -> crate::Result; - - async fn fetch_object(&self, id: &str) -> crate::Result; - async fn pull_object(&self, id: &str) -> crate::Result; + async fn resolve_user(&self, actor: serde_json::Value) -> crate::Result; async fn fetch_activity(&self, id: &str) -> crate::Result; - async fn pull_activity(&self, id: &str) -> crate::Result; + async fn resolve_activity(&self, activity: serde_json::Value) -> crate::Result; + + async fn fetch_object(&self, id: &str) -> crate::Result { self.fetch_object_r(id, 0).await } + #[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> crate::Result { self.resolve_object_r(object, 0).await } + + async fn fetch_object_r(&self, id: &str, depth: i32) -> crate::Result; + async fn resolve_object_r(&self, object: serde_json::Value, depth: i32) -> crate::Result; + async fn fetch_thread(&self, id: &str) -> crate::Result<()>; @@ -85,6 +122,28 @@ pub trait Fetcher { #[axum::async_trait] impl Fetcher for Context { + async fn pull(&self, id: &str) -> crate::Result> { + let _domain = self.fetch_domain(&Context::server(id)).await?; + + let document = Self::request( + Method::GET, id, None, + &format!("https://{}", self.domain()), self.pkey(), self.domain(), + ) + .await? + .json::() + .await?; + + match document.object_type() { + None => Err(UpubError::bad_request()), + Some(apb::ObjectType::Collection(_)) => Err(UpubError::unprocessable()), + Some(apb::ObjectType::Tombstone) => Err(UpubError::not_found()), + Some(apb::ObjectType::Activity(_)) => Ok(PullResult::Activity(document)), + Some(apb::ObjectType::Actor(_)) => Ok(PullResult::Actor(document)), + _ => Ok(PullResult::Object(document)), + } + } + + async fn webfinger(&self, user: &str, host: &str) -> crate::Result { let subject = format!("acct:{user}@{host}"); let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}"); @@ -165,15 +224,39 @@ impl Fetcher for Context { Ok(instance_model) } - async fn fetch_user(&self, id: &str) -> crate::Result { - if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { - return Ok(x); // already in db, easy + async fn resolve_user(&self, mut document: serde_json::Value) -> crate::Result { + let id = document.id().ok_or_else(|| UpubError::field("id"))?.to_string(); + + // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time + if let Some(followers_url) = &document.followers().id() { + let req = Self::request( + Method::GET, followers_url, None, + &format!("https://{}", self.domain()), self.pkey(), self.domain(), + ).await; + if let Ok(res) = req { + if let Ok(user_followers) = res.json::().await { + if let Some(total) = user_followers.total_items() { + document = document.set_followers_count(Some(total)); + } + } + } } - let _domain = self.fetch_domain(&Context::server(id)).await?; + if let Some(following_url) = &document.following().id() { + let req = Self::request( + Method::GET, following_url, None, + &format!("https://{}", self.domain()), self.pkey(), self.domain(), + ).await; + if let Ok(res) = req { + if let Ok(user_following) = res.json::().await { + if let Some(total) = user_following.total_items() { + document = document.set_following_count(Some(total)); + } + } + } + } - let user_document = self.pull_user(id).await?; - let user_model = model::actor::ActiveModel::new(&user_document)?; + let user_model = model::actor::ActiveModel::new(&document)?; // TODO this may fail: while fetching, remote server may fetch our service actor. // if it does so with http signature, we will fetch that actor in background @@ -182,48 +265,21 @@ impl Fetcher for Context { // TODO fetch it back to get the internal id Ok( - model::actor::Entity::find_by_ap_id(id) + model::actor::Entity::find_by_ap_id(&id) .one(self.db()) .await? .ok_or_else(UpubError::internal_server_error)? ) } - async fn pull_user(&self, id: &str) -> crate::Result { - let mut user = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), - ).await?.json::().await?; - - // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs - if let Some(followers_url) = &user.followers().id() { - let req = Self::request( - Method::GET, followers_url, None, - &format!("https://{}", self.domain()), self.pkey(), self.domain(), - ).await; - if let Ok(res) = req { - if let Ok(user_followers) = res.json::().await { - if let Some(total) = user_followers.total_items() { - user = user.set_followers_count(Some(total)); - } - } - } + async fn fetch_user(&self, id: &str) -> crate::Result { + if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { + return Ok(x); // already in db, easy } - if let Some(following_url) = &user.following().id() { - let req = Self::request( - Method::GET, following_url, None, - &format!("https://{}", self.domain()), self.pkey(), self.domain(), - ).await; - if let Ok(res) = req { - if let Ok(user_following) = res.json::().await { - if let Some(total) = user_following.total_items() { - user = user.set_following_count(Some(total)); - } - } - } - } + let document = self.pull(id).await?.actor()?; - Ok(user) + self.resolve_user(document).await } async fn fetch_activity(&self, id: &str) -> crate::Result { @@ -231,22 +287,13 @@ impl Fetcher for Context { return Ok(x); // already in db, easy } - let _domain = self.fetch_domain(&Context::server(id)).await?; + let activity = self.pull(id).await?.activity()?; - let activity_document = self.pull_activity(id).await?; - let activity_model = self.insert_activity(activity_document, Some(Context::server(id))).await?; - - let addressed = activity_model.addressed(); - let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?; - - Ok(activity_model) + self.resolve_activity(activity).await } - async fn pull_activity(&self, id: &str) -> crate::Result { - let activity = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), - ).await?.json::().await?; + async fn resolve_activity(&self, activity: serde_json::Value) -> crate::Result { + let id = activity.id().ok_or_else(|| UpubError::field("id"))?.to_string(); if let Some(activity_actor) = activity.actor().id() { if let Err(e) = self.fetch_user(&activity_actor).await { @@ -260,7 +307,13 @@ impl Fetcher for Context { } } - Ok(activity) + let activity_model = self.insert_activity(activity, Some(Context::server(&id))).await?; + + let addressed = activity_model.addressed(); + let expanded_addresses = self.expand_addressing(addressed).await?; + self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?; + + Ok(activity_model) } async fn fetch_thread(&self, _id: &str) -> crate::Result<()> { @@ -268,62 +321,50 @@ impl Fetcher for Context { todo!() } - async fn fetch_object(&self, id: &str) -> crate::Result { - fetch_object_inner(self, id, 0).await + async fn fetch_object_r(&self, id: &str, depth: i32) -> crate::Result { + if let Some(x) = model::object::Entity::find_by_ap_id(id).one(self.db()).await? { + return Ok(x); // already in db, easy + } + + let object = self.pull(id).await?.object()?; + + self.resolve_object_r(object, depth).await } - async fn pull_object(&self, id: &str) -> crate::Result { - Ok( - Context::request( - Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), - ) - .await? - .json::() - .await? - ) - } -} + async fn resolve_object_r(&self, object: serde_json::Value, depth: i32) -> crate::Result { + let id = object.id().ok_or_else(|| UpubError::field("id"))?.to_string(); -#[async_recursion::async_recursion] -async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result { - if let Some(x) = model::object::Entity::find_by_ap_id(id).one(ctx.db()).await? { - return Ok(x); // already in db, easy - } - - let _domain = ctx.fetch_domain(&Context::server(id)).await?; - - let object = ctx.pull_object(id).await?; - - if let Some(oid) = object.id() { - if oid != id { - if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? { - return Ok(x); // already in db, but with id different that given url + if let Some(oid) = object.id() { + if oid != id { + if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(self.db()).await? { + return Ok(x); // already in db, but with id different that given url + } } } - } - if let Some(attributed_to) = object.attributed_to().id() { - if let Err(e) = ctx.fetch_user(&attributed_to).await { - tracing::warn!("could not get actor of fetched object: {e}"); + if let Some(attributed_to) = object.attributed_to().id() { + if let Err(e) = self.fetch_user(&attributed_to).await { + tracing::warn!("could not get actor of fetched object: {e}"); + } } - } - let addressed = object.addressed(); + let addressed = object.addressed(); - if let Some(reply) = object.in_reply_to().id() { - if depth <= 16 { - fetch_object_inner(ctx, &reply, depth + 1).await?; - } else { - tracing::warn!("thread deeper than 16, giving up fetching more replies"); + if let Some(reply) = object.in_reply_to().id() { + if depth <= 16 { + self.fetch_object_r(&reply, depth + 1).await?; + } else { + tracing::warn!("thread deeper than 16, giving up fetching more replies"); + } } + + let object_model = self.insert_object(object, None).await?; + + let expanded_addresses = self.expand_addressing(addressed).await?; + self.address_to(None, Some(object_model.internal), &expanded_addresses).await?; + + Ok(object_model) } - - let object_model = ctx.insert_object(object, None).await?; - - let expanded_addresses = ctx.expand_addressing(addressed).await?; - ctx.address_to(None, Some(object_model.internal), &expanded_addresses).await?; - - Ok(object_model) } #[axum::async_trait]