forked from alemi/upub
feat: transactions!!
quite ugly because i have to pass it everywhere as argument but should work i think, and also transactions now!!
This commit is contained in:
parent
797837f2a1
commit
3123c8c1e0
13 changed files with 136 additions and 534 deletions
|
@ -1,4 +1,4 @@
|
|||
use sea_orm::EntityTrait;
|
||||
use sea_orm::{EntityTrait, TransactionTrait};
|
||||
use upub::traits::{fetch::{Fetchable, PullError}, Normalizer};
|
||||
|
||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> {
|
||||
|
@ -7,27 +7,30 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), Pu
|
|||
let mut node = apb::Node::link(uri.to_string());
|
||||
node.fetch(&ctx).await?;
|
||||
|
||||
|
||||
let obj = node.extract().expect("node still empty after fetch?");
|
||||
|
||||
println!("{}", serde_json::to_string_pretty(&obj).unwrap());
|
||||
|
||||
if save {
|
||||
let tx = ctx.db().begin().await?;
|
||||
match obj.base_type() {
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => {
|
||||
upub::model::actor::Entity::insert(
|
||||
upub::AP::actor_q(&obj).unwrap()
|
||||
).exec(ctx.db()).await.unwrap();
|
||||
).exec(&tx).await.unwrap();
|
||||
},
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
|
||||
ctx.insert_activity(obj).await.unwrap();
|
||||
ctx.insert_activity(obj, &tx).await.unwrap();
|
||||
},
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
|
||||
ctx.insert_object(obj).await.unwrap();
|
||||
ctx.insert_object(obj, &tx).await.unwrap();
|
||||
},
|
||||
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
|
||||
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),
|
||||
Err(_) => tracing::error!("no type on object"),
|
||||
}
|
||||
tx.commit().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
|
||||
use upub::traits::Addresser;
|
||||
|
||||
pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<(), sea_orm::DbErr> {
|
||||
let aid = ctx.aid(&uuid::Uuid::new_v4().to_string());
|
||||
|
@ -34,7 +33,8 @@ pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<()
|
|||
upub::model::activity::Entity::insert(activity_model)
|
||||
.exec(ctx.db()).await?;
|
||||
|
||||
ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?;
|
||||
// TODO!!!
|
||||
// ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,19 +1,21 @@
|
|||
use sea_orm::ConnectionTrait;
|
||||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait AnyQuery {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr>;
|
||||
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T : sea_orm::EntityTrait> AnyQuery for sea_orm::Select<T> {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr> {
|
||||
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr> {
|
||||
Ok(self.one(db).await?.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T : sea_orm::SelectorTrait + Send> AnyQuery for sea_orm::Selector<T> {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr> {
|
||||
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr> {
|
||||
Ok(self.one(db).await?.is_some())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ impl Entity {
|
|||
Entity::find().filter(Column::Id.eq(id))
|
||||
}
|
||||
|
||||
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result<Option<i64>, DbErr> {
|
||||
pub async fn ap_to_internal(id: &str, db: &impl ConnectionTrait) -> Result<Option<i64>, DbErr> {
|
||||
Entity::find()
|
||||
.filter(Column::Id.eq(id))
|
||||
.select_only()
|
||||
|
|
|
@ -139,7 +139,7 @@ impl Entity {
|
|||
Entity::delete_many().filter(Column::Id.eq(id))
|
||||
}
|
||||
|
||||
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result<Option<i64>, DbErr> {
|
||||
pub async fn ap_to_internal(id: &str, db: &impl ConnectionTrait) -> Result<Option<i64>, DbErr> {
|
||||
Entity::find()
|
||||
.filter(Column::Id.eq(id))
|
||||
.select_only()
|
||||
|
|
|
@ -46,7 +46,7 @@ impl Entity {
|
|||
Entity::find().filter(Column::Domain.eq(domain))
|
||||
}
|
||||
|
||||
pub async fn domain_to_internal(domain: &str, db: &DatabaseConnection) -> Result<Option<i64>, DbErr> {
|
||||
pub async fn domain_to_internal(domain: &str, db: &impl ConnectionTrait) -> Result<Option<i64>, DbErr> {
|
||||
Entity::find()
|
||||
.filter(Column::Domain.eq(domain))
|
||||
.select_only()
|
||||
|
|
|
@ -129,7 +129,7 @@ impl Entity {
|
|||
Entity::delete_many().filter(Column::Id.eq(id))
|
||||
}
|
||||
|
||||
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> Result<Option<i64>, DbErr> {
|
||||
pub async fn ap_to_internal(id: &str, db: &impl ConnectionTrait) -> Result<Option<i64>, DbErr> {
|
||||
Entity::find()
|
||||
.filter(Column::Id.eq(id))
|
||||
.select_only()
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||
use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseTransaction, DbErr, EntityTrait};
|
||||
|
||||
use crate::traits::fetch::Fetcher;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Addresser {
|
||||
async fn expand_addressing(&self, targets: Vec<String>) -> Result<Vec<String>, DbErr>;
|
||||
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String]) -> Result<(), DbErr>;
|
||||
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr>;
|
||||
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
|
||||
async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> Result<(), DbErr>;
|
||||
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>;
|
||||
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
@ -34,7 +32,7 @@ impl Addresser for crate::Context {
|
|||
Ok(out)
|
||||
}
|
||||
|
||||
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String]) -> Result<(), DbErr> {
|
||||
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> {
|
||||
// TODO address_to became kind of expensive, with these two selects right away and then another
|
||||
// select for each target we're addressing to... can this be improved??
|
||||
let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false };
|
||||
|
@ -48,8 +46,8 @@ impl Addresser for crate::Context {
|
|||
{
|
||||
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
|
||||
match (
|
||||
crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), self.db()).await?,
|
||||
crate::model::actor::Entity::ap_to_internal(target, self.db()).await?,
|
||||
crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), tx).await?,
|
||||
crate::model::actor::Entity::ap_to_internal(target, tx).await?,
|
||||
) {
|
||||
(Some(server), Some(actor)) => (Some(server), Some(actor)),
|
||||
(None, _) => { tracing::error!("failed resolving domain of {target}"); continue; },
|
||||
|
@ -70,14 +68,14 @@ impl Addresser for crate::Context {
|
|||
|
||||
if !addressing.is_empty() {
|
||||
crate::model::addressing::Entity::insert_many(addressing)
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> {
|
||||
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String], tx: &DatabaseTransaction) -> Result<(), DbErr> {
|
||||
let mut deliveries = Vec::new();
|
||||
for target in targets.iter()
|
||||
.filter(|to| !to.is_empty())
|
||||
|
@ -108,7 +106,7 @@ impl Addresser for crate::Context {
|
|||
|
||||
if !deliveries.is_empty() {
|
||||
crate::model::job::Entity::insert_many(deliveries)
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
@ -117,23 +115,4 @@ impl Addresser for crate::Context {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
|
||||
async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> Result<(), DbErr> {
|
||||
let addressed = self.expand_addressing(activity_targets).await?;
|
||||
let internal_aid = crate::model::activity::Entity::ap_to_internal(aid, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(aid.to_string()))?;
|
||||
let internal_oid = if let Some(o) = oid {
|
||||
Some(
|
||||
crate::model::object::Entity::ap_to_internal(o, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(o.to_string()))?
|
||||
)
|
||||
} else { None };
|
||||
self.address_to(Some(internal_aid), internal_oid, &addressed).await?;
|
||||
self.deliver_to(aid, uid, &addressed).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::collections::BTreeMap;
|
|||
|
||||
use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object};
|
||||
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
|
||||
use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet};
|
||||
use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet, TransactionTrait};
|
||||
|
||||
use crate::traits::normalize::AP;
|
||||
|
||||
|
@ -355,11 +355,15 @@ impl Fetcher for crate::Context {
|
|||
}
|
||||
}
|
||||
|
||||
let activity_model = self.insert_activity(activity).await?;
|
||||
let tx = self.db().begin().await?;
|
||||
|
||||
let activity_model = self.insert_activity(activity, &tx).await?;
|
||||
|
||||
let addressed = activity_model.addressed();
|
||||
let expanded_addresses = self.expand_addressing(addressed).await?;
|
||||
self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?;
|
||||
self.address_to(Some(activity_model.internal), None, &expanded_addresses, &tx).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(activity_model)
|
||||
}
|
||||
|
@ -406,10 +410,14 @@ impl Fetcher for crate::Context {
|
|||
}
|
||||
}
|
||||
|
||||
let object_model = self.insert_object(object).await?;
|
||||
let tx = self.db().begin().await?;
|
||||
|
||||
let object_model = self.insert_object(object, &tx).await?;
|
||||
|
||||
let expanded_addresses = self.expand_addressing(addressed).await?;
|
||||
self.address_to(None, Some(object_model.internal), &expanded_addresses).await?;
|
||||
self.address_to(None, Some(object_model.internal), &expanded_addresses, &tx).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(object_model)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use apb::{field::OptionalString, Collection, Document, Endpoints, Node, Object, PublicKey};
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter};
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, DatabaseTransaction, DbErr, EntityTrait, IntoActiveModel, QueryFilter};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum NormalizerError {
|
||||
|
@ -12,14 +12,14 @@ pub enum NormalizerError {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Normalizer {
|
||||
async fn insert_object(&self, obj: impl apb::Object) -> Result<crate::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity) -> Result<crate::model::activity::Model, NormalizerError>;
|
||||
async fn insert_object(&self, obj: impl apb::Object, tx: &DatabaseTransaction) -> Result<crate::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity, tx: &DatabaseTransaction) -> Result<crate::model::activity::Model, NormalizerError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Normalizer for crate::Context {
|
||||
|
||||
async fn insert_object(&self, object: impl apb::Object) -> Result<crate::model::object::Model, NormalizerError> {
|
||||
async fn insert_object(&self, object: impl apb::Object, tx: &DatabaseTransaction) -> Result<crate::model::object::Model, NormalizerError> {
|
||||
let oid = object.id()?.to_string();
|
||||
let uid = object.attributed_to().id().str();
|
||||
let t = object.object_type()?;
|
||||
|
@ -45,7 +45,7 @@ impl Normalizer for crate::Context {
|
|||
// > kind of dumb. there should be a job system so this can be done in waves. or maybe there's
|
||||
// > some whole other way to do this?? im thinking but misskey aaaa!! TODO
|
||||
if let Set(Some(ref reply)) = object_active_model.in_reply_to {
|
||||
if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? {
|
||||
if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(tx).await? {
|
||||
object_active_model.context = Set(o.context);
|
||||
} else {
|
||||
object_active_model.context = Set(None); // TODO to be filled by some other task
|
||||
|
@ -54,9 +54,9 @@ impl Normalizer for crate::Context {
|
|||
object_active_model.context = Set(Some(oid.clone()));
|
||||
}
|
||||
|
||||
crate::model::object::Entity::insert(object_active_model).exec(self.db()).await?;
|
||||
crate::model::object::Entity::insert(object_active_model).exec(tx).await?;
|
||||
let object_model = crate::model::object::Entity::find_by_ap_id(&oid)
|
||||
.one(self.db())
|
||||
.one(tx)
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?;
|
||||
|
||||
|
@ -65,7 +65,7 @@ impl Normalizer for crate::Context {
|
|||
crate::model::object::Entity::update_many()
|
||||
.filter(crate::model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(crate::model::object::Column::Replies, Expr::col(crate::model::object::Column::Replies).add(1))
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
}
|
||||
// update statuses counter
|
||||
|
@ -73,7 +73,7 @@ impl Normalizer for crate::Context {
|
|||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1))
|
||||
.filter(crate::model::actor::Column::Id.eq(&object_author))
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ impl Normalizer for crate::Context {
|
|||
AP::attachment_q(o.as_document()?, object_model.internal)?,
|
||||
};
|
||||
crate::model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
}
|
||||
// lemmy sends us an image field in posts, treat it like an attachment i'd say
|
||||
|
@ -124,23 +124,23 @@ impl Normalizer for crate::Context {
|
|||
}
|
||||
|
||||
crate::model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(object_model)
|
||||
}
|
||||
|
||||
async fn insert_activity(&self, activity: impl apb::Activity) -> Result<crate::model::activity::Model, NormalizerError> {
|
||||
async fn insert_activity(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<crate::model::activity::Model, NormalizerError> {
|
||||
let mut activity_model = AP::activity(&activity)?;
|
||||
|
||||
let mut active_model = activity_model.clone().into_active_model();
|
||||
active_model.internal = NotSet;
|
||||
crate::model::activity::Entity::insert(active_model)
|
||||
.exec(self.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
|
||||
let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, self.db())
|
||||
let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, tx)
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?;
|
||||
activity_model.internal = internal;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use apb::{target::Addressed, Activity, Base, Object};
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait};
|
||||
use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
@ -31,30 +31,30 @@ pub enum ProcessorError {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Processor {
|
||||
async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError>;
|
||||
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Processor for crate::Context {
|
||||
async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||
match activity.activity_type()? {
|
||||
// TODO emojireacts are NOT likes, but let's process them like ones for now maybe?
|
||||
apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(self, activity).await?),
|
||||
apb::ActivityType::Create => Ok(create(self, activity).await?),
|
||||
apb::ActivityType::Follow => Ok(follow(self, activity).await?),
|
||||
apb::ActivityType::Announce => Ok(announce(self, activity).await?),
|
||||
apb::ActivityType::Accept(_) => Ok(accept(self, activity).await?),
|
||||
apb::ActivityType::Reject(_) => Ok(reject(self, activity).await?),
|
||||
apb::ActivityType::Undo => Ok(undo(self, activity).await?),
|
||||
apb::ActivityType::Delete => Ok(delete(self, activity).await?),
|
||||
apb::ActivityType::Update => Ok(update(self, activity).await?),
|
||||
apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(self, activity, tx).await?),
|
||||
apb::ActivityType::Create => Ok(create(self, activity, tx).await?),
|
||||
apb::ActivityType::Follow => Ok(follow(self, activity, tx).await?),
|
||||
apb::ActivityType::Announce => Ok(announce(self, activity, tx).await?),
|
||||
apb::ActivityType::Accept(_) => Ok(accept(self, activity, tx).await?),
|
||||
apb::ActivityType::Reject(_) => Ok(reject(self, activity, tx).await?),
|
||||
apb::ActivityType::Undo => Ok(undo(self, activity, tx).await?),
|
||||
apb::ActivityType::Delete => Ok(delete(self, activity, tx).await?),
|
||||
apb::ActivityType::Update => Ok(update(self, activity, tx).await?),
|
||||
_ => Err(ProcessorError::Unprocessable),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let Some(object_node) = activity.object().extract() else {
|
||||
// TODO we could process non-embedded activities or arrays but im lazy rn
|
||||
tracing::error!("refusing to process activity without embedded object");
|
||||
|
@ -65,30 +65,30 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
tracing::warn!("failed fetching replies for received object: {e}");
|
||||
}
|
||||
}
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let object_model = ctx.insert_object(object_node).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
let object_model = ctx.insert_object(object_node, tx).await?;
|
||||
let expanded_addressing = ctx.expand_addressing(object_model.addressed()).await?;
|
||||
ctx.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing, tx).await?;
|
||||
tracing::info!("{} posted {}", activity_model.actor, object_model.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let object_uri = activity.object().id()?.to_string();
|
||||
let published = activity.published().unwrap_or_else(|_|chrono::Utc::now());
|
||||
let obj = ctx.fetch_object(&object_uri).await?;
|
||||
if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal)
|
||||
.any(ctx.db())
|
||||
.any(tx)
|
||||
.await?
|
||||
{
|
||||
return Err(ProcessorError::AlreadyProcessed);
|
||||
}
|
||||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
|
||||
let like = crate::model::like::ActiveModel {
|
||||
internal: NotSet,
|
||||
|
@ -97,11 +97,11 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
activity: Set(activity_model.internal),
|
||||
published: Set(published),
|
||||
};
|
||||
crate::model::like::Entity::insert(like).exec(ctx.db()).await?;
|
||||
crate::model::like::Entity::insert(like).exec(tx).await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1))
|
||||
.filter(crate::model::object::Column::Internal.eq(obj.internal))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
|
||||
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
|
@ -111,24 +111,24 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
.select_only()
|
||||
.select_column(crate::model::object::Column::AttributedTo)
|
||||
.into_tuple::<String>()
|
||||
.one(ctx.db())
|
||||
.one(tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?
|
||||
);
|
||||
}
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
|
||||
tracing::info!("{} liked {}", uid, obj.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let source_actor = activity.actor().id()?.to_string();
|
||||
let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, ctx.db())
|
||||
let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let target_actor = activity.object().id()?.to_string();
|
||||
let usr = ctx.fetch_user(&target_actor).await?;
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
let relation_model = crate::model::relation::ActiveModel {
|
||||
internal: NotSet,
|
||||
accept: Set(None),
|
||||
|
@ -137,22 +137,22 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
following: Set(usr.internal),
|
||||
};
|
||||
crate::model::relation::Entity::insert(relation_model)
|
||||
.exec(ctx.db()).await?;
|
||||
.exec(tx).await?;
|
||||
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
if !expanded_addressing.contains(&target_actor) {
|
||||
expanded_addressing.push(target_actor);
|
||||
}
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
|
||||
tracing::info!("{} wants to follow {}", source_actor, usr.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
// TODO what about TentativeAccept
|
||||
let target_actor = activity.actor().id()?.to_string();
|
||||
let follow_request_id = activity.object().id()?.to_string();
|
||||
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
.one(ctx.db())
|
||||
.one(tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
||||
|
@ -160,7 +160,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
return Err(ProcessorError::Unauthorized);
|
||||
}
|
||||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
|
||||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
|
@ -168,7 +168,7 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Expr::col(crate::model::actor::Column::FollowingCount).add(1)
|
||||
)
|
||||
.filter(crate::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
|
@ -176,13 +176,13 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Expr::col(crate::model::actor::Column::FollowersCount).add(1)
|
||||
)
|
||||
.filter(crate::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
|
||||
crate::model::relation::Entity::update_many()
|
||||
.col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
|
||||
.filter(crate::model::relation::Column::Activity.eq(follow_activity.internal))
|
||||
.exec(ctx.db()).await?;
|
||||
.exec(tx).await?;
|
||||
|
||||
tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor);
|
||||
|
||||
|
@ -190,16 +190,16 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
if !expanded_addressing.contains(&follow_activity.actor) {
|
||||
expanded_addressing.push(follow_activity.actor);
|
||||
}
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
// TODO what about TentativeReject?
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let follow_request_id = activity.object().id()?.to_string();
|
||||
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
.one(ctx.db())
|
||||
.one(tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
||||
|
@ -207,11 +207,11 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
return Err(ProcessorError::Unauthorized);
|
||||
}
|
||||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
|
||||
crate::model::relation::Entity::delete_many()
|
||||
.filter(crate::model::relation::Column::Activity.eq(activity_model.internal))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
|
||||
tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor);
|
||||
|
@ -221,19 +221,19 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
expanded_addressing.push(follow_activity.actor);
|
||||
}
|
||||
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn delete(_ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let oid = activity.object().id()?.to_string();
|
||||
crate::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users");
|
||||
crate::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects");
|
||||
crate::model::actor::Entity::delete_by_ap_id(&oid).exec(tx).await.info_failed("failed deleting from users");
|
||||
crate::model::object::Entity::delete_by_ap_id(&oid).exec(tx).await.info_failed("failed deleting from objects");
|
||||
tracing::debug!("deleted '{oid}'");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn update(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let Some(object_node) = activity.object().extract() else {
|
||||
tracing::error!("refusing to process activity without embedded object");
|
||||
|
@ -241,29 +241,29 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
};
|
||||
let oid = object_node.id()?.to_string();
|
||||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
|
||||
match object_node.object_type()? {
|
||||
apb::ObjectType::Actor(_) => {
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let mut actor_model = crate::AP::actor_q(object_node.as_actor()?)?;
|
||||
actor_model.internal = Set(internal_uid);
|
||||
actor_model.updated = Set(chrono::Utc::now());
|
||||
crate::model::actor::Entity::update(actor_model)
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
},
|
||||
apb::ObjectType::Note => {
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, ctx.db())
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let mut object_model = crate::AP::object_q(&object_node)?;
|
||||
object_model.internal = Set(internal_oid);
|
||||
object_model.updated = Set(chrono::Utc::now());
|
||||
crate::model::object::Entity::update(object_model)
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
},
|
||||
t => tracing::warn!("no side effects implemented for update type {t:?}"),
|
||||
|
@ -271,11 +271,11 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
|
||||
tracing::info!("{} updated {}", uid, oid);
|
||||
let expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
// TODO in theory we could work with just object_id but right now only accept embedded
|
||||
let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?;
|
||||
|
@ -287,18 +287,18 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
|
||||
let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string();
|
||||
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
||||
let activity_type = activity.activity_type()?;
|
||||
let targets = ctx.expand_addressing(activity.addressed()).await?;
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &targets).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &targets, tx).await?;
|
||||
|
||||
match activity_type {
|
||||
apb::ActivityType::Like => {
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
crate::model::like::Entity::delete_many()
|
||||
|
@ -307,32 +307,32 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
.add(crate::model::like::Column::Actor.eq(internal_uid))
|
||||
.add(crate::model::like::Column::Object.eq(internal_oid))
|
||||
)
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.filter(crate::model::object::Column::Internal.eq(internal_oid))
|
||||
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).sub(1))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
},
|
||||
apb::ActivityType::Follow => {
|
||||
let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
crate::model::relation::Entity::delete_many()
|
||||
.filter(crate::model::relation::Column::Follower.eq(internal_uid))
|
||||
.filter(crate::model::relation::Column::Following.eq(internal_uid_following))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
crate::model::actor::Entity::update_many()
|
||||
.filter(crate::model::actor::Column::Internal.eq(internal_uid))
|
||||
.col_expr(crate::model::actor::Column::FollowingCount, Expr::col(crate::model::actor::Column::FollowingCount).sub(1))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
crate::model::actor::Entity::update_many()
|
||||
.filter(crate::model::actor::Column::Internal.eq(internal_uid_following))
|
||||
.col_expr(crate::model::actor::Column::FollowersCount, Expr::col(crate::model::actor::Column::FollowersCount).sub(1))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
},
|
||||
t => {
|
||||
|
@ -344,10 +344,10 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let actor = ctx.fetch_user(&uid).await?;
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, tx)
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let announced_id = activity.object().id()?.to_string();
|
||||
|
@ -363,7 +363,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
|
|||
None => {
|
||||
match ctx.pull(&announced_id).await? {
|
||||
// if we receive a remote activity, process it directly
|
||||
Pull::Activity(x) => return ctx.process(x).await,
|
||||
Pull::Activity(x) => return ctx.process(x, tx).await,
|
||||
// actors are not processable at all
|
||||
Pull::Actor(_) => return Err(ProcessorError::Unprocessable),
|
||||
// objects are processed down below, make a mock Internal::Object(internal)
|
||||
|
@ -378,10 +378,10 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
|
|||
crate::context::Internal::Activity(_) => Err(ProcessorError::AlreadyProcessed), // ???
|
||||
crate::context::Internal::Object(internal) => {
|
||||
let object_model = model::object::Entity::find_by_id(internal)
|
||||
.one(ctx.db())
|
||||
.one(tx)
|
||||
.await?
|
||||
.ok_or_else(|| sea_orm::DbErr::RecordNotFound(internal.to_string()))?;
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let activity_model = ctx.insert_activity(activity, tx).await?;
|
||||
|
||||
// relays send us objects as Announce, but we don't really want to count those towards the
|
||||
// total shares count of an object, so just fetch the object and be done with it
|
||||
|
@ -398,13 +398,13 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
|
|||
};
|
||||
|
||||
let expanded_addressing = ctx.expand_addressing(addressed).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing, tx).await?;
|
||||
crate::model::announce::Entity::insert(share)
|
||||
.exec(ctx.db()).await?;
|
||||
.exec(tx).await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
|
||||
.filter(crate::model::object::Column::Internal.eq(object_model.internal))
|
||||
.exec(ctx.db())
|
||||
.exec(tx)
|
||||
.await?;
|
||||
|
||||
tracing::info!("{} shared {}", activity_model.actor, announced_id);
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use sea_orm::TransactionTrait;
|
||||
use upub::traits::Processor;
|
||||
|
||||
|
||||
|
@ -12,5 +13,9 @@ pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate
|
|||
return Ok(());
|
||||
};
|
||||
|
||||
Ok(ctx.process(activity).await?)
|
||||
let tx = ctx.db().begin().await?;
|
||||
ctx.process(activity, &tx).await?;
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut};
|
||||
use sea_orm::{EntityTrait, QueryFilter, QuerySelect, SelectColumns, ColumnTrait};
|
||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns, TransactionTrait};
|
||||
use upub::{model, traits::{Addresser, Processor}, Context};
|
||||
|
||||
|
||||
|
@ -7,6 +7,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
|||
let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?;
|
||||
let mut activity : serde_json::Value = serde_json::from_str(payload)?;
|
||||
let mut t = activity.object_type()?;
|
||||
let tx = ctx.db().begin().await?;
|
||||
|
||||
if matches!(t, apb::ObjectType::Note) {
|
||||
activity = apb::new()
|
||||
|
@ -60,407 +61,11 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
|||
// TODO we expand addressing twice, ugghhhhh
|
||||
let targets = ctx.expand_addressing(activity.addressed()).await?;
|
||||
|
||||
ctx.process(activity).await?;
|
||||
ctx.process(activity, &tx).await?;
|
||||
|
||||
ctx.deliver_to(&job.activity, &job.actor, &targets).await?;
|
||||
ctx.deliver_to(&job.activity, &job.actor, &targets, &tx).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
#[axum::async_trait]
|
||||
impl apb::server::Outbox for Context {
|
||||
type Error = UpubError;
|
||||
type Object = serde_json::Value;
|
||||
type Activity = serde_json::Value;
|
||||
|
||||
async fn create_note(&self, uid: String, object: serde_json::Value) -> crate::Result<String> {
|
||||
self.create(
|
||||
uid,
|
||||
apb::new()
|
||||
.set_activity_type(Some(apb::ActivityType::Create))
|
||||
.set_to(object.to())
|
||||
.set_bto(object.bto())
|
||||
.set_cc(object.cc())
|
||||
.set_bcc(object.bcc())
|
||||
.set_object(Node::object(object))
|
||||
).await
|
||||
}
|
||||
|
||||
async fn create(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let Some(object) = activity.object().extract() else {
|
||||
return Err(UpubError::bad_request());
|
||||
};
|
||||
|
||||
let raw_oid = uuid::Uuid::new_v4().to_string();
|
||||
let oid = self.oid(&raw_oid);
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = activity.addressed();
|
||||
|
||||
if let Some(reply) = object.in_reply_to().id() {
|
||||
self.fetch_object(&reply).await?;
|
||||
}
|
||||
|
||||
self.insert_object(
|
||||
object
|
||||
.set_id(Some(&oid))
|
||||
.set_attributed_to(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
.set_content(content.as_deref())
|
||||
.set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))),
|
||||
Some(self.domain().to_string()),
|
||||
).await?;
|
||||
|
||||
self.insert_activity(
|
||||
activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_object(Node::link(oid.clone()))
|
||||
.set_published(Some(chrono::Utc::now())),
|
||||
Some(self.domain().to_string()),
|
||||
).await?;
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = activity.addressed();
|
||||
let oid = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
let obj_model = self.fetch_object(&oid).await?;
|
||||
|
||||
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
|
||||
|
||||
if model::like::Entity::find_by_uid_oid(internal_uid, obj_model.internal)
|
||||
.any(self.db())
|
||||
.await?
|
||||
{
|
||||
return Err(UpubError::not_modified());
|
||||
}
|
||||
|
||||
let activity_model = self.insert_activity(
|
||||
activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now())),
|
||||
Some(self.domain().to_string()),
|
||||
).await?;
|
||||
|
||||
self.process_like(internal_uid, obj_model.internal, activity_model.internal, chrono::Utc::now()).await?;
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = activity.addressed();
|
||||
let target = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
|
||||
let activity_model = model::activity::ActiveModel::new(
|
||||
&activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
|
||||
let follower_internal = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
|
||||
let following_internal = model::actor::Entity::ap_to_internal(&target, self.db()).await?;
|
||||
|
||||
model::activity::Entity::insert(activity_model)
|
||||
.exec(self.db()).await?;
|
||||
|
||||
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
|
||||
|
||||
let relation_model = model::relation::ActiveModel {
|
||||
internal: NotSet,
|
||||
follower: Set(follower_internal),
|
||||
following: Set(following_internal),
|
||||
activity: Set(internal_aid),
|
||||
accept: Set(None),
|
||||
};
|
||||
|
||||
model::relation::Entity::insert(relation_model)
|
||||
.exec(self.db()).await?;
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = activity.addressed();
|
||||
let accepted_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
let accepted_activity = model::activity::Entity::find_by_ap_id(&accepted_id)
|
||||
.one(self.db()).await?
|
||||
.ok_or_else(UpubError::not_found)?;
|
||||
|
||||
if accepted_activity.activity_type != apb::ActivityType::Follow {
|
||||
return Err(UpubError::bad_request());
|
||||
}
|
||||
if uid != accepted_activity.object.ok_or_else(UpubError::bad_request)? {
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
|
||||
let activity_model = model::activity::ActiveModel::new(
|
||||
&activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
model::activity::Entity::insert(activity_model.into_active_model())
|
||||
.exec(self.db()).await?;
|
||||
|
||||
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
|
||||
|
||||
match accepted_activity.activity_type {
|
||||
apb::ActivityType::Follow => {
|
||||
model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
model::actor::Column::FollowersCount,
|
||||
Expr::col(model::actor::Column::FollowersCount).add(1)
|
||||
)
|
||||
.filter(model::actor::Column::Id.eq(&uid))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
model::relation::Entity::update_many()
|
||||
.filter(model::relation::Column::Activity.eq(accepted_activity.internal))
|
||||
.col_expr(model::relation::Column::Accept, Expr::value(Some(internal_aid)))
|
||||
.exec(self.db()).await?;
|
||||
},
|
||||
t => tracing::error!("no side effects implemented for accepting {t:?}"),
|
||||
}
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn reject(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = activity.addressed();
|
||||
let rejected_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
let rejected_activity = model::activity::Entity::find_by_ap_id(&rejected_id)
|
||||
.one(self.db()).await?
|
||||
.ok_or_else(UpubError::not_found)?;
|
||||
|
||||
if rejected_activity.activity_type != apb::ActivityType::Follow {
|
||||
return Err(UpubError::bad_request());
|
||||
}
|
||||
if uid != rejected_activity.object.ok_or_else(UpubError::bad_request)? {
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
|
||||
let activity_model = model::activity::ActiveModel::new(
|
||||
&activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
model::activity::Entity::insert(activity_model)
|
||||
.exec(self.db()).await?;
|
||||
|
||||
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
|
||||
|
||||
model::relation::Entity::delete_many()
|
||||
.filter(model::relation::Column::Activity.eq(internal_aid))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
|
||||
let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
let old_activity = model::activity::Entity::find_by_ap_id(&old_aid)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(UpubError::not_found)?;
|
||||
if old_activity.actor != uid {
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
|
||||
let activity_model = self.insert_activity(
|
||||
activity.clone()
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now())),
|
||||
Some(self.domain().to_string())
|
||||
).await?;
|
||||
|
||||
let targets = self.expand_addressing(activity.addressed()).await?;
|
||||
self.process_undo(internal_uid, activity).await?;
|
||||
|
||||
self.address_to(Some(activity_model.internal), None, &targets).await?;
|
||||
self.deliver_to(&activity_model.id, &uid, &targets).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn delete(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let oid = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
|
||||
let object = model::object::Entity::find_by_ap_id(&oid)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(UpubError::not_found)?;
|
||||
|
||||
if uid != object.attributed_to.ok_or_else(UpubError::forbidden)? {
|
||||
// can't change objects of others, and objects from noone count as others
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
|
||||
let addressed = activity.addressed();
|
||||
let activity_model = model::activity::ActiveModel::new(
|
||||
&activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
|
||||
model::activity::Entity::insert(activity_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
model::object::Entity::delete_by_ap_id(&oid)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
self.dispatch(&uid, addressed, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn update(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let object_node = activity.object().extract().ok_or_else(UpubError::bad_request)?;
|
||||
let addressed = activity.addressed();
|
||||
let target = object_node.id().ok_or_else(UpubError::bad_request)?.to_string();
|
||||
|
||||
let activity_model = model::activity::ActiveModel::new(
|
||||
&activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
|
||||
model::activity::Entity::insert(activity_model)
|
||||
.exec(self.db()).await?;
|
||||
|
||||
match object_node.object_type() {
|
||||
Some(apb::ObjectType::Actor(_)) => {
|
||||
let old_actor_model = model::actor::Entity::find_by_ap_id(&target)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(UpubError::not_found)?;
|
||||
|
||||
if old_actor_model.id != uid {
|
||||
// can't change user fields of others
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
|
||||
let mut new_actor_model = model::actor::ActiveModel {
|
||||
internal: Unchanged(old_actor_model.internal),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Some(name) = object_node.name() {
|
||||
new_actor_model.name = Set(Some(name.to_string()));
|
||||
}
|
||||
if let Some(summary) = object_node.summary() {
|
||||
new_actor_model.summary = Set(Some(summary.to_string()));
|
||||
}
|
||||
if let Some(image) = object_node.image().id() {
|
||||
new_actor_model.image = Set(Some(image));
|
||||
}
|
||||
if let Some(icon) = object_node.icon().id() {
|
||||
new_actor_model.icon = Set(Some(icon));
|
||||
}
|
||||
new_actor_model.updated = Set(chrono::Utc::now());
|
||||
|
||||
model::actor::Entity::update(new_actor_model)
|
||||
.exec(self.db()).await?;
|
||||
},
|
||||
Some(apb::ObjectType::Note) => {
|
||||
let old_object_model = model::object::Entity::find_by_ap_id(&target)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(UpubError::not_found)?;
|
||||
|
||||
if uid != old_object_model.attributed_to.ok_or_else(UpubError::forbidden)? {
|
||||
// can't change objects of others
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
|
||||
let mut new_object_model = model::object::ActiveModel {
|
||||
internal: Unchanged(old_object_model.internal),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Some(name) = object_node.name() {
|
||||
new_object_model.name = Set(Some(name.to_string()));
|
||||
}
|
||||
if let Some(summary) = object_node.summary() {
|
||||
new_object_model.summary = Set(Some(summary.to_string()));
|
||||
}
|
||||
if let Some(content) = object_node.content() {
|
||||
new_object_model.content = Set(Some(content.to_string()));
|
||||
}
|
||||
new_object_model.updated = Set(chrono::Utc::now());
|
||||
|
||||
model::object::Entity::update(new_object_model)
|
||||
.exec(self.db()).await?;
|
||||
},
|
||||
_ => return Err(UpubError::Status(StatusCode::NOT_IMPLEMENTED)),
|
||||
}
|
||||
|
||||
self.dispatch(&uid, addressed, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
|
||||
async fn announce(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
|
||||
let aid = self.aid(&uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = activity.addressed();
|
||||
let oid = activity.object().id().ok_or_else(UpubError::bad_request)?;
|
||||
let obj = self.fetch_object(&oid).await?;
|
||||
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
|
||||
|
||||
let activity_model = model::activity::ActiveModel::new(
|
||||
&activity
|
||||
.set_id(Some(&aid))
|
||||
.set_actor(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
|
||||
let share_model = model::announce::ActiveModel {
|
||||
internal: NotSet,
|
||||
actor: Set(internal_uid),
|
||||
object: Set(obj.internal),
|
||||
published: Set(chrono::Utc::now()),
|
||||
};
|
||||
model::activity::Entity::insert(activity_model)
|
||||
.exec(self.db()).await?;
|
||||
model::announce::Entity::insert(share_model).exec(self.db()).await?;
|
||||
model::object::Entity::update_many()
|
||||
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
|
||||
.filter(model::object::Column::Internal.eq(obj.internal))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, None).await?;
|
||||
|
||||
Ok(aid)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
|
Loading…
Reference in a new issue