Compare commits
No commits in common. "3a6e63244860056d5408574f96b41716922637b9" and "17c1765295463c7177e9591b03921e7edc1015b7" have entirely different histories.
3a6e632448
...
17c1765295
3 changed files with 36 additions and 93 deletions
|
@ -73,12 +73,6 @@ pub struct SecurityConfig {
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub allow_login_refresh: bool,
|
pub allow_login_refresh: bool,
|
||||||
|
|
||||||
#[serde_inline_default(2)]
|
|
||||||
pub max_id_redirects: u32,
|
|
||||||
|
|
||||||
#[serde_inline_default(20)]
|
|
||||||
pub thread_crawl_depth: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -9,21 +9,12 @@ use crate::{errors::UpubError, model, VERSION};
|
||||||
|
|
||||||
use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context};
|
use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum PullResult<T> {
|
pub enum PullResult<T> {
|
||||||
Actor(T),
|
Actor(T),
|
||||||
Activity(T),
|
Activity(T),
|
||||||
Object(T),
|
Object(T),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> PullResult<T> {
|
|
||||||
pub fn inner(self) -> T {
|
|
||||||
match self {
|
|
||||||
Self::Actor(x) | Self::Activity(x) | Self::Object(x) => x
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PullResult<serde_json::Value> {
|
impl PullResult<serde_json::Value> {
|
||||||
pub fn actor(self) -> crate::Result<serde_json::Value> {
|
pub fn actor(self) -> crate::Result<serde_json::Value> {
|
||||||
match self {
|
match self {
|
||||||
|
@ -48,23 +39,11 @@ impl PullResult<serde_json::Value> {
|
||||||
Self::Object(x) => Ok(x),
|
Self::Object(x) => Ok(x),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn resolve(self, ctx: &(impl Fetcher + Sync)) -> crate::Result<()> {
|
|
||||||
match self {
|
|
||||||
Self::Actor(x) => { ctx.resolve_user(x).await?; },
|
|
||||||
Self::Object(x) => { ctx.resolve_object(x).await?; },
|
|
||||||
Self::Activity(x) => { ctx.resolve_activity(x).await?; },
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[axum::async_trait]
|
#[axum::async_trait]
|
||||||
pub trait Fetcher {
|
pub trait Fetcher {
|
||||||
async fn pull(&self, id: &str) -> crate::Result<PullResult<serde_json::Value>> { self.pull_r(id, 0).await }
|
async fn pull(&self, id: &str) -> crate::Result<PullResult<serde_json::Value>>;
|
||||||
async fn pull_r(&self, id: &str, depth: i32) -> crate::Result<PullResult<serde_json::Value>>;
|
|
||||||
|
|
||||||
|
|
||||||
async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String>;
|
async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String>;
|
||||||
|
|
||||||
async fn fetch_domain(&self, domain: &str) -> crate::Result<model::instance::Model>;
|
async fn fetch_domain(&self, domain: &str) -> crate::Result<model::instance::Model>;
|
||||||
|
@ -78,8 +57,8 @@ pub trait Fetcher {
|
||||||
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> { self.fetch_object_r(id, 0).await }
|
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> { self.fetch_object_r(id, 0).await }
|
||||||
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> crate::Result<model::object::Model> { self.resolve_object_r(object, 0).await }
|
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> crate::Result<model::object::Model> { self.resolve_object_r(object, 0).await }
|
||||||
|
|
||||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> crate::Result<model::object::Model>;
|
async fn fetch_object_r(&self, id: &str, depth: i32) -> crate::Result<model::object::Model>;
|
||||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> crate::Result<model::object::Model>;
|
async fn resolve_object_r(&self, object: serde_json::Value, depth: i32) -> crate::Result<model::object::Model>;
|
||||||
|
|
||||||
|
|
||||||
async fn fetch_thread(&self, id: &str) -> crate::Result<()>;
|
async fn fetch_thread(&self, id: &str) -> crate::Result<()>;
|
||||||
|
@ -143,7 +122,7 @@ pub trait Fetcher {
|
||||||
|
|
||||||
#[axum::async_trait]
|
#[axum::async_trait]
|
||||||
impl Fetcher for Context {
|
impl Fetcher for Context {
|
||||||
async fn pull_r(&self, id: &str, depth: u32) -> crate::Result<PullResult<serde_json::Value>> {
|
async fn pull(&self, id: &str) -> crate::Result<PullResult<serde_json::Value>> {
|
||||||
let _domain = self.fetch_domain(&Context::server(id)).await?;
|
let _domain = self.fetch_domain(&Context::server(id)).await?;
|
||||||
|
|
||||||
let document = Self::request(
|
let document = Self::request(
|
||||||
|
@ -154,14 +133,6 @@ impl Fetcher for Context {
|
||||||
.json::<serde_json::Value>()
|
.json::<serde_json::Value>()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let doc_id = document.id().ok_or_else(|| UpubError::field("id"))?;
|
|
||||||
if id != doc_id {
|
|
||||||
if depth >= self.cfg().security.max_id_redirects {
|
|
||||||
return Err(UpubError::unprocessable());
|
|
||||||
}
|
|
||||||
return self.pull(doc_id).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
match document.object_type() {
|
match document.object_type() {
|
||||||
None => Err(UpubError::bad_request()),
|
None => Err(UpubError::bad_request()),
|
||||||
Some(apb::ObjectType::Collection(_)) => Err(UpubError::unprocessable()),
|
Some(apb::ObjectType::Collection(_)) => Err(UpubError::unprocessable()),
|
||||||
|
@ -350,7 +321,7 @@ impl Fetcher for Context {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> crate::Result<model::object::Model> {
|
async fn fetch_object_r(&self, id: &str, depth: i32) -> crate::Result<model::object::Model> {
|
||||||
if let Some(x) = model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
|
if let Some(x) = model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
@ -360,7 +331,7 @@ impl Fetcher for Context {
|
||||||
self.resolve_object_r(object, depth).await
|
self.resolve_object_r(object, depth).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> crate::Result<model::object::Model> {
|
async fn resolve_object_r(&self, object: serde_json::Value, depth: i32) -> crate::Result<model::object::Model> {
|
||||||
let id = object.id().ok_or_else(|| UpubError::field("id"))?.to_string();
|
let id = object.id().ok_or_else(|| UpubError::field("id"))?.to_string();
|
||||||
|
|
||||||
if let Some(oid) = object.id() {
|
if let Some(oid) = object.id() {
|
||||||
|
@ -380,10 +351,10 @@ impl Fetcher for Context {
|
||||||
let addressed = object.addressed();
|
let addressed = object.addressed();
|
||||||
|
|
||||||
if let Some(reply) = object.in_reply_to().id() {
|
if let Some(reply) = object.in_reply_to().id() {
|
||||||
if depth <= self.cfg().security.thread_crawl_depth {
|
if depth <= 16 {
|
||||||
self.fetch_object_r(&reply, depth + 1).await?;
|
self.fetch_object_r(&reply, depth + 1).await?;
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!("thread deeper than {}, giving up fetching more replies", self.cfg().security.thread_crawl_depth);
|
tracing::warn!("thread deeper than 16, giving up fetching more replies");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
use apb::{target::Addressed, Activity, Base, Object};
|
use apb::{target::Addressed, Activity, Base, Object};
|
||||||
use reqwest::StatusCode;
|
|
||||||
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||||
|
|
||||||
use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}};
|
use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}};
|
||||||
|
|
||||||
use super::{fetcher::{Fetcher, PullResult}, side_effects::SideEffects, Context};
|
use super::{fetcher::Fetcher, Context, side_effects::SideEffects};
|
||||||
|
|
||||||
|
|
||||||
#[axum::async_trait]
|
#[axum::async_trait]
|
||||||
|
@ -262,55 +261,34 @@ impl apb::server::Inbox for Context {
|
||||||
let actor = self.fetch_user(&uid).await?;
|
let actor = self.fetch_user(&uid).await?;
|
||||||
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
|
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
|
||||||
let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?;
|
let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?;
|
||||||
|
let activity_model = self.insert_activity(activity.clone(), Some(server)).await?;
|
||||||
match self.pull(&announced_id).await? {
|
|
||||||
PullResult::Actor(_) => Err(UpubError::unprocessable()),
|
|
||||||
PullResult::Object(object) => {
|
|
||||||
let object_model = self.resolve_object(object).await?;
|
|
||||||
let activity_model = self.insert_activity(activity.clone(), Some(server.clone())).await?;
|
|
||||||
|
|
||||||
// relays send us objects as Announce, but we don't really want to count those towards the
|
let announced = self.fetch_object(&announced_id).await?;
|
||||||
// total shares count of an object, so just fetch the object and be done with it
|
// relays send us activities as Announce, but we don't really want to count those towards the
|
||||||
if !matches!(actor.actor_type, apb::ActorType::Person) {
|
// total shares count of an object, so just fetch the object and be done with it
|
||||||
tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id);
|
if matches!(actor.actor_type, apb::ActorType::Person) {
|
||||||
return Ok(())
|
tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id);
|
||||||
}
|
return Ok(())
|
||||||
|
|
||||||
let share = model::announce::ActiveModel {
|
|
||||||
internal: NotSet,
|
|
||||||
actor: Set(internal_uid),
|
|
||||||
object: Set(object_model.internal),
|
|
||||||
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
|
|
||||||
};
|
|
||||||
|
|
||||||
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
|
|
||||||
self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
|
||||||
model::announce::Entity::insert(share)
|
|
||||||
.exec(self.db()).await?;
|
|
||||||
model::object::Entity::update_many()
|
|
||||||
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
|
|
||||||
.filter(model::object::Column::Internal.eq(object_model.internal))
|
|
||||||
.exec(self.db())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::info!("{} shared {}", activity_model.actor, announced_id);
|
|
||||||
Ok(())
|
|
||||||
},
|
|
||||||
PullResult::Activity(activity) => {
|
|
||||||
// groups update all members of other things that happen inside, process those
|
|
||||||
let server = Context::server(activity.id().unwrap_or_default());
|
|
||||||
match activity.activity_type().ok_or_else(UpubError::bad_request)? {
|
|
||||||
apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(self.like(server, activity).await?),
|
|
||||||
apb::ActivityType::Create => Ok(self.create(server, activity).await?),
|
|
||||||
apb::ActivityType::Undo => Ok(self.undo(server, activity).await?),
|
|
||||||
apb::ActivityType::Delete => Ok(self.delete(server, activity).await?),
|
|
||||||
apb::ActivityType::Update => Ok(self.update(server, activity).await?),
|
|
||||||
x => {
|
|
||||||
tracing::warn!("ignoring unhandled announced activity of type {x:?}");
|
|
||||||
Err(StatusCode::NOT_IMPLEMENTED.into())
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let share = model::announce::ActiveModel {
|
||||||
|
internal: NotSet,
|
||||||
|
actor: Set(internal_uid),
|
||||||
|
object: Set(announced.internal),
|
||||||
|
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
|
||||||
|
};
|
||||||
|
|
||||||
|
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
|
||||||
|
self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||||
|
model::announce::Entity::insert(share)
|
||||||
|
.exec(self.db()).await?;
|
||||||
|
model::object::Entity::update_many()
|
||||||
|
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
|
||||||
|
.filter(model::object::Column::Internal.eq(announced.internal))
|
||||||
|
.exec(self.db())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
tracing::info!("{} shared {}", activity_model.actor, announced.id);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue