From 3123c8c1e0229e3f15a366781445aa9419e03285 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 04:15:27 +0200 Subject: [PATCH] feat: transactions!! quite ugly because i have to pass it everywhere as argument but should work i think, and also transactions now!! --- upub/cli/src/fetch.rs | 11 +- upub/cli/src/relay.rs | 4 +- upub/core/src/ext.rs | 8 +- upub/core/src/model/activity.rs | 2 +- upub/core/src/model/actor.rs | 2 +- upub/core/src/model/instance.rs | 2 +- upub/core/src/model/object.rs | 2 +- upub/core/src/traits/address.rs | 39 +-- upub/core/src/traits/fetch.rs | 18 +- upub/core/src/traits/normalize.rs | 28 +- upub/core/src/traits/process.rs | 140 +++++----- upub/worker/src/inbound.rs | 7 +- upub/worker/src/local.rs | 407 +----------------------------- 13 files changed, 136 insertions(+), 534 deletions(-) diff --git a/upub/cli/src/fetch.rs b/upub/cli/src/fetch.rs index 5f89fe2e..7679a3d5 100644 --- a/upub/cli/src/fetch.rs +++ b/upub/cli/src/fetch.rs @@ -1,4 +1,4 @@ -use sea_orm::EntityTrait; +use sea_orm::{EntityTrait, TransactionTrait}; use upub::traits::{fetch::{Fetchable, PullError}, Normalizer}; pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> { @@ -7,27 +7,30 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), Pu let mut node = apb::Node::link(uri.to_string()); node.fetch(&ctx).await?; + let obj = node.extract().expect("node still empty after fetch?"); println!("{}", serde_json::to_string_pretty(&obj).unwrap()); if save { + let tx = ctx.db().begin().await?; match obj.base_type() { Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { upub::model::actor::Entity::insert( upub::AP::actor_q(&obj).unwrap() - ).exec(ctx.db()).await.unwrap(); + ).exec(&tx).await.unwrap(); }, Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { - ctx.insert_activity(obj).await.unwrap(); + ctx.insert_activity(obj, &tx).await.unwrap(); }, Ok(apb::BaseType::Object(apb::ObjectType::Note)) => { - ctx.insert_object(obj).await.unwrap(); + ctx.insert_object(obj, &tx).await.unwrap(); }, Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), Err(_) => tracing::error!("no type on object"), } + tx.commit().await?; } Ok(()) diff --git a/upub/cli/src/relay.rs b/upub/cli/src/relay.rs index 30eb4ca9..ad510922 100644 --- a/upub/cli/src/relay.rs +++ b/upub/cli/src/relay.rs @@ -1,5 +1,4 @@ use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; -use upub::traits::Addresser; pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<(), sea_orm::DbErr> { let aid = ctx.aid(&uuid::Uuid::new_v4().to_string()); @@ -34,7 +33,8 @@ pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<() upub::model::activity::Entity::insert(activity_model) .exec(ctx.db()).await?; - ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?; + // TODO!!! + // ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?; Ok(()) } diff --git a/upub/core/src/ext.rs b/upub/core/src/ext.rs index 1ec194b6..bd2f78f7 100644 --- a/upub/core/src/ext.rs +++ b/upub/core/src/ext.rs @@ -1,19 +1,21 @@ +use sea_orm::ConnectionTrait; + #[async_trait::async_trait] pub trait AnyQuery { - async fn any(self, db: &sea_orm::DatabaseConnection) -> Result; + async fn any(self, db: &impl ConnectionTrait) -> Result; } #[async_trait::async_trait] impl AnyQuery for sea_orm::Select { - async fn any(self, db: &sea_orm::DatabaseConnection) -> Result { + async fn any(self, db: &impl ConnectionTrait) -> Result { Ok(self.one(db).await?.is_some()) } } #[async_trait::async_trait] impl AnyQuery for sea_orm::Selector { - async fn any(self, db: &sea_orm::DatabaseConnection) -> Result { + async fn any(self, db: &impl ConnectionTrait) -> Result { Ok(self.one(db).await?.is_some()) } } diff --git a/upub/core/src/model/activity.rs b/upub/core/src/model/activity.rs index 5d773b42..0f4f36d9 100644 --- a/upub/core/src/model/activity.rs +++ b/upub/core/src/model/activity.rs @@ -68,7 +68,7 @@ impl Entity { Entity::find().filter(Column::Id.eq(id)) } - pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result, DbErr> { + pub async fn ap_to_internal(id: &str, db: &impl ConnectionTrait) -> Result, DbErr> { Entity::find() .filter(Column::Id.eq(id)) .select_only() diff --git a/upub/core/src/model/actor.rs b/upub/core/src/model/actor.rs index bf7692db..234b9b98 100644 --- a/upub/core/src/model/actor.rs +++ b/upub/core/src/model/actor.rs @@ -139,7 +139,7 @@ impl Entity { Entity::delete_many().filter(Column::Id.eq(id)) } - pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result, DbErr> { + pub async fn ap_to_internal(id: &str, db: &impl ConnectionTrait) -> Result, DbErr> { Entity::find() .filter(Column::Id.eq(id)) .select_only() diff --git a/upub/core/src/model/instance.rs b/upub/core/src/model/instance.rs index c87b5bd8..b11fa95a 100644 --- a/upub/core/src/model/instance.rs +++ b/upub/core/src/model/instance.rs @@ -46,7 +46,7 @@ impl Entity { Entity::find().filter(Column::Domain.eq(domain)) } - pub async fn domain_to_internal(domain: &str, db: &DatabaseConnection) -> Result, DbErr> { + pub async fn domain_to_internal(domain: &str, db: &impl ConnectionTrait) -> Result, DbErr> { Entity::find() .filter(Column::Domain.eq(domain)) .select_only() diff --git a/upub/core/src/model/object.rs b/upub/core/src/model/object.rs index a574a136..80848705 100644 --- a/upub/core/src/model/object.rs +++ b/upub/core/src/model/object.rs @@ -129,7 +129,7 @@ impl Entity { Entity::delete_many().filter(Column::Id.eq(id)) } - pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result, DbErr> { + pub async fn ap_to_internal(id: &str, db: &impl ConnectionTrait) -> Result, DbErr> { Entity::find() .filter(Column::Id.eq(id)) .select_only() diff --git a/upub/core/src/traits/address.rs b/upub/core/src/traits/address.rs index aff190df..52292dc7 100644 --- a/upub/core/src/traits/address.rs +++ b/upub/core/src/traits/address.rs @@ -1,14 +1,12 @@ -use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait}; +use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseTransaction, DbErr, EntityTrait}; use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { async fn expand_addressing(&self, targets: Vec) -> Result, DbErr>; - async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> Result<(), DbErr>; - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr>; - //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] - async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> Result<(), DbErr>; + async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>; + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>; } #[async_trait::async_trait] @@ -34,7 +32,7 @@ impl Addresser for crate::Context { Ok(out) } - async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> Result<(), DbErr> { + async fn address_to(&self, aid: Option, oid: Option, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> { // TODO address_to became kind of expensive, with these two selects right away and then another // select for each target we're addressing to... can this be improved?? let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; @@ -48,8 +46,8 @@ impl Addresser for crate::Context { { let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { match ( - crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), self.db()).await?, - crate::model::actor::Entity::ap_to_internal(target, self.db()).await?, + crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), tx).await?, + crate::model::actor::Entity::ap_to_internal(target, tx).await?, ) { (Some(server), Some(actor)) => (Some(server), Some(actor)), (None, _) => { tracing::error!("failed resolving domain of {target}"); continue; }, @@ -70,14 +68,14 @@ impl Addresser for crate::Context { if !addressing.is_empty() { crate::model::addressing::Entity::insert_many(addressing) - .exec(self.db()) + .exec(tx) .await?; } Ok(()) } - async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> { + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> { let mut deliveries = Vec::new(); for target in targets.iter() .filter(|to| !to.is_empty()) @@ -108,7 +106,7 @@ impl Addresser for crate::Context { if !deliveries.is_empty() { crate::model::job::Entity::insert_many(deliveries) - .exec(self.db()) + .exec(tx) .await?; } @@ -117,23 +115,4 @@ impl Addresser for crate::Context { Ok(()) } - - //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] - async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> Result<(), DbErr> { - let addressed = self.expand_addressing(activity_targets).await?; - let internal_aid = crate::model::activity::Entity::ap_to_internal(aid, self.db()) - .await? - .ok_or_else(|| DbErr::RecordNotFound(aid.to_string()))?; - let internal_oid = if let Some(o) = oid { - Some( - crate::model::object::Entity::ap_to_internal(o, self.db()) - .await? - .ok_or_else(|| DbErr::RecordNotFound(o.to_string()))? - ) - } else { None }; - self.address_to(Some(internal_aid), internal_oid, &addressed).await?; - self.deliver_to(aid, uid, &addressed).await?; - Ok(()) - } - } diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index d59d7fa5..c3773a2f 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; -use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet}; +use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet, TransactionTrait}; use crate::traits::normalize::AP; @@ -355,11 +355,15 @@ impl Fetcher for crate::Context { } } - let activity_model = self.insert_activity(activity).await?; + let tx = self.db().begin().await?; + + let activity_model = self.insert_activity(activity, &tx).await?; let addressed = activity_model.addressed(); let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?; + self.address_to(Some(activity_model.internal), None, &expanded_addresses, &tx).await?; + + tx.commit().await?; Ok(activity_model) } @@ -406,10 +410,14 @@ impl Fetcher for crate::Context { } } - let object_model = self.insert_object(object).await?; + let tx = self.db().begin().await?; + + let object_model = self.insert_object(object, &tx).await?; let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(None, Some(object_model.internal), &expanded_addresses).await?; + self.address_to(None, Some(object_model.internal), &expanded_addresses, &tx).await?; + + tx.commit().await?; Ok(object_model) } diff --git a/upub/core/src/traits/normalize.rs b/upub/core/src/traits/normalize.rs index 6db5e970..137fbbfb 100644 --- a/upub/core/src/traits/normalize.rs +++ b/upub/core/src/traits/normalize.rs @@ -1,5 +1,5 @@ use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey}; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DatabaseTransaction, DbErr, EntityTrait, IntoActiveModel, QueryFilter}; #[derive(Debug, thiserror::Error)] pub enum NormalizerError { @@ -12,14 +12,14 @@ pub enum NormalizerError { #[async_trait::async_trait] pub trait Normalizer { - async fn insert_object(&self, obj: impl apb::Object) -> Result; - async fn insert_activity(&self, act: impl apb::Activity) -> Result; + async fn insert_object(&self, obj: impl apb::Object, tx: &DatabaseTransaction) -> Result; + async fn insert_activity(&self, act: impl apb::Activity, tx: &DatabaseTransaction) -> Result; } #[async_trait::async_trait] impl Normalizer for crate::Context { - async fn insert_object(&self, object: impl apb::Object) -> Result { + async fn insert_object(&self, object: impl apb::Object, tx: &DatabaseTransaction) -> Result { let oid = object.id()?.to_string(); let uid = object.attributed_to().id().str(); let t = object.object_type()?; @@ -45,7 +45,7 @@ impl Normalizer for crate::Context { // > kind of dumb. there should be a job system so this can be done in waves. or maybe there's // > some whole other way to do this?? im thinking but misskey aaaa!! TODO if let Set(Some(ref reply)) = object_active_model.in_reply_to { - if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? { + if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(tx).await? { object_active_model.context = Set(o.context); } else { object_active_model.context = Set(None); // TODO to be filled by some other task @@ -54,9 +54,9 @@ impl Normalizer for crate::Context { object_active_model.context = Set(Some(oid.clone())); } - crate::model::object::Entity::insert(object_active_model).exec(self.db()).await?; + crate::model::object::Entity::insert(object_active_model).exec(tx).await?; let object_model = crate::model::object::Entity::find_by_ap_id(&oid) - .one(self.db()) + .one(tx) .await? .ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?; @@ -65,7 +65,7 @@ impl Normalizer for crate::Context { crate::model::object::Entity::update_many() .filter(crate::model::object::Column::Id.eq(in_reply_to)) .col_expr(crate::model::object::Column::Replies, Expr::col(crate::model::object::Column::Replies).add(1)) - .exec(self.db()) + .exec(tx) .await?; } // update statuses counter @@ -73,7 +73,7 @@ impl Normalizer for crate::Context { crate::model::actor::Entity::update_many() .col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1)) .filter(crate::model::actor::Column::Id.eq(&object_author)) - .exec(self.db()) + .exec(tx) .await?; } @@ -97,7 +97,7 @@ impl Normalizer for crate::Context { AP::attachment_q(o.as_document()?, object_model.internal)?, }; crate::model::attachment::Entity::insert(attachment_model) - .exec(self.db()) + .exec(tx) .await?; } // lemmy sends us an image field in posts, treat it like an attachment i'd say @@ -124,23 +124,23 @@ impl Normalizer for crate::Context { } crate::model::attachment::Entity::insert(attachment_model) - .exec(self.db()) + .exec(tx) .await?; } Ok(object_model) } - async fn insert_activity(&self, activity: impl apb::Activity) -> Result { + async fn insert_activity(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result { let mut activity_model = AP::activity(&activity)?; let mut active_model = activity_model.clone().into_active_model(); active_model.internal = NotSet; crate::model::activity::Entity::insert(active_model) - .exec(self.db()) + .exec(tx) .await?; - let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, self.db()) + let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, tx) .await? .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; activity_model.internal = internal; diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index b7867fa1..517d2ad7 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,5 +1,5 @@ use apb::{target::Addressed, Activity, Base, Object}; -use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait}; use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] @@ -31,30 +31,30 @@ pub enum ProcessorError { #[async_trait::async_trait] pub trait Processor { - async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError>; + async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>; } #[async_trait::async_trait] impl Processor for crate::Context { - async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError> { + async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { // TODO we could process Links and bare Objects maybe, but probably out of AP spec? match activity.activity_type()? { // TODO emojireacts are NOT likes, but let's process them like ones for now maybe? - apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(self, activity).await?), - apb::ActivityType::Create => Ok(create(self, activity).await?), - apb::ActivityType::Follow => Ok(follow(self, activity).await?), - apb::ActivityType::Announce => Ok(announce(self, activity).await?), - apb::ActivityType::Accept(_) => Ok(accept(self, activity).await?), - apb::ActivityType::Reject(_) => Ok(reject(self, activity).await?), - apb::ActivityType::Undo => Ok(undo(self, activity).await?), - apb::ActivityType::Delete => Ok(delete(self, activity).await?), - apb::ActivityType::Update => Ok(update(self, activity).await?), + apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(self, activity, tx).await?), + apb::ActivityType::Create => Ok(create(self, activity, tx).await?), + apb::ActivityType::Follow => Ok(follow(self, activity, tx).await?), + apb::ActivityType::Announce => Ok(announce(self, activity, tx).await?), + apb::ActivityType::Accept(_) => Ok(accept(self, activity, tx).await?), + apb::ActivityType::Reject(_) => Ok(reject(self, activity, tx).await?), + apb::ActivityType::Undo => Ok(undo(self, activity, tx).await?), + apb::ActivityType::Delete => Ok(delete(self, activity, tx).await?), + apb::ActivityType::Update => Ok(update(self, activity, tx).await?), _ => Err(ProcessorError::Unprocessable), } } } -pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let Some(object_node) = activity.object().extract() else { // TODO we could process non-embedded activities or arrays but im lazy rn tracing::error!("refusing to process activity without embedded object"); @@ -65,30 +65,30 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Resul tracing::warn!("failed fetching replies for received object: {e}"); } } - let activity_model = ctx.insert_activity(activity).await?; - let object_model = ctx.insert_object(object_node).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + let object_model = ctx.insert_object(object_node, tx).await?; let expanded_addressing = ctx.expand_addressing(object_model.addressed()).await?; - ctx.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing, tx).await?; tracing::info!("{} posted {}", activity_model.actor, object_model.id); Ok(()) } -pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); - let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx) .await? .ok_or(ProcessorError::Incomplete)?; let object_uri = activity.object().id()?.to_string(); let published = activity.published().unwrap_or_else(|_|chrono::Utc::now()); let obj = ctx.fetch_object(&object_uri).await?; if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) - .any(ctx.db()) + .any(tx) .await? { return Err(ProcessorError::AlreadyProcessed); } - let activity_model = ctx.insert_activity(activity).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; let like = crate::model::like::ActiveModel { internal: NotSet, @@ -97,11 +97,11 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result< activity: Set(activity_model.internal), published: Set(published), }; - crate::model::like::Entity::insert(like).exec(ctx.db()).await?; + crate::model::like::Entity::insert(like).exec(tx).await?; crate::model::object::Entity::update_many() .col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1)) .filter(crate::model::object::Column::Internal.eq(obj.internal)) - .exec(ctx.db()) + .exec(tx) .await?; let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; @@ -111,24 +111,24 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result< .select_only() .select_column(crate::model::object::Column::AttributedTo) .into_tuple::() - .one(ctx.db()) + .one(tx) .await? .ok_or(ProcessorError::Incomplete)? ); } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; tracing::info!("{} liked {}", uid, obj.id); Ok(()) } -pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let source_actor = activity.actor().id()?.to_string(); - let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, ctx.db()) + let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, tx) .await? .ok_or(ProcessorError::Incomplete)?; let target_actor = activity.object().id()?.to_string(); let usr = ctx.fetch_user(&target_actor).await?; - let activity_model = ctx.insert_activity(activity).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; let relation_model = crate::model::relation::ActiveModel { internal: NotSet, accept: Set(None), @@ -137,22 +137,22 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Resul following: Set(usr.internal), }; crate::model::relation::Entity::insert(relation_model) - .exec(ctx.db()).await?; + .exec(tx).await?; let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; if !expanded_addressing.contains(&target_actor) { expanded_addressing.push(target_actor); } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; tracing::info!("{} wants to follow {}", source_actor, usr.id); Ok(()) } -pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { // TODO what about TentativeAccept let target_actor = activity.actor().id()?.to_string(); let follow_request_id = activity.object().id()?.to_string(); let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id) - .one(ctx.db()) + .one(tx) .await? .ok_or(ProcessorError::Incomplete)?; @@ -160,7 +160,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul return Err(ProcessorError::Unauthorized); } - let activity_model = ctx.insert_activity(activity).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; crate::model::actor::Entity::update_many() .col_expr( @@ -168,7 +168,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul Expr::col(crate::model::actor::Column::FollowingCount).add(1) ) .filter(crate::model::actor::Column::Id.eq(&follow_activity.actor)) - .exec(ctx.db()) + .exec(tx) .await?; crate::model::actor::Entity::update_many() .col_expr( @@ -176,13 +176,13 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul Expr::col(crate::model::actor::Column::FollowersCount).add(1) ) .filter(crate::model::actor::Column::Id.eq(&follow_activity.actor)) - .exec(ctx.db()) + .exec(tx) .await?; crate::model::relation::Entity::update_many() .col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) .filter(crate::model::relation::Column::Activity.eq(follow_activity.internal)) - .exec(ctx.db()).await?; + .exec(tx).await?; tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); @@ -190,16 +190,16 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul if !expanded_addressing.contains(&follow_activity.actor) { expanded_addressing.push(follow_activity.actor); } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; Ok(()) } -pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { // TODO what about TentativeReject? let uid = activity.actor().id()?.to_string(); let follow_request_id = activity.object().id()?.to_string(); let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id) - .one(ctx.db()) + .one(tx) .await? .ok_or(ProcessorError::Incomplete)?; @@ -207,11 +207,11 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Resul return Err(ProcessorError::Unauthorized); } - let activity_model = ctx.insert_activity(activity).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; crate::model::relation::Entity::delete_many() .filter(crate::model::relation::Column::Activity.eq(activity_model.internal)) - .exec(ctx.db()) + .exec(tx) .await?; tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor); @@ -221,19 +221,19 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Resul expanded_addressing.push(follow_activity.actor); } - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; Ok(()) } -pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn delete(_ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let oid = activity.object().id()?.to_string(); - crate::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users"); - crate::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects"); + crate::model::actor::Entity::delete_by_ap_id(&oid).exec(tx).await.info_failed("failed deleting from users"); + crate::model::object::Entity::delete_by_ap_id(&oid).exec(tx).await.info_failed("failed deleting from objects"); tracing::debug!("deleted '{oid}'"); Ok(()) } -pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); let Some(object_node) = activity.object().extract() else { tracing::error!("refusing to process activity without embedded object"); @@ -241,29 +241,29 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Resul }; let oid = object_node.id()?.to_string(); - let activity_model = ctx.insert_activity(activity).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; match object_node.object_type()? { apb::ObjectType::Actor(_) => { - let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, tx) .await? .ok_or(ProcessorError::Incomplete)?; let mut actor_model = crate::AP::actor_q(object_node.as_actor()?)?; actor_model.internal = Set(internal_uid); actor_model.updated = Set(chrono::Utc::now()); crate::model::actor::Entity::update(actor_model) - .exec(ctx.db()) + .exec(tx) .await?; }, apb::ObjectType::Note => { - let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, ctx.db()) + let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, tx) .await? .ok_or(ProcessorError::Incomplete)?; let mut object_model = crate::AP::object_q(&object_node)?; object_model.internal = Set(internal_oid); object_model.updated = Set(chrono::Utc::now()); crate::model::object::Entity::update(object_model) - .exec(ctx.db()) + .exec(tx) .await?; }, t => tracing::warn!("no side effects implemented for update type {t:?}"), @@ -271,11 +271,11 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Resul tracing::info!("{} updated {}", uid, oid); let expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; Ok(()) } -pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); // TODO in theory we could work with just object_id but right now only accept embedded let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?; @@ -287,18 +287,18 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result< let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string(); - let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx) .await? .ok_or(ProcessorError::Incomplete)?; let activity_type = activity.activity_type()?; let targets = ctx.expand_addressing(activity.addressed()).await?; - let activity_model = ctx.insert_activity(activity).await?; - ctx.address_to(Some(activity_model.internal), None, &targets).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; + ctx.address_to(Some(activity_model.internal), None, &targets, tx).await?; match activity_type { apb::ActivityType::Like => { - let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db()) + let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, tx) .await? .ok_or(ProcessorError::Incomplete)?; crate::model::like::Entity::delete_many() @@ -307,32 +307,32 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result< .add(crate::model::like::Column::Actor.eq(internal_uid)) .add(crate::model::like::Column::Object.eq(internal_oid)) ) - .exec(ctx.db()) + .exec(tx) .await?; crate::model::object::Entity::update_many() .filter(crate::model::object::Column::Internal.eq(internal_oid)) .col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).sub(1)) - .exec(ctx.db()) + .exec(tx) .await?; }, apb::ActivityType::Follow => { - let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db()) + let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, tx) .await? .ok_or(ProcessorError::Incomplete)?; crate::model::relation::Entity::delete_many() .filter(crate::model::relation::Column::Follower.eq(internal_uid)) .filter(crate::model::relation::Column::Following.eq(internal_uid_following)) - .exec(ctx.db()) + .exec(tx) .await?; crate::model::actor::Entity::update_many() .filter(crate::model::actor::Column::Internal.eq(internal_uid)) .col_expr(crate::model::actor::Column::FollowingCount, Expr::col(crate::model::actor::Column::FollowingCount).sub(1)) - .exec(ctx.db()) + .exec(tx) .await?; crate::model::actor::Entity::update_many() .filter(crate::model::actor::Column::Internal.eq(internal_uid_following)) .col_expr(crate::model::actor::Column::FollowersCount, Expr::col(crate::model::actor::Column::FollowersCount).sub(1)) - .exec(ctx.db()) + .exec(tx) .await?; }, t => { @@ -344,10 +344,10 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result< Ok(()) } -pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); let actor = ctx.fetch_user(&uid).await?; - let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx) .await? .ok_or(ProcessorError::Incomplete)?; let announced_id = activity.object().id()?.to_string(); @@ -363,7 +363,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res None => { match ctx.pull(&announced_id).await? { // if we receive a remote activity, process it directly - Pull::Activity(x) => return ctx.process(x).await, + Pull::Activity(x) => return ctx.process(x, tx).await, // actors are not processable at all Pull::Actor(_) => return Err(ProcessorError::Unprocessable), // objects are processed down below, make a mock Internal::Object(internal) @@ -378,10 +378,10 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res crate::context::Internal::Activity(_) => Err(ProcessorError::AlreadyProcessed), // ??? crate::context::Internal::Object(internal) => { let object_model = model::object::Entity::find_by_id(internal) - .one(ctx.db()) + .one(tx) .await? .ok_or_else(|| sea_orm::DbErr::RecordNotFound(internal.to_string()))?; - let activity_model = ctx.insert_activity(activity).await?; + let activity_model = ctx.insert_activity(activity, tx).await?; // relays send us objects as Announce, but we don't really want to count those towards the // total shares count of an object, so just fetch the object and be done with it @@ -398,13 +398,13 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res }; let expanded_addressing = ctx.expand_addressing(addressed).await?; - ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?; crate::model::announce::Entity::insert(share) - .exec(ctx.db()).await?; + .exec(tx).await?; crate::model::object::Entity::update_many() .col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) .filter(crate::model::object::Column::Internal.eq(object_model.internal)) - .exec(ctx.db()) + .exec(tx) .await?; tracing::info!("{} shared {}", activity_model.actor, announced_id); diff --git a/upub/worker/src/inbound.rs b/upub/worker/src/inbound.rs index c1a8533b..9431706e 100644 --- a/upub/worker/src/inbound.rs +++ b/upub/worker/src/inbound.rs @@ -1,3 +1,4 @@ +use sea_orm::TransactionTrait; use upub::traits::Processor; @@ -12,5 +13,9 @@ pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate return Ok(()); }; - Ok(ctx.process(activity).await?) + let tx = ctx.db().begin().await?; + ctx.process(activity, &tx).await?; + tx.commit().await?; + + Ok(()) } diff --git a/upub/worker/src/local.rs b/upub/worker/src/local.rs index 18eb1d69..f9cad61a 100644 --- a/upub/worker/src/local.rs +++ b/upub/worker/src/local.rs @@ -1,5 +1,5 @@ use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut}; -use sea_orm::{EntityTrait, QueryFilter, QuerySelect, SelectColumns, ColumnTrait}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait}; use upub::{model, traits::{Addresser, Processor}, Context}; @@ -7,6 +7,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult< let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?; let mut activity : serde_json::Value = serde_json::from_str(payload)?; let mut t = activity.object_type()?; + let tx = ctx.db().begin().await?; if matches!(t, apb::ObjectType::Note) { activity = apb::new() @@ -60,407 +61,11 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult< // TODO we expand addressing twice, ugghhhhh let targets = ctx.expand_addressing(activity.addressed()).await?; - ctx.process(activity).await?; + ctx.process(activity, &tx).await?; - ctx.deliver_to(&job.activity, &job.actor, &targets).await?; + ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?; + + tx.commit().await?; Ok(()) } - -/* - -#[axum::async_trait] -impl apb::server::Outbox for Context { - type Error = UpubError; - type Object = serde_json::Value; - type Activity = serde_json::Value; - - async fn create_note(&self, uid: String, object: serde_json::Value) -> crate::Result { - self.create( - uid, - apb::new() - .set_activity_type(Some(apb::ActivityType::Create)) - .set_to(object.to()) - .set_bto(object.bto()) - .set_cc(object.cc()) - .set_bcc(object.bcc()) - .set_object(Node::object(object)) - ).await - } - - async fn create(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let Some(object) = activity.object().extract() else { - return Err(UpubError::bad_request()); - }; - - let raw_oid = uuid::Uuid::new_v4().to_string(); - let oid = self.oid(&raw_oid); - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - - if let Some(reply) = object.in_reply_to().id() { - self.fetch_object(&reply).await?; - } - - self.insert_object( - object - .set_id(Some(&oid)) - .set_attributed_to(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - .set_content(content.as_deref()) - .set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))), - Some(self.domain().to_string()), - ).await?; - - self.insert_activity( - activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_object(Node::link(oid.clone())) - .set_published(Some(chrono::Utc::now())), - Some(self.domain().to_string()), - ).await?; - - self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; - Ok(aid) - } - - async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; - let obj_model = self.fetch_object(&oid).await?; - - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - - if model::like::Entity::find_by_uid_oid(internal_uid, obj_model.internal) - .any(self.db()) - .await? - { - return Err(UpubError::not_modified()); - } - - let activity_model = self.insert_activity( - activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())), - Some(self.domain().to_string()), - ).await?; - - self.process_like(internal_uid, obj_model.internal, activity_model.internal, chrono::Utc::now()).await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let target = activity.object().id().ok_or_else(UpubError::bad_request)?; - - let activity_model = model::activity::ActiveModel::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - - let follower_internal = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - let following_internal = model::actor::Entity::ap_to_internal(&target, self.db()).await?; - - model::activity::Entity::insert(activity_model) - .exec(self.db()).await?; - - let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - - let relation_model = model::relation::ActiveModel { - internal: NotSet, - follower: Set(follower_internal), - following: Set(following_internal), - activity: Set(internal_aid), - accept: Set(None), - }; - - model::relation::Entity::insert(relation_model) - .exec(self.db()).await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let accepted_id = activity.object().id().ok_or_else(UpubError::bad_request)?; - let accepted_activity = model::activity::Entity::find_by_ap_id(&accepted_id) - .one(self.db()).await? - .ok_or_else(UpubError::not_found)?; - - if accepted_activity.activity_type != apb::ActivityType::Follow { - return Err(UpubError::bad_request()); - } - if uid != accepted_activity.object.ok_or_else(UpubError::bad_request)? { - return Err(UpubError::forbidden()); - } - - let activity_model = model::activity::ActiveModel::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - - let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - - match accepted_activity.activity_type { - apb::ActivityType::Follow => { - model::actor::Entity::update_many() - .col_expr( - model::actor::Column::FollowersCount, - Expr::col(model::actor::Column::FollowersCount).add(1) - ) - .filter(model::actor::Column::Id.eq(&uid)) - .exec(self.db()) - .await?; - model::relation::Entity::update_many() - .filter(model::relation::Column::Activity.eq(accepted_activity.internal)) - .col_expr(model::relation::Column::Accept, Expr::value(Some(internal_aid))) - .exec(self.db()).await?; - }, - t => tracing::error!("no side effects implemented for accepting {t:?}"), - } - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn reject(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let rejected_id = activity.object().id().ok_or_else(UpubError::bad_request)?; - let rejected_activity = model::activity::Entity::find_by_ap_id(&rejected_id) - .one(self.db()).await? - .ok_or_else(UpubError::not_found)?; - - if rejected_activity.activity_type != apb::ActivityType::Follow { - return Err(UpubError::bad_request()); - } - if uid != rejected_activity.object.ok_or_else(UpubError::bad_request)? { - return Err(UpubError::forbidden()); - } - - let activity_model = model::activity::ActiveModel::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model) - .exec(self.db()).await?; - - let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - - model::relation::Entity::delete_many() - .filter(model::relation::Column::Activity.eq(internal_aid)) - .exec(self.db()) - .await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?; - let old_activity = model::activity::Entity::find_by_ap_id(&old_aid) - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)?; - if old_activity.actor != uid { - return Err(UpubError::forbidden()); - } - - let activity_model = self.insert_activity( - activity.clone() - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())), - Some(self.domain().to_string()) - ).await?; - - let targets = self.expand_addressing(activity.addressed()).await?; - self.process_undo(internal_uid, activity).await?; - - self.address_to(Some(activity_model.internal), None, &targets).await?; - self.deliver_to(&activity_model.id, &uid, &targets).await?; - - Ok(aid) - } - - async fn delete(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; - - let object = model::object::Entity::find_by_ap_id(&oid) - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)?; - - if uid != object.attributed_to.ok_or_else(UpubError::forbidden)? { - // can't change objects of others, and objects from noone count as others - return Err(UpubError::forbidden()); - } - - let addressed = activity.addressed(); - let activity_model = model::activity::ActiveModel::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - - model::activity::Entity::insert(activity_model) - .exec(self.db()) - .await?; - - model::object::Entity::delete_by_ap_id(&oid) - .exec(self.db()) - .await?; - - self.dispatch(&uid, addressed, &aid, None).await?; - - Ok(aid) - } - - async fn update(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let object_node = activity.object().extract().ok_or_else(UpubError::bad_request)?; - let addressed = activity.addressed(); - let target = object_node.id().ok_or_else(UpubError::bad_request)?.to_string(); - - let activity_model = model::activity::ActiveModel::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - - model::activity::Entity::insert(activity_model) - .exec(self.db()).await?; - - match object_node.object_type() { - Some(apb::ObjectType::Actor(_)) => { - let old_actor_model = model::actor::Entity::find_by_ap_id(&target) - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)?; - - if old_actor_model.id != uid { - // can't change user fields of others - return Err(UpubError::forbidden()); - } - - let mut new_actor_model = model::actor::ActiveModel { - internal: Unchanged(old_actor_model.internal), - ..Default::default() - }; - - if let Some(name) = object_node.name() { - new_actor_model.name = Set(Some(name.to_string())); - } - if let Some(summary) = object_node.summary() { - new_actor_model.summary = Set(Some(summary.to_string())); - } - if let Some(image) = object_node.image().id() { - new_actor_model.image = Set(Some(image)); - } - if let Some(icon) = object_node.icon().id() { - new_actor_model.icon = Set(Some(icon)); - } - new_actor_model.updated = Set(chrono::Utc::now()); - - model::actor::Entity::update(new_actor_model) - .exec(self.db()).await?; - }, - Some(apb::ObjectType::Note) => { - let old_object_model = model::object::Entity::find_by_ap_id(&target) - .one(self.db()) - .await? - .ok_or_else(UpubError::not_found)?; - - if uid != old_object_model.attributed_to.ok_or_else(UpubError::forbidden)? { - // can't change objects of others - return Err(UpubError::forbidden()); - } - - let mut new_object_model = model::object::ActiveModel { - internal: Unchanged(old_object_model.internal), - ..Default::default() - }; - - if let Some(name) = object_node.name() { - new_object_model.name = Set(Some(name.to_string())); - } - if let Some(summary) = object_node.summary() { - new_object_model.summary = Set(Some(summary.to_string())); - } - if let Some(content) = object_node.content() { - new_object_model.content = Set(Some(content.to_string())); - } - new_object_model.updated = Set(chrono::Utc::now()); - - model::object::Entity::update(new_object_model) - .exec(self.db()).await?; - }, - _ => return Err(UpubError::Status(StatusCode::NOT_IMPLEMENTED)), - } - - self.dispatch(&uid, addressed, &aid, None).await?; - - Ok(aid) - } - - async fn announce(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(&uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; - let obj = self.fetch_object(&oid).await?; - let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; - - let activity_model = model::activity::ActiveModel::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - - let share_model = model::announce::ActiveModel { - internal: NotSet, - actor: Set(internal_uid), - object: Set(obj.internal), - published: Set(chrono::Utc::now()), - }; - model::activity::Entity::insert(activity_model) - .exec(self.db()).await?; - model::announce::Entity::insert(share_model).exec(self.db()).await?; - model::object::Entity::update_many() - .col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1)) - .filter(model::object::Column::Internal.eq(obj.internal)) - .exec(self.db()) - .await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } -} - -*/