chore: centralized way to process and insert objects

it shouldn't break anything, also names are kind of weird (normalizer?
insert??) but ill think about it later, after ive done the same for
activities and users
This commit is contained in:
əlemi 2024-05-20 02:49:43 +02:00
parent f4db94db27
commit b910e346ea
Signed by: alemi
GPG key ID: A4895B84D311642C
6 changed files with 155 additions and 206 deletions

View file

@ -1,5 +1,5 @@
use apb::{Document, DocumentMut, Link, Object, ObjectMut}; use apb::{DocumentMut, ObjectMut};
use sea_orm::{entity::prelude::*, Set}; use sea_orm::entity::prelude::*;
use crate::routes::activitypub::jsonld::LD; use crate::routes::activitypub::jsonld::LD;
@ -19,23 +19,6 @@ pub struct Model {
pub created: ChronoDateTimeUtc, pub created: ChronoDateTimeUtc,
} }
impl ActiveModel {
// TODO receive an impl, not a specific type!
// issue is that it's either an apb::Link or apb::Document, but Document doesnt inherit from link!
pub fn new(document: &serde_json::Value, object: String, media_type: Option<String>) -> Result<ActiveModel, super::FieldError> {
let media_type = media_type.unwrap_or_else(|| document.media_type().unwrap_or("link").to_string());
Ok(ActiveModel {
id: sea_orm::ActiveValue::NotSet,
object: Set(object),
url: Set(document.url().id().unwrap_or_else(|| document.href().to_string())),
document_type: Set(document.document_type().unwrap_or(apb::DocumentType::Page)),
media_type: Set(media_type),
name: Set(document.name().map(|x| x.to_string())),
created: Set(document.published().unwrap_or(chrono::Utc::now())),
})
}
}
impl Model { impl Model {
pub fn ap(self) -> serde_json::Value { pub fn ap(self) -> serde_json::Value {
serde_json::Value::new_object() serde_json::Value::new_object()

View file

@ -7,7 +7,7 @@ use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryF
use crate::{errors::UpubError, model, VERSION}; use crate::{errors::UpubError, model, VERSION};
use super::{httpsign::HttpSignature, Context}; use super::{httpsign::HttpSignature, normalizer::Normalizer, Context};
#[axum::async_trait] #[axum::async_trait]
pub trait Fetcher { pub trait Fetcher {
@ -300,19 +300,13 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
if let Err(e) = ctx.fetch_user(&attributed_to).await { if let Err(e) = ctx.fetch_user(&attributed_to).await {
tracing::warn!("could not get actor of fetched object: {e}"); tracing::warn!("could not get actor of fetched object: {e}");
} }
model::user::Entity::update_many()
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
.filter(model::user::Column::Id.eq(&attributed_to))
.exec(ctx.db())
.await?;
} }
let addressed = object.addressed(); let addressed = object.addressed();
let mut object_model = model::object::Model::new(&object)?;
if let Some(reply) = &object_model.in_reply_to { if let Some(reply) = object.in_reply_to().id() {
if depth <= 16 { if depth <= 16 {
fetch_object_inner(ctx, reply, depth + 1).await?; fetch_object_inner(ctx, &reply, depth + 1).await?;
model::object::Entity::update_many() model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(reply)) .filter(model::object::Column::Id.eq(reply))
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
@ -323,48 +317,11 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
} }
} }
// fix context also for remote posts let object_model = ctx.insert_object(object, None).await?;
// TODO this is not really appropriate because we're mirroring incorrectly remote objects, but
// it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them
match (&object_model.in_reply_to, &object_model.context) {
(Some(reply_id), None) => // get context from replied object
object_model.context = fetch_object_inner(ctx, reply_id, depth + 1).await?.context,
(None, None) => // generate a new context
object_model.context = Some(crate::url!(ctx, "/context/{}", uuid::Uuid::new_v4().to_string())),
(_, Some(_)) => {}, // leave it as set by user
}
for attachment in object.attachment() {
let attachment_model = model::attachment::ActiveModel::new(&attachment, object_model.id.clone(), None)?;
model::attachment::Entity::insert(attachment_model)
.exec(ctx.db())
.await?;
}
// lemmy sends us an image field in posts, treat it like an attachment i'd say
if let Some(img) = object.image().get() {
// TODO lemmy doesnt tell us the media type but we use it to display the thing...
let img_url = img.url().id().unwrap_or_default();
let media_type = if img_url.ends_with("png") {
Some("image/png".to_string())
} else if img_url.ends_with("webp") {
Some("image/webp".to_string())
} else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") {
Some("image/jpeg".to_string())
} else {
None
};
let attachment_model = model::attachment::ActiveModel::new(img, object_model.id.clone(), media_type)?;
model::attachment::Entity::insert(attachment_model)
.exec(ctx.db())
.await?;
}
let expanded_addresses = ctx.expand_addressing(addressed).await?; let expanded_addresses = ctx.expand_addressing(addressed).await?;
ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?; ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?;
model::object::Entity::insert(object_model.clone().into_active_model())
.exec(ctx.db()).await?;
Ok(object_model) Ok(object_model)
} }

View file

@ -2,7 +2,7 @@ use apb::{target::Addressed, Activity, Base, Object};
use reqwest::StatusCode; use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set};
use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}}; use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}, server::normalizer::Normalizer};
use super::{fetcher::Fetcher, Context}; use super::{fetcher::Fetcher, Context};
@ -20,74 +20,10 @@ impl apb::server::Inbox for Context {
tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap());
return Err(UpubError::unprocessable()); return Err(UpubError::unprocessable());
}; };
let mut object_model = model::object::Model::new(&object_node)?; let object_model = self.insert_object(object_node, Some(server)).await?;
let oid = object_model.id.clone();
let uid = object_model.attributed_to.clone();
// make sure we're allowed to edit this object
if let Some(object_author) = &object_model.attributed_to {
if server != Context::server(object_author) {
return Err(UpubError::forbidden());
}
} else if server != Context::server(&object_model.id) {
return Err(UpubError::forbidden());
};
// fix context also for remote posts
// TODO this is not really appropriate because we're mirroring incorrectly remote objects, but
// it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them
match (&object_model.in_reply_to, &object_model.context) {
(Some(reply_id), None) => // get context from replied object
object_model.context = self.fetch_object(reply_id).await?.context,
(None, None) => // generate a new context
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
(_, Some(_)) => {}, // leave it as set by user
}
// update replies counter
if let Some(ref in_reply_to) = object_model.in_reply_to {
if self.fetch_object(in_reply_to).await.is_ok() {
model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(in_reply_to))
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
.exec(self.db())
.await?;
}
}
model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?;
for attachment in object_node.attachment() {
let attachment_model = model::attachment::ActiveModel::new(&attachment, oid.clone(), None)?;
model::attachment::Entity::insert(attachment_model)
.exec(self.db())
.await?;
}
// lemmy sends us an image field in posts, treat it like an attachment i'd say
if let Some(img) = object_node.image().get() {
// TODO lemmy doesnt tell us the media type but we use it to display the thing...
let img_url = img.url().id().unwrap_or_default();
let media_type = if img_url.ends_with("png") {
Some("image/png".to_string())
} else if img_url.ends_with("webp") {
Some("image/webp".to_string())
} else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") {
Some("image/jpeg".to_string())
} else {
None
};
let attachment_model = model::attachment::ActiveModel::new(img, oid.clone(), media_type)?;
model::attachment::Entity::insert(attachment_model)
.exec(self.db())
.await?;
}
// TODO can we even receive anonymous objects?
if let Some(object_author) = uid {
model::user::Entity::update_many()
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
.filter(model::user::Column::Id.eq(&object_author))
.exec(self.db())
.await?;
}
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?; self.address_to(Some(&aid), Some(&object_model.id), &expanded_addressing).await?;
tracing::info!("{} posted {}", aid, oid); tracing::info!("{} posted {}", aid, object_model.id);
Ok(()) Ok(())
} }

View file

@ -7,5 +7,6 @@ pub mod outbox;
pub mod auth; pub mod auth;
pub mod builders; pub mod builders;
pub mod httpsign; pub mod httpsign;
pub mod normalizer;
pub use context::Context; pub use context::Context;

126
src/server/normalizer.rs Normal file
View file

@ -0,0 +1,126 @@
use apb::{Node, Link, Base, Object, Document};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set};
use crate::{errors::UpubError, model, server::Context};
use super::fetcher::Fetcher;
#[axum::async_trait]
pub trait Normalizer {
async fn insert_object(&self, obj: impl apb::Object, server: Option<String>) -> crate::Result<model::object::Model>;
}
#[axum::async_trait]
impl Normalizer for super::Context {
async fn insert_object(&self, object_node: impl apb::Object, server: Option<String>) -> crate::Result<model::object::Model> {
let mut object_model = model::object::Model::new(&object_node)?;
let oid = object_model.id.clone();
let uid = object_model.attributed_to.clone();
if let Some(server) = server {
// make sure we're allowed to create this object
if let Some(object_author) = &object_model.attributed_to {
if server != Context::server(object_author) {
return Err(UpubError::forbidden());
}
} else if server != Context::server(&object_model.id) {
return Err(UpubError::forbidden());
};
}
// make sure content only contains a safe subset of html
if let Some(content) = object_model.content {
object_model.content = Some(mdhtml::safe_markdown(&content));
}
// fix context also for remote posts
// TODO this is not really appropriate because we're mirroring incorrectly remote objects, but
// it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them
match (&object_model.in_reply_to, &object_model.context) {
(Some(reply_id), None) => // get context from replied object
object_model.context = self.fetch_object(reply_id).await?.context,
(None, None) => // generate a new context
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
(_, Some(_)) => {}, // leave it as set by user
}
// update replies counter
if let Some(ref in_reply_to) = object_model.in_reply_to {
if self.fetch_object(in_reply_to).await.is_ok() {
model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(in_reply_to))
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
.exec(self.db())
.await?;
}
}
// update statuses counter
if let Some(object_author) = uid {
model::user::Entity::update_many()
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
.filter(model::user::Column::Id.eq(&object_author))
.exec(self.db())
.await?;
}
model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?;
for attachment in object_node.attachment().flat() {
let attachment_model = match attachment {
Node::Empty => continue,
Node::Array(_) => {
tracing::warn!("ignoring array-in-array while processing attachments");
continue
},
Node::Link(l) => model::attachment::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
url: Set(l.href().to_string()),
object: Set(oid.clone()),
document_type: Set(apb::DocumentType::Page),
name: Set(l.link_name().map(|x| x.to_string())),
media_type: Set(l.link_media_type().unwrap_or("link").to_string()),
created: Set(chrono::Utc::now()),
},
Node::Object(o) => model::attachment::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())),
object: Set(oid.clone()),
document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
name: Set(o.name().map(|x| x.to_string())),
media_type: Set(o.media_type().unwrap_or("link").to_string()),
created: Set(o.published().unwrap_or_else(|| chrono::Utc::now())),
},
};
model::attachment::Entity::insert(attachment_model)
.exec(self.db())
.await?;
}
// lemmy sends us an image field in posts, treat it like an attachment i'd say
if let Some(img) = object_node.image().get() {
// TODO lemmy doesnt tell us the media type but we use it to display the thing...
let img_url = img.url().id().unwrap_or_default();
let media_type = if img_url.ends_with("png") {
Some("image/png".to_string())
} else if img_url.ends_with("webp") {
Some("image/webp".to_string())
} else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") {
Some("image/jpeg".to_string())
} else {
None
};
let attachment_model = model::attachment::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())),
object: Set(oid.clone()),
document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
name: Set(img.name().map(|x| x.to_string())),
media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()),
created: Set(img.published().unwrap_or_else(|| chrono::Utc::now())),
};
model::attachment::Entity::insert(attachment_model)
.exec(self.db())
.await?;
}
Ok(object_model)
}
}

View file

@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, EntityTrait, IntoA
use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD}; use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD};
use super::{fetcher::Fetcher, Context}; use super::{fetcher::Fetcher, normalizer::Normalizer, Context};
#[axum::async_trait] #[axum::async_trait]
@ -18,27 +18,14 @@ impl apb::server::Outbox for Context {
let oid = self.oid(raw_oid.clone()); let oid = self.oid(raw_oid.clone());
let aid = self.aid(uuid::Uuid::new_v4().to_string()); let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = object.addressed(); let activity_targets = object.addressed();
let mut object_model = model::object::Model::new( let object_model = self.insert_object(
&object object
.set_id(Some(&oid)) .set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone())) .set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now())) .set_published(Some(chrono::Utc::now()))
)?; .set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))),
if let Some(content) = object_model.content { Some(self.base()),
object_model.content = Some(mdhtml::safe_markdown(&content)); ).await?;
}
match (&object_model.in_reply_to, &object_model.context) {
(Some(reply_id), None) => // get context from replied object
object_model.context = self.fetch_object(reply_id).await?.context,
(None, None) => // generate a new context
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
(_, Some(_)) => {}, // leave it as set by user
}
let reply_to = object_model.in_reply_to.clone();
if let Some(fe_url) = &self.cfg().instance.frontend {
object_model.url = Some(format!("{fe_url}/objects/{raw_oid}"));
}
let activity_model = model::activity::Model { let activity_model = model::activity::Model {
id: aid.clone(), id: aid.clone(),
@ -53,22 +40,8 @@ impl apb::server::Outbox for Context {
published: object_model.published, published: object_model.published,
}; };
model::object::Entity::insert(object_model.into_active_model())
.exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?; .exec(self.db()).await?;
model::user::Entity::update_many()
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
.filter(model::user::Column::Id.eq(&uid))
.exec(self.db())
.await?;
if let Some(reply_to) = reply_to {
model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(reply_to))
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
.exec(self.db())
.await?;
}
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
@ -84,60 +57,33 @@ impl apb::server::Outbox for Context {
let oid = self.oid(raw_oid.clone()); let oid = self.oid(raw_oid.clone());
let aid = self.aid(uuid::Uuid::new_v4().to_string()); let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed(); let activity_targets = activity.addressed();
let mut object_model = model::object::Model::new(
&object self.insert_object(
object
.set_id(Some(&oid)) .set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone())) .set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now())) .set_published(Some(chrono::Utc::now()))
)?; .set_to(activity.to())
let mut activity_model = model::activity::Model::new( .set_bto(activity.bto())
.set_cc(activity.cc())
.set_bcc(activity.bcc()),
Some(self.base()),
).await?;
let activity_model = model::activity::Model::new(
&activity &activity
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now())) .set_published(Some(chrono::Utc::now()))
.set_object(Node::link(oid.clone()))
)?; )?;
activity_model.object = Some(oid.clone());
object_model.to = activity_model.to.clone();
object_model.bto = activity_model.bto.clone();
object_model.cc = activity_model.cc.clone();
object_model.bcc = activity_model.bcc.clone();
if let Some(content) = object_model.content {
object_model.content = Some(mdhtml::safe_markdown(&content));
}
match (&object_model.in_reply_to, &object_model.context) {
(Some(reply_id), None) => // get context from replied object
object_model.context = self.fetch_object(reply_id).await?.context,
(None, None) => // generate a new context
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
(_, Some(_)) => {}, // leave it as set by user
}
let reply_to = object_model.in_reply_to.clone();
if let Some(fe_url) = &self.cfg().instance.frontend {
object_model.url = Some(format!("{fe_url}/objects/{raw_oid}"));
}
model::object::Entity::insert(object_model.into_active_model())
.exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?; .exec(self.db()).await?;
model::user::Entity::update_many()
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
.filter(model::user::Column::Id.eq(&uid))
.exec(self.db())
.await?;
if let Some(reply_to) = reply_to {
model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(reply_to))
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
.exec(self.db())
.await?;
}
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
Ok(aid) Ok(aid)
} }
async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> { async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(uuid::Uuid::new_v4().to_string()); let aid = self.aid(uuid::Uuid::new_v4().to_string());