From b910e346eacd4591f3d9da2a098de0e0d09ccce6 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 20 May 2024 02:49:43 +0200 Subject: [PATCH] chore: centralized way to process and insert objects it shouldn't break anything, also names are kind of weird (normalizer? insert??) but ill think about it later, after ive done the same for activities and users --- src/model/attachment.rs | 21 +------ src/server/fetcher.rs | 51 ++-------------- src/server/inbox.rs | 72 ++-------------------- src/server/mod.rs | 1 + src/server/normalizer.rs | 126 +++++++++++++++++++++++++++++++++++++++ src/server/outbox.rs | 90 ++++++---------------------- 6 files changed, 155 insertions(+), 206 deletions(-) create mode 100644 src/server/normalizer.rs diff --git a/src/model/attachment.rs b/src/model/attachment.rs index 833a76ae..1a117969 100644 --- a/src/model/attachment.rs +++ b/src/model/attachment.rs @@ -1,5 +1,5 @@ -use apb::{Document, DocumentMut, Link, Object, ObjectMut}; -use sea_orm::{entity::prelude::*, Set}; +use apb::{DocumentMut, ObjectMut}; +use sea_orm::entity::prelude::*; use crate::routes::activitypub::jsonld::LD; @@ -19,23 +19,6 @@ pub struct Model { pub created: ChronoDateTimeUtc, } -impl ActiveModel { - // TODO receive an impl, not a specific type! - // issue is that it's either an apb::Link or apb::Document, but Document doesnt inherit from link! - pub fn new(document: &serde_json::Value, object: String, media_type: Option) -> Result { - let media_type = media_type.unwrap_or_else(|| document.media_type().unwrap_or("link").to_string()); - Ok(ActiveModel { - id: sea_orm::ActiveValue::NotSet, - object: Set(object), - url: Set(document.url().id().unwrap_or_else(|| document.href().to_string())), - document_type: Set(document.document_type().unwrap_or(apb::DocumentType::Page)), - media_type: Set(media_type), - name: Set(document.name().map(|x| x.to_string())), - created: Set(document.published().unwrap_or(chrono::Utc::now())), - }) - } -} - impl Model { pub fn ap(self) -> serde_json::Value { serde_json::Value::new_object() diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index c33a66f7..151a2c07 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -7,7 +7,7 @@ use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryF use crate::{errors::UpubError, model, VERSION}; -use super::{httpsign::HttpSignature, Context}; +use super::{httpsign::HttpSignature, normalizer::Normalizer, Context}; #[axum::async_trait] pub trait Fetcher { @@ -300,19 +300,13 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res if let Err(e) = ctx.fetch_user(&attributed_to).await { tracing::warn!("could not get actor of fetched object: {e}"); } - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&attributed_to)) - .exec(ctx.db()) - .await?; } let addressed = object.addressed(); - let mut object_model = model::object::Model::new(&object)?; - if let Some(reply) = &object_model.in_reply_to { + if let Some(reply) = object.in_reply_to().id() { if depth <= 16 { - fetch_object_inner(ctx, reply, depth + 1).await?; + fetch_object_inner(ctx, &reply, depth + 1).await?; model::object::Entity::update_many() .filter(model::object::Column::Id.eq(reply)) .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) @@ -323,48 +317,11 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res } } - // fix context also for remote posts - // TODO this is not really appropriate because we're mirroring incorrectly remote objects, but - // it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = fetch_object_inner(ctx, reply_id, depth + 1).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(ctx, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - - for attachment in object.attachment() { - let attachment_model = model::attachment::ActiveModel::new(&attachment, object_model.id.clone(), None)?; - model::attachment::Entity::insert(attachment_model) - .exec(ctx.db()) - .await?; - } - // lemmy sends us an image field in posts, treat it like an attachment i'd say - if let Some(img) = object.image().get() { - // TODO lemmy doesnt tell us the media type but we use it to display the thing... - let img_url = img.url().id().unwrap_or_default(); - let media_type = if img_url.ends_with("png") { - Some("image/png".to_string()) - } else if img_url.ends_with("webp") { - Some("image/webp".to_string()) - } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { - Some("image/jpeg".to_string()) - } else { - None - }; - let attachment_model = model::attachment::ActiveModel::new(img, object_model.id.clone(), media_type)?; - model::attachment::Entity::insert(attachment_model) - .exec(ctx.db()) - .await?; - } + let object_model = ctx.insert_object(object, None).await?; let expanded_addresses = ctx.expand_addressing(addressed).await?; ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?; - model::object::Entity::insert(object_model.clone().into_active_model()) - .exec(ctx.db()).await?; - Ok(object_model) } diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 10171266..1a106504 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -2,7 +2,7 @@ use apb::{target::Addressed, Activity, Base, Object}; use reqwest::StatusCode; use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; -use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}}; +use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}, server::normalizer::Normalizer}; use super::{fetcher::Fetcher, Context}; @@ -20,74 +20,10 @@ impl apb::server::Inbox for Context { tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); return Err(UpubError::unprocessable()); }; - let mut object_model = model::object::Model::new(&object_node)?; - let oid = object_model.id.clone(); - let uid = object_model.attributed_to.clone(); - // make sure we're allowed to edit this object - if let Some(object_author) = &object_model.attributed_to { - if server != Context::server(object_author) { - return Err(UpubError::forbidden()); - } - } else if server != Context::server(&object_model.id) { - return Err(UpubError::forbidden()); - }; - // fix context also for remote posts - // TODO this is not really appropriate because we're mirroring incorrectly remote objects, but - // it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = self.fetch_object(reply_id).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - // update replies counter - if let Some(ref in_reply_to) = object_model.in_reply_to { - if self.fetch_object(in_reply_to).await.is_ok() { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(in_reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) - .exec(self.db()) - .await?; - } - } - model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; - for attachment in object_node.attachment() { - let attachment_model = model::attachment::ActiveModel::new(&attachment, oid.clone(), None)?; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - // lemmy sends us an image field in posts, treat it like an attachment i'd say - if let Some(img) = object_node.image().get() { - // TODO lemmy doesnt tell us the media type but we use it to display the thing... - let img_url = img.url().id().unwrap_or_default(); - let media_type = if img_url.ends_with("png") { - Some("image/png".to_string()) - } else if img_url.ends_with("webp") { - Some("image/webp".to_string()) - } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { - Some("image/jpeg".to_string()) - } else { - None - }; - let attachment_model = model::attachment::ActiveModel::new(img, oid.clone(), media_type)?; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - // TODO can we even receive anonymous objects? - if let Some(object_author) = uid { - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&object_author)) - .exec(self.db()) - .await?; - } + let object_model = self.insert_object(object_node, Some(server)).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?; - tracing::info!("{} posted {}", aid, oid); + self.address_to(Some(&aid), Some(&object_model.id), &expanded_addressing).await?; + tracing::info!("{} posted {}", aid, object_model.id); Ok(()) } diff --git a/src/server/mod.rs b/src/server/mod.rs index b6830b36..392c2812 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,5 +7,6 @@ pub mod outbox; pub mod auth; pub mod builders; pub mod httpsign; +pub mod normalizer; pub use context::Context; diff --git a/src/server/normalizer.rs b/src/server/normalizer.rs new file mode 100644 index 00000000..fb790650 --- /dev/null +++ b/src/server/normalizer.rs @@ -0,0 +1,126 @@ +use apb::{Node, Link, Base, Object, Document}; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set}; +use crate::{errors::UpubError, model, server::Context}; + +use super::fetcher::Fetcher; + +#[axum::async_trait] +pub trait Normalizer { + async fn insert_object(&self, obj: impl apb::Object, server: Option) -> crate::Result; +} + +#[axum::async_trait] +impl Normalizer for super::Context { + async fn insert_object(&self, object_node: impl apb::Object, server: Option) -> crate::Result { + let mut object_model = model::object::Model::new(&object_node)?; + let oid = object_model.id.clone(); + let uid = object_model.attributed_to.clone(); + if let Some(server) = server { + // make sure we're allowed to create this object + if let Some(object_author) = &object_model.attributed_to { + if server != Context::server(object_author) { + return Err(UpubError::forbidden()); + } + } else if server != Context::server(&object_model.id) { + return Err(UpubError::forbidden()); + }; + } + + // make sure content only contains a safe subset of html + if let Some(content) = object_model.content { + object_model.content = Some(mdhtml::safe_markdown(&content)); + } + + // fix context also for remote posts + // TODO this is not really appropriate because we're mirroring incorrectly remote objects, but + // it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them + match (&object_model.in_reply_to, &object_model.context) { + (Some(reply_id), None) => // get context from replied object + object_model.context = self.fetch_object(reply_id).await?.context, + (None, None) => // generate a new context + object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), + (_, Some(_)) => {}, // leave it as set by user + } + + // update replies counter + if let Some(ref in_reply_to) = object_model.in_reply_to { + if self.fetch_object(in_reply_to).await.is_ok() { + model::object::Entity::update_many() + .filter(model::object::Column::Id.eq(in_reply_to)) + .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) + .exec(self.db()) + .await?; + } + } + // update statuses counter + if let Some(object_author) = uid { + model::user::Entity::update_many() + .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) + .filter(model::user::Column::Id.eq(&object_author)) + .exec(self.db()) + .await?; + } + + model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?; + + for attachment in object_node.attachment().flat() { + let attachment_model = match attachment { + Node::Empty => continue, + Node::Array(_) => { + tracing::warn!("ignoring array-in-array while processing attachments"); + continue + }, + Node::Link(l) => model::attachment::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + url: Set(l.href().to_string()), + object: Set(oid.clone()), + document_type: Set(apb::DocumentType::Page), + name: Set(l.link_name().map(|x| x.to_string())), + media_type: Set(l.link_media_type().unwrap_or("link").to_string()), + created: Set(chrono::Utc::now()), + }, + Node::Object(o) => model::attachment::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())), + object: Set(oid.clone()), + document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), + name: Set(o.name().map(|x| x.to_string())), + media_type: Set(o.media_type().unwrap_or("link").to_string()), + created: Set(o.published().unwrap_or_else(|| chrono::Utc::now())), + }, + }; + model::attachment::Entity::insert(attachment_model) + .exec(self.db()) + .await?; + } + // lemmy sends us an image field in posts, treat it like an attachment i'd say + if let Some(img) = object_node.image().get() { + // TODO lemmy doesnt tell us the media type but we use it to display the thing... + let img_url = img.url().id().unwrap_or_default(); + let media_type = if img_url.ends_with("png") { + Some("image/png".to_string()) + } else if img_url.ends_with("webp") { + Some("image/webp".to_string()) + } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { + Some("image/jpeg".to_string()) + } else { + None + }; + + let attachment_model = model::attachment::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())), + object: Set(oid.clone()), + document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), + name: Set(img.name().map(|x| x.to_string())), + media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()), + created: Set(img.published().unwrap_or_else(|| chrono::Utc::now())), + }; + model::attachment::Entity::insert(attachment_model) + .exec(self.db()) + .await?; + } + + Ok(object_model) + } +} diff --git a/src/server/outbox.rs b/src/server/outbox.rs index 3766e75f..5251c10d 100644 --- a/src/server/outbox.rs +++ b/src/server/outbox.rs @@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, EntityTrait, IntoA use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD}; -use super::{fetcher::Fetcher, Context}; +use super::{fetcher::Fetcher, normalizer::Normalizer, Context}; #[axum::async_trait] @@ -18,27 +18,14 @@ impl apb::server::Outbox for Context { let oid = self.oid(raw_oid.clone()); let aid = self.aid(uuid::Uuid::new_v4().to_string()); let activity_targets = object.addressed(); - let mut object_model = model::object::Model::new( - &object + let object_model = self.insert_object( + object .set_id(Some(&oid)) .set_attributed_to(Node::link(uid.clone())) .set_published(Some(chrono::Utc::now())) - )?; - if let Some(content) = object_model.content { - object_model.content = Some(mdhtml::safe_markdown(&content)); - } - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = self.fetch_object(reply_id).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - let reply_to = object_model.in_reply_to.clone(); - - if let Some(fe_url) = &self.cfg().instance.frontend { - object_model.url = Some(format!("{fe_url}/objects/{raw_oid}")); - } + .set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))), + Some(self.base()), + ).await?; let activity_model = model::activity::Model { id: aid.clone(), @@ -53,22 +40,8 @@ impl apb::server::Outbox for Context { published: object_model.published, }; - model::object::Entity::insert(object_model.into_active_model()) - .exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&uid)) - .exec(self.db()) - .await?; - if let Some(reply_to) = reply_to { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) - .exec(self.db()) - .await?; - } self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; @@ -84,60 +57,33 @@ impl apb::server::Outbox for Context { let oid = self.oid(raw_oid.clone()); let aid = self.aid(uuid::Uuid::new_v4().to_string()); let activity_targets = activity.addressed(); - let mut object_model = model::object::Model::new( - &object + + self.insert_object( + object .set_id(Some(&oid)) .set_attributed_to(Node::link(uid.clone())) .set_published(Some(chrono::Utc::now())) - )?; - let mut activity_model = model::activity::Model::new( + .set_to(activity.to()) + .set_bto(activity.bto()) + .set_cc(activity.cc()) + .set_bcc(activity.bcc()), + Some(self.base()), + ).await?; + + let activity_model = model::activity::Model::new( &activity .set_id(Some(&aid)) .set_actor(Node::link(uid.clone())) .set_published(Some(chrono::Utc::now())) + .set_object(Node::link(oid.clone())) )?; - activity_model.object = Some(oid.clone()); - object_model.to = activity_model.to.clone(); - object_model.bto = activity_model.bto.clone(); - object_model.cc = activity_model.cc.clone(); - object_model.bcc = activity_model.bcc.clone(); - if let Some(content) = object_model.content { - object_model.content = Some(mdhtml::safe_markdown(&content)); - } - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = self.fetch_object(reply_id).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - let reply_to = object_model.in_reply_to.clone(); - if let Some(fe_url) = &self.cfg().instance.frontend { - object_model.url = Some(format!("{fe_url}/objects/{raw_oid}")); - } - model::object::Entity::insert(object_model.into_active_model()) - .exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&uid)) - .exec(self.db()) - .await?; - if let Some(reply_to) = reply_to { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) - .exec(self.db()) - .await?; - } self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; - Ok(aid) } - async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result { let aid = self.aid(uuid::Uuid::new_v4().to_string());