feat: impl fetch_thread when fetching replies
This commit is contained in:
parent
86035c2878
commit
b8250eae6c
2 changed files with 46 additions and 61 deletions
|
@ -1,6 +1,6 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use apb::{Activity, Actor, ActorMut, Base, Collection, Object};
|
||||
use apb::{Activity, Actor, ActorMut, Base, Collection, CollectionPage, Object};
|
||||
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
|
||||
use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter};
|
||||
|
||||
|
@ -401,9 +401,40 @@ impl Fetcher for crate::Context {
|
|||
Ok(activity_model)
|
||||
}
|
||||
|
||||
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> {
|
||||
// crawl_replies(self, id, 0).await
|
||||
todo!()
|
||||
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError> {
|
||||
tracing::info!("crawling replies of '{id}'");
|
||||
let object = self.pull(id).await?.object()?;
|
||||
let replies = object.replies().resolve(self).await?;
|
||||
|
||||
let mut page;
|
||||
let mut next = replies.first();
|
||||
|
||||
loop {
|
||||
page = next.resolve(self).await?;
|
||||
|
||||
// fix for mastodon: at some point it introduces ?only_other_accounts=true and then returns a
|
||||
// collection, not a page anymore ???
|
||||
if matches!(page.object_type()?, apb::ObjectType::Collection(..)) {
|
||||
page = page.first().extract().ok_or(RequestError::Tombstone)?;
|
||||
}
|
||||
|
||||
for obj in page.items() {
|
||||
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
|
||||
tracing::warn!("error fetching reply: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
for obj in page.ordered_items() {
|
||||
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
|
||||
tracing::warn!("error fetching reply: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
next = page.next();
|
||||
if next.is_empty() { break };
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||
|
@ -486,55 +517,3 @@ impl Dereferenceable<serde_json::Value> for apb::Node<serde_json::Value> {
|
|||
}
|
||||
}
|
||||
}
|
||||
// #[async_recursion::async_recursion]
|
||||
// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> {
|
||||
// tracing::info!("crawling replies of '{id}'");
|
||||
// let object = crate::Context::request(
|
||||
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
//
|
||||
// let object_model = crate::model::object::Model::new(&object)?;
|
||||
// match crate::model::object::Entity::insert(object_model.into_active_model())
|
||||
// .exec(ctx.db()).await
|
||||
// {
|
||||
// Ok(_) => {},
|
||||
// Err(sea_orm::DbErr::RecordNotInserted) => {},
|
||||
// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite
|
||||
// Err(e) => return Err(e.into()),
|
||||
// }
|
||||
//
|
||||
// if depth > 16 {
|
||||
// tracing::warn!("stopping thread crawling: too deep!");
|
||||
// return Ok(());
|
||||
// }
|
||||
//
|
||||
// let mut page_url = match object.replies().get() {
|
||||
// Some(serde_json::Value::String(x)) => {
|
||||
// let replies = crate::Context::request(
|
||||
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
// replies.first().id()
|
||||
// },
|
||||
// Some(serde_json::Value::Object(x)) => {
|
||||
// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
|
||||
// obj.first().id()
|
||||
// },
|
||||
// _ => return Ok(()),
|
||||
// };
|
||||
//
|
||||
// while let Some(ref url) = page_url {
|
||||
// let replies = crate::Context::request(
|
||||
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
//
|
||||
// for reply in replies.items() {
|
||||
// // TODO right now it crawls one by one, could be made in parallel but would be quite more
|
||||
// // abusive, so i'll keep it like this while i try it out
|
||||
// crawl_replies(ctx, reply.href(), depth + 1).await?;
|
||||
// }
|
||||
//
|
||||
// page_url = replies.next().id();
|
||||
// }
|
||||
//
|
||||
// Ok(())
|
||||
// }
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use apb::{BaseMut, CollectionMut, LD};
|
||||
use axum::extract::{Path, Query, State};
|
||||
use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns};
|
||||
use upub::{model, Context};
|
||||
use upub::{model, traits::Fetcher, Context};
|
||||
|
||||
use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity};
|
||||
|
||||
|
@ -9,11 +9,17 @@ pub async fn get(
|
|||
State(ctx): State<Context>,
|
||||
Path(id): Path<String>,
|
||||
AuthIdentity(auth): AuthIdentity,
|
||||
Query(_q): Query<TryFetch>,
|
||||
Query(q): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
// if auth.is_local() && q.fetch {
|
||||
// ctx.fetch_thread(&oid).await?;
|
||||
// }
|
||||
let oid = ctx.oid(&id);
|
||||
if auth.is_local() && q.fetch {
|
||||
// TODO a task should do this, not the web handler!
|
||||
// so we dont keep clients waiting and we limit
|
||||
// concurrent possible crawlers
|
||||
// however the results given immediately would
|
||||
// become inaccurate!!
|
||||
ctx.fetch_thread(&oid, ctx.db()).await?;
|
||||
}
|
||||
|
||||
let replies_ids = upub::Query::objects(auth.my_id())
|
||||
.filter(auth.filter())
|
||||
|
|
Loading…
Reference in a new issue