diff --git a/src/model/attachment.rs b/src/model/attachment.rs index 833a76a..1a11796 100644 --- a/src/model/attachment.rs +++ b/src/model/attachment.rs @@ -1,5 +1,5 @@ -use apb::{Document, DocumentMut, Link, Object, ObjectMut}; -use sea_orm::{entity::prelude::*, Set}; +use apb::{DocumentMut, ObjectMut}; +use sea_orm::entity::prelude::*; use crate::routes::activitypub::jsonld::LD; @@ -19,23 +19,6 @@ pub struct Model { pub created: ChronoDateTimeUtc, } -impl ActiveModel { - // TODO receive an impl, not a specific type! - // issue is that it's either an apb::Link or apb::Document, but Document doesnt inherit from link! - pub fn new(document: &serde_json::Value, object: String, media_type: Option) -> Result { - let media_type = media_type.unwrap_or_else(|| document.media_type().unwrap_or("link").to_string()); - Ok(ActiveModel { - id: sea_orm::ActiveValue::NotSet, - object: Set(object), - url: Set(document.url().id().unwrap_or_else(|| document.href().to_string())), - document_type: Set(document.document_type().unwrap_or(apb::DocumentType::Page)), - media_type: Set(media_type), - name: Set(document.name().map(|x| x.to_string())), - created: Set(document.published().unwrap_or(chrono::Utc::now())), - }) - } -} - impl Model { pub fn ap(self) -> serde_json::Value { serde_json::Value::new_object() diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index c33a66f..151a2c0 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -7,7 +7,7 @@ use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryF use crate::{errors::UpubError, model, VERSION}; -use super::{httpsign::HttpSignature, Context}; +use super::{httpsign::HttpSignature, normalizer::Normalizer, Context}; #[axum::async_trait] pub trait Fetcher { @@ -300,19 +300,13 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res if let Err(e) = ctx.fetch_user(&attributed_to).await { tracing::warn!("could not get actor of fetched object: {e}"); } - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&attributed_to)) - .exec(ctx.db()) - .await?; } let addressed = object.addressed(); - let mut object_model = model::object::Model::new(&object)?; - if let Some(reply) = &object_model.in_reply_to { + if let Some(reply) = object.in_reply_to().id() { if depth <= 16 { - fetch_object_inner(ctx, reply, depth + 1).await?; + fetch_object_inner(ctx, &reply, depth + 1).await?; model::object::Entity::update_many() .filter(model::object::Column::Id.eq(reply)) .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) @@ -323,48 +317,11 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res } } - // fix context also for remote posts - // TODO this is not really appropriate because we're mirroring incorrectly remote objects, but - // it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = fetch_object_inner(ctx, reply_id, depth + 1).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(ctx, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - - for attachment in object.attachment() { - let attachment_model = model::attachment::ActiveModel::new(&attachment, object_model.id.clone(), None)?; - model::attachment::Entity::insert(attachment_model) - .exec(ctx.db()) - .await?; - } - // lemmy sends us an image field in posts, treat it like an attachment i'd say - if let Some(img) = object.image().get() { - // TODO lemmy doesnt tell us the media type but we use it to display the thing... - let img_url = img.url().id().unwrap_or_default(); - let media_type = if img_url.ends_with("png") { - Some("image/png".to_string()) - } else if img_url.ends_with("webp") { - Some("image/webp".to_string()) - } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { - Some("image/jpeg".to_string()) - } else { - None - }; - let attachment_model = model::attachment::ActiveModel::new(img, object_model.id.clone(), media_type)?; - model::attachment::Entity::insert(attachment_model) - .exec(ctx.db()) - .await?; - } + let object_model = ctx.insert_object(object, None).await?; let expanded_addresses = ctx.expand_addressing(addressed).await?; ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?; - model::object::Entity::insert(object_model.clone().into_active_model()) - .exec(ctx.db()).await?; - Ok(object_model) } diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 1017126..1a10650 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -2,7 +2,7 @@ use apb::{target::Addressed, Activity, Base, Object}; use reqwest::StatusCode; use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; -use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}}; +use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}, server::normalizer::Normalizer}; use super::{fetcher::Fetcher, Context}; @@ -20,74 +20,10 @@ impl apb::server::Inbox for Context { tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); return Err(UpubError::unprocessable()); }; - let mut object_model = model::object::Model::new(&object_node)?; - let oid = object_model.id.clone(); - let uid = object_model.attributed_to.clone(); - // make sure we're allowed to edit this object - if let Some(object_author) = &object_model.attributed_to { - if server != Context::server(object_author) { - return Err(UpubError::forbidden()); - } - } else if server != Context::server(&object_model.id) { - return Err(UpubError::forbidden()); - }; - // fix context also for remote posts - // TODO this is not really appropriate because we're mirroring incorrectly remote objects, but - // it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = self.fetch_object(reply_id).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - // update replies counter - if let Some(ref in_reply_to) = object_model.in_reply_to { - if self.fetch_object(in_reply_to).await.is_ok() { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(in_reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) - .exec(self.db()) - .await?; - } - } - model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; - for attachment in object_node.attachment() { - let attachment_model = model::attachment::ActiveModel::new(&attachment, oid.clone(), None)?; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - // lemmy sends us an image field in posts, treat it like an attachment i'd say - if let Some(img) = object_node.image().get() { - // TODO lemmy doesnt tell us the media type but we use it to display the thing... - let img_url = img.url().id().unwrap_or_default(); - let media_type = if img_url.ends_with("png") { - Some("image/png".to_string()) - } else if img_url.ends_with("webp") { - Some("image/webp".to_string()) - } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { - Some("image/jpeg".to_string()) - } else { - None - }; - let attachment_model = model::attachment::ActiveModel::new(img, oid.clone(), media_type)?; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - // TODO can we even receive anonymous objects? - if let Some(object_author) = uid { - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&object_author)) - .exec(self.db()) - .await?; - } + let object_model = self.insert_object(object_node, Some(server)).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?; - tracing::info!("{} posted {}", aid, oid); + self.address_to(Some(&aid), Some(&object_model.id), &expanded_addressing).await?; + tracing::info!("{} posted {}", aid, object_model.id); Ok(()) } diff --git a/src/server/mod.rs b/src/server/mod.rs index b6830b3..392c281 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,5 +7,6 @@ pub mod outbox; pub mod auth; pub mod builders; pub mod httpsign; +pub mod normalizer; pub use context::Context; diff --git a/src/server/normalizer.rs b/src/server/normalizer.rs new file mode 100644 index 0000000..fb79065 --- /dev/null +++ b/src/server/normalizer.rs @@ -0,0 +1,126 @@ +use apb::{Node, Link, Base, Object, Document}; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set}; +use crate::{errors::UpubError, model, server::Context}; + +use super::fetcher::Fetcher; + +#[axum::async_trait] +pub trait Normalizer { + async fn insert_object(&self, obj: impl apb::Object, server: Option) -> crate::Result; +} + +#[axum::async_trait] +impl Normalizer for super::Context { + async fn insert_object(&self, object_node: impl apb::Object, server: Option) -> crate::Result { + let mut object_model = model::object::Model::new(&object_node)?; + let oid = object_model.id.clone(); + let uid = object_model.attributed_to.clone(); + if let Some(server) = server { + // make sure we're allowed to create this object + if let Some(object_author) = &object_model.attributed_to { + if server != Context::server(object_author) { + return Err(UpubError::forbidden()); + } + } else if server != Context::server(&object_model.id) { + return Err(UpubError::forbidden()); + }; + } + + // make sure content only contains a safe subset of html + if let Some(content) = object_model.content { + object_model.content = Some(mdhtml::safe_markdown(&content)); + } + + // fix context also for remote posts + // TODO this is not really appropriate because we're mirroring incorrectly remote objects, but + // it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them + match (&object_model.in_reply_to, &object_model.context) { + (Some(reply_id), None) => // get context from replied object + object_model.context = self.fetch_object(reply_id).await?.context, + (None, None) => // generate a new context + object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), + (_, Some(_)) => {}, // leave it as set by user + } + + // update replies counter + if let Some(ref in_reply_to) = object_model.in_reply_to { + if self.fetch_object(in_reply_to).await.is_ok() { + model::object::Entity::update_many() + .filter(model::object::Column::Id.eq(in_reply_to)) + .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) + .exec(self.db()) + .await?; + } + } + // update statuses counter + if let Some(object_author) = uid { + model::user::Entity::update_many() + .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) + .filter(model::user::Column::Id.eq(&object_author)) + .exec(self.db()) + .await?; + } + + model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?; + + for attachment in object_node.attachment().flat() { + let attachment_model = match attachment { + Node::Empty => continue, + Node::Array(_) => { + tracing::warn!("ignoring array-in-array while processing attachments"); + continue + }, + Node::Link(l) => model::attachment::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + url: Set(l.href().to_string()), + object: Set(oid.clone()), + document_type: Set(apb::DocumentType::Page), + name: Set(l.link_name().map(|x| x.to_string())), + media_type: Set(l.link_media_type().unwrap_or("link").to_string()), + created: Set(chrono::Utc::now()), + }, + Node::Object(o) => model::attachment::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())), + object: Set(oid.clone()), + document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), + name: Set(o.name().map(|x| x.to_string())), + media_type: Set(o.media_type().unwrap_or("link").to_string()), + created: Set(o.published().unwrap_or_else(|| chrono::Utc::now())), + }, + }; + model::attachment::Entity::insert(attachment_model) + .exec(self.db()) + .await?; + } + // lemmy sends us an image field in posts, treat it like an attachment i'd say + if let Some(img) = object_node.image().get() { + // TODO lemmy doesnt tell us the media type but we use it to display the thing... + let img_url = img.url().id().unwrap_or_default(); + let media_type = if img_url.ends_with("png") { + Some("image/png".to_string()) + } else if img_url.ends_with("webp") { + Some("image/webp".to_string()) + } else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") { + Some("image/jpeg".to_string()) + } else { + None + }; + + let attachment_model = model::attachment::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())), + object: Set(oid.clone()), + document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), + name: Set(img.name().map(|x| x.to_string())), + media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()), + created: Set(img.published().unwrap_or_else(|| chrono::Utc::now())), + }; + model::attachment::Entity::insert(attachment_model) + .exec(self.db()) + .await?; + } + + Ok(object_model) + } +} diff --git a/src/server/outbox.rs b/src/server/outbox.rs index 3766e75..5251c10 100644 --- a/src/server/outbox.rs +++ b/src/server/outbox.rs @@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, EntityTrait, IntoA use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD}; -use super::{fetcher::Fetcher, Context}; +use super::{fetcher::Fetcher, normalizer::Normalizer, Context}; #[axum::async_trait] @@ -18,27 +18,14 @@ impl apb::server::Outbox for Context { let oid = self.oid(raw_oid.clone()); let aid = self.aid(uuid::Uuid::new_v4().to_string()); let activity_targets = object.addressed(); - let mut object_model = model::object::Model::new( - &object + let object_model = self.insert_object( + object .set_id(Some(&oid)) .set_attributed_to(Node::link(uid.clone())) .set_published(Some(chrono::Utc::now())) - )?; - if let Some(content) = object_model.content { - object_model.content = Some(mdhtml::safe_markdown(&content)); - } - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = self.fetch_object(reply_id).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - let reply_to = object_model.in_reply_to.clone(); - - if let Some(fe_url) = &self.cfg().instance.frontend { - object_model.url = Some(format!("{fe_url}/objects/{raw_oid}")); - } + .set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))), + Some(self.base()), + ).await?; let activity_model = model::activity::Model { id: aid.clone(), @@ -53,22 +40,8 @@ impl apb::server::Outbox for Context { published: object_model.published, }; - model::object::Entity::insert(object_model.into_active_model()) - .exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&uid)) - .exec(self.db()) - .await?; - if let Some(reply_to) = reply_to { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) - .exec(self.db()) - .await?; - } self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; @@ -84,60 +57,33 @@ impl apb::server::Outbox for Context { let oid = self.oid(raw_oid.clone()); let aid = self.aid(uuid::Uuid::new_v4().to_string()); let activity_targets = activity.addressed(); - let mut object_model = model::object::Model::new( - &object + + self.insert_object( + object .set_id(Some(&oid)) .set_attributed_to(Node::link(uid.clone())) .set_published(Some(chrono::Utc::now())) - )?; - let mut activity_model = model::activity::Model::new( + .set_to(activity.to()) + .set_bto(activity.bto()) + .set_cc(activity.cc()) + .set_bcc(activity.bcc()), + Some(self.base()), + ).await?; + + let activity_model = model::activity::Model::new( &activity .set_id(Some(&aid)) .set_actor(Node::link(uid.clone())) .set_published(Some(chrono::Utc::now())) + .set_object(Node::link(oid.clone())) )?; - activity_model.object = Some(oid.clone()); - object_model.to = activity_model.to.clone(); - object_model.bto = activity_model.bto.clone(); - object_model.cc = activity_model.cc.clone(); - object_model.bcc = activity_model.bcc.clone(); - if let Some(content) = object_model.content { - object_model.content = Some(mdhtml::safe_markdown(&content)); - } - match (&object_model.in_reply_to, &object_model.context) { - (Some(reply_id), None) => // get context from replied object - object_model.context = self.fetch_object(reply_id).await?.context, - (None, None) => // generate a new context - object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())), - (_, Some(_)) => {}, // leave it as set by user - } - let reply_to = object_model.in_reply_to.clone(); - if let Some(fe_url) = &self.cfg().instance.frontend { - object_model.url = Some(format!("{fe_url}/objects/{raw_oid}")); - } - model::object::Entity::insert(object_model.into_active_model()) - .exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&uid)) - .exec(self.db()) - .await?; - if let Some(reply_to) = reply_to { - model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) - .exec(self.db()) - .await?; - } self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; - Ok(aid) } - async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result { let aid = self.aid(uuid::Uuid::new_v4().to_string());