feat: fetch accepts ConnectionTrait, better tx use

basically nothing wants a transaction anymore, so that quick stuff can
pass a DatabaseConnection and be done, while longer stuff can start a
transaction and provide that. i think this will solve deadlocks with
transactions in sqlite?
This commit is contained in:
əlemi 2024-06-06 07:10:23 +02:00
parent 45bbc34dba
commit 1ce89aa6f9
Signed by: alemi
GPG key ID: A4895B84D311642C
10 changed files with 111 additions and 104 deletions

1
Cargo.lock generated
View file

@ -4693,6 +4693,7 @@ name = "upub"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"apb", "apb",
"async-recursion",
"async-trait", "async-trait",
"base64 0.22.1", "base64 0.22.1",
"chrono", "chrono",

View file

@ -12,6 +12,7 @@ readme = "README.md"
[dependencies] [dependencies]
thiserror = "1" thiserror = "1"
async-recursion = "1.1"
async-trait = "0.1" async-trait = "0.1"
sha256 = "1.5" sha256 = "1.5"
openssl = "0.10" # TODO handle pubkeys with a smaller crate openssl = "0.10" # TODO handle pubkeys with a smaller crate

View file

@ -1,12 +1,12 @@
use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseTransaction, DbErr, EntityTrait}; use sea_orm::{ActiveValue::{NotSet, Set}, ConnectionTrait, DbErr, EntityTrait};
use crate::traits::fetch::Fetcher; use crate::traits::fetch::Fetcher;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Addresser { pub trait Addresser {
async fn expand_addressing(&self, targets: Vec<String>) -> Result<Vec<String>, DbErr>; async fn expand_addressing(&self, targets: Vec<String>) -> Result<Vec<String>, DbErr>;
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>; async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>; async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -32,7 +32,7 @@ impl Addresser for crate::Context {
Ok(out) Ok(out)
} }
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> { async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> {
// TODO address_to became kind of expensive, with these two selects right away and then another // TODO address_to became kind of expensive, with these two selects right away and then another
// select for each target we're addressing to... can this be improved?? // select for each target we're addressing to... can this be improved??
let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false };
@ -75,7 +75,7 @@ impl Addresser for crate::Context {
Ok(()) Ok(())
} }
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> { async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &impl ConnectionTrait) -> Result<(), DbErr> {
let mut deliveries = Vec::new(); let mut deliveries = Vec::new();
for target in targets.iter() for target in targets.iter()
.filter(|to| !to.is_empty()) .filter(|to| !to.is_empty())
@ -83,7 +83,7 @@ impl Addresser for crate::Context {
.filter(|to| to != &apb::target::PUBLIC) .filter(|to| to != &apb::target::PUBLIC)
{ {
// TODO fetch concurrently // TODO fetch concurrently
match self.fetch_user(target).await { match self.fetch_user(target, tx).await {
Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
crate::model::job::ActiveModel { crate::model::job::ActiveModel {
internal: sea_orm::ActiveValue::NotSet, internal: sea_orm::ActiveValue::NotSet,

View file

@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object};
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet, TransactionTrait}; use sea_orm::{ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, NotSet};
use crate::traits::normalize::AP; use crate::traits::normalize::AP;
@ -86,22 +86,18 @@ pub trait Fetcher {
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>; async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>;
async fn fetch_domain(&self, domain: &str) -> Result<crate::model::instance::Model, PullError>; async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError>;
async fn fetch_user(&self, id: &str) -> Result<crate::model::actor::Model, PullError>; async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>;
async fn resolve_user(&self, actor: serde_json::Value) -> Result<crate::model::actor::Model, PullError>; async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError>;
async fn fetch_activity(&self, id: &str) -> Result<crate::model::activity::Model, PullError>; async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>;
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError>; async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError>;
async fn fetch_object(&self, id: &str) -> Result<crate::model::object::Model, PullError> { self.fetch_object_r(id, 0).await } async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>;
async fn resolve_object(&self, object: serde_json::Value) -> Result<crate::model::object::Model, PullError> { self.resolve_object_r(object, 0).await } async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError>;
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError>; async fn fetch_thread(&self, id: &str, tx: &impl ConnectionTrait) -> Result<(), PullError>;
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError>;
async fn fetch_thread(&self, id: &str) -> Result<(), PullError>;
async fn request( async fn request(
method: reqwest::Method, method: reqwest::Method,
@ -163,7 +159,7 @@ pub trait Fetcher {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Fetcher for crate::Context { impl Fetcher for crate::Context {
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> { async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> {
let _domain = self.fetch_domain(&crate::Context::server(id)).await?; // let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
let document = Self::request( let document = Self::request(
Method::GET, id, None, Method::GET, id, None,
@ -223,8 +219,8 @@ impl Fetcher for crate::Context {
Ok(None) Ok(None)
} }
async fn fetch_domain(&self, domain: &str) -> Result<crate::model::instance::Model, PullError> { async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, PullError> {
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(self.db()).await? { if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
@ -265,8 +261,8 @@ impl Fetcher for crate::Context {
let mut active_model = instance_model.clone().into_active_model(); let mut active_model = instance_model.clone().into_active_model();
active_model.internal = NotSet; active_model.internal = NotSet;
crate::model::instance::Entity::insert(active_model).exec(self.db()).await?; crate::model::instance::Entity::insert(active_model).exec(tx).await?;
let internal = crate::model::instance::Entity::domain_to_internal(domain, self.db()) let internal = crate::model::instance::Entity::domain_to_internal(domain, tx)
.await? .await?
.ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?; .ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?;
instance_model.internal = internal; instance_model.internal = internal;
@ -274,9 +270,11 @@ impl Fetcher for crate::Context {
Ok(instance_model) Ok(instance_model)
} }
async fn resolve_user(&self, mut document: serde_json::Value) -> Result<crate::model::actor::Model, PullError> { async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> {
let id = document.id()?.to_string(); let id = document.id()?.to_string();
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time
if let Ok(followers_url) = document.followers().id() { if let Ok(followers_url) = document.followers().id() {
let req = Self::request( let req = Self::request(
@ -311,118 +309,121 @@ impl Fetcher for crate::Context {
// TODO this may fail: while fetching, remote server may fetch our service actor. // TODO this may fail: while fetching, remote server may fetch our service actor.
// if it does so with http signature, we will fetch that actor in background // if it does so with http signature, we will fetch that actor in background
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error // meaning that, once we reach here, it's already inserted and returns an UNIQUE error
crate::model::actor::Entity::insert(user_model).exec(self.db()).await?; crate::model::actor::Entity::insert(user_model).exec(tx).await?;
// TODO fetch it back to get the internal id // TODO fetch it back to get the internal id
Ok( Ok(
crate::model::actor::Entity::find_by_ap_id(&id) crate::model::actor::Entity::find_by_ap_id(&id)
.one(self.db()) .one(tx)
.await? .await?
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))? .ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
) )
} }
async fn fetch_user(&self, id: &str) -> Result<crate::model::actor::Model, PullError> { async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, PullError> {
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
let document = self.pull(id).await?.actor()?; let document = self.pull(id).await?.actor()?;
self.resolve_user(document).await self.resolve_user(document, tx).await
} }
async fn fetch_activity(&self, id: &str) -> Result<crate::model::activity::Model, PullError> { async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> {
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(tx).await? {
return Ok(x); // already in db, easy return Ok(x); // already in db, easy
} }
let activity = self.pull(id).await?.activity()?; let activity = self.pull(id).await?.activity()?;
self.resolve_activity(activity).await self.resolve_activity(activity, tx).await
} }
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError> { async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, PullError> {
let _domain = self.fetch_domain(&crate::Context::server(activity.id()?), tx).await?;
if let Ok(activity_actor) = activity.actor().id() { if let Ok(activity_actor) = activity.actor().id() {
if let Err(e) = self.fetch_user(activity_actor).await { if let Err(e) = self.fetch_user(activity_actor, tx).await {
tracing::warn!("could not get actor of fetched activity: {e}"); tracing::warn!("could not get actor of fetched activity: {e}");
} }
} }
if let Ok(activity_object) = activity.object().id() { if let Ok(activity_object) = activity.object().id() {
if let Err(e) = self.fetch_object(activity_object).await { if let Err(e) = self.fetch_object(activity_object, tx).await {
tracing::warn!("could not get object of fetched activity: {e}"); tracing::warn!("could not get object of fetched activity: {e}");
} }
} }
let tx = self.db().begin().await?; let activity_model = self.insert_activity(activity, tx).await?;
let activity_model = self.insert_activity(activity, &tx).await?;
let addressed = activity_model.addressed(); let addressed = activity_model.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?; let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addresses, &tx).await?; self.address_to(Some(activity_model.internal), None, &expanded_addresses, tx).await?;
tx.commit().await?;
Ok(activity_model) Ok(activity_model)
} }
async fn fetch_thread(&self, _id: &str) -> Result<(), PullError> { async fn fetch_thread(&self, _id: &str, _tx: &impl ConnectionTrait) -> Result<(), PullError> {
// crawl_replies(self, id, 0).await // crawl_replies(self, id, 0).await
todo!() todo!()
} }
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError> { async fn fetch_object(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(self.db()).await? { fetch_object_r(self, id, 0, tx).await
return Ok(x); // already in db, easy
}
let object = self.pull(id).await?.object()?;
self.resolve_object_r(object, depth).await
} }
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError> { async fn resolve_object(&self, object: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
let id = object.id()?.to_string(); resolve_object_r(self, object, 0, tx).await
if let Ok(oid) = object.id() {
if oid != id {
if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? {
return Ok(x); // already in db, but with id different that given url
}
}
}
if let Ok(attributed_to) = object.attributed_to().id() {
if let Err(e) = self.fetch_user(attributed_to).await {
tracing::warn!("could not get actor of fetched object: {e}");
}
}
let addressed = object.addressed();
if let Ok(reply) = object.in_reply_to().id() {
if depth <= self.cfg().security.thread_crawl_depth {
self.fetch_object_r(reply, depth + 1).await?;
} else {
tracing::warn!("thread deeper than {}, giving up fetching more replies", self.cfg().security.thread_crawl_depth);
}
}
let tx = self.db().begin().await?;
let object_model = self.insert_object(object, &tx).await?;
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(None, Some(object_model.internal), &expanded_addresses, &tx).await?;
tx.commit().await?;
Ok(object_model)
} }
} }
#[async_recursion::async_recursion]
async fn fetch_object_r(ctx: &crate::Context, id: &str, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(tx).await? {
return Ok(x); // already in db, easy
}
let object = ctx.pull(id).await?.object()?;
resolve_object_r(ctx, object, depth, tx).await
}
async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth: u32, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, PullError> {
let id = object.id()?.to_string();
if let Ok(oid) = object.id() {
if oid != id {
if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(tx).await? {
return Ok(x); // already in db, but with id different that given url
}
}
}
if let Ok(attributed_to) = object.attributed_to().id() {
if let Err(e) = ctx.fetch_user(attributed_to, tx).await {
tracing::warn!("could not get actor of fetched object: {e}");
}
}
let addressed = object.addressed();
if let Ok(reply) = object.in_reply_to().id() {
if depth <= ctx.cfg().security.thread_crawl_depth {
fetch_object_r(ctx, reply, depth + 1, tx).await?;
} else {
tracing::warn!("thread deeper than {}, giving up fetching more replies", ctx.cfg().security.thread_crawl_depth);
}
}
let object_model = ctx.insert_object(object, tx).await?;
let expanded_addresses = ctx.expand_addressing(addressed).await?;
ctx.address_to(None, Some(object_model.internal), &expanded_addresses, tx).await?;
Ok(object_model)
}
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Fetchable : Sync + Send { pub trait Fetchable : Sync + Send {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>; async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>;

View file

@ -1,5 +1,5 @@
use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey};
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DatabaseTransaction, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum NormalizerError { pub enum NormalizerError {
@ -12,14 +12,14 @@ pub enum NormalizerError {
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Normalizer { pub trait Normalizer {
async fn insert_object(&self, obj: impl apb::Object, tx: &DatabaseTransaction) -> Result<crate::model::object::Model, NormalizerError>; async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>;
async fn insert_activity(&self, act: impl apb::Activity, tx: &DatabaseTransaction) -> Result<crate::model::activity::Model, NormalizerError>; async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Normalizer for crate::Context { impl Normalizer for crate::Context {
async fn insert_object(&self, object: impl apb::Object, tx: &DatabaseTransaction) -> Result<crate::model::object::Model, NormalizerError> { async fn insert_object(&self, object: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError> {
let oid = object.id()?.to_string(); let oid = object.id()?.to_string();
let uid = object.attributed_to().id().str(); let uid = object.attributed_to().id().str();
let t = object.object_type()?; let t = object.object_type()?;
@ -131,7 +131,7 @@ impl Normalizer for crate::Context {
Ok(object_model) Ok(object_model)
} }
async fn insert_activity(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<crate::model::activity::Model, NormalizerError> { async fn insert_activity(&self, activity: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError> {
let mut activity_model = AP::activity(&activity)?; let mut activity_model = AP::activity(&activity)?;
let mut active_model = activity_model.clone().into_active_model(); let mut active_model = activity_model.clone().into_active_model();

View file

@ -61,7 +61,7 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
return Err(ProcessorError::Unprocessable); return Err(ProcessorError::Unprocessable);
}; };
if let Ok(reply) = object_node.in_reply_to().id() { if let Ok(reply) = object_node.in_reply_to().id() {
if let Err(e) = ctx.fetch_object(reply).await { if let Err(e) = ctx.fetch_object(reply, tx).await {
tracing::warn!("failed fetching replies for received object: {e}"); tracing::warn!("failed fetching replies for received object: {e}");
} }
} }
@ -80,7 +80,7 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
.ok_or(ProcessorError::Incomplete)?; .ok_or(ProcessorError::Incomplete)?;
let object_uri = activity.object().id()?.to_string(); let object_uri = activity.object().id()?.to_string();
let published = activity.published().unwrap_or_else(|_|chrono::Utc::now()); let published = activity.published().unwrap_or_else(|_|chrono::Utc::now());
let obj = ctx.fetch_object(&object_uri).await?; let obj = ctx.fetch_object(&object_uri, tx).await?;
if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal)
.any(tx) .any(tx)
.await? .await?
@ -127,7 +127,7 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
.await? .await?
.ok_or(ProcessorError::Incomplete)?; .ok_or(ProcessorError::Incomplete)?;
let target_actor = activity.object().id()?.to_string(); let target_actor = activity.object().id()?.to_string();
let usr = ctx.fetch_user(&target_actor).await?; let usr = ctx.fetch_user(&target_actor, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?; let activity_model = ctx.insert_activity(activity, tx).await?;
let relation_model = crate::model::relation::ActiveModel { let relation_model = crate::model::relation::ActiveModel {
internal: NotSet, internal: NotSet,
@ -346,7 +346,7 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let uid = activity.actor().id()?.to_string(); let uid = activity.actor().id()?.to_string();
let actor = ctx.fetch_user(&uid).await?; let actor = ctx.fetch_user(&uid, tx).await?;
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx) let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx)
.await? .await?
.ok_or(ProcessorError::Incomplete)?; .ok_or(ProcessorError::Incomplete)?;
@ -369,7 +369,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
// objects are processed down below, make a mock Internal::Object(internal) // objects are processed down below, make a mock Internal::Object(internal)
Pull::Object(x) => Pull::Object(x) =>
crate::context::Internal::Object( crate::context::Internal::Object(
ctx.resolve_object(x).await?.internal ctx.resolve_object(x, tx).await?.internal
), ),
} }
} }

View file

@ -1,5 +1,5 @@
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, QueryFilter}; use sea_orm::{ColumnTrait, QueryFilter, TransactionTrait};
use upub::{model::{self, addressing::Event, attachment::BatchFillable}, traits::Fetcher, Context}; use upub::{model::{self, addressing::Event, attachment::BatchFillable}, traits::Fetcher, Context};
use apb::LD; use apb::LD;
@ -15,7 +15,9 @@ pub async fn view(
) -> crate::ApiResult<JsonLD<serde_json::Value>> { ) -> crate::ApiResult<JsonLD<serde_json::Value>> {
let aid = ctx.aid(&id); let aid = ctx.aid(&id);
if auth.is_local() && query.fetch && !ctx.is_local(&aid) { if auth.is_local() && query.fetch && !ctx.is_local(&aid) {
let obj = ctx.fetch_activity(&aid).await?; let tx = ctx.db().begin().await?;
let obj = ctx.fetch_activity(&aid, &tx).await?;
tx.commit().await?;
if obj.id != aid { if obj.id != aid {
return Err(crate::ApiError::Redirect(obj.id)); return Err(crate::ApiError::Redirect(obj.id));
} }

View file

@ -2,7 +2,7 @@ pub mod replies;
use apb::{CollectionMut, ObjectMut, LD}; use apb::{CollectionMut, ObjectMut, LD};
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait};
use upub::{model::{self, addressing::Event}, traits::Fetcher, Context}; use upub::{model::{self, addressing::Event}, traits::Fetcher, Context};
use crate::{builders::JsonLD, AuthIdentity}; use crate::{builders::JsonLD, AuthIdentity};
@ -17,7 +17,9 @@ pub async fn view(
) -> crate::ApiResult<JsonLD<serde_json::Value>> { ) -> crate::ApiResult<JsonLD<serde_json::Value>> {
let oid = ctx.oid(&id); let oid = ctx.oid(&id);
if auth.is_local() && query.fetch && !ctx.is_local(&oid) { if auth.is_local() && query.fetch && !ctx.is_local(&oid) {
let obj = ctx.fetch_object(&oid).await?; let tx = ctx.db().begin().await?;
let obj = ctx.fetch_object(&oid, &tx).await?;
tx.commit().await?;
// some implementations serve statuses on different urls than their AP id // some implementations serve statuses on different urls than their AP id
if obj.id != oid { if obj.id != oid {
return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id))));

View file

@ -30,7 +30,7 @@ pub async fn view(
} }
} }
if query.fetch && !ctx.is_local(&uid) { if query.fetch && !ctx.is_local(&uid) {
ctx.fetch_user(&uid).await?; ctx.fetch_user(&uid, ctx.db()).await?;
} }
} }
let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db()) let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db())

View file

@ -120,7 +120,7 @@ where
.next().ok_or(ApiError::bad_request())? .next().ok_or(ApiError::bad_request())?
.to_string(); .to_string();
match ctx.fetch_user(&user_id).await { match ctx.fetch_user(&user_id, ctx.db()).await {
Err(e) => tracing::warn!("failed resolving http signature actor: {e}"), Err(e) => tracing::warn!("failed resolving http signature actor: {e}"),
Ok(user) => match http_signature Ok(user) => match http_signature
.build_from_parts(parts) .build_from_parts(parts)