chore: fetcher rework

This commit is contained in:
əlemi 2024-05-30 22:17:26 +02:00
parent e636afd283
commit 8251e3f550
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 159 additions and 105 deletions

View file

@ -16,9 +16,10 @@ pub async fn update_users(ctx: crate::server::Context, days: i64) -> crate::Resu
while let Some(user) = stream.try_next().await? { while let Some(user) = stream.try_next().await? {
if ctx.is_local(&user.id) { continue } if ctx.is_local(&user.id) { continue }
match ctx.pull_user(&user.id).await { match ctx.pull(&user.id).await.map(|x| x.actor()) {
Err(e) => tracing::warn!("could not update user {}: {e}", user.id), Err(e) => tracing::warn!("could not update user {}: {e}", user.id),
Ok(doc) => match crate::model::actor::ActiveModel::new(&doc) { Ok(Err(e)) => tracing::warn!("could not update user {}: {e}", user.id),
Ok(Ok(doc)) => match crate::model::actor::ActiveModel::new(&doc) {
Ok(mut u) => { Ok(mut u) => {
u.internal = Set(user.internal); u.internal = Set(user.internal);
u.updated = Set(chrono::Utc::now()); u.updated = Set(chrono::Utc::now());

View file

@ -28,6 +28,9 @@ pub enum UpubError {
#[error("invalid base64 string: {0:?}")] #[error("invalid base64 string: {0:?}")]
Base64(#[from] base64::DecodeError), Base64(#[from] base64::DecodeError),
#[error("type mismatch on object: expected {0:?}, found {1:?}")]
Mismatch(apb::ObjectType, apb::ObjectType),
// TODO this isn't really an error but i need to redirect from some routes so this allows me to // TODO this isn't really an error but i need to redirect from some routes so this allows me to
// keep the type hints on the return type, still what the hell!!!! // keep the type hints on the return type, still what the hell!!!!
#[error("redirecting to {0}")] #[error("redirecting to {0}")]
@ -108,6 +111,15 @@ impl axum::response::IntoResponse for UpubError {
"description": format!("missing required field from request: '{}'", x.0), "description": format!("missing required field from request: '{}'", x.0),
})) }))
).into_response(), ).into_response(),
UpubError::Mismatch(expected, found) => (
axum::http::StatusCode::UNPROCESSABLE_ENTITY,
axum::Json(serde_json::json!({
"error": "type",
"expected": expected.as_ref().to_string(),
"found": found.as_ref().to_string(),
"description": self.to_string(),
}))
).into_response(),
_ => ( _ => (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(serde_json::json!({ axum::Json(serde_json::json!({

View file

@ -9,20 +9,57 @@ use crate::{errors::UpubError, model, VERSION};
use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context}; use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context};
pub enum PullResult<T> {
Actor(T),
Activity(T),
Object(T),
}
impl PullResult<serde_json::Value> {
pub fn actor(self) -> crate::Result<serde_json::Value> {
match self {
Self::Actor(x) => Ok(x),
Self::Activity(x) => Err(UpubError::Mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
Self::Object(x) => Err(UpubError::Mismatch(apb::ObjectType::Actor(apb::ActorType::Person), x.object_type().unwrap_or(apb::ObjectType::Object))),
}
}
pub fn activity(self) -> crate::Result<serde_json::Value> {
match self {
Self::Actor(x) => Err(UpubError::Mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
Self::Activity(x) => Ok(x),
Self::Object(x) => Err(UpubError::Mismatch(apb::ObjectType::Activity(apb::ActivityType::Activity), x.object_type().unwrap_or(apb::ObjectType::Object))),
}
}
pub fn object(self) -> crate::Result<serde_json::Value> {
match self {
Self::Actor(x) => Err(UpubError::Mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Actor(apb::ActorType::Person)))),
Self::Activity(x) => Err(UpubError::Mismatch(apb::ObjectType::Object, x.object_type().unwrap_or(apb::ObjectType::Activity(apb::ActivityType::Activity)))),
Self::Object(x) => Ok(x),
}
}
}
#[axum::async_trait] #[axum::async_trait]
pub trait Fetcher { pub trait Fetcher {
async fn pull(&self, id: &str) -> crate::Result<PullResult<serde_json::Value>>;
async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String>; async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String>;
async fn fetch_domain(&self, domain: &str) -> crate::Result<model::instance::Model>; async fn fetch_domain(&self, domain: &str) -> crate::Result<model::instance::Model>;
async fn fetch_user(&self, id: &str) -> crate::Result<model::actor::Model>; async fn fetch_user(&self, id: &str) -> crate::Result<model::actor::Model>;
async fn pull_user(&self, id: &str) -> crate::Result<serde_json::Value>; async fn resolve_user(&self, actor: serde_json::Value) -> crate::Result<model::actor::Model>;
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model>;
async fn pull_object(&self, id: &str) -> crate::Result<serde_json::Value>;
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>; async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
async fn pull_activity(&self, id: &str) -> crate::Result<serde_json::Value>; async fn resolve_activity(&self, activity: serde_json::Value) -> crate::Result<model::activity::Model>;
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> { self.fetch_object_r(id, 0).await }
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> crate::Result<model::object::Model> { self.resolve_object_r(object, 0).await }
async fn fetch_object_r(&self, id: &str, depth: i32) -> crate::Result<model::object::Model>;
async fn resolve_object_r(&self, object: serde_json::Value, depth: i32) -> crate::Result<model::object::Model>;
async fn fetch_thread(&self, id: &str) -> crate::Result<()>; async fn fetch_thread(&self, id: &str) -> crate::Result<()>;
@ -85,6 +122,28 @@ pub trait Fetcher {
#[axum::async_trait] #[axum::async_trait]
impl Fetcher for Context { impl Fetcher for Context {
async fn pull(&self, id: &str) -> crate::Result<PullResult<serde_json::Value>> {
let _domain = self.fetch_domain(&Context::server(id)).await?;
let document = Self::request(
Method::GET, id, None,
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
)
.await?
.json::<serde_json::Value>()
.await?;
match document.object_type() {
None => Err(UpubError::bad_request()),
Some(apb::ObjectType::Collection(_)) => Err(UpubError::unprocessable()),
Some(apb::ObjectType::Tombstone) => Err(UpubError::not_found()),
Some(apb::ObjectType::Activity(_)) => Ok(PullResult::Activity(document)),
Some(apb::ObjectType::Actor(_)) => Ok(PullResult::Actor(document)),
_ => Ok(PullResult::Object(document)),
}
}
async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String> { async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String> {
let subject = format!("acct:{user}@{host}"); let subject = format!("acct:{user}@{host}");
let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}"); let webfinger_uri = format!("https://{host}/.well-known/webfinger?resource={subject}");
@ -165,15 +224,39 @@ impl Fetcher for Context {
Ok(instance_model) Ok(instance_model)
} }
async fn fetch_user(&self, id: &str) -> crate::Result<model::actor::Model> { async fn resolve_user(&self, mut document: serde_json::Value) -> crate::Result<model::actor::Model> {
if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { let id = document.id().ok_or_else(|| UpubError::field("id"))?.to_string();
return Ok(x); // already in db, easy
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time
if let Some(followers_url) = &document.followers().id() {
let req = Self::request(
Method::GET, followers_url, None,
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await;
if let Ok(res) = req {
if let Ok(user_followers) = res.json::<serde_json::Value>().await {
if let Some(total) = user_followers.total_items() {
document = document.set_followers_count(Some(total));
}
}
}
} }
let _domain = self.fetch_domain(&Context::server(id)).await?; if let Some(following_url) = &document.following().id() {
let req = Self::request(
Method::GET, following_url, None,
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await;
if let Ok(res) = req {
if let Ok(user_following) = res.json::<serde_json::Value>().await {
if let Some(total) = user_following.total_items() {
document = document.set_following_count(Some(total));
}
}
}
}
let user_document = self.pull_user(id).await?; let user_model = model::actor::ActiveModel::new(&document)?;
let user_model = model::actor::ActiveModel::new(&user_document)?;
// TODO this may fail: while fetching, remote server may fetch our service actor. // TODO this may fail: while fetching, remote server may fetch our service actor.
// if it does so with http signature, we will fetch that actor in background // if it does so with http signature, we will fetch that actor in background
@ -182,48 +265,21 @@ impl Fetcher for Context {
// TODO fetch it back to get the internal id // TODO fetch it back to get the internal id
Ok( Ok(
model::actor::Entity::find_by_ap_id(id) model::actor::Entity::find_by_ap_id(&id)
.one(self.db()) .one(self.db())
.await? .await?
.ok_or_else(UpubError::internal_server_error)? .ok_or_else(UpubError::internal_server_error)?
) )
} }
async fn pull_user(&self, id: &str) -> crate::Result<serde_json::Value> { async fn fetch_user(&self, id: &str) -> crate::Result<model::actor::Model> {
let mut user = Self::request( if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? {
Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), return Ok(x); // already in db, easy
).await?.json::<serde_json::Value>().await?;
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs
if let Some(followers_url) = &user.followers().id() {
let req = Self::request(
Method::GET, followers_url, None,
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await;
if let Ok(res) = req {
if let Ok(user_followers) = res.json::<serde_json::Value>().await {
if let Some(total) = user_followers.total_items() {
user = user.set_followers_count(Some(total));
}
}
}
} }
if let Some(following_url) = &user.following().id() { let document = self.pull(id).await?.actor()?;
let req = Self::request(
Method::GET, following_url, None,
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await;
if let Ok(res) = req {
if let Ok(user_following) = res.json::<serde_json::Value>().await {
if let Some(total) = user_following.total_items() {
user = user.set_following_count(Some(total));
}
}
}
}
Ok(user) self.resolve_user(document).await
} }
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model> { async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model> {
@ -231,22 +287,13 @@ impl Fetcher for Context {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
let _domain = self.fetch_domain(&Context::server(id)).await?; let activity = self.pull(id).await?.activity()?;
let activity_document = self.pull_activity(id).await?; self.resolve_activity(activity).await
let activity_model = self.insert_activity(activity_document, Some(Context::server(id))).await?;
let addressed = activity_model.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?;
Ok(activity_model)
} }
async fn pull_activity(&self, id: &str) -> crate::Result<serde_json::Value> { async fn resolve_activity(&self, activity: serde_json::Value) -> crate::Result<model::activity::Model> {
let activity = Self::request( let id = activity.id().ok_or_else(|| UpubError::field("id"))?.to_string();
Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await?.json::<serde_json::Value>().await?;
if let Some(activity_actor) = activity.actor().id() { if let Some(activity_actor) = activity.actor().id() {
if let Err(e) = self.fetch_user(&activity_actor).await { if let Err(e) = self.fetch_user(&activity_actor).await {
@ -260,7 +307,13 @@ impl Fetcher for Context {
} }
} }
Ok(activity) let activity_model = self.insert_activity(activity, Some(Context::server(&id))).await?;
let addressed = activity_model.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?;
Ok(activity_model)
} }
async fn fetch_thread(&self, _id: &str) -> crate::Result<()> { async fn fetch_thread(&self, _id: &str) -> crate::Result<()> {
@ -268,62 +321,50 @@ impl Fetcher for Context {
todo!() todo!()
} }
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> { async fn fetch_object_r(&self, id: &str, depth: i32) -> crate::Result<model::object::Model> {
fetch_object_inner(self, id, 0).await if let Some(x) = model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
return Ok(x); // already in db, easy
}
let object = self.pull(id).await?.object()?;
self.resolve_object_r(object, depth).await
} }
async fn pull_object(&self, id: &str) -> crate::Result<serde_json::Value> { async fn resolve_object_r(&self, object: serde_json::Value, depth: i32) -> crate::Result<model::object::Model> {
Ok( let id = object.id().ok_or_else(|| UpubError::field("id"))?.to_string();
Context::request(
Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(),
)
.await?
.json::<serde_json::Value>()
.await?
)
}
}
#[async_recursion::async_recursion] if let Some(oid) = object.id() {
async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result<model::object::Model> { if oid != id {
if let Some(x) = model::object::Entity::find_by_ap_id(id).one(ctx.db()).await? { if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(self.db()).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, but with id different that given url
} }
let _domain = ctx.fetch_domain(&Context::server(id)).await?;
let object = ctx.pull_object(id).await?;
if let Some(oid) = object.id() {
if oid != id {
if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? {
return Ok(x); // already in db, but with id different that given url
} }
} }
}
if let Some(attributed_to) = object.attributed_to().id() { if let Some(attributed_to) = object.attributed_to().id() {
if let Err(e) = ctx.fetch_user(&attributed_to).await { if let Err(e) = self.fetch_user(&attributed_to).await {
tracing::warn!("could not get actor of fetched object: {e}"); tracing::warn!("could not get actor of fetched object: {e}");
}
} }
}
let addressed = object.addressed(); let addressed = object.addressed();
if let Some(reply) = object.in_reply_to().id() { if let Some(reply) = object.in_reply_to().id() {
if depth <= 16 { if depth <= 16 {
fetch_object_inner(ctx, &reply, depth + 1).await?; self.fetch_object_r(&reply, depth + 1).await?;
} else { } else {
tracing::warn!("thread deeper than 16, giving up fetching more replies"); tracing::warn!("thread deeper than 16, giving up fetching more replies");
}
} }
let object_model = self.insert_object(object, None).await?;
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(None, Some(object_model.internal), &expanded_addresses).await?;
Ok(object_model)
} }
let object_model = ctx.insert_object(object, None).await?;
let expanded_addresses = ctx.expand_addressing(addressed).await?;
ctx.address_to(None, Some(object_model.internal), &expanded_addresses).await?;
Ok(object_model)
} }
#[axum::async_trait] #[axum::async_trait]