diff --git a/src/routes/activitypub/object/replies.rs b/src/routes/activitypub/object/replies.rs index 8f69b9b..35d08cc 100644 --- a/src/routes/activitypub/object/replies.rs +++ b/src/routes/activitypub/object/replies.rs @@ -1,16 +1,21 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter}; -use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; +use crate::{model, routes::activitypub::{JsonLD, Pagination, TryFetch}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, + Query(q): Query, ) -> crate::Result> { let replies_id = url!(ctx, "/objects/{id}/replies"); let oid = ctx.uri("objects", id); + if auth.is_local() && q.fetch { + ctx.fetch_thread(&oid).await?; + } + let count = model::addressing::Entity::find_addressed(auth.my_id()) .filter(auth.filter_condition()) .filter(model::object::Column::InReplyTo.eq(oid)) diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 07cad64..21af542 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; -use apb::{target::Addressed, Activity, Collection, Object}; +use apb::{target::Addressed, Activity, Collection, CollectionPage, Link, Object}; use base64::Engine; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; -use crate::{model, VERSION}; +use crate::{errors::UpubError, model, VERSION}; use super::{auth::HttpSignature, Context}; @@ -20,6 +20,8 @@ pub trait Fetcher { async fn fetch_activity(&self, id: &str) -> crate::Result; async fn pull_activity(&self, id: &str) -> crate::Result; + async fn fetch_thread(&self, id: &str) -> crate::Result<()>; + async fn request( method: reqwest::Method, url: &str, @@ -165,6 +167,10 @@ impl Fetcher for Context { Ok(activity_model) } + async fn fetch_thread(&self, id: &str) -> crate::Result<()> { + crawl_replies(self, id, 0).await + } + async fn fetch_object(&self, id: &str) -> crate::Result { fetch_object_inner(self, id, 0).await } @@ -178,6 +184,58 @@ impl Fetcher for Context { } } +#[async_recursion::async_recursion] +async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> { + tracing::info!("crawling replies of '{id}'"); + let object = Context::request( + Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), + ).await?.json::().await?; + + let object_model = model::object::Model::new(&object)?; + match model::object::Entity::insert(object_model.into_active_model()) + .exec(ctx.db()).await + { + Ok(_) => {}, + Err(sea_orm::DbErr::RecordNotInserted) => {}, + Err(e) => return Err(e.into()), + } + + if depth > 16 { + tracing::warn!("stopping thread crawling: too deep!"); + return Ok(()); + } + + let mut page_url = match object.replies().get() { + Some(serde_json::Value::String(x)) => { + let replies = Context::request( + Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), + ).await?.json::().await?; + replies.first().id() + }, + Some(serde_json::Value::Object(x)) => { + let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO! + obj.first().id() + }, + _ => return Ok(()), + }; + + while let Some(ref url) = page_url { + let replies = Context::request( + Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), + ).await?.json::().await?; + + for reply in replies.items() { + // TODO right now it crawls one by one, could be made in parallel but would be quite more + // abusive, so i'll keep it like this while i try it out + crawl_replies(ctx, reply.href(), depth + 1).await?; + } + + page_url = replies.next().id(); + } + + Ok(()) +} + #[async_recursion::async_recursion] async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result { if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? {