From e68332bc31db07a8689a7cadf92b9fe684bc3485 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 28 Mar 2024 04:52:17 +0100 Subject: [PATCH] chore: refactor collections with utils, moved stuff --- src/activitypub/inbox.rs | 78 +++++++++++++++---------------- src/activitypub/mod.rs | 14 +++++- src/activitypub/user/following.rs | 22 ++++----- src/activitypub/user/inbox.rs | 39 ++++++---------- src/activitypub/user/outbox.rs | 50 +++++++------------- src/errors.rs | 8 ++++ src/main.rs | 2 + src/server.rs | 20 +++++++- 8 files changed, 118 insertions(+), 115 deletions(-) diff --git a/src/activitypub/inbox.rs b/src/activitypub/inbox.rs index 5bd029a..fe2f037 100644 --- a/src/activitypub/inbox.rs +++ b/src/activitypub/inbox.rs @@ -1,48 +1,46 @@ use axum::{extract::{Query, State}, http::StatusCode}; -use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; +use sea_orm::{ColumnTrait, Condition, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect}; -use crate::{activitystream::{object::collection::{page::CollectionPageMut, CollectionMut, CollectionType}, BaseMut, Node}, model, server::Context, url}; +use crate::{activitystream::Node, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; use super::{activity::ap_activity, jsonld::LD, JsonLD, Pagination, PUBLIC_TARGET}; -pub async fn get(State(ctx) : State, Query(page): Query) -> Result, StatusCode> { - let limit = page.batch.unwrap_or(20).min(100); - let offset = page.offset.unwrap_or(0); - if let Some(true) = page.page { - match model::addressing::Entity::find() - .filter(Condition::all().add(model::addressing::Column::Actor.eq(PUBLIC_TARGET))) - .order_by(model::addressing::Column::Published, sea_orm::Order::Desc) - .find_also_related(model::activity::Entity) // TODO join also with objects - .limit(limit) - .offset(offset) - .all(ctx.db()) - .await - { - Ok(x) => Ok(JsonLD(serde_json::Value::new_object() - .set_id(Some(&url!(ctx, "/inbox"))) - .set_collection_type(Some(CollectionType::OrderedCollection)) - .set_part_of(Node::link(url!(ctx, "/inbox"))) - .set_next(Node::link(url!(ctx, "/inbox?page=true&offset={}", offset+limit))) - .set_ordered_items(Node::array( - x.into_iter() - .filter_map(|(_, a)| Some(ap_activity(a?))) - .collect() - )) - .ld_context() - )), - Err(e) => { - tracing::error!("failed paginating global outbox: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) - }, - } - } else { - Ok(JsonLD(serde_json::Value::new_object() - .set_id(Some(&url!(ctx, "/inbox"))) - .set_collection_type(Some(CollectionType::OrderedCollection)) - .set_first(Node::link(url!(ctx, "/inbox?page=true"))) - .ld_context() - )) - } +pub async fn get( + State(ctx): State, +) -> Result, StatusCode> { + Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/inbox"), None).ld_context())) } +pub async fn page( + State(ctx): State, + AuthIdentity(auth): AuthIdentity, + Query(page): Query, +) -> Result, UpubError> { + let limit = page.batch.unwrap_or(20).min(50); + let offset = page.offset.unwrap_or(0); + let mut condition = Condition::any() + .add(model::addressing::Column::Actor.eq(PUBLIC_TARGET)); + if let Identity::Local(user) = auth { + condition = condition + .add(model::addressing::Column::Actor.eq(user)); + } + let activities = model::addressing::Entity::find() + .filter(condition) + .order_by(model::addressing::Column::Published, Order::Asc) + .find_also_related(model::activity::Entity) + .limit(limit) + .offset(offset) + .all(ctx.db()) + .await?; + Ok(JsonLD( + ctx.ap_collection_page( + &url!(ctx, "/inbox/page"), + offset, limit, + activities + .into_iter() + .filter_map(|(_, a)| Some(Node::object(ap_activity(a?)))) + .collect::>>() + ).ld_context() + )) +} diff --git a/src/activitypub/mod.rs b/src/activitypub/mod.rs index aa520ef..2e85fb1 100644 --- a/src/activitypub/mod.rs +++ b/src/activitypub/mod.rs @@ -8,7 +8,7 @@ pub mod well_known; pub mod jsonld; pub use jsonld::JsonLD; -use axum::{extract::State, http::StatusCode, Json}; +use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; use rand::Rng; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; @@ -39,11 +39,21 @@ pub fn domain(domain: &str) -> String { #[derive(Debug, serde::Deserialize)] // TODO i don't really like how pleroma/mastodon do it actually, maybe change this? pub struct Pagination { - pub page: Option, pub offset: Option, pub batch: Option, } +pub struct CreationResult(pub String); +impl IntoResponse for CreationResult { + fn into_response(self) -> axum::response::Response { + ( + StatusCode::CREATED, + [("Location", self.0.as_str())] + ) + .into_response() + } +} + pub async fn view(State(ctx): State) -> Result, StatusCode> { Ok(Json( serde_json::Value::new_object() diff --git a/src/activitypub/user/following.rs b/src/activitypub/user/following.rs index d7a224d..1bba997 100644 --- a/src/activitypub/user/following.rs +++ b/src/activitypub/user/following.rs @@ -17,12 +17,10 @@ pub async fn get( 0 }); Ok(JsonLD( - serde_json::Value::new_object() - .set_id(Some(&format!("{}/users/{id}/{follow___}", ctx.base()))) - .set_collection_type(Some(CollectionType::OrderedCollection)) - .set_total_items(Some(count)) - .set_first(Node::link(format!("{}/users/{id}/{follow___}/page", ctx.base()))) - .ld_context() + ctx.ap_collection( + &url!(ctx, "/users/{id}/{follow___}"), + Some(count) + ).ld_context() )) } @@ -47,12 +45,12 @@ pub async fn page( }, Ok(following) => { Ok(JsonLD( - serde_json::Value::new_object() - .set_collection_type(Some(CollectionType::OrderedCollectionPage)) - .set_part_of(Node::link(url!(ctx, "/users/{id}/{follow___}"))) - .set_next(Node::link(url!(ctx, "/users/{id}/{follow___}/page?offset={}", offset+limit))) - .set_ordered_items(Node::array(following.into_iter().map(|x| x.following).collect())) - .ld_context() + ctx.ap_collection_page( + &url!(ctx, "/users/{id}/{follow___}"), + offset, + limit, + following.into_iter().map(|x| Node::link(x.following)).collect() + ).ld_context() )) }, } diff --git a/src/activitypub/user/inbox.rs b/src/activitypub/user/inbox.rs index d465540..cf2d715 100644 --- a/src/activitypub/user/inbox.rs +++ b/src/activitypub/user/inbox.rs @@ -12,12 +12,7 @@ pub async fn get( Identity::Anonymous => Err(StatusCode::FORBIDDEN), Identity::Remote(_) => Err(StatusCode::FORBIDDEN), Identity::Local(user) => if ctx.uid(id.clone()) == user { - Ok(JsonLD(serde_json::Value::new_object() - .set_id(Some(&url!(ctx, "/users/{id}/inbox"))) - .set_collection_type(Some(CollectionType::OrderedCollection)) - .set_first(Node::link(url!(ctx, "/users/{id}/inbox/page"))) - .ld_context() - )) + Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/users/{id}/inbox"), None).ld_context())) } else { Err(StatusCode::FORBIDDEN) }, @@ -29,19 +24,16 @@ pub async fn page( Path(id): Path, AuthIdentity(auth): AuthIdentity, Query(page): Query, -) -> Result, StatusCode> { +) -> crate::Result> { let uid = ctx.uid(id.clone()); match auth { - Identity::Anonymous => Err(StatusCode::FORBIDDEN), - Identity::Remote(_) => Err(StatusCode::FORBIDDEN), + Identity::Anonymous => Err(StatusCode::FORBIDDEN.into()), + Identity::Remote(_) => Err(StatusCode::FORBIDDEN.into()), Identity::Local(user) => if uid == user { let limit = page.batch.unwrap_or(20).min(50); let offset = page.offset.unwrap_or(0); match model::addressing::Entity::find() - .filter(Condition::any() - .add(model::addressing::Column::Actor.eq(PUBLIC_TARGET)) - .add(model::addressing::Column::Actor.eq(uid)) - ) + .filter(Condition::all().add(model::addressing::Column::Actor.eq(uid))) .order_by(model::addressing::Column::Published, Order::Asc) .find_also_related(model::activity::Entity) .limit(limit) @@ -50,27 +42,24 @@ pub async fn page( .await { Ok(activities) => { - Ok(JsonLD(serde_json::Value::new_object() - .set_id(Some(&url!(ctx, "/users/{id}/inbox/page?offset={offset}"))) - .set_collection_type(Some(CollectionType::OrderedCollectionPage)) - .set_part_of(Node::link(url!(ctx, "/users/{id}/inbox"))) - .set_next(Node::link(url!(ctx, "/users/{id}/inbox/page?offset={}", offset+limit))) - .set_ordered_items(Node::array( + Ok(JsonLD( + ctx.ap_collection_page( + &url!(ctx, "/users/{id}/inbox/page"), + offset, limit, activities .into_iter() - .filter_map(|(_, a)| Some(ap_activity(a?))) - .collect::>() - )) - .ld_context() + .filter_map(|(_, a)| Some(Node::object(ap_activity(a?)))) + .collect::>>() + ).ld_context() )) }, Err(e) => { tracing::error!("failed paginating user inbox for {id}: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR) + Err(StatusCode::INTERNAL_SERVER_ERROR.into()) }, } } else { - Err(StatusCode::FORBIDDEN) + Err(StatusCode::FORBIDDEN.into()) }, } } diff --git a/src/activitypub/user/outbox.rs b/src/activitypub/user/outbox.rs index fc6334f..bacf9ab 100644 --- a/src/activitypub/user/outbox.rs +++ b/src/activitypub/user/outbox.rs @@ -1,32 +1,17 @@ -use axum::{extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json}; +use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use sea_orm::{EntityTrait, IntoActiveModel, Order, QueryOrder, QuerySelect, Set}; -use crate::{activitypub::{jsonld::LD, JsonLD, Pagination}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; +use crate::{activitypub::{jsonld::LD, CreationResult, JsonLD, Pagination}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; pub async fn get( State(ctx): State, Path(id): Path, ) -> Result, StatusCode> { Ok(JsonLD( - serde_json::Value::new_object() - .set_id(Some(&url!(ctx, "/users/{id}/outbox"))) - .set_collection_type(Some(CollectionType::OrderedCollection)) - .set_first(Node::link(url!(ctx, "/users/{id}/outbox/page"))) - .ld_context() + ctx.ap_collection(&url!(ctx, "/users/{id}/outbox"), None).ld_context() )) } -pub struct CreationResult(pub String); -impl IntoResponse for CreationResult { - fn into_response(self) -> axum::response::Response { - ( - StatusCode::CREATED, - [("Location", self.0.as_str())] - ) - .into_response() - } -} - pub async fn page( State(ctx): State, Path(id): Path, @@ -57,26 +42,23 @@ pub async fn page( Err(_e) => Err(StatusCode::INTERNAL_SERVER_ERROR), Ok(items) => { Ok(JsonLD( - serde_json::Value::new_object() - // TODO set id, calculate uri from given args - .set_id(Some(&url!(ctx, "/users/{id}/outbox/page?offset={offset}"))) - .set_collection_type(Some(CollectionType::OrderedCollectionPage)) - .set_part_of(Node::link(url!(ctx, "/users/{id}/outbox"))) - .set_next(Node::link(url!(ctx, "/users/{id}/outbox/page?offset={}", limit+offset))) - .set_ordered_items(Node::array( - items - .into_iter() - .map(|(a, o)| { - let oid = a.object.clone(); + ctx.ap_collection_page( + &url!(ctx, "/users/{id}/outbox/page"), + offset, limit, + items + .into_iter() + .map(|(a, o)| { + let oid = a.object.clone(); + Node::object( super::super::activity::ap_activity(a) .set_object(match o { Some(o) => Node::object(super::super::object::ap_object(o)), - None => Node::maybe_link(oid), + None => Node::maybe_link(oid), }) - }) - .collect() - )) - .ld_context() + ) + }) + .collect() + ).ld_context() )) }, } diff --git a/src/errors.rs b/src/errors.rs index ebc1911..1ffd980 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -16,6 +16,14 @@ pub enum UpubError { Reqwest(#[from] reqwest::Error), } +impl UpubError { + pub fn code(code: axum::http::StatusCode) -> Self { + UpubError::Status(code) + } +} + +pub type UpubResult = Result; + impl From for UpubError { fn from(value: axum::http::StatusCode) -> Self { UpubError::Status(value) diff --git a/src/main.rs b/src/main.rs index 6603c9a..eb4db15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,8 @@ use clap::{Parser, Subcommand}; use sea_orm::{ConnectOptions, Database, EntityTrait, IntoActiveModel}; use sea_orm_migration::MigratorTrait; +pub use errors::UpubResult as Result; + use crate::activitystream::{BaseType, ObjectType}; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/src/server.rs b/src/server.rs index 560b47d..71fe6f9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,7 @@ use std::{str::Utf8Error, sync::Arc}; use openssl::rsa::Rsa; use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; -use crate::{activitypub::PUBLIC_TARGET, dispatcher::Dispatcher, fetcher::Fetcher, model}; +use crate::{activitypub::{jsonld::LD, PUBLIC_TARGET}, activitystream::{object::{activity::ActivityMut, collection::{page::CollectionPageMut, CollectionMut, CollectionType}}, Base, BaseMut, Node, Object}, dispatcher::Dispatcher, fetcher::Fetcher, model}; #[derive(Clone)] pub struct Context(Arc); @@ -206,5 +206,21 @@ impl Context { Ok(()) } -} + pub fn ap_collection(&self, id: &str, total_items: Option) -> serde_json::Value { + serde_json::Value::new_object() + .set_id(Some(id)) + .set_collection_type(Some(CollectionType::OrderedCollection)) + .set_first(Node::link(format!("{id}/page"))) + .set_total_items(total_items) + } + + pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec>) -> serde_json::Value { + serde_json::Value::new_object() + .set_id(Some(&format!("{id}?offset={offset}"))) + .set_collection_type(Some(CollectionType::OrderedCollectionPage)) + .set_part_of(Node::link(id.replace("/page", ""))) + .set_next(Node::link(format!("{id}?offset={}", offset+limit))) + .set_ordered_items(Node::Array(items)) + } +}