Compare commits

..

2 commits

2 changed files with 61 additions and 74 deletions

View file

@ -1,6 +1,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use apb::{Activity, Actor, ActorMut, Base, Collection, Object}; use apb::{Activity, Actor, ActorMut, Base, Collection, CollectionPage, Object};
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter}; use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter};
@ -401,9 +401,40 @@ impl Fetcher for crate::Context {
Ok(activity_model) Ok(activity_model)
} }
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> { async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError> {
// crawl_replies(self, id, 0).await tracing::info!("crawling replies of '{id}'");
todo!() let object = self.pull(id).await?.object()?;
let replies = object.replies().resolve(self).await?;
let mut page;
let mut next = replies.first();
loop {
page = next.resolve(self).await?;
// fix for mastodon: at some point it introduces ?only_other_accounts=true and then returns a
// collection, not a page anymore ???
if matches!(page.object_type()?, apb::ObjectType::Collection(..)) {
page = page.first().extract().ok_or(RequestError::Tombstone)?;
}
for obj in page.items() {
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
tracing::warn!("error fetching reply: {e}");
}
}
for obj in page.ordered_items() {
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
tracing::warn!("error fetching reply: {e}");
}
}
next = page.next();
if next.is_empty() { break };
}
Ok(())
} }
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> { async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
@ -464,75 +495,25 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
} }
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
pub trait Fetchable : Sync + Send { pub trait Dereferenceable<T> : Sync + Send {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>; async fn resolve(self, ctx: &crate::Context) -> Result<T, RequestError>;
} }
impl Fetchable for apb::Node<serde_json::Value> { impl Dereferenceable<serde_json::Value> for apb::Node<serde_json::Value> {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> { async fn resolve(self, ctx: &crate::Context) -> Result<serde_json::Value, RequestError> {
if let apb::Node::Link(uri) = self { match self {
if let Ok(href) = uri.href() { apb::Node::Link(uri) => {
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain()) let href = uri.href()?;
tracing::info!("dereferencing {href}");
let res = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
.await? .await?
.json::<serde_json::Value>() .json::<serde_json::Value>()
.await? .await?;
.into(); Ok(res)
},
apb::Node::Object(x) => Ok(*x),
apb::Node::Empty => Err(RequestError::Tombstone),
apb::Node::Array(_) => Err(RequestError::Malformed(apb::FieldErr("id"))), // TODO weird!!
} }
} }
Ok(self)
} }
}
// #[async_recursion::async_recursion]
// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> {
// tracing::info!("crawling replies of '{id}'");
// let object = crate::Context::request(
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
//
// let object_model = crate::model::object::Model::new(&object)?;
// match crate::model::object::Entity::insert(object_model.into_active_model())
// .exec(ctx.db()).await
// {
// Ok(_) => {},
// Err(sea_orm::DbErr::RecordNotInserted) => {},
// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite
// Err(e) => return Err(e.into()),
// }
//
// if depth > 16 {
// tracing::warn!("stopping thread crawling: too deep!");
// return Ok(());
// }
//
// let mut page_url = match object.replies().get() {
// Some(serde_json::Value::String(x)) => {
// let replies = crate::Context::request(
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
// replies.first().id()
// },
// Some(serde_json::Value::Object(x)) => {
// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
// obj.first().id()
// },
// _ => return Ok(()),
// };
//
// while let Some(ref url) = page_url {
// let replies = crate::Context::request(
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
//
// for reply in replies.items() {
// // TODO right now it crawls one by one, could be made in parallel but would be quite more
// // abusive, so i'll keep it like this while i try it out
// crawl_replies(ctx, reply.href(), depth + 1).await?;
// }
//
// page_url = replies.next().id();
// }
//
// Ok(())
// }

View file

@ -1,7 +1,7 @@
use apb::{BaseMut, CollectionMut, LD}; use apb::{BaseMut, CollectionMut, LD};
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns};
use upub::{model, Context}; use upub::{model, traits::Fetcher, Context};
use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity}; use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity};
@ -9,11 +9,17 @@ pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
Path(id): Path<String>, Path(id): Path<String>,
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
Query(_q): Query<TryFetch>, Query(q): Query<TryFetch>,
) -> crate::ApiResult<JsonLD<serde_json::Value>> { ) -> crate::ApiResult<JsonLD<serde_json::Value>> {
// if auth.is_local() && q.fetch { let oid = ctx.oid(&id);
// ctx.fetch_thread(&oid).await?; if auth.is_local() && q.fetch {
// } // TODO a task should do this, not the web handler!
// so we dont keep clients waiting and we limit
// concurrent possible crawlers
// however the results given immediately would
// become inaccurate!!
ctx.fetch_thread(&oid, ctx.db()).await?;
}
let replies_ids = upub::Query::objects(auth.my_id()) let replies_ids = upub::Query::objects(auth.my_id())
.filter(auth.filter()) .filter(auth.filter())