From f4252a2fbf8a67cb7dbf3bf5ce0aca67b6a3142d Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 18 Apr 2024 05:25:56 +0200 Subject: [PATCH] fix: insert addressings after fetching also refactored fetcher into a trait of context --- src/routes/activitypub/activity.rs | 4 +- src/routes/activitypub/object.rs | 4 +- src/routes/activitypub/user/mod.rs | 4 +- src/server/auth.rs | 4 +- src/server/context.rs | 11 +---- src/server/dispatcher.rs | 4 +- src/server/fetcher.rs | 65 +++++++++++++++++++----------- src/server/inbox.rs | 4 +- 8 files changed, 57 insertions(+), 43 deletions(-) diff --git a/src/routes/activitypub/activity.rs b/src/routes/activitypub/activity.rs index a66b053..271239b 100644 --- a/src/routes/activitypub/activity.rs +++ b/src/routes/activitypub/activity.rs @@ -1,6 +1,6 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, QueryFilter}; -use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, Context}}; +use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; use apb::{ActivityMut, ObjectMut, BaseMut, Node}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -40,7 +40,7 @@ pub async fn view( { Some(activity) => Ok(JsonLD(serde_json::Value::from(activity).ld_context())), None => if auth.is_local() && query.fetch && !ctx.is_local(&aid) { - Ok(JsonLD(ap_activity(ctx.fetch().activity(&aid).await?).ld_context())) + Ok(JsonLD(ap_activity(ctx.fetch_activity(&aid).await?).ld_context())) } else { Err(UpubError::not_found()) }, diff --git a/src/routes/activitypub/object.rs b/src/routes/activitypub/object.rs index 6555542..dd8de76 100644 --- a/src/routes/activitypub/object.rs +++ b/src/routes/activitypub/object.rs @@ -2,7 +2,7 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, QueryFilter}; use apb::{ObjectMut, BaseMut, Node}; -use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, Context}}; +use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -45,7 +45,7 @@ pub async fn view( Some(EmbeddedActivity { activity: _, object: Some(object) }) => Ok(JsonLD(ap_object(object).ld_context())), Some(EmbeddedActivity { activity: _, object: None }) => Err(UpubError::not_found()), None => if auth.is_local() && query.fetch && !ctx.is_local(&oid) { - Ok(JsonLD(ap_object(ctx.fetch().object(&oid).await?).ld_context())) + Ok(JsonLD(ap_object(ctx.fetch_object(&oid).await?).ld_context())) } else { Err(UpubError::not_found()) }, diff --git a/src/routes/activitypub/user/mod.rs b/src/routes/activitypub/user/mod.rs index b2aa438..dec01ee 100644 --- a/src/routes/activitypub/user/mod.rs +++ b/src/routes/activitypub/user/mod.rs @@ -8,7 +8,7 @@ use axum::extract::{Path, Query, State}; use sea_orm::EntityTrait; use apb::{ActorMut, BaseMut, CollectionMut, DocumentMut, DocumentType, Node, ObjectMut, PublicKeyMut}; -use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, Context}, url}; +use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -103,7 +103,7 @@ pub async fn view( // remote user TODDO doesn't work? Some((user, None)) => Ok(JsonLD(ap_user(user).ld_context())), None => if auth.is_local() && query.fetch && !ctx.is_local(&uid) { - Ok(JsonLD(ap_user(ctx.fetch().user(&uid).await?).ld_context())) + Ok(JsonLD(ap_user(ctx.fetch_user(&uid).await?).ld_context())) } else { Err(UpubError::not_found()) }, diff --git a/src/server/auth.rs b/src/server/auth.rs index 5132939..8e2176e 100644 --- a/src/server/auth.rs +++ b/src/server/auth.rs @@ -8,6 +8,8 @@ use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; use crate::{errors::UpubError, model, server::Context}; +use super::fetcher::Fetcher; + #[derive(Debug, Clone)] pub enum Identity { Anonymous, @@ -101,7 +103,7 @@ where .next().ok_or(UpubError::bad_request())? .to_string(); - match ctx.fetch().user(&user_id).await { + match ctx.fetch_user(&user_id).await { Ok(user) => match http_signature .build_from_parts(parts) .verify(&user.public_key) diff --git a/src/server/context.rs b/src/server/context.rs index d7c7bf9..3e0ffba 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -6,7 +6,7 @@ use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilt use crate::{model, routes::activitypub::jsonld::LD}; -use super::{dispatcher::Dispatcher, fetcher::Fetcher}; +use super::dispatcher::Dispatcher; #[derive(Clone)] @@ -15,7 +15,6 @@ struct ContextInner { db: DatabaseConnection, domain: String, protocol: String, - fetcher: Fetcher, dispatcher: Dispatcher, // TODO keep these pre-parsed app: model::application::Model, @@ -63,10 +62,8 @@ impl Context { } }; - let fetcher = Fetcher::new(db.clone(), domain.clone(), app.private_key.clone()); - Ok(Context(Arc::new(ContextInner { - db, domain, protocol, app, fetcher, dispatcher, + db, domain, protocol, app, dispatcher, }))) } @@ -92,10 +89,6 @@ impl Context { } } - pub fn fetch(&self) -> &Fetcher { - &self.0.fetcher - } - /// get full user id uri pub fn uid(&self, id: String) -> String { self.uri("users", id) diff --git a/src/server/dispatcher.rs b/src/server/dispatcher.rs index b922ec0..dc60a1d 100644 --- a/src/server/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -3,7 +3,7 @@ use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, Qu use tokio::{sync::broadcast, task::JoinHandle}; use apb::{ActivityMut, Node}; -use crate::{errors::UpubError, model, routes::activitypub::{activity::ap_activity, object::ap_object}, server::fetcher::Fetcher}; +use crate::{errors::UpubError, model, routes::activitypub::{activity::ap_activity, object::ap_object}, server::{fetcher::Fetcher, Context}}; pub struct Dispatcher { waker: broadcast::Sender<()>, @@ -91,7 +91,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut continue; }; - if let Err(e) = Fetcher::request( + if let Err(e) = Context::request( Method::POST, &delivery.target, Some(&serde_json::to_string(&payload).unwrap()), &delivery.actor, &key, &domain diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index bf960d6..039dac9 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -1,26 +1,34 @@ use std::collections::BTreeMap; +use apb::target::Addressed; use base64::Engine; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; -use sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel}; +use sea_orm::{EntityTrait, IntoActiveModel}; use crate::{model, VERSION}; use super::{auth::HttpSignature, Context}; +#[axum::async_trait] +pub trait Fetcher { + async fn request( + method: reqwest::Method, + url: &str, + payload: Option<&str>, + from: &str, + key: &str, + domain: &str, + ) -> crate::Result; -pub struct Fetcher { - db: DatabaseConnection, - key: String, // TODO store pre-parsed - domain: String, // TODO merge directly with Context so we don't need to copy this + async fn fetch_user(&self, id: &str) -> crate::Result; + async fn fetch_object(&self, id: &str) -> crate::Result; + async fn fetch_activity(&self, id: &str) -> crate::Result; } -impl Fetcher { - pub fn new(db: DatabaseConnection, domain: String, key: String) -> Self { - Fetcher { db, domain, key } - } - pub async fn request( +#[axum::async_trait] +impl Fetcher for Context { + async fn request( method: reqwest::Method, url: &str, payload: Option<&str>, @@ -74,50 +82,61 @@ impl Fetcher { Ok(res.error_for_status()?) } - pub async fn user(&self, id: &str) -> crate::Result { - if let Some(x) = model::user::Entity::find_by_id(id).one(&self.db).await? { + async fn fetch_user(&self, id: &str) -> crate::Result { + if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } let user = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, + Method::GET, id, None, &format!("https://{}", self.base()), &self.app().private_key, self.base(), ).await?.json::().await?; let user_model = model::user::Model::new(&user)?; model::user::Entity::insert(user_model.clone().into_active_model()) - .exec(&self.db).await?; + .exec(self.db()).await?; Ok(user_model) } - pub async fn activity(&self, id: &str) -> crate::Result { - if let Some(x) = model::activity::Entity::find_by_id(id).one(&self.db).await? { + async fn fetch_activity(&self, id: &str) -> crate::Result { + if let Some(x) = model::activity::Entity::find_by_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } let activity = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, + Method::GET, id, None, &format!("https://{}", self.base()), &self.app().private_key, self.base(), ).await?.json::().await?; + + let addressed = activity.addressed(); let activity_model = model::activity::Model::new(&activity)?; model::activity::Entity::insert(activity_model.clone().into_active_model()) - .exec(&self.db).await?; + .exec(self.db()).await?; + + let expanded_addresses = self.expand_addressing(addressed).await?; + self.address_to(&activity_model.id, activity_model.object.as_deref(), &expanded_addresses).await?; Ok(activity_model) } - pub async fn object(&self, id: &str) -> crate::Result { - if let Some(x) = model::object::Entity::find_by_id(id).one(&self.db).await? { + async fn fetch_object(&self, id: &str) -> crate::Result { + if let Some(x) = model::object::Entity::find_by_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } let object = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, + Method::GET, id, None, &format!("https://{}", self.base()), &self.app().private_key, self.base(), ).await?.json::().await?; + + let addressed = object.addressed(); let object_model = model::object::Model::new(&object)?; model::object::Entity::insert(object_model.clone().into_active_model()) - .exec(&self.db).await?; + .exec(self.db()).await?; + + let expanded_addresses = self.expand_addressing(addressed).await?; + // TODO we don't know which activity created this! + self.address_to("", Some(&object_model.id), &expanded_addresses).await?; Ok(object_model) } @@ -134,7 +153,7 @@ impl Fetchable for apb::Node { if let apb::Node::Link(uri) = self { let from = format!("{}{}", ctx.protocol(), ctx.base()); // TODO helper to avoid this? let pkey = &ctx.app().private_key; - *self = Fetcher::request(Method::GET, uri.href(), None, &from, pkey, ctx.base()) + *self = Context::request(Method::GET, uri.href(), None, &from, pkey, ctx.base()) .await? .json::() .await? diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 9424485..ea5e664 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ColumnTrait, Condition, EntityTrait, IntoActiveMo use crate::{errors::{LoggableError, UpubError}, model}; -use super::Context; +use super::{fetcher::Fetcher, Context}; #[axum::async_trait] @@ -34,7 +34,7 @@ impl apb::server::Inbox for Context { let aid = activity.id().ok_or(UpubError::bad_request())?; let uid = activity.actor().id().ok_or(UpubError::bad_request())?; let oid = activity.object().id().ok_or(UpubError::bad_request())?; - if let Err(e) = self.fetch().object(&oid).await { + if let Err(e) = self.fetch_object(&oid).await { tracing::warn!("failed fetching liked object: {e}"); } let like = model::like::ActiveModel {