From b8250eae6c155053406216d18218aa59b88175e8 Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 10 Nov 2024 21:40:07 +0100 Subject: [PATCH] feat: impl fetch_thread when fetching replies --- upub/core/src/traits/fetch.rs | 91 +++++++------------ upub/routes/src/activitypub/object/replies.rs | 16 +++- 2 files changed, 46 insertions(+), 61 deletions(-) diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index c4a4b40..9081bc6 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use apb::{Activity, Actor, ActorMut, Base, Collection, Object}; +use apb::{Activity, Actor, ActorMut, Base, Collection, CollectionPage, Object}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter}; @@ -401,9 +401,40 @@ impl Fetcher for crate::Context { Ok(activity_model) } - async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> { - // crawl_replies(self, id, 0).await - todo!() + async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError> { + tracing::info!("crawling replies of '{id}'"); + let object = self.pull(id).await?.object()?; + let replies = object.replies().resolve(self).await?; + + let mut page; + let mut next = replies.first(); + + loop { + page = next.resolve(self).await?; + + // fix for mastodon: at some point it introduces ?only_other_accounts=true and then returns a + // collection, not a page anymore ??? + if matches!(page.object_type()?, apb::ObjectType::Collection(..)) { + page = page.first().extract().ok_or(RequestError::Tombstone)?; + } + + for obj in page.items() { + if let Err(e) = self.fetch_object(obj.id()?, tx).await { + tracing::warn!("error fetching reply: {e}"); + } + } + + for obj in page.ordered_items() { + if let Err(e) = self.fetch_object(obj.id()?, tx).await { + tracing::warn!("error fetching reply: {e}"); + } + } + + next = page.next(); + if next.is_empty() { break }; + } + + Ok(()) } async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result { @@ -486,55 +517,3 @@ impl Dereferenceable for apb::Node { } } } -// #[async_recursion::async_recursion] -// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> { -// tracing::info!("crawling replies of '{id}'"); -// let object = crate::Context::request( -// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), -// ).await?.json::().await?; -// -// let object_model = crate::model::object::Model::new(&object)?; -// match crate::model::object::Entity::insert(object_model.into_active_model()) -// .exec(ctx.db()).await -// { -// Ok(_) => {}, -// Err(sea_orm::DbErr::RecordNotInserted) => {}, -// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite -// Err(e) => return Err(e.into()), -// } -// -// if depth > 16 { -// tracing::warn!("stopping thread crawling: too deep!"); -// return Ok(()); -// } -// -// let mut page_url = match object.replies().get() { -// Some(serde_json::Value::String(x)) => { -// let replies = crate::Context::request( -// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), -// ).await?.json::().await?; -// replies.first().id() -// }, -// Some(serde_json::Value::Object(x)) => { -// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO! -// obj.first().id() -// }, -// _ => return Ok(()), -// }; -// -// while let Some(ref url) = page_url { -// let replies = crate::Context::request( -// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), -// ).await?.json::().await?; -// -// for reply in replies.items() { -// // TODO right now it crawls one by one, could be made in parallel but would be quite more -// // abusive, so i'll keep it like this while i try it out -// crawl_replies(ctx, reply.href(), depth + 1).await?; -// } -// -// page_url = replies.next().id(); -// } -// -// Ok(()) -// } diff --git a/upub/routes/src/activitypub/object/replies.rs b/upub/routes/src/activitypub/object/replies.rs index 7a415ed..425df01 100644 --- a/upub/routes/src/activitypub/object/replies.rs +++ b/upub/routes/src/activitypub/object/replies.rs @@ -1,7 +1,7 @@ use apb::{BaseMut, CollectionMut, LD}; use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns}; -use upub::{model, Context}; +use upub::{model, traits::Fetcher, Context}; use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity}; @@ -9,11 +9,17 @@ pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, - Query(_q): Query, + Query(q): Query, ) -> crate::ApiResult> { - // if auth.is_local() && q.fetch { - // ctx.fetch_thread(&oid).await?; - // } + let oid = ctx.oid(&id); + if auth.is_local() && q.fetch { + // TODO a task should do this, not the web handler! + // so we dont keep clients waiting and we limit + // concurrent possible crawlers + // however the results given immediately would + // become inaccurate!! + ctx.fetch_thread(&oid, ctx.db()).await?; + } let replies_ids = upub::Query::objects(auth.my_id()) .filter(auth.filter())