chore: centralized way to process and insert objects
it shouldn't break anything, also names are kind of weird (normalizer? insert??) but ill think about it later, after ive done the same for activities and users
This commit is contained in:
parent
f4db94db27
commit
53f9f6d3f3
6 changed files with 143 additions and 167 deletions
|
@ -1,5 +1,5 @@
|
|||
use apb::{Document, DocumentMut, Link, Object, ObjectMut};
|
||||
use sea_orm::{entity::prelude::*, Set};
|
||||
use apb::{DocumentMut, ObjectMut};
|
||||
use sea_orm::entity::prelude::*;
|
||||
|
||||
use crate::routes::activitypub::jsonld::LD;
|
||||
|
||||
|
@ -19,23 +19,6 @@ pub struct Model {
|
|||
pub created: ChronoDateTimeUtc,
|
||||
}
|
||||
|
||||
impl ActiveModel {
|
||||
// TODO receive an impl, not a specific type!
|
||||
// issue is that it's either an apb::Link or apb::Document, but Document doesnt inherit from link!
|
||||
pub fn new(document: &serde_json::Value, object: String, media_type: Option<String>) -> Result<ActiveModel, super::FieldError> {
|
||||
let media_type = media_type.unwrap_or_else(|| document.media_type().unwrap_or("link").to_string());
|
||||
Ok(ActiveModel {
|
||||
id: sea_orm::ActiveValue::NotSet,
|
||||
object: Set(object),
|
||||
url: Set(document.url().id().unwrap_or_else(|| document.href().to_string())),
|
||||
document_type: Set(document.document_type().unwrap_or(apb::DocumentType::Page)),
|
||||
media_type: Set(media_type),
|
||||
name: Set(document.name().map(|x| x.to_string())),
|
||||
created: Set(document.published().unwrap_or(chrono::Utc::now())),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Model {
|
||||
pub fn ap(self) -> serde_json::Value {
|
||||
serde_json::Value::new_object()
|
||||
|
|
|
@ -7,7 +7,7 @@ use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryF
|
|||
|
||||
use crate::{errors::UpubError, model, VERSION};
|
||||
|
||||
use super::{httpsign::HttpSignature, Context};
|
||||
use super::{httpsign::HttpSignature, normalizer::Normalizer, Context};
|
||||
|
||||
#[axum::async_trait]
|
||||
pub trait Fetcher {
|
||||
|
@ -300,19 +300,13 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
|
|||
if let Err(e) = ctx.fetch_user(&attributed_to).await {
|
||||
tracing::warn!("could not get actor of fetched object: {e}");
|
||||
}
|
||||
model::user::Entity::update_many()
|
||||
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
|
||||
.filter(model::user::Column::Id.eq(&attributed_to))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
}
|
||||
|
||||
let addressed = object.addressed();
|
||||
let mut object_model = model::object::Model::new(&object)?;
|
||||
|
||||
if let Some(reply) = &object_model.in_reply_to {
|
||||
if let Some(reply) = object.in_reply_to().id() {
|
||||
if depth <= 16 {
|
||||
fetch_object_inner(ctx, reply, depth + 1).await?;
|
||||
fetch_object_inner(ctx, &reply, depth + 1).await?;
|
||||
model::object::Entity::update_many()
|
||||
.filter(model::object::Column::Id.eq(reply))
|
||||
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
|
||||
|
@ -323,48 +317,11 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
|
|||
}
|
||||
}
|
||||
|
||||
// fix context also for remote posts
|
||||
// TODO this is not really appropriate because we're mirroring incorrectly remote objects, but
|
||||
// it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them
|
||||
match (&object_model.in_reply_to, &object_model.context) {
|
||||
(Some(reply_id), None) => // get context from replied object
|
||||
object_model.context = fetch_object_inner(ctx, reply_id, depth + 1).await?.context,
|
||||
(None, None) => // generate a new context
|
||||
object_model.context = Some(crate::url!(ctx, "/context/{}", uuid::Uuid::new_v4().to_string())),
|
||||
(_, Some(_)) => {}, // leave it as set by user
|
||||
}
|
||||
|
||||
for attachment in object.attachment() {
|
||||
let attachment_model = model::attachment::ActiveModel::new(&attachment, object_model.id.clone(), None)?;
|
||||
model::attachment::Entity::insert(attachment_model)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
}
|
||||
// lemmy sends us an image field in posts, treat it like an attachment i'd say
|
||||
if let Some(img) = object.image().get() {
|
||||
// TODO lemmy doesnt tell us the media type but we use it to display the thing...
|
||||
let img_url = img.url().id().unwrap_or_default();
|
||||
let media_type = if img_url.ends_with("png") {
|
||||
Some("image/png".to_string())
|
||||
} else if img_url.ends_with("webp") {
|
||||
Some("image/webp".to_string())
|
||||
} else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") {
|
||||
Some("image/jpeg".to_string())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let attachment_model = model::attachment::ActiveModel::new(img, object_model.id.clone(), media_type)?;
|
||||
model::attachment::Entity::insert(attachment_model)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
}
|
||||
let object_model = ctx.insert_object(object, None).await?;
|
||||
|
||||
let expanded_addresses = ctx.expand_addressing(addressed).await?;
|
||||
ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?;
|
||||
|
||||
model::object::Entity::insert(object_model.clone().into_active_model())
|
||||
.exec(ctx.db()).await?;
|
||||
|
||||
Ok(object_model)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ use apb::{target::Addressed, Activity, Base, Object};
|
|||
use reqwest::StatusCode;
|
||||
use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set};
|
||||
|
||||
use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}};
|
||||
use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}, server::normalizer::Normalizer};
|
||||
|
||||
use super::{fetcher::Fetcher, Context};
|
||||
|
||||
|
@ -20,74 +20,10 @@ impl apb::server::Inbox for Context {
|
|||
tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap());
|
||||
return Err(UpubError::unprocessable());
|
||||
};
|
||||
let mut object_model = model::object::Model::new(&object_node)?;
|
||||
let oid = object_model.id.clone();
|
||||
let uid = object_model.attributed_to.clone();
|
||||
// make sure we're allowed to edit this object
|
||||
if let Some(object_author) = &object_model.attributed_to {
|
||||
if server != Context::server(object_author) {
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
} else if server != Context::server(&object_model.id) {
|
||||
return Err(UpubError::forbidden());
|
||||
};
|
||||
// fix context also for remote posts
|
||||
// TODO this is not really appropriate because we're mirroring incorrectly remote objects, but
|
||||
// it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them
|
||||
match (&object_model.in_reply_to, &object_model.context) {
|
||||
(Some(reply_id), None) => // get context from replied object
|
||||
object_model.context = self.fetch_object(reply_id).await?.context,
|
||||
(None, None) => // generate a new context
|
||||
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
|
||||
(_, Some(_)) => {}, // leave it as set by user
|
||||
}
|
||||
// update replies counter
|
||||
if let Some(ref in_reply_to) = object_model.in_reply_to {
|
||||
if self.fetch_object(in_reply_to).await.is_ok() {
|
||||
model::object::Entity::update_many()
|
||||
.filter(model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?;
|
||||
model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?;
|
||||
for attachment in object_node.attachment() {
|
||||
let attachment_model = model::attachment::ActiveModel::new(&attachment, oid.clone(), None)?;
|
||||
model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
// lemmy sends us an image field in posts, treat it like an attachment i'd say
|
||||
if let Some(img) = object_node.image().get() {
|
||||
// TODO lemmy doesnt tell us the media type but we use it to display the thing...
|
||||
let img_url = img.url().id().unwrap_or_default();
|
||||
let media_type = if img_url.ends_with("png") {
|
||||
Some("image/png".to_string())
|
||||
} else if img_url.ends_with("webp") {
|
||||
Some("image/webp".to_string())
|
||||
} else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") {
|
||||
Some("image/jpeg".to_string())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let attachment_model = model::attachment::ActiveModel::new(img, oid.clone(), media_type)?;
|
||||
model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
// TODO can we even receive anonymous objects?
|
||||
if let Some(object_author) = uid {
|
||||
model::user::Entity::update_many()
|
||||
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
|
||||
.filter(model::user::Column::Id.eq(&object_author))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
let object_model = self.insert_object(object_node, Some(server)).await?;
|
||||
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
|
||||
self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?;
|
||||
tracing::info!("{} posted {}", aid, oid);
|
||||
self.address_to(Some(&aid), Some(&object_model.id), &expanded_addressing).await?;
|
||||
tracing::info!("{} posted {}", aid, object_model.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -7,5 +7,6 @@ pub mod outbox;
|
|||
pub mod auth;
|
||||
pub mod builders;
|
||||
pub mod httpsign;
|
||||
pub mod normalizer;
|
||||
|
||||
pub use context::Context;
|
||||
|
|
126
src/server/normalizer.rs
Normal file
126
src/server/normalizer.rs
Normal file
|
@ -0,0 +1,126 @@
|
|||
use apb::{Node, Link, Base, Object, Document};
|
||||
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set};
|
||||
use crate::{errors::UpubError, model, server::Context};
|
||||
|
||||
use super::fetcher::Fetcher;
|
||||
|
||||
#[axum::async_trait]
|
||||
pub trait Normalizer {
|
||||
async fn insert_object(&self, obj: impl apb::Object, server: Option<String>) -> crate::Result<model::object::Model>;
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
impl Normalizer for super::Context {
|
||||
async fn insert_object(&self, object_node: impl apb::Object, server: Option<String>) -> crate::Result<model::object::Model> {
|
||||
let mut object_model = model::object::Model::new(&object_node)?;
|
||||
let oid = object_model.id.clone();
|
||||
let uid = object_model.attributed_to.clone();
|
||||
if let Some(server) = server {
|
||||
// make sure we're allowed to create this object
|
||||
if let Some(object_author) = &object_model.attributed_to {
|
||||
if server != Context::server(object_author) {
|
||||
return Err(UpubError::forbidden());
|
||||
}
|
||||
} else if server != Context::server(&object_model.id) {
|
||||
return Err(UpubError::forbidden());
|
||||
};
|
||||
}
|
||||
|
||||
// make sure content only contains a safe subset of html
|
||||
if let Some(content) = object_model.content {
|
||||
object_model.content = Some(mdhtml::safe_markdown(&content));
|
||||
}
|
||||
|
||||
// fix context also for remote posts
|
||||
// TODO this is not really appropriate because we're mirroring incorrectly remote objects, but
|
||||
// it makes it SOO MUCH EASIER for us to fetch threads and stuff, so we're filling it for them
|
||||
match (&object_model.in_reply_to, &object_model.context) {
|
||||
(Some(reply_id), None) => // get context from replied object
|
||||
object_model.context = self.fetch_object(reply_id).await?.context,
|
||||
(None, None) => // generate a new context
|
||||
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
|
||||
(_, Some(_)) => {}, // leave it as set by user
|
||||
}
|
||||
|
||||
// update replies counter
|
||||
if let Some(ref in_reply_to) = object_model.in_reply_to {
|
||||
if self.fetch_object(in_reply_to).await.is_ok() {
|
||||
model::object::Entity::update_many()
|
||||
.filter(model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
// update statuses counter
|
||||
if let Some(object_author) = uid {
|
||||
model::user::Entity::update_many()
|
||||
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
|
||||
.filter(model::user::Column::Id.eq(&object_author))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
||||
model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?;
|
||||
|
||||
for attachment in object_node.attachment().flat() {
|
||||
let attachment_model = match attachment {
|
||||
Node::Empty => continue,
|
||||
Node::Array(_) => {
|
||||
tracing::warn!("ignoring array-in-array while processing attachments");
|
||||
continue
|
||||
},
|
||||
Node::Link(l) => model::attachment::ActiveModel {
|
||||
id: sea_orm::ActiveValue::NotSet,
|
||||
url: Set(l.href().to_string()),
|
||||
object: Set(oid.clone()),
|
||||
document_type: Set(apb::DocumentType::Page),
|
||||
name: Set(l.link_name().map(|x| x.to_string())),
|
||||
media_type: Set(l.link_media_type().unwrap_or("link").to_string()),
|
||||
created: Set(chrono::Utc::now()),
|
||||
},
|
||||
Node::Object(o) => model::attachment::ActiveModel {
|
||||
id: sea_orm::ActiveValue::NotSet,
|
||||
url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())),
|
||||
object: Set(oid.clone()),
|
||||
document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
|
||||
name: Set(o.name().map(|x| x.to_string())),
|
||||
media_type: Set(o.media_type().unwrap_or("link").to_string()),
|
||||
created: Set(o.published().unwrap_or_else(|| chrono::Utc::now())),
|
||||
},
|
||||
};
|
||||
model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
// lemmy sends us an image field in posts, treat it like an attachment i'd say
|
||||
if let Some(img) = object_node.image().get() {
|
||||
// TODO lemmy doesnt tell us the media type but we use it to display the thing...
|
||||
let img_url = img.url().id().unwrap_or_default();
|
||||
let media_type = if img_url.ends_with("png") {
|
||||
Some("image/png".to_string())
|
||||
} else if img_url.ends_with("webp") {
|
||||
Some("image/webp".to_string())
|
||||
} else if img_url.ends_with("jpeg") || img_url.ends_with("jpg") {
|
||||
Some("image/jpeg".to_string())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let attachment_model = model::attachment::ActiveModel {
|
||||
id: sea_orm::ActiveValue::NotSet,
|
||||
url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())),
|
||||
object: Set(oid.clone()),
|
||||
document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
|
||||
name: Set(img.name().map(|x| x.to_string())),
|
||||
media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()),
|
||||
created: Set(img.published().unwrap_or_else(|| chrono::Utc::now())),
|
||||
};
|
||||
model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(object_model)
|
||||
}
|
||||
}
|
|
@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, EntityTrait, IntoA
|
|||
|
||||
use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD};
|
||||
|
||||
use super::{fetcher::Fetcher, Context};
|
||||
use super::{fetcher::Fetcher, normalizer::Normalizer, Context};
|
||||
|
||||
|
||||
#[axum::async_trait]
|
||||
|
@ -18,27 +18,14 @@ impl apb::server::Outbox for Context {
|
|||
let oid = self.oid(raw_oid.clone());
|
||||
let aid = self.aid(uuid::Uuid::new_v4().to_string());
|
||||
let activity_targets = object.addressed();
|
||||
let mut object_model = model::object::Model::new(
|
||||
&object
|
||||
let object_model = self.insert_object(
|
||||
object
|
||||
.set_id(Some(&oid))
|
||||
.set_attributed_to(Node::link(uid.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
)?;
|
||||
if let Some(content) = object_model.content {
|
||||
object_model.content = Some(mdhtml::safe_markdown(&content));
|
||||
}
|
||||
match (&object_model.in_reply_to, &object_model.context) {
|
||||
(Some(reply_id), None) => // get context from replied object
|
||||
object_model.context = self.fetch_object(reply_id).await?.context,
|
||||
(None, None) => // generate a new context
|
||||
object_model.context = Some(crate::url!(self, "/context/{}", uuid::Uuid::new_v4().to_string())),
|
||||
(_, Some(_)) => {}, // leave it as set by user
|
||||
}
|
||||
let reply_to = object_model.in_reply_to.clone();
|
||||
|
||||
if let Some(fe_url) = &self.cfg().instance.frontend {
|
||||
object_model.url = Some(format!("{fe_url}/objects/{raw_oid}"));
|
||||
}
|
||||
.set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))),
|
||||
Some(self.base()),
|
||||
).await?;
|
||||
|
||||
let activity_model = model::activity::Model {
|
||||
id: aid.clone(),
|
||||
|
@ -53,22 +40,8 @@ impl apb::server::Outbox for Context {
|
|||
published: object_model.published,
|
||||
};
|
||||
|
||||
model::object::Entity::insert(object_model.into_active_model())
|
||||
.exec(self.db()).await?;
|
||||
model::activity::Entity::insert(activity_model.into_active_model())
|
||||
.exec(self.db()).await?;
|
||||
model::user::Entity::update_many()
|
||||
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
|
||||
.filter(model::user::Column::Id.eq(&uid))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
if let Some(reply_to) = reply_to {
|
||||
model::object::Entity::update_many()
|
||||
.filter(model::object::Column::Id.eq(reply_to))
|
||||
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
||||
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
|
||||
|
||||
|
|
Loading…
Reference in a new issue