1
0
Fork 0
forked from alemi/upub

fix: big refactor of timelines querying

should not be more reliable, consistent and all around less buggy, but
it may actually break some things so let's find out
This commit is contained in:
əlemi 2024-04-24 02:31:35 +02:00
parent 165bf19f8c
commit 768081c251
Signed by: alemi
GPG key ID: A4895B84D311642C
15 changed files with 298 additions and 316 deletions

View file

@ -1,5 +1,7 @@
use apb::{ActivityMut, ObjectMut}; use apb::{ActivityMut, ObjectMut};
use sea_orm::{entity::prelude::*, FromQueryResult, Iterable, Order, QueryOrder, QuerySelect, SelectColumns}; use sea_orm::{entity::prelude::*, Condition, FromQueryResult, Iterable, Order, QueryOrder, QuerySelect, SelectColumns};
use crate::routes::activitypub::jsonld::LD;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "addressing")] #[sea_orm(table_name = "addressing")]
@ -59,105 +61,81 @@ impl ActiveModelBehavior for ActiveModel {}
#[allow(clippy::large_enum_variant)] // tombstone is an outlier, not the norm! this is a beefy enum
#[derive(Debug, Clone)]
#[derive(Debug)] pub enum Event {
pub struct EmbeddedActivity { Tombstone,
pub activity: crate::model::activity::Model, StrayObject(crate::model::object::Model),
pub object: Option<crate::model::object::Model>, Activity(crate::model::activity::Model),
DeepActivity {
activity: crate::model::activity::Model,
object: crate::model::object::Model,
}
} }
impl EmbeddedActivity {
pub async fn ap_filled(self, db: &DatabaseConnection) -> crate::Result<serde_json::Value> { impl Event {
let a = self.activity.ap(); pub fn id(&self) -> &str {
match self.object { match self {
None => Ok(a), Event::Tombstone => "",
Some(o) => { Event::StrayObject(x) => x.id.as_str(),
let attachments = o.find_related(crate::model::attachment::Entity) Event::Activity(x) => x.id.as_str(),
.all(db) Event::DeepActivity { activity: _, object } => object.id.as_str(),
.await?
.into_iter()
.map(|x| x.ap())
.collect();
Ok(a.set_object(
apb::Node::object(o.ap().set_attachment(apb::Node::array(attachments)))
))
} }
} }
pub fn ap(self, attachment: Option<Vec<crate::model::attachment::Model>>) -> serde_json::Value {
let attachment = match attachment {
None => apb::Node::Empty,
Some(vec) => apb::Node::array(
vec.into_iter().map(|x| x.ap()).collect()
),
};
match self {
Event::Activity(x) => x.ap(),
Event::DeepActivity { activity, object } =>
activity.ap().set_object(apb::Node::object(object.ap().set_attachment(attachment))),
Event::StrayObject(x) => serde_json::Value::new_object()
.set_activity_type(Some(apb::ActivityType::Activity))
.set_object(apb::Node::object(x.ap().set_attachment(attachment))),
Event::Tombstone => serde_json::Value::new_object()
.set_activity_type(Some(apb::ActivityType::Activity))
.set_object(apb::Node::object(
serde_json::Value::new_object()
.set_object_type(Some(apb::ObjectType::Tombstone))
)),
}
} }
} }
impl FromQueryResult for EmbeddedActivity { impl FromQueryResult for Event {
fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result<Self, sea_orm::DbErr> {
let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name())?;
let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name()).ok();
Ok(Self { activity, object })
}
}
#[derive(Debug)]
pub struct WrappedObject {
pub activity: Option<crate::model::activity::Model>,
pub object: crate::model::object::Model,
}
impl WrappedObject {
pub async fn ap_filled(self, db: &DatabaseConnection) -> crate::Result<serde_json::Value> {
let attachments = self.object.find_related(crate::model::attachment::Entity)
.all(db)
.await?
.into_iter()
.map(|x| x.ap())
.collect();
let o = self.object.ap()
.set_attachment(apb::Node::Array(attachments));
match self.activity {
None => Ok(o),
Some(a) => Ok(a.ap().set_object(apb::Node::object(o))),
}
}
}
impl FromQueryResult for WrappedObject {
fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result<Self, sea_orm::DbErr> { fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result<Self, sea_orm::DbErr> {
let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name()).ok(); let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name()).ok();
let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name())?; let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name()).ok();
Ok(Self { activity, object }) match (activity, object) {
(Some(activity), Some(object)) => Ok(Self::DeepActivity { activity, object }),
(Some(activity), None) => Ok(Self::Activity(activity)),
(None, Some(object)) => Ok(Self::StrayObject(object)),
(None, None) => Ok(Self::Tombstone),
}
} }
} }
impl Entity { impl Entity {
pub fn find_activities() -> Select<Entity> { pub fn find_addressed() -> Select<Entity> {
let mut select = Entity::find() let mut select = Entity::find()
.distinct() .distinct()
.select_only() .select_only()
.join(sea_orm::JoinType::InnerJoin, Relation::Activity.def()) .join(sea_orm::JoinType::LeftJoin, Relation::Object.def())
.join(sea_orm::JoinType::LeftJoin, crate::model::activity::Relation::Object.def())
.order_by(crate::model::activity::Column::Published, Order::Desc);
for col in crate::model::activity::Column::iter() {
select = select.select_column_as(col, format!("{}{}", crate::model::activity::Entity.table_name(), col.to_string()));
}
for col in crate::model::object::Column::iter() {
select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string()));
}
select
}
pub fn find_objects() -> Select<Entity> {
let mut select = Entity::find()
.distinct()
.select_only()
.join(sea_orm::JoinType::InnerJoin, Relation::Object.def())
.join(sea_orm::JoinType::LeftJoin, Relation::Activity.def()) .join(sea_orm::JoinType::LeftJoin, Relation::Activity.def())
.order_by(crate::model::object::Column::Published, Order::Desc); .filter(
// TODO ghetto double inner join because i want to filter out tombstones
Condition::any()
.add(crate::model::activity::Column::Id.is_not_null())
.add(crate::model::object::Column::Id.is_not_null())
)
.order_by(Column::Published, Order::Desc);
for col in crate::model::object::Column::iter() { for col in crate::model::object::Column::iter() {
select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string())); select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string()));

View file

@ -3,6 +3,8 @@ use sea_orm::{entity::prelude::*, Set};
use crate::routes::activitypub::jsonld::LD; use crate::routes::activitypub::jsonld::LD;
use super::addressing::Event;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "attachments")] #[sea_orm(table_name = "attachments")]
pub struct Model { pub struct Model {
@ -59,3 +61,52 @@ impl Related<super::object::Entity> for Entity {
} }
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}
#[axum::async_trait]
pub trait BatchFillable {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr>;
}
#[axum::async_trait]
impl BatchFillable for &[Event] {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr> {
let objects : Vec<crate::model::object::Model> = self
.iter()
.filter_map(|x| match x {
Event::Tombstone => None,
Event::Activity(_) => None,
Event::StrayObject(x) => Some(x.clone()),
Event::DeepActivity { activity: _, object } => Some(object.clone()),
})
.collect();
let attachments = objects.load_many(Entity, db).await?;
let mut out : std::collections::BTreeMap<String, Vec<Model>> = std::collections::BTreeMap::new();
for attach in attachments.into_iter().flatten() {
if out.contains_key(&attach.object) {
out.get_mut(&attach.object).expect("contains but get failed?").push(attach);
} else {
out.insert(attach.object.clone(), vec![attach]);
}
}
Ok(out)
}
}
#[axum::async_trait]
impl BatchFillable for Vec<Event> {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr> {
self.as_slice().load_attachments_batch(db).await
}
}
#[axum::async_trait]
impl BatchFillable for Event {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr> {
let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient
x.load_attachments_batch(db).await
}
}

View file

@ -1,6 +1,6 @@
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, QueryFilter}; use sea_orm::{ColumnTrait, QueryFilter};
use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; use crate::{errors::UpubError, model::{self, addressing::Event, attachment::BatchFillable}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}};
use super::{jsonld::LD, JsonLD, TryFetch}; use super::{jsonld::LD, JsonLD, TryFetch};
@ -19,17 +19,17 @@ pub async fn view(
ctx.fetch_activity(&aid).await?; ctx.fetch_activity(&aid).await?;
} }
match model::addressing::Entity::find_activities() let row = model::addressing::Entity::find_addressed()
.filter(model::activity::Column::Id.eq(&aid)) .filter(model::activity::Column::Id.eq(&aid))
.filter(auth.filter_condition()) .filter(auth.filter_condition())
.into_model::<EmbeddedActivity>() .into_model::<Event>()
.one(ctx.db()) .one(ctx.db())
.await? .await?
{ .ok_or_else(UpubError::not_found)?;
Some(activity) => Ok(JsonLD(
activity.ap_filled(ctx.db()).await?.ld_context() let mut attachments = row.load_attachments_batch(ctx.db()).await?;
)), let attach = attachments.remove(row.id());
None => Err(UpubError::not_found()),
} Ok(JsonLD(row.ap(attach).ld_context()))
} }

View file

@ -1,7 +1,7 @@
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, PaginatorTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter};
use crate::{model::{self, addressing::WrappedObject}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url};
pub async fn get( pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
@ -14,13 +14,13 @@ pub async fn get(
url!(ctx, "/context/{id}") url!(ctx, "/context/{id}")
}; };
let count = model::addressing::Entity::find_objects() let count = model::addressing::Entity::find_addressed()
.filter(auth.filter_condition()) .filter(auth.filter_condition())
.filter(model::object::Column::Context.eq(context)) .filter(model::object::Column::Context.eq(context))
.count(ctx.db()) .count(ctx.db())
.await?; .await?;
Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/context/{id}"), Some(count)).ld_context())) crate::server::builders::collection(&url!(ctx, "/context/{id}"), Some(count))
} }
pub async fn page( pub async fn page(
@ -29,9 +29,6 @@ pub async fn page(
Query(page): Query<Pagination>, Query(page): Query<Pagination>,
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
) -> crate::Result<JsonLD<serde_json::Value>> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let limit = page.batch.unwrap_or(20).min(50);
let offset = page.offset.unwrap_or(0);
let context = if id.starts_with('+') { let context = if id.starts_with('+') {
id.replacen('+', "https://", 1).replace('@', "/") id.replacen('+', "https://", 1).replace('@', "/")
} else if id.starts_with("tag:") { } else if id.starts_with("tag:") {
@ -40,24 +37,13 @@ pub async fn page(
url!(ctx, "/context/{id}") // TODO need a better way to figure out which ones are our contexts url!(ctx, "/context/{id}") // TODO need a better way to figure out which ones are our contexts
}; };
let items = model::addressing::Entity::find_objects() crate::server::builders::paginate(
.filter(auth.filter_condition()) url!(ctx, "/context/{id}/page"),
.filter(model::object::Column::Context.eq(context)) Condition::all()
.limit(limit) .add(auth.filter_condition())
.offset(offset) .add(model::object::Column::Context.eq(context)),
.into_model::<WrappedObject>() ctx.db(),
.all(ctx.db()) page
.await?; )
.await
let mut out = Vec::new();
for item in items {
out.push(item.ap_filled(ctx.db()).await?);
}
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/context/{id}/page"),
offset, limit, out,
).ld_context()
))
} }

View file

@ -1,16 +1,15 @@
use apb::{server::Inbox, Activity, ActivityType}; use apb::{server::Inbox, Activity, ActivityType};
use axum::{extract::{Query, State}, http::StatusCode, Json}; use axum::{extract::{Query, State}, http::StatusCode, Json};
use sea_orm::{QueryFilter, QuerySelect};
use crate::{errors::UpubError, model::{self, addressing::WrappedObject}, server::{auth::{AuthIdentity, Identity}, Context}, url}; use crate::{errors::UpubError, server::{auth::{AuthIdentity, Identity}, Context}, url};
use super::{jsonld::LD, JsonLD, Pagination}; use super::{JsonLD, Pagination};
pub async fn get( pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
) -> Result<JsonLD<serde_json::Value>, StatusCode> { ) -> crate::Result<JsonLD<serde_json::Value>> {
Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/inbox"), None).ld_context())) crate::server::builders::collection(&url!(ctx, "/inbox"), None)
} }
pub async fn page( pub async fn page(
@ -18,25 +17,13 @@ pub async fn page(
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
Query(page): Query<Pagination>, Query(page): Query<Pagination>,
) -> crate::Result<JsonLD<serde_json::Value>> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let limit = page.batch.unwrap_or(20).min(50); crate::server::builders::paginate(
let offset = page.offset.unwrap_or(0); url!(ctx, "/inbox/page"),
let objects = model::addressing::Entity::find_objects() auth.filter_condition(),
.filter(auth.filter_condition()) ctx.db(),
.limit(limit) page
.offset(offset) )
.into_model::<WrappedObject>() .await
.all(ctx.db())
.await?;
let mut out = Vec::new();
for object in objects {
out.push(object.ap_filled(ctx.db()).await?);
}
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/inbox/page"),
offset, limit, out,
).ld_context()
))
} }
macro_rules! pretty_json { macro_rules! pretty_json {

View file

@ -4,7 +4,7 @@ use apb::{BaseMut, CollectionMut, ObjectMut};
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, ModelTrait, QueryFilter}; use sea_orm::{ColumnTrait, ModelTrait, QueryFilter};
use crate::{errors::UpubError, model::{self, addressing::WrappedObject}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; use crate::{errors::UpubError, model::{self, addressing::Event}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}};
use super::{jsonld::LD, JsonLD, TryFetch}; use super::{jsonld::LD, JsonLD, TryFetch};
@ -23,17 +23,22 @@ pub async fn view(
ctx.fetch_object(&oid).await?; ctx.fetch_object(&oid).await?;
} }
let Some(object) = model::addressing::Entity::find_objects() let item = model::addressing::Entity::find_addressed()
.filter(model::object::Column::Id.eq(&oid)) .filter(model::object::Column::Id.eq(&oid))
.filter(auth.filter_condition()) .filter(auth.filter_condition())
.into_model::<WrappedObject>() .into_model::<Event>()
.one(ctx.db()) .one(ctx.db())
.await? .await?
else { .ok_or_else(UpubError::not_found)?;
return Err(UpubError::not_found());
let object = match item {
Event::Tombstone => return Err(UpubError::not_found()),
Event::Activity(_) => return Err(UpubError::not_found()),
Event::StrayObject(x) => x,
Event::DeepActivity { activity: _, object } => object,
}; };
let attachments = object.object.find_related(model::attachment::Entity) let attachments = object.find_related(model::attachment::Entity)
.all(ctx.db()) .all(ctx.db())
.await? .await?
.into_iter() .into_iter()
@ -45,10 +50,10 @@ pub async fn view(
.set_id(Some(&crate::url!(ctx, "/objects/{id}/replies"))) .set_id(Some(&crate::url!(ctx, "/objects/{id}/replies")))
.set_collection_type(Some(apb::CollectionType::OrderedCollection)) .set_collection_type(Some(apb::CollectionType::OrderedCollection))
.set_first(apb::Node::link(crate::url!(ctx, "/objects/{id}/replies/page"))) .set_first(apb::Node::link(crate::url!(ctx, "/objects/{id}/replies/page")))
.set_total_items(Some(object.object.comments as u64)); .set_total_items(Some(object.comments as u64));
Ok(JsonLD( Ok(JsonLD(
object.object.ap() object.ap()
.set_replies(apb::Node::object(replies)) .set_replies(apb::Node::object(replies))
.set_attachment(apb::Node::array(attachments)) .set_attachment(apb::Node::array(attachments))
.ld_context() .ld_context()

View file

@ -1,7 +1,7 @@
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, PaginatorTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter};
use crate::{model::{self, addressing::WrappedObject}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url};
pub async fn get( pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
@ -14,13 +14,13 @@ pub async fn get(
ctx.oid(id.clone()) ctx.oid(id.clone())
}; };
let count = model::addressing::Entity::find_objects() let count = model::addressing::Entity::find_addressed()
.filter(auth.filter_condition()) .filter(auth.filter_condition())
.filter(model::object::Column::InReplyTo.eq(oid)) .filter(model::object::Column::InReplyTo.eq(oid))
.count(ctx.db()) .count(ctx.db())
.await?; .await?;
Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/objects/{id}/replies"), Some(count)).ld_context())) crate::server::builders::collection(&url!(ctx, "/objects/{id}/replies"), Some(count))
} }
pub async fn page( pub async fn page(
@ -29,33 +29,19 @@ pub async fn page(
Query(page): Query<Pagination>, Query(page): Query<Pagination>,
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
) -> crate::Result<JsonLD<serde_json::Value>> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let limit = page.batch.unwrap_or(20).min(50);
let offset = page.offset.unwrap_or(0);
let oid = if id.starts_with('+') { let oid = if id.starts_with('+') {
format!("https://{}", id.replacen('+', "", 1).replace('@', "/")) format!("https://{}", id.replacen('+', "", 1).replace('@', "/"))
} else { } else {
ctx.oid(id.clone()) ctx.oid(id.clone())
}; };
let items = model::addressing::Entity::find_objects() crate::server::builders::paginate(
.filter(auth.filter_condition()) url!(ctx, "/objects/{id}/replies/page"),
.filter(model::object::Column::InReplyTo.eq(oid)) Condition::all()
// TODO also limit to only local activities .add(auth.filter_condition())
.limit(limit) .add(model::object::Column::InReplyTo.eq(oid)),
.offset(offset) ctx.db(),
.into_model::<WrappedObject>() page
.all(ctx.db()) )
.await?; .await
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/objects/{id}/replies/page"),
offset, limit,
items
.into_iter()
.map(|x| x.object.ap())
.collect()
).ld_context()
))
} }

View file

@ -1,10 +1,9 @@
use axum::{extract::{Query, State}, http::StatusCode, Json}; use axum::{extract::{Query, State}, http::StatusCode, Json};
use sea_orm::{QueryFilter, QuerySelect};
use crate::{errors::UpubError, model::{self, addressing::WrappedObject}, routes::activitypub::{jsonld::LD, CreationResult, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url}; use crate::{errors::UpubError, routes::activitypub::{CreationResult, JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url};
pub async fn get(State(ctx): State<Context>) -> Result<JsonLD<serde_json::Value>, StatusCode> { pub async fn get(State(ctx): State<Context>) -> crate::Result<JsonLD<serde_json::Value>> {
Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/outbox"), None).ld_context())) crate::server::builders::collection(&url!(ctx, "/outbox"), None)
} }
pub async fn page( pub async fn page(
@ -12,29 +11,13 @@ pub async fn page(
Query(page): Query<Pagination>, Query(page): Query<Pagination>,
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
) -> crate::Result<JsonLD<serde_json::Value>> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let limit = page.batch.unwrap_or(20).min(50); crate::server::builders::paginate(
let offset = page.offset.unwrap_or(0); url!(ctx, "/outbox/page"),
auth.filter_condition(), // TODO filter local only stuff
let items = model::addressing::Entity::find_objects() ctx.db(),
.filter(auth.filter_condition()) page,
// TODO also limit to only local activities )
.limit(limit) .await
.offset(offset)
.into_model::<WrappedObject>()
.all(ctx.db()).await?;
let mut out = Vec::new();
for item in items {
out.push(item.ap_filled(ctx.db()).await?);
}
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/outbox/page"),
offset, limit,
out,
).ld_context()
))
} }
pub async fn post( pub async fn post(

View file

@ -1,14 +1,14 @@
use axum::{extract::{Path, Query, State}, http::StatusCode}; use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{routes::activitypub::{jsonld::LD, JsonLD, Pagination}, model, server::Context, url}; use crate::{routes::activitypub::{JsonLD, Pagination}, model, server::Context, url};
use model::relation::Column::{Following, Follower}; use model::relation::Column::{Following, Follower};
pub async fn get<const OUTGOING: bool>( pub async fn get<const OUTGOING: bool>(
State(ctx): State<Context>, State(ctx): State<Context>,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<JsonLD<serde_json::Value>, StatusCode> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let follow___ = if OUTGOING { "following" } else { "followers" }; let follow___ = if OUTGOING { "following" } else { "followers" };
let count = model::relation::Entity::find() let count = model::relation::Entity::find()
.filter(if OUTGOING { Follower } else { Following }.eq(ctx.uid(id.clone()))) .filter(if OUTGOING { Follower } else { Following }.eq(ctx.uid(id.clone())))
@ -16,44 +16,32 @@ pub async fn get<const OUTGOING: bool>(
tracing::error!("failed counting {follow___} for {id}: {e}"); tracing::error!("failed counting {follow___} for {id}: {e}");
0 0
}); });
Ok(JsonLD(
ctx.ap_collection( crate::server::builders::collection(&url!(ctx, "/users/{id}/{follow___}"), Some(count))
&url!(ctx, "/users/{id}/{follow___}"),
Some(count)
).ld_context()
))
} }
pub async fn page<const OUTGOING: bool>( pub async fn page<const OUTGOING: bool>(
State(ctx): State<Context>, State(ctx): State<Context>,
Path(id): Path<String>, Path(id): Path<String>,
Query(page): Query<Pagination>, Query(page): Query<Pagination>,
) -> Result<JsonLD<serde_json::Value>, StatusCode> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let follow___ = if OUTGOING { "following" } else { "followers" }; let follow___ = if OUTGOING { "following" } else { "followers" };
let limit = page.batch.unwrap_or(20).min(50); let limit = page.batch.unwrap_or(20).min(50);
let offset = page.offset.unwrap_or(0); let offset = page.offset.unwrap_or(0);
match model::relation::Entity::find()
let following = model::relation::Entity::find()
.filter(if OUTGOING { Follower } else { Following }.eq(ctx.uid(id.clone()))) .filter(if OUTGOING { Follower } else { Following }.eq(ctx.uid(id.clone())))
.select_only() .select_only()
.select_column(if OUTGOING { Following } else { Follower }) .select_column(if OUTGOING { Following } else { Follower })
.limit(limit) .limit(limit)
.offset(page.offset.unwrap_or(0)) .offset(page.offset.unwrap_or(0))
.into_tuple::<String>() .into_tuple::<String>()
.all(ctx.db()).await .all(ctx.db())
{ .await?;
Err(e) => {
tracing::error!("error queriying {follow___} for {id}: {e}"); crate::server::builders::collection_page(
Err(StatusCode::INTERNAL_SERVER_ERROR)
},
Ok(following) => {
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/users/{id}/{follow___}/page"), &url!(ctx, "/users/{id}/{follow___}/page"),
offset, offset, limit,
limit,
following.into_iter().map(serde_json::Value::String).collect() following.into_iter().map(serde_json::Value::String).collect()
).ld_context() )
))
},
}
} }

View file

@ -1,20 +1,20 @@
use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
use sea_orm::{ColumnTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, Condition};
use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url}; use crate::{errors::UpubError, model, routes::activitypub::{JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url};
pub async fn get( pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
Path(id): Path<String>, Path(id): Path<String>,
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
) -> Result<JsonLD<serde_json::Value>, StatusCode> { ) -> crate::Result<JsonLD<serde_json::Value>> {
match auth { match auth {
Identity::Anonymous => Err(StatusCode::FORBIDDEN), Identity::Anonymous => Err(StatusCode::FORBIDDEN.into()),
Identity::Remote(_) => Err(StatusCode::FORBIDDEN), Identity::Remote(_) => Err(StatusCode::FORBIDDEN.into()),
Identity::Local(user) => if ctx.uid(id.clone()) == user { Identity::Local(user) => if ctx.uid(id.clone()) == user {
Ok(JsonLD(ctx.ap_collection(&url!(ctx, "/users/{id}/inbox"), None).ld_context())) crate::server::builders::collection(&url!(ctx, "/users/{id}/inbox"), None)
} else { } else {
Err(StatusCode::FORBIDDEN) Err(StatusCode::FORBIDDEN.into())
}, },
} }
} }
@ -25,32 +25,21 @@ pub async fn page(
AuthIdentity(auth): AuthIdentity, AuthIdentity(auth): AuthIdentity,
Query(page): Query<Pagination>, Query(page): Query<Pagination>,
) -> crate::Result<JsonLD<serde_json::Value>> { ) -> crate::Result<JsonLD<serde_json::Value>> {
let Identity::Local(uid) = auth else { let Identity::Local(uid) = &auth else {
// local inbox is only for local users // local inbox is only for local users
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
}; };
if uid != ctx.uid(id.clone()) { if uid != &ctx.uid(id.clone()) {
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
} }
let limit = page.batch.unwrap_or(20).min(50);
let offset = page.offset.unwrap_or(0); crate::server::builders::paginate(
let activities = model::addressing::Entity::find_activities() url!(ctx, "/users/{id}/inbox/page"),
.filter(model::addressing::Column::Actor.eq(&uid)) Condition::all().add(model::addressing::Column::Actor.eq(uid)),
.offset(offset) ctx.db(),
.limit(limit) page,
.into_model::<EmbeddedActivity>() )
.all(ctx.db()) .await
.await?;
let mut out = Vec::new();
for activity in activities {
out.push(activity.ap_filled(ctx.db()).await?);
}
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/users/{id}/inbox/page"),
offset, limit, out,
).ld_context()
))
} }
pub async fn post( pub async fn post(

View file

@ -1,16 +1,14 @@
use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
use sea_orm::{ColumnTrait, QueryFilter, QuerySelect}; use sea_orm::{ColumnTrait, Condition};
use apb::{server::Outbox, AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType}; use apb::{server::Outbox, AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType};
use crate::{errors::UpubError, model::{self, addressing::EmbeddedActivity}, routes::activitypub::{jsonld::LD, CreationResult, JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url}; use crate::{errors::UpubError, model, routes::activitypub::{CreationResult, JsonLD, Pagination}, server::{auth::{AuthIdentity, Identity}, Context}, url};
pub async fn get( pub async fn get(
State(ctx): State<Context>, State(ctx): State<Context>,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<JsonLD<serde_json::Value>, StatusCode> { ) -> crate::Result<JsonLD<serde_json::Value>> {
Ok(JsonLD( crate::server::builders::collection(&url!(ctx, "/users/{id}/outbox"), None)
ctx.ap_collection(&url!(ctx, "/users/{id}/outbox"), None).ld_context()
))
} }
pub async fn page( pub async fn page(
@ -24,28 +22,19 @@ pub async fn page(
} else { } else {
ctx.uid(id.clone()) ctx.uid(id.clone())
}; };
let limit = page.batch.unwrap_or(20).min(50); crate::server::builders::paginate(
let offset = page.offset.unwrap_or(0); url!(ctx, "/users/{id}/outbox/page"),
Condition::all()
let activities = model::addressing::Entity::find_activities() .add(auth.filter_condition())
.filter(model::activity::Column::Actor.eq(&uid)) .add(
.filter(auth.filter_condition()) Condition::any()
.limit(limit) .add(model::activity::Column::Actor.eq(&uid))
.offset(offset) .add(model::object::Column::AttributedTo.eq(&uid))
.into_model::<EmbeddedActivity>() ),
.all(ctx.db()).await?; ctx.db(),
page,
let mut out = Vec::new(); )
for activity in activities { .await
out.push(activity.ap_filled(ctx.db()).await?);
}
Ok(JsonLD(
ctx.ap_collection_page(
&url!(ctx, "/users/{id}/outbox/page"),
offset, limit, out,
).ld_context()
))
} }
pub async fn post( pub async fn post(

View file

@ -70,7 +70,7 @@ pub async fn statuses(
Query(_query): Query<StatusesQuery>, Query(_query): Query<StatusesQuery>,
) -> Result<Json<Vec<Status>>, StatusCode> { ) -> Result<Json<Vec<Status>>, StatusCode> {
let uid = ctx.uid(id); let uid = ctx.uid(id);
model::addressing::Entity::find_activities() model::addressing::Entity::find_addressed()
.filter(model::activity::Column::Actor.eq(uid)) .filter(model::activity::Column::Actor.eq(uid))
.filter(auth.filter_condition()); .filter(auth.filter_condition());

64
src/server/builders.rs Normal file
View file

@ -0,0 +1,64 @@
use apb::{BaseMut, CollectionMut, CollectionPageMut};
use sea_orm::{Condition, DatabaseConnection, QueryFilter, QuerySelect};
use crate::{model::{addressing::Event, attachment::BatchFillable}, routes::activitypub::{jsonld::LD, JsonLD, Pagination}};
pub async fn paginate(
id: String,
filter: Condition,
db: &DatabaseConnection,
page: Pagination,
) -> crate::Result<JsonLD<serde_json::Value>> {
let limit = page.batch.unwrap_or(20).min(50);
let offset = page.offset.unwrap_or(0);
let items = crate::model::addressing::Entity::find_addressed()
.filter(filter)
// TODO also limit to only local activities
.limit(limit)
.offset(offset)
.into_model::<Event>()
.all(db)
.await?;
let mut attachments = items.load_attachments_batch(db).await?;
let items : Vec<serde_json::Value> = items
.into_iter()
.map(|item| {
let attach = attachments.remove(item.id());
item.ap(attach)
})
.collect();
collection_page(&id, offset, limit, items)
}
pub fn collection_page(id: &str, offset: u64, limit: u64, items: Vec<serde_json::Value>) -> crate::Result<JsonLD<serde_json::Value>> {
let next = if items.len() < limit as usize {
apb::Node::Empty
} else {
apb::Node::link(format!("{id}?offset={}", offset+limit))
};
Ok(JsonLD(
serde_json::Value::new_object()
.set_id(Some(&format!("{id}?offset={offset}")))
.set_collection_type(Some(apb::CollectionType::OrderedCollectionPage))
.set_part_of(apb::Node::link(id.replace("/page", "")))
.set_ordered_items(apb::Node::array(items))
.set_next(next)
.ld_context()
))
}
pub fn collection(id: &str, total_items: Option<u64>) -> crate::Result<JsonLD<serde_json::Value>> {
Ok(JsonLD(
serde_json::Value::new_object()
.set_id(Some(id))
.set_collection_type(Some(apb::CollectionType::OrderedCollection))
.set_first(apb::Node::link(format!("{id}/page")))
.set_total_items(total_items)
.ld_context()
))
}

View file

@ -1,10 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use apb::{BaseMut, CollectionMut, CollectionPageMut};
use openssl::rsa::Rsa; use openssl::rsa::Rsa;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
use crate::{model, routes::activitypub::jsonld::LD, server::fetcher::Fetcher}; use crate::{model, server::fetcher::Fetcher};
use super::dispatcher::Dispatcher; use super::dispatcher::Dispatcher;
@ -221,30 +220,6 @@ impl Context {
Ok(()) Ok(())
} }
// TODO should probs not be here
pub fn ap_collection(&self, id: &str, total_items: Option<u64>) -> serde_json::Value {
serde_json::Value::new_object()
.set_id(Some(id))
.set_collection_type(Some(apb::CollectionType::OrderedCollection))
.set_first(apb::Node::link(format!("{id}/page")))
.set_total_items(total_items)
}
// TODO should probs not be here
pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec<serde_json::Value>) -> serde_json::Value {
serde_json::Value::new_object()
.set_id(Some(&format!("{id}?offset={offset}")))
.set_collection_type(Some(apb::CollectionType::OrderedCollectionPage))
.set_part_of(apb::Node::link(id.replace("/page", "")))
.set_ordered_items(apb::Node::array(items))
.set_next(
if items.len() < limit as usize {
apb::Node::Empty
} else {
apb::Node::link(format!("{id}?offset={}", offset+limit))
}
)
}
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> { pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(activity_targets).await?; let addressed = self.expand_addressing(activity_targets).await?;

View file

@ -4,5 +4,6 @@ pub mod fetcher;
pub mod inbox; pub mod inbox;
pub mod outbox; pub mod outbox;
pub mod auth; pub mod auth;
pub mod builders;
pub use context::Context; pub use context::Context;