fix: insert addressings after fetching

also refactored fetcher into a trait of context
This commit is contained in:
əlemi 2024-04-18 05:25:56 +02:00
parent 59ebee71a8
commit f4252a2fbf
Signed by: alemi
GPG key ID: A4895B84D311642C
8 changed files with 57 additions and 43 deletions

View file

@ -1,6 +1,6 @@
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, QueryFilter}; use sea_orm::{ColumnTrait, QueryFilter};
use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, Context}}; use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}};
use apb::{ActivityMut, ObjectMut, BaseMut, Node}; use apb::{ActivityMut, ObjectMut, BaseMut, Node};
use super::{jsonld::LD, JsonLD, TryFetch}; use super::{jsonld::LD, JsonLD, TryFetch};
@ -40,7 +40,7 @@ pub async fn view(
{ {
Some(activity) => Ok(JsonLD(serde_json::Value::from(activity).ld_context())), Some(activity) => Ok(JsonLD(serde_json::Value::from(activity).ld_context())),
None => if auth.is_local() && query.fetch && !ctx.is_local(&aid) { None => if auth.is_local() && query.fetch && !ctx.is_local(&aid) {
Ok(JsonLD(ap_activity(ctx.fetch().activity(&aid).await?).ld_context())) Ok(JsonLD(ap_activity(ctx.fetch_activity(&aid).await?).ld_context()))
} else { } else {
Err(UpubError::not_found()) Err(UpubError::not_found())
}, },

View file

@ -2,7 +2,7 @@ use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, QueryFilter}; use sea_orm::{ColumnTrait, QueryFilter};
use apb::{ObjectMut, BaseMut, Node}; use apb::{ObjectMut, BaseMut, Node};
use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, Context}}; use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}};
use super::{jsonld::LD, JsonLD, TryFetch}; use super::{jsonld::LD, JsonLD, TryFetch};
@ -45,7 +45,7 @@ pub async fn view(
Some(EmbeddedActivity { activity: _, object: Some(object) }) => Ok(JsonLD(ap_object(object).ld_context())), Some(EmbeddedActivity { activity: _, object: Some(object) }) => Ok(JsonLD(ap_object(object).ld_context())),
Some(EmbeddedActivity { activity: _, object: None }) => Err(UpubError::not_found()), Some(EmbeddedActivity { activity: _, object: None }) => Err(UpubError::not_found()),
None => if auth.is_local() && query.fetch && !ctx.is_local(&oid) { None => if auth.is_local() && query.fetch && !ctx.is_local(&oid) {
Ok(JsonLD(ap_object(ctx.fetch().object(&oid).await?).ld_context())) Ok(JsonLD(ap_object(ctx.fetch_object(&oid).await?).ld_context()))
} else { } else {
Err(UpubError::not_found()) Err(UpubError::not_found())
}, },

View file

@ -8,7 +8,7 @@ use axum::extract::{Path, Query, State};
use sea_orm::EntityTrait; use sea_orm::EntityTrait;
use apb::{ActorMut, BaseMut, CollectionMut, DocumentMut, DocumentType, Node, ObjectMut, PublicKeyMut}; use apb::{ActorMut, BaseMut, CollectionMut, DocumentMut, DocumentType, Node, ObjectMut, PublicKeyMut};
use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, Context}, url}; use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url};
use super::{jsonld::LD, JsonLD, TryFetch}; use super::{jsonld::LD, JsonLD, TryFetch};
@ -103,7 +103,7 @@ pub async fn view(
// remote user TODDO doesn't work? // remote user TODDO doesn't work?
Some((user, None)) => Ok(JsonLD(ap_user(user).ld_context())), Some((user, None)) => Ok(JsonLD(ap_user(user).ld_context())),
None => if auth.is_local() && query.fetch && !ctx.is_local(&uid) { None => if auth.is_local() && query.fetch && !ctx.is_local(&uid) {
Ok(JsonLD(ap_user(ctx.fetch().user(&uid).await?).ld_context())) Ok(JsonLD(ap_user(ctx.fetch_user(&uid).await?).ld_context()))
} else { } else {
Err(UpubError::not_found()) Err(UpubError::not_found())
}, },

View file

@ -8,6 +8,8 @@ use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
use crate::{errors::UpubError, model, server::Context}; use crate::{errors::UpubError, model, server::Context};
use super::fetcher::Fetcher;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Identity { pub enum Identity {
Anonymous, Anonymous,
@ -101,7 +103,7 @@ where
.next().ok_or(UpubError::bad_request())? .next().ok_or(UpubError::bad_request())?
.to_string(); .to_string();
match ctx.fetch().user(&user_id).await { match ctx.fetch_user(&user_id).await {
Ok(user) => match http_signature Ok(user) => match http_signature
.build_from_parts(parts) .build_from_parts(parts)
.verify(&user.public_key) .verify(&user.public_key)

View file

@ -6,7 +6,7 @@ use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilt
use crate::{model, routes::activitypub::jsonld::LD}; use crate::{model, routes::activitypub::jsonld::LD};
use super::{dispatcher::Dispatcher, fetcher::Fetcher}; use super::dispatcher::Dispatcher;
#[derive(Clone)] #[derive(Clone)]
@ -15,7 +15,6 @@ struct ContextInner {
db: DatabaseConnection, db: DatabaseConnection,
domain: String, domain: String,
protocol: String, protocol: String,
fetcher: Fetcher,
dispatcher: Dispatcher, dispatcher: Dispatcher,
// TODO keep these pre-parsed // TODO keep these pre-parsed
app: model::application::Model, app: model::application::Model,
@ -63,10 +62,8 @@ impl Context {
} }
}; };
let fetcher = Fetcher::new(db.clone(), domain.clone(), app.private_key.clone());
Ok(Context(Arc::new(ContextInner { Ok(Context(Arc::new(ContextInner {
db, domain, protocol, app, fetcher, dispatcher, db, domain, protocol, app, dispatcher,
}))) })))
} }
@ -92,10 +89,6 @@ impl Context {
} }
} }
pub fn fetch(&self) -> &Fetcher {
&self.0.fetcher
}
/// get full user id uri /// get full user id uri
pub fn uid(&self, id: String) -> String { pub fn uid(&self, id: String) -> String {
self.uri("users", id) self.uri("users", id)

View file

@ -3,7 +3,7 @@ use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, Qu
use tokio::{sync::broadcast, task::JoinHandle}; use tokio::{sync::broadcast, task::JoinHandle};
use apb::{ActivityMut, Node}; use apb::{ActivityMut, Node};
use crate::{errors::UpubError, model, routes::activitypub::{activity::ap_activity, object::ap_object}, server::fetcher::Fetcher}; use crate::{errors::UpubError, model, routes::activitypub::{activity::ap_activity, object::ap_object}, server::{fetcher::Fetcher, Context}};
pub struct Dispatcher { pub struct Dispatcher {
waker: broadcast::Sender<()>, waker: broadcast::Sender<()>,
@ -91,7 +91,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
continue; continue;
}; };
if let Err(e) = Fetcher::request( if let Err(e) = Context::request(
Method::POST, &delivery.target, Method::POST, &delivery.target,
Some(&serde_json::to_string(&payload).unwrap()), Some(&serde_json::to_string(&payload).unwrap()),
&delivery.actor, &key, &domain &delivery.actor, &key, &domain

View file

@ -1,26 +1,34 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use apb::target::Addressed;
use base64::Engine; use base64::Engine;
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel}; use sea_orm::{EntityTrait, IntoActiveModel};
use crate::{model, VERSION}; use crate::{model, VERSION};
use super::{auth::HttpSignature, Context}; use super::{auth::HttpSignature, Context};
#[axum::async_trait]
pub trait Fetcher {
async fn request(
method: reqwest::Method,
url: &str,
payload: Option<&str>,
from: &str,
key: &str,
domain: &str,
) -> crate::Result<Response>;
pub struct Fetcher { async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model>;
db: DatabaseConnection, async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model>;
key: String, // TODO store pre-parsed async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
domain: String, // TODO merge directly with Context so we don't need to copy this
} }
impl Fetcher {
pub fn new(db: DatabaseConnection, domain: String, key: String) -> Self {
Fetcher { db, domain, key }
}
pub async fn request( #[axum::async_trait]
impl Fetcher for Context {
async fn request(
method: reqwest::Method, method: reqwest::Method,
url: &str, url: &str,
payload: Option<&str>, payload: Option<&str>,
@ -74,50 +82,61 @@ impl Fetcher {
Ok(res.error_for_status()?) Ok(res.error_for_status()?)
} }
pub async fn user(&self, id: &str) -> crate::Result<model::user::Model> { async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model> {
if let Some(x) = model::user::Entity::find_by_id(id).one(&self.db).await? { if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
let user = Self::request( let user = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, Method::GET, id, None, &format!("https://{}", self.base()), &self.app().private_key, self.base(),
).await?.json::<serde_json::Value>().await?; ).await?.json::<serde_json::Value>().await?;
let user_model = model::user::Model::new(&user)?; let user_model = model::user::Model::new(&user)?;
model::user::Entity::insert(user_model.clone().into_active_model()) model::user::Entity::insert(user_model.clone().into_active_model())
.exec(&self.db).await?; .exec(self.db()).await?;
Ok(user_model) Ok(user_model)
} }
pub async fn activity(&self, id: &str) -> crate::Result<model::activity::Model> { async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model> {
if let Some(x) = model::activity::Entity::find_by_id(id).one(&self.db).await? { if let Some(x) = model::activity::Entity::find_by_id(id).one(self.db()).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
let activity = Self::request( let activity = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, Method::GET, id, None, &format!("https://{}", self.base()), &self.app().private_key, self.base(),
).await?.json::<serde_json::Value>().await?; ).await?.json::<serde_json::Value>().await?;
let addressed = activity.addressed();
let activity_model = model::activity::Model::new(&activity)?; let activity_model = model::activity::Model::new(&activity)?;
model::activity::Entity::insert(activity_model.clone().into_active_model()) model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(&self.db).await?; .exec(self.db()).await?;
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(&activity_model.id, activity_model.object.as_deref(), &expanded_addresses).await?;
Ok(activity_model) Ok(activity_model)
} }
pub async fn object(&self, id: &str) -> crate::Result<model::object::Model> { async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> {
if let Some(x) = model::object::Entity::find_by_id(id).one(&self.db).await? { if let Some(x) = model::object::Entity::find_by_id(id).one(self.db()).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
let object = Self::request( let object = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain), &self.key, &self.domain, Method::GET, id, None, &format!("https://{}", self.base()), &self.app().private_key, self.base(),
).await?.json::<serde_json::Value>().await?; ).await?.json::<serde_json::Value>().await?;
let addressed = object.addressed();
let object_model = model::object::Model::new(&object)?; let object_model = model::object::Model::new(&object)?;
model::object::Entity::insert(object_model.clone().into_active_model()) model::object::Entity::insert(object_model.clone().into_active_model())
.exec(&self.db).await?; .exec(self.db()).await?;
let expanded_addresses = self.expand_addressing(addressed).await?;
// TODO we don't know which activity created this!
self.address_to("", Some(&object_model.id), &expanded_addresses).await?;
Ok(object_model) Ok(object_model)
} }
@ -134,7 +153,7 @@ impl Fetchable for apb::Node<serde_json::Value> {
if let apb::Node::Link(uri) = self { if let apb::Node::Link(uri) = self {
let from = format!("{}{}", ctx.protocol(), ctx.base()); // TODO helper to avoid this? let from = format!("{}{}", ctx.protocol(), ctx.base()); // TODO helper to avoid this?
let pkey = &ctx.app().private_key; let pkey = &ctx.app().private_key;
*self = Fetcher::request(Method::GET, uri.href(), None, &from, pkey, ctx.base()) *self = Context::request(Method::GET, uri.href(), None, &from, pkey, ctx.base())
.await? .await?
.json::<serde_json::Value>() .json::<serde_json::Value>()
.await? .await?

View file

@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ColumnTrait, Condition, EntityTrait, IntoActiveMo
use crate::{errors::{LoggableError, UpubError}, model}; use crate::{errors::{LoggableError, UpubError}, model};
use super::Context; use super::{fetcher::Fetcher, Context};
#[axum::async_trait] #[axum::async_trait]
@ -34,7 +34,7 @@ impl apb::server::Inbox for Context {
let aid = activity.id().ok_or(UpubError::bad_request())?; let aid = activity.id().ok_or(UpubError::bad_request())?;
let uid = activity.actor().id().ok_or(UpubError::bad_request())?; let uid = activity.actor().id().ok_or(UpubError::bad_request())?;
let oid = activity.object().id().ok_or(UpubError::bad_request())?; let oid = activity.object().id().ok_or(UpubError::bad_request())?;
if let Err(e) = self.fetch().object(&oid).await { if let Err(e) = self.fetch_object(&oid).await {
tracing::warn!("failed fetching liked object: {e}"); tracing::warn!("failed fetching liked object: {e}");
} }
let like = model::like::ActiveModel { let like = model::like::ActiveModel {