Compare commits
2 commits
7d14bccdee
...
b8250eae6c
Author | SHA1 | Date | |
---|---|---|---|
b8250eae6c | |||
86035c2878 |
2 changed files with 61 additions and 74 deletions
|
@ -1,6 +1,6 @@
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use apb::{Activity, Actor, ActorMut, Base, Collection, Object};
|
use apb::{Activity, Actor, ActorMut, Base, Collection, CollectionPage, Object};
|
||||||
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
|
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
|
||||||
use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter};
|
use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet, QueryFilter};
|
||||||
|
|
||||||
|
@ -401,9 +401,40 @@ impl Fetcher for crate::Context {
|
||||||
Ok(activity_model)
|
Ok(activity_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), RequestError> {
|
async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), RequestError> {
|
||||||
// crawl_replies(self, id, 0).await
|
tracing::info!("crawling replies of '{id}'");
|
||||||
todo!()
|
let object = self.pull(id).await?.object()?;
|
||||||
|
let replies = object.replies().resolve(self).await?;
|
||||||
|
|
||||||
|
let mut page;
|
||||||
|
let mut next = replies.first();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
page = next.resolve(self).await?;
|
||||||
|
|
||||||
|
// fix for mastodon: at some point it introduces ?only_other_accounts=true and then returns a
|
||||||
|
// collection, not a page anymore ???
|
||||||
|
if matches!(page.object_type()?, apb::ObjectType::Collection(..)) {
|
||||||
|
page = page.first().extract().ok_or(RequestError::Tombstone)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for obj in page.items() {
|
||||||
|
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
|
||||||
|
tracing::warn!("error fetching reply: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for obj in page.ordered_items() {
|
||||||
|
if let Err(e) = self.fetch_object(obj.id()?, tx).await {
|
||||||
|
tracing::warn!("error fetching reply: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
next = page.next();
|
||||||
|
if next.is_empty() { break };
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, RequestError> {
|
||||||
|
@ -464,75 +495,25 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(async_fn_in_trait)]
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait Fetchable : Sync + Send {
|
pub trait Dereferenceable<T> : Sync + Send {
|
||||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
|
async fn resolve(self, ctx: &crate::Context) -> Result<T, RequestError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Fetchable for apb::Node<serde_json::Value> {
|
impl Dereferenceable<serde_json::Value> for apb::Node<serde_json::Value> {
|
||||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError> {
|
async fn resolve(self, ctx: &crate::Context) -> Result<serde_json::Value, RequestError> {
|
||||||
if let apb::Node::Link(uri) = self {
|
match self {
|
||||||
if let Ok(href) = uri.href() {
|
apb::Node::Link(uri) => {
|
||||||
*self = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
|
let href = uri.href()?;
|
||||||
|
tracing::info!("dereferencing {href}");
|
||||||
|
let res = crate::Context::request(Method::GET, href, None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||||
.await?
|
.await?
|
||||||
.json::<serde_json::Value>()
|
.json::<serde_json::Value>()
|
||||||
.await?
|
.await?;
|
||||||
.into();
|
Ok(res)
|
||||||
|
},
|
||||||
|
apb::Node::Object(x) => Ok(*x),
|
||||||
|
apb::Node::Empty => Err(RequestError::Tombstone),
|
||||||
|
apb::Node::Array(_) => Err(RequestError::Malformed(apb::FieldErr("id"))), // TODO weird!!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(self)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// #[async_recursion::async_recursion]
|
|
||||||
// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> {
|
|
||||||
// tracing::info!("crawling replies of '{id}'");
|
|
||||||
// let object = crate::Context::request(
|
|
||||||
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
|
||||||
// ).await?.json::<serde_json::Value>().await?;
|
|
||||||
//
|
|
||||||
// let object_model = crate::model::object::Model::new(&object)?;
|
|
||||||
// match crate::model::object::Entity::insert(object_model.into_active_model())
|
|
||||||
// .exec(ctx.db()).await
|
|
||||||
// {
|
|
||||||
// Ok(_) => {},
|
|
||||||
// Err(sea_orm::DbErr::RecordNotInserted) => {},
|
|
||||||
// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite
|
|
||||||
// Err(e) => return Err(e.into()),
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if depth > 16 {
|
|
||||||
// tracing::warn!("stopping thread crawling: too deep!");
|
|
||||||
// return Ok(());
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// let mut page_url = match object.replies().get() {
|
|
||||||
// Some(serde_json::Value::String(x)) => {
|
|
||||||
// let replies = crate::Context::request(
|
|
||||||
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
|
||||||
// ).await?.json::<serde_json::Value>().await?;
|
|
||||||
// replies.first().id()
|
|
||||||
// },
|
|
||||||
// Some(serde_json::Value::Object(x)) => {
|
|
||||||
// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
|
|
||||||
// obj.first().id()
|
|
||||||
// },
|
|
||||||
// _ => return Ok(()),
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// while let Some(ref url) = page_url {
|
|
||||||
// let replies = crate::Context::request(
|
|
||||||
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
|
||||||
// ).await?.json::<serde_json::Value>().await?;
|
|
||||||
//
|
|
||||||
// for reply in replies.items() {
|
|
||||||
// // TODO right now it crawls one by one, could be made in parallel but would be quite more
|
|
||||||
// // abusive, so i'll keep it like this while i try it out
|
|
||||||
// crawl_replies(ctx, reply.href(), depth + 1).await?;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// page_url = replies.next().id();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// Ok(())
|
|
||||||
// }
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use apb::{BaseMut, CollectionMut, LD};
|
use apb::{BaseMut, CollectionMut, LD};
|
||||||
use axum::extract::{Path, Query, State};
|
use axum::extract::{Path, Query, State};
|
||||||
use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns};
|
use sea_orm::{ColumnTrait, Condition, QueryFilter, QuerySelect, SelectColumns};
|
||||||
use upub::{model, Context};
|
use upub::{model, traits::Fetcher, Context};
|
||||||
|
|
||||||
use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity};
|
use crate::{activitypub::{Pagination, TryFetch}, builders::JsonLD, AuthIdentity};
|
||||||
|
|
||||||
|
@ -9,11 +9,17 @@ pub async fn get(
|
||||||
State(ctx): State<Context>,
|
State(ctx): State<Context>,
|
||||||
Path(id): Path<String>,
|
Path(id): Path<String>,
|
||||||
AuthIdentity(auth): AuthIdentity,
|
AuthIdentity(auth): AuthIdentity,
|
||||||
Query(_q): Query<TryFetch>,
|
Query(q): Query<TryFetch>,
|
||||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||||
// if auth.is_local() && q.fetch {
|
let oid = ctx.oid(&id);
|
||||||
// ctx.fetch_thread(&oid).await?;
|
if auth.is_local() && q.fetch {
|
||||||
// }
|
// TODO a task should do this, not the web handler!
|
||||||
|
// so we dont keep clients waiting and we limit
|
||||||
|
// concurrent possible crawlers
|
||||||
|
// however the results given immediately would
|
||||||
|
// become inaccurate!!
|
||||||
|
ctx.fetch_thread(&oid, ctx.db()).await?;
|
||||||
|
}
|
||||||
|
|
||||||
let replies_ids = upub::Query::objects(auth.my_id())
|
let replies_ids = upub::Query::objects(auth.my_id())
|
||||||
.filter(auth.filter())
|
.filter(auth.filter())
|
||||||
|
|
Loading…
Reference in a new issue