1
0
Fork 0
forked from alemi/upub

feat: add server crawler

i may remove this tho, it definitely should not be arbitrarily invokable
by local users!!!
This commit is contained in:
əlemi 2024-05-03 04:43:25 +02:00
parent 7454da6525
commit 9e42d4b460
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 66 additions and 3 deletions

View file

@ -1,16 +1,21 @@
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter}; use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter};
use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; use crate::{model, routes::activitypub::{JsonLD, Pagination, TryFetch}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url};
pub async fn get( pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
Path(id): Path<String>, Path(id): Path<String>,
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
Query(q): Query<TryFetch>,
) -> crate::Result<JsonLD<serde_json::Value>> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let replies_id = url!(ctx, "/objects/{id}/replies"); let replies_id = url!(ctx, "/objects/{id}/replies");
let oid = ctx.uri("objects", id); let oid = ctx.uri("objects", id);
if auth.is_local() && q.fetch {
ctx.fetch_thread(&oid).await?;
}
let count = model::addressing::Entity::find_addressed(auth.my_id()) let count = model::addressing::Entity::find_addressed(auth.my_id())
.filter(auth.filter_condition()) .filter(auth.filter_condition())
.filter(model::object::Column::InReplyTo.eq(oid)) .filter(model::object::Column::InReplyTo.eq(oid))

View file

@ -1,11 +1,11 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use apb::{target::Addressed, Activity, Collection, Object}; use apb::{target::Addressed, Activity, Collection, CollectionPage, Link, Object};
use base64::Engine; use base64::Engine;
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
use crate::{model, VERSION}; use crate::{errors::UpubError, model, VERSION};
use super::{auth::HttpSignature, Context}; use super::{auth::HttpSignature, Context};
@ -20,6 +20,8 @@ pub trait Fetcher {
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>; async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model>; async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
async fn fetch_thread(&self, id: &str) -> crate::Result<()>;
async fn request( async fn request(
method: reqwest::Method, method: reqwest::Method,
url: &str, url: &str,
@ -165,6 +167,10 @@ impl Fetcher for Context {
Ok(activity_model) Ok(activity_model)
} }
async fn fetch_thread(&self, id: &str) -> crate::Result<()> {
crawl_replies(self, id, 0).await
}
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> { async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> {
fetch_object_inner(self, id, 0).await fetch_object_inner(self, id, 0).await
} }
@ -178,6 +184,58 @@ impl Fetcher for Context {
} }
} }
#[async_recursion::async_recursion]
async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> {
tracing::info!("crawling replies of '{id}'");
let object = Context::request(
Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
).await?.json::<serde_json::Value>().await?;
let object_model = model::object::Model::new(&object)?;
match model::object::Entity::insert(object_model.into_active_model())
.exec(ctx.db()).await
{
Ok(_) => {},
Err(sea_orm::DbErr::RecordNotInserted) => {},
Err(e) => return Err(e.into()),
}
if depth > 16 {
tracing::warn!("stopping thread crawling: too deep!");
return Ok(());
}
let mut page_url = match object.replies().get() {
Some(serde_json::Value::String(x)) => {
let replies = Context::request(
Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
).await?.json::<serde_json::Value>().await?;
replies.first().id()
},
Some(serde_json::Value::Object(x)) => {
let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
obj.first().id()
},
_ => return Ok(()),
};
while let Some(ref url) = page_url {
let replies = Context::request(
Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
).await?.json::<serde_json::Value>().await?;
for reply in replies.items() {
// TODO right now it crawls one by one, could be made in parallel but would be quite more
// abusive, so i'll keep it like this while i try it out
crawl_replies(ctx, reply.href(), depth + 1).await?;
}
page_url = replies.next().id();
}
Ok(())
}
#[async_recursion::async_recursion] #[async_recursion::async_recursion]
async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result<model::object::Model> { async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result<model::object::Model> {
if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? { if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? {