diff --git a/src/model/addressing.rs b/src/model/addressing.rs index 95b410b3..7c9fdf50 100644 --- a/src/model/addressing.rs +++ b/src/model/addressing.rs @@ -1,5 +1,7 @@ use apb::{ActivityMut, ObjectMut}; -use sea_orm::{entity::prelude::*, FromQueryResult, Iterable, Order, QueryOrder, QuerySelect, SelectColumns}; +use sea_orm::{entity::prelude::*, Condition, FromQueryResult, Iterable, Order, QueryOrder, QuerySelect, SelectColumns}; + +use crate::routes::activitypub::jsonld::LD; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "addressing")] @@ -59,105 +61,81 @@ impl ActiveModelBehavior for ActiveModel {} - - -#[derive(Debug)] -pub struct EmbeddedActivity { - pub activity: crate::model::activity::Model, - pub object: Option, +#[allow(clippy::large_enum_variant)] // tombstone is an outlier, not the norm! this is a beefy enum +#[derive(Debug, Clone)] +pub enum Event { + Tombstone, + StrayObject(crate::model::object::Model), + Activity(crate::model::activity::Model), + DeepActivity { + activity: crate::model::activity::Model, + object: crate::model::object::Model, + } } -impl EmbeddedActivity { - pub async fn ap_filled(self, db: &DatabaseConnection) -> crate::Result { - let a = self.activity.ap(); - match self.object { - None => Ok(a), - Some(o) => { - let attachments = o.find_related(crate::model::attachment::Entity) - .all(db) - .await? - .into_iter() - .map(|x| x.ap()) - .collect(); - Ok(a.set_object( - apb::Node::object(o.ap().set_attachment(apb::Node::array(attachments))) - )) - } + +impl Event { + pub fn id(&self) -> &str { + match self { + Event::Tombstone => "", + Event::StrayObject(x) => x.id.as_str(), + Event::Activity(x) => x.id.as_str(), + Event::DeepActivity { activity: _, object } => object.id.as_str(), + } + } + + pub fn ap(self, attachment: Option>) -> serde_json::Value { + let attachment = match attachment { + None => apb::Node::Empty, + Some(vec) => apb::Node::array( + vec.into_iter().map(|x| x.ap()).collect() + ), + }; + match self { + Event::Activity(x) => x.ap(), + Event::DeepActivity { activity, object } => + activity.ap().set_object(apb::Node::object(object.ap().set_attachment(attachment))), + Event::StrayObject(x) => serde_json::Value::new_object() + .set_activity_type(Some(apb::ActivityType::Activity)) + .set_object(apb::Node::object(x.ap().set_attachment(attachment))), + Event::Tombstone => serde_json::Value::new_object() + .set_activity_type(Some(apb::ActivityType::Activity)) + .set_object(apb::Node::object( + serde_json::Value::new_object() + .set_object_type(Some(apb::ObjectType::Tombstone)) + )), } } } -impl FromQueryResult for EmbeddedActivity { - fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result { - let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name())?; - let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name()).ok(); - Ok(Self { activity, object }) - } -} - -#[derive(Debug)] -pub struct WrappedObject { - pub activity: Option, - pub object: crate::model::object::Model, -} - - -impl WrappedObject { - pub async fn ap_filled(self, db: &DatabaseConnection) -> crate::Result { - let attachments = self.object.find_related(crate::model::attachment::Entity) - .all(db) - .await? - .into_iter() - .map(|x| x.ap()) - .collect(); - let o = self.object.ap() - .set_attachment(apb::Node::Array(attachments)); - match self.activity { - None => Ok(o), - Some(a) => Ok(a.ap().set_object(apb::Node::object(o))), - } - } -} - -impl FromQueryResult for WrappedObject { +impl FromQueryResult for Event { fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result { let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name()).ok(); - let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name())?; - Ok(Self { activity, object }) + let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name()).ok(); + match (activity, object) { + (Some(activity), Some(object)) => Ok(Self::DeepActivity { activity, object }), + (Some(activity), None) => Ok(Self::Activity(activity)), + (None, Some(object)) => Ok(Self::StrayObject(object)), + (None, None) => Ok(Self::Tombstone), + } } } - - - impl Entity { - pub fn find_activities() -> Select { + pub fn find_addressed() -> Select { let mut select = Entity::find() .distinct() .select_only() - .join(sea_orm::JoinType::InnerJoin, Relation::Activity.def()) - .join(sea_orm::JoinType::LeftJoin, crate::model::activity::Relation::Object.def()) - .order_by(crate::model::activity::Column::Published, Order::Desc); - - for col in crate::model::activity::Column::iter() { - select = select.select_column_as(col, format!("{}{}", crate::model::activity::Entity.table_name(), col.to_string())); - } - - for col in crate::model::object::Column::iter() { - select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string())); - } - - select - } - - pub fn find_objects() -> Select { - let mut select = Entity::find() - .distinct() - .select_only() - .join(sea_orm::JoinType::InnerJoin, Relation::Object.def()) + .join(sea_orm::JoinType::LeftJoin, Relation::Object.def()) .join(sea_orm::JoinType::LeftJoin, Relation::Activity.def()) - .order_by(crate::model::object::Column::Published, Order::Desc); + .filter( + // TODO ghetto double inner join because i want to filter out tombstones + Condition::any() + .add(crate::model::activity::Column::Id.is_not_null()) + .add(crate::model::object::Column::Id.is_not_null()) + ) + .order_by(Column::Published, Order::Desc); for col in crate::model::object::Column::iter() { select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string())); diff --git a/src/model/attachment.rs b/src/model/attachment.rs index 4a89634a..f3cf7eb3 100644 --- a/src/model/attachment.rs +++ b/src/model/attachment.rs @@ -3,6 +3,8 @@ use sea_orm::{entity::prelude::*, Set}; use crate::routes::activitypub::jsonld::LD; +use super::addressing::Event; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "attachments")] pub struct Model { @@ -59,3 +61,52 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + + +#[axum::async_trait] +pub trait BatchFillable { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr>; +} + +#[axum::async_trait] +impl BatchFillable for &[Event] { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { + let objects : Vec = self + .iter() + .filter_map(|x| match x { + Event::Tombstone => None, + Event::Activity(_) => None, + Event::StrayObject(x) => Some(x.clone()), + Event::DeepActivity { activity: _, object } => Some(object.clone()), + }) + .collect(); + + let attachments = objects.load_many(Entity, db).await?; + + let mut out : std::collections::BTreeMap> = std::collections::BTreeMap::new(); + for attach in attachments.into_iter().flatten() { + if out.contains_key(&attach.object) { + out.get_mut(&attach.object).expect("contains but get failed?").push(attach); + } else { + out.insert(attach.object.clone(), vec![attach]); + } + } + + Ok(out) + } +} + +#[axum::async_trait] +impl BatchFillable for Vec { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { + self.as_slice().load_attachments_batch(db).await + } +} + +#[axum::async_trait] +impl BatchFillable for Event { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { + let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient + x.load_attachments_batch(db).await + } +} diff --git a/src/routes/activitypub/activity.rs b/src/routes/activitypub/activity.rs index c53053dd..dbd853f3 100644 --- a/src/routes/activitypub/activity.rs +++ b/src/routes/activitypub/activity.rs @@ -1,6 +1,6 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, QueryFilter}; -use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; +use crate::{errors::UpubError, model::{self, addressing::Event, attachment::BatchFillable}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -19,17 +19,17 @@ pub async fn view( ctx.fetch_activity(&aid).await?; } - match model::addressing::Entity::find_activities() + let row = model::addressing::Entity::find_addressed() .filter(model::activity::Column::Id.eq(&aid)) .filter(auth.filter_condition()) - .into_model::() + .into_model::() .one(ctx.db()) .await? - { - Some(activity) => Ok(JsonLD( - activity.ap_filled(ctx.db()).await?.ld_context() - )), - None => Err(UpubError::not_found()), - } + .ok_or_else(UpubError::not_found)?; + + let mut attachments = row.load_attachments_batch(ctx.db()).await?; + let attach = attachments.remove(row.id()); + + Ok(JsonLD(row.ap(attach).ld_context())) } diff --git a/src/routes/activitypub/context.rs b/src/routes/activitypub/context.rs index cb3c7d2e..382ba113 100644 --- a/src/routes/activitypub/context.rs +++ b/src/routes/activitypub/context.rs @@ -1,7 +1,7 @@ use axum::extract::{Path, Query, State}; -use sea_orm::{ColumnTrait, PaginatorTrait, QueryFilter, QuerySelect}; +use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter}; -use crate::{model::{self, addressing::WrappedObject}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; +use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; pub async fn get( State(ctx): State, @@ -14,13 +14,13 @@ pub async fn get( url!(ctx, "/context/{id}") }; - let count = model::addressing::Entity::find_objects() + let count = model::addressing::Entity::find_addressed() .filter(auth.filter_condition()) .filter(model::object::Column::Context.eq(context)) .count(ctx.db()) .await?; - Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/context/{id}"), Some(count)).ld_context())) + crate::server::builders::collection(&url!(ctx, "/context/{id}"), Some(count)) } pub async fn page( @@ -29,9 +29,6 @@ pub async fn page( Query(page): Query, AuthIdentity(auth): AuthIdentity, ) -> crate::Result> { - let limit = page.batch.unwrap_or(20).min(50); - let offset = page.offset.unwrap_or(0); - let context = if id.starts_with('+') { id.replacen('+', "https://", 1).replace('@', "/") } else if id.starts_with("tag:") { @@ -40,24 +37,13 @@ pub async fn page( url!(ctx, "/context/{id}") // TODO need a better way to figure out which ones are our contexts }; - let items = model::addressing::Entity::find_objects() - .filter(auth.filter_condition()) - .filter(model::object::Column::Context.eq(context)) - .limit(limit) - .offset(offset) - .into_model::() - .all(ctx.db()) - .await?; - - let mut out = Vec::new(); - for item in items { - out.push(item.ap_filled(ctx.db()).await?); - } - - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/context/{id}/page"), - offset, limit, out, - ).ld_context() - )) + crate::server::builders::paginate( + url!(ctx, "/context/{id}/page"), + Condition::all() + .add(auth.filter_condition()) + .add(model::object::Column::Context.eq(context)), + ctx.db(), + page + ) + .await } diff --git a/src/routes/activitypub/inbox.rs b/src/routes/activitypub/inbox.rs index 0239681b..5f0aca52 100644 --- a/src/routes/activitypub/inbox.rs +++ b/src/routes/activitypub/inbox.rs @@ -1,16 +1,15 @@ use apb::{server::Inbox, Activity, ActivityType}; use axum::{extract::{Query, State}, http::StatusCode, Json}; -use sea_orm::{QueryFilter, QuerySelect}; -use crate::{errors::UpubError, model::{self, addressing::WrappedObject}, server::{auth::{AuthIdentity, Identity}, Context}, url}; +use crate::{errors::UpubError, server::{auth::{AuthIdentity, Identity}, Context}, url}; -use super::{jsonld::LD, JsonLD, Pagination}; +use super::{JsonLD, Pagination}; pub async fn get( State(ctx): State, -) -> Result, StatusCode> { - Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/inbox"), None).ld_context())) +) -> crate::Result> { + crate::server::builders::collection(&url!(ctx, "/inbox"), None) } pub async fn page( @@ -18,25 +17,13 @@ pub async fn page( AuthIdentity(auth): AuthIdentity, Query(page): Query, ) -> crate::Result> { - let limit = page.batch.unwrap_or(20).min(50); - let offset = page.offset.unwrap_or(0); - let objects = model::addressing::Entity::find_objects() - .filter(auth.filter_condition()) - .limit(limit) - .offset(offset) - .into_model::() - .all(ctx.db()) - .await?; - let mut out = Vec::new(); - for object in objects { - out.push(object.ap_filled(ctx.db()).await?); - } - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/inbox/page"), - offset, limit, out, - ).ld_context() - )) + crate::server::builders::paginate( + url!(ctx, "/inbox/page"), + auth.filter_condition(), + ctx.db(), + page + ) + .await } macro_rules! pretty_json { diff --git a/src/routes/activitypub/object/mod.rs b/src/routes/activitypub/object/mod.rs index bca2bdad..db572548 100644 --- a/src/routes/activitypub/object/mod.rs +++ b/src/routes/activitypub/object/mod.rs @@ -4,7 +4,7 @@ use apb::{BaseMut, CollectionMut, ObjectMut}; use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, ModelTrait, QueryFilter}; -use crate::{errors::UpubError, model::{self, addressing::WrappedObject}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; +use crate::{errors::UpubError, model::{self, addressing::Event}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -23,17 +23,22 @@ pub async fn view( ctx.fetch_object(&oid).await?; } - let Some(object) = model::addressing::Entity::find_objects() + let item = model::addressing::Entity::find_addressed() .filter(model::object::Column::Id.eq(&oid)) .filter(auth.filter_condition()) - .into_model::() + .into_model::() .one(ctx.db()) .await? - else { - return Err(UpubError::not_found()); + .ok_or_else(UpubError::not_found)?; + + let object = match item { + Event::Tombstone => return Err(UpubError::not_found()), + Event::Activity(_) => return Err(UpubError::not_found()), + Event::StrayObject(x) => x, + Event::DeepActivity { activity: _, object } => object, }; - let attachments = object.object.find_related(model::attachment::Entity) + let attachments = object.find_related(model::attachment::Entity) .all(ctx.db()) .await? .into_iter() @@ -45,10 +50,10 @@ pub async fn view( .set_id(Some(&crate::url!(ctx, "/objects/{id}/replies"))) .set_collection_type(Some(apb::CollectionType::OrderedCollection)) .set_first(apb::Node::link(crate::url!(ctx, "/objects/{id}/replies/page"))) - .set_total_items(Some(object.object.comments as u64)); + .set_total_items(Some(object.comments as u64)); Ok(JsonLD( - object.object.ap() + object.ap() .set_replies(apb::Node::object(replies)) .set_attachment(apb::Node::array(attachments)) .ld_context() diff --git a/src/routes/activitypub/object/replies.rs b/src/routes/activitypub/object/replies.rs index c70c0acf..95304943 100644 --- a/src/routes/activitypub/object/replies.rs +++ b/src/routes/activitypub/object/replies.rs @@ -1,7 +1,7 @@ use axum::extract::{Path, Query, State}; -use sea_orm::{ColumnTrait, PaginatorTrait, QueryFilter, QuerySelect}; +use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter}; -use crate::{model::{self, addressing::WrappedObject}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; +use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; pub async fn get( State(ctx): State, @@ -14,13 +14,13 @@ pub async fn get( ctx.oid(id.clone()) }; - let count = model::addressing::Entity::find_objects() + let count = model::addressing::Entity::find_addressed() .filter(auth.filter_condition()) .filter(model::object::Column::InReplyTo.eq(oid)) .count(ctx.db()) .await?; - Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/objects/{id}/replies"), Some(count)).ld_context())) + crate::server::builders::collection(&url!(ctx, "/objects/{id}/replies"), Some(count)) } pub async fn page( @@ -29,33 +29,19 @@ pub async fn page( Query(page): Query, AuthIdentity(auth): AuthIdentity, ) -> crate::Result> { - let limit = page.batch.unwrap_or(20).min(50); - let offset = page.offset.unwrap_or(0); - let oid = if id.starts_with('+') { format!("https://{}", id.replacen('+', "", 1).replace('@', "/")) } else { ctx.oid(id.clone()) }; - let items = model::addressing::Entity::find_objects() - .filter(auth.filter_condition()) - .filter(model::object::Column::InReplyTo.eq(oid)) - // TODO also limit to only local activities - .limit(limit) - .offset(offset) - .into_model::() - .all(ctx.db()) - .await?; - - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/objects/{id}/replies/page"), - offset, limit, - items - .into_iter() - .map(|x| x.object.ap()) - .collect() - ).ld_context() - )) + crate::server::builders::paginate( + url!(ctx, "/objects/{id}/replies/page"), + Condition::all() + .add(auth.filter_condition()) + .add(model::object::Column::InReplyTo.eq(oid)), + ctx.db(), + page + ) + .await } diff --git a/src/routes/activitypub/outbox.rs b/src/routes/activitypub/outbox.rs index 75bda87c..8bd3225c 100644 --- a/src/routes/activitypub/outbox.rs +++ b/src/routes/activitypub/outbox.rs @@ -1,10 +1,9 @@ use axum::{extract::{Query, State}, http::StatusCode, Json}; -use sea_orm::{QueryFilter, QuerySelect}; -use crate::{errors::UpubError, model::{self, addressing::WrappedObject}, routes::activitypub::{jsonld::LD, CreationResult, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; +use crate::{errors::UpubError, routes::activitypub::{CreationResult, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; -pub async fn get(State(ctx): State) -> Result, StatusCode> { - Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/outbox"), None).ld_context())) +pub async fn get(State(ctx): State) -> crate::Result> { + crate::server::builders::collection(&url!(ctx, "/outbox"), None) } pub async fn page( @@ -12,29 +11,13 @@ pub async fn page( Query(page): Query, AuthIdentity(auth): AuthIdentity, ) -> crate::Result> { - let limit = page.batch.unwrap_or(20).min(50); - let offset = page.offset.unwrap_or(0); - - let items = model::addressing::Entity::find_objects() - .filter(auth.filter_condition()) - // TODO also limit to only local activities - .limit(limit) - .offset(offset) - .into_model::() - .all(ctx.db()).await?; - - let mut out = Vec::new(); - for item in items { - out.push(item.ap_filled(ctx.db()).await?); - } - - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/outbox/page"), - offset, limit, - out, - ).ld_context() - )) + crate::server::builders::paginate( + url!(ctx, "/outbox/page"), + auth.filter_condition(), // TODO filter local only stuff + ctx.db(), + page, + ) + .await } pub async fn post( diff --git a/src/routes/activitypub/user/following.rs b/src/routes/activitypub/user/following.rs index 27ca37e3..1519e555 100644 --- a/src/routes/activitypub/user/following.rs +++ b/src/routes/activitypub/user/following.rs @@ -1,14 +1,14 @@ -use axum::{extract::{Path, Query, State}, http::StatusCode}; +use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{routes::activitypub::{jsonld::LD, JsonLD, Pagination}, model, server::Context, url}; +use crate::{routes::activitypub::{JsonLD, Pagination}, model, server::Context, url}; use model::relation::Column::{Following, Follower}; pub async fn get( State(ctx): State, Path(id): Path, -) -> Result, StatusCode> { +) -> crate::Result> { let follow___ = if OUTGOING { "following" } else { "followers" }; let count = model::relation::Entity::find() .filter(if OUTGOING { Follower } else { Following }.eq(ctx.uid(id.clone()))) @@ -16,44 +16,32 @@ pub async fn get( tracing::error!("failed counting {follow___} for {id}: {e}"); 0 }); - Ok(JsonLD( - ctx.ap_collection( - &url!(ctx, "/users/{id}/{follow___}"), - Some(count) - ).ld_context() - )) + + crate::server::builders::collection(&url!(ctx, "/users/{id}/{follow___}"), Some(count)) } pub async fn page( State(ctx): State, Path(id): Path, Query(page): Query, -) -> Result, StatusCode> { +) -> crate::Result> { let follow___ = if OUTGOING { "following" } else { "followers" }; let limit = page.batch.unwrap_or(20).min(50); let offset = page.offset.unwrap_or(0); - match model::relation::Entity::find() + + let following = model::relation::Entity::find() .filter(if OUTGOING { Follower } else { Following }.eq(ctx.uid(id.clone()))) .select_only() .select_column(if OUTGOING { Following } else { Follower }) .limit(limit) .offset(page.offset.unwrap_or(0)) .into_tuple::() - .all(ctx.db()).await - { - Err(e) => { - tracing::error!("error queriying {follow___} for {id}: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) - }, - Ok(following) => { - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/users/{id}/{follow___}/page"), - offset, - limit, - following.into_iter().map(serde_json::Value::String).collect() - ).ld_context() - )) - }, - } + .all(ctx.db()) + .await?; + + crate::server::builders::collection_page( + &url!(ctx, "/users/{id}/{follow___}/page"), + offset, limit, + following.into_iter().map(serde_json::Value::String).collect() + ) } diff --git a/src/routes/activitypub/user/inbox.rs b/src/routes/activitypub/user/inbox.rs index 9be33f1f..84e4e8a2 100644 --- a/src/routes/activitypub/user/inbox.rs +++ b/src/routes/activitypub/user/inbox.rs @@ -1,20 +1,20 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; -use sea_orm::{ColumnTrait, QueryFilter, QuerySelect}; -use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url}; +use sea_orm::{ColumnTrait, Condition}; +use crate::{errors::UpubError, model, routes::activitypub::{JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url}; pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, -) -> Result, StatusCode> { +) -> crate::Result> { match auth { - Identity::Anonymous => Err(StatusCode::FORBIDDEN), - Identity::Remote(_) => Err(StatusCode::FORBIDDEN), + Identity::Anonymous => Err(StatusCode::FORBIDDEN.into()), + Identity::Remote(_) => Err(StatusCode::FORBIDDEN.into()), Identity::Local(user) => if ctx.uid(id.clone()) == user { - Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/users/{id}/inbox"), None).ld_context())) + crate::server::builders::collection(&url!(ctx, "/users/{id}/inbox"), None) } else { - Err(StatusCode::FORBIDDEN) + Err(StatusCode::FORBIDDEN.into()) }, } } @@ -25,32 +25,21 @@ pub async fn page( AuthIdentity(auth): AuthIdentity, Query(page): Query, ) -> crate::Result> { - let Identity::Local(uid) = auth else { + let Identity::Local(uid) = &auth else { // local inbox is only for local users return Err(UpubError::forbidden()); }; - if uid != ctx.uid(id.clone()) { + if uid != &ctx.uid(id.clone()) { return Err(UpubError::forbidden()); } - let limit = page.batch.unwrap_or(20).min(50); - let offset = page.offset.unwrap_or(0); - let activities = model::addressing::Entity::find_activities() - .filter(model::addressing::Column::Actor.eq(&uid)) - .offset(offset) - .limit(limit) - .into_model::() - .all(ctx.db()) - .await?; - let mut out = Vec::new(); - for activity in activities { - out.push(activity.ap_filled(ctx.db()).await?); - } - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/users/{id}/inbox/page"), - offset, limit, out, - ).ld_context() - )) + + crate::server::builders::paginate( + url!(ctx, "/users/{id}/inbox/page"), + Condition::all().add(model::addressing::Column::Actor.eq(uid)), + ctx.db(), + page, + ) + .await } pub async fn post( diff --git a/src/routes/activitypub/user/outbox.rs b/src/routes/activitypub/user/outbox.rs index 8d1d4db4..c993df1c 100644 --- a/src/routes/activitypub/user/outbox.rs +++ b/src/routes/activitypub/user/outbox.rs @@ -1,16 +1,14 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; -use sea_orm::{ColumnTrait, QueryFilter, QuerySelect}; +use sea_orm::{ColumnTrait, Condition}; use apb::{server::Outbox, AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType}; -use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, routes::activitypub::{jsonld::LD, CreationResult, JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url}; +use crate::{errors::UpubError, model, routes::activitypub::{CreationResult, JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url}; pub async fn get( State(ctx): State, Path(id): Path, -) -> Result, StatusCode> { - Ok(JsonLD( - ctx.ap_collection(&url!(ctx, "/users/{id}/outbox"), None).ld_context() - )) +) -> crate::Result> { + crate::server::builders::collection(&url!(ctx, "/users/{id}/outbox"), None) } pub async fn page( @@ -24,28 +22,19 @@ pub async fn page( } else { ctx.uid(id.clone()) }; - let limit = page.batch.unwrap_or(20).min(50); - let offset = page.offset.unwrap_or(0); - - let activities = model::addressing::Entity::find_activities() - .filter(model::activity::Column::Actor.eq(&uid)) - .filter(auth.filter_condition()) - .limit(limit) - .offset(offset) - .into_model::() - .all(ctx.db()).await?; - - let mut out = Vec::new(); - for activity in activities { - out.push(activity.ap_filled(ctx.db()).await?); - } - - Ok(JsonLD( - ctx.ap_collection_page( - &url!(ctx, "/users/{id}/outbox/page"), - offset, limit, out, - ).ld_context() - )) + crate::server::builders::paginate( + url!(ctx, "/users/{id}/outbox/page"), + Condition::all() + .add(auth.filter_condition()) + .add( + Condition::any() + .add(model::activity::Column::Actor.eq(&uid)) + .add(model::object::Column::AttributedTo.eq(&uid)) + ), + ctx.db(), + page, + ) + .await } pub async fn post( diff --git a/src/routes/mastodon/accounts.rs b/src/routes/mastodon/accounts.rs index 0e47ae95..913db72b 100644 --- a/src/routes/mastodon/accounts.rs +++ b/src/routes/mastodon/accounts.rs @@ -70,7 +70,7 @@ pub async fn statuses( Query(_query): Query, ) -> Result>, StatusCode> { let uid = ctx.uid(id); - model::addressing::Entity::find_activities() + model::addressing::Entity::find_addressed() .filter(model::activity::Column::Actor.eq(uid)) .filter(auth.filter_condition()); diff --git a/src/server/builders.rs b/src/server/builders.rs new file mode 100644 index 00000000..7eac7c96 --- /dev/null +++ b/src/server/builders.rs @@ -0,0 +1,64 @@ +use apb::{BaseMut, CollectionMut, CollectionPageMut}; +use sea_orm::{Condition, DatabaseConnection, QueryFilter, QuerySelect}; + +use crate::{model::{addressing::Event, attachment::BatchFillable}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}}; + +pub async fn paginate( + id: String, + filter: Condition, + db: &DatabaseConnection, + page: Pagination, +) -> crate::Result> { + let limit = page.batch.unwrap_or(20).min(50); + let offset = page.offset.unwrap_or(0); + + let items = crate::model::addressing::Entity::find_addressed() + .filter(filter) + // TODO also limit to only local activities + .limit(limit) + .offset(offset) + .into_model::() + .all(db) + .await?; + + let mut attachments = items.load_attachments_batch(db).await?; + + let items : Vec = items + .into_iter() + .map(|item| { + let attach = attachments.remove(item.id()); + item.ap(attach) + }) + .collect(); + + collection_page(&id, offset, limit, items) +} + +pub fn collection_page(id: &str, offset: u64, limit: u64, items: Vec) -> crate::Result> { + let next = if items.len() < limit as usize { + apb::Node::Empty + } else { + apb::Node::link(format!("{id}?offset={}", offset+limit)) + }; + Ok(JsonLD( + serde_json::Value::new_object() + .set_id(Some(&format!("{id}?offset={offset}"))) + .set_collection_type(Some(apb::CollectionType::OrderedCollectionPage)) + .set_part_of(apb::Node::link(id.replace("/page", ""))) + .set_ordered_items(apb::Node::array(items)) + .set_next(next) + .ld_context() + )) +} + + +pub fn collection(id: &str, total_items: Option) -> crate::Result> { + Ok(JsonLD( + serde_json::Value::new_object() + .set_id(Some(id)) + .set_collection_type(Some(apb::CollectionType::OrderedCollection)) + .set_first(apb::Node::link(format!("{id}/page"))) + .set_total_items(total_items) + .ld_context() + )) +} diff --git a/src/server/context.rs b/src/server/context.rs index 2010d394..148f1575 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use apb::{BaseMut, CollectionMut, CollectionPageMut}; use openssl::rsa::Rsa; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; -use crate::{model, routes::activitypub::jsonld::LD, server::fetcher::Fetcher}; +use crate::{model, server::fetcher::Fetcher}; use super::dispatcher::Dispatcher; @@ -221,30 +220,6 @@ impl Context { Ok(()) } - // TODO should probs not be here - pub fn ap_collection(&self, id: &str, total_items: Option) -> serde_json::Value { - serde_json::Value::new_object() - .set_id(Some(id)) - .set_collection_type(Some(apb::CollectionType::OrderedCollection)) - .set_first(apb::Node::link(format!("{id}/page"))) - .set_total_items(total_items) - } - - // TODO should probs not be here - pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec) -> serde_json::Value { - serde_json::Value::new_object() - .set_id(Some(&format!("{id}?offset={offset}"))) - .set_collection_type(Some(apb::CollectionType::OrderedCollectionPage)) - .set_part_of(apb::Node::link(id.replace("/page", ""))) - .set_ordered_items(apb::Node::array(items)) - .set_next( - if items.len() < limit as usize { - apb::Node::Empty - } else { - apb::Node::link(format!("{id}?offset={}", offset+limit)) - } - ) - } pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { let addressed = self.expand_addressing(activity_targets).await?; diff --git a/src/server/mod.rs b/src/server/mod.rs index 1d5985d7..f6adaf64 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,5 +4,6 @@ pub mod fetcher; pub mod inbox; pub mod outbox; pub mod auth; +pub mod builders; pub use context::Context;