Compare commits

...

2 commits

2 changed files with 61 additions and 74 deletions

View file

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use apb::{Activity, Actor, ActorMut, Base, Collection, Object};
use apb::{Activity, Actor, ActorMut, Base, Collection, CollectionPage, Object};
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter};
@ -401,9 +401,40 @@ impl Fetcher for crate::Context {
Ok(activity_model)
}
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> {
// crawl_replies(self, id, 0).await
todo!()
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError> {
tracing::info!("crawling replies of '{id}'");
let object = self.pull(id).await?.object()?;
let replies = object.replies().resolve(self).await?;
let mut page;
let mut next = replies.first();
loop {
page = next.resolve(self).await?;
// fix for mastodon: at some point it introduces ?only_other_accounts=true and then returns a
// collection, not a page anymore ???
if matches!(page.object_type()?, apb::ObjectType::Collection(..)) {
page = page.first().extract().ok_or(RequestError::Tombstone)?;
}
for obj in page.items() {
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
tracing::warn!("error fetching reply: {e}");
}
}
for obj in page.ordered_items() {
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
tracing::warn!("error fetching reply: {e}");
}
}
next = page.next();
if next.is_empty() { break };
}
Ok(())
}
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
@ -464,75 +495,25 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
}
#[allow(async_fn_in_trait)]
pub trait Fetchable : Sync + Send {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
pub trait Dereferenceable<T> : Sync + Send {
async fn resolve(self, ctx: &crate::Context) -> Result<T, RequestError>;
}
impl Fetchable for apb::Node<serde_json::Value> {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> {
if let apb::Node::Link(uri) = self {
if let Ok(href) = uri.href() {
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
impl Dereferenceable<serde_json::Value> for apb::Node<serde_json::Value> {
async fn resolve(self, ctx: &crate::Context) -> Result<serde_json::Value, RequestError> {
match self {
apb::Node::Link(uri) => {
let href = uri.href()?;
tracing::info!("dereferencing {href}");
let res = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
.await?
.json::<serde_json::Value>()
.await?
.into();
.await?;
Ok(res)
},
apb::Node::Object(x) => Ok(*x),
apb::Node::Empty => Err(RequestError::Tombstone),
apb::Node::Array(_) => Err(RequestError::Malformed(apb::FieldErr("id"))), // TODO weird!!
}
}
Ok(self)
}
}
// #[async_recursion::async_recursion]
// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> {
// tracing::info!("crawling replies of '{id}'");
// let object = crate::Context::request(
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
//
// let object_model = crate::model::object::Model::new(&object)?;
// match crate::model::object::Entity::insert(object_model.into_active_model())
// .exec(ctx.db()).await
// {
// Ok(_) => {},
// Err(sea_orm::DbErr::RecordNotInserted) => {},
// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite
// Err(e) => return Err(e.into()),
// }
//
// if depth > 16 {
// tracing::warn!("stopping thread crawling: too deep!");
// return Ok(());
// }
//
// let mut page_url = match object.replies().get() {
// Some(serde_json::Value::String(x)) => {
// let replies = crate::Context::request(
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
// replies.first().id()
// },
// Some(serde_json::Value::Object(x)) => {
// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
// obj.first().id()
// },
// _ => return Ok(()),
// };
//
// while let Some(ref url) = page_url {
// let replies = crate::Context::request(
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
//
// for reply in replies.items() {
// // TODO right now it crawls one by one, could be made in parallel but would be quite more
// // abusive, so i'll keep it like this while i try it out
// crawl_replies(ctx, reply.href(), depth + 1).await?;
// }
//
// page_url = replies.next().id();
// }
//
// Ok(())
// }

View file

@ -1,7 +1,7 @@
use apb::{BaseMut, CollectionMut, LD};
use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns};
use upub::{model, Context};
use upub::{model, traits::Fetcher, Context};
use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity};
@ -9,11 +9,17 @@ pub async fn get(
State(ctx): State<Context>,
Path(id): Path<String>,
AuthIdentity(auth): AuthIdentity,
Query(_q): Query<TryFetch>,
Query(q): Query<TryFetch>,
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
// if auth.is_local() && q.fetch {
// ctx.fetch_thread(&oid).await?;
// }
let oid = ctx.oid(&id);
if auth.is_local() && q.fetch {
// TODO a task should do this, not the web handler!
// so we dont keep clients waiting and we limit
// concurrent possible crawlers
// however the results given immediately would
// become inaccurate!!
ctx.fetch_thread(&oid, ctx.db()).await?;
}
let replies_ids = upub::Query::objects(auth.my_id())
.filter(auth.filter())