diff --git a/src/model/activity.rs b/src/model/activity.rs index df312cd5..eddc86da 100644 --- a/src/model/activity.rs +++ b/src/model/activity.rs @@ -1,7 +1,7 @@ use apb::{ActivityMut, ActivityType, BaseMut, ObjectMut}; -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; -use crate::routes::activitypub::jsonld::LD; +use crate::{model::Audience, errors::UpubError, routes::activitypub::jsonld::LD}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "activities")] @@ -14,10 +14,10 @@ pub struct Model { pub actor: String, pub object: Option, pub target: Option, - pub to: Option, - pub bto: Option, - pub cc: Option, - pub bcc: Option, + pub to: Audience, + pub bto: Audience, + pub cc: Audience, + pub bcc: Audience, pub published: ChronoDateTimeUtc, } @@ -71,11 +71,28 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} +impl Entity { + pub fn find_by_ap_id(id: &str) -> Select { + Entity::find().filter(Column::Id.eq(id)) + } + + pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result { + Entity::find() + .filter(Column::Id.eq(id)) + .select_only() + .select_column(Column::Internal) + .into_tuple::() + .one(db) + .await? + .ok_or_else(UpubError::not_found) + } +} + impl ActiveModel { pub fn new(activity: &impl apb::Activity) -> Result { Ok(ActiveModel { - id: sea_orm::ActiveValue::NotSet, - ap_id: sea_orm::ActiveValue::Set(activity.id().ok_or(super::FieldError("id"))?.to_string()), + internal: sea_orm::ActiveValue::NotSet, + id: sea_orm::ActiveValue::Set(activity.id().ok_or(super::FieldError("id"))?.to_string()), activity_type: sea_orm::ActiveValue::Set(activity.activity_type().ok_or(super::FieldError("type"))?), actor: sea_orm::ActiveValue::Set(activity.actor().id().ok_or(super::FieldError("actor"))?), object: sea_orm::ActiveValue::Set(activity.object().id()), diff --git a/src/model/actor.rs b/src/model/actor.rs index 32233dca..cd41d082 100644 --- a/src/model/actor.rs +++ b/src/model/actor.rs @@ -1,8 +1,8 @@ -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; use apb::{Actor, ActorMut, ActorType, BaseMut, DocumentMut, Endpoints, EndpointsMut, Object, ObjectMut, PublicKey, PublicKeyMut}; -use crate::routes::activitypub::jsonld::LD; +use crate::{errors::UpubError, routes::activitypub::jsonld::LD}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "actors")] @@ -133,14 +133,31 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} +impl Entity { + pub fn find_by_ap_id(id: &str) -> Select { + Entity::find().filter(Column::Id.eq(id)) + } + + pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result { + Entity::find() + .filter(Column::Id.eq(id)) + .select_only() + .select_column(Column::Internal) + .into_tuple::() + .one(db) + .await? + .ok_or_else(UpubError::not_found) + } +} + impl ActiveModel { - pub fn new(object: &impl Actor, instance: i32) -> Result { + pub fn new(object: &impl Actor) -> Result { let ap_id = object.id().ok_or(super::FieldError("id"))?.to_string(); - let (_domain, fallback_preferred_username) = split_user_id(&ap_id); + let (domain, fallback_preferred_username) = split_user_id(&ap_id); Ok(ActiveModel { - instance: sea_orm::ActiveValue::Set(instance), // TODO receiving it from outside is cheap - id: sea_orm::ActiveValue::NotSet, - ap_id: sea_orm::ActiveValue::Set(ap_id), + internal: sea_orm::ActiveValue::NotSet, + domain: sea_orm::ActiveValue::Set(domain), + id: sea_orm::ActiveValue::Set(ap_id), preferred_username: sea_orm::ActiveValue::Set(object.preferred_username().unwrap_or(&fallback_preferred_username).to_string()), actor_type: sea_orm::ActiveValue::Set(object.actor_type().ok_or(super::FieldError("type"))?), name: sea_orm::ActiveValue::Set(object.name().map(|x| x.to_string())), @@ -154,9 +171,9 @@ impl ActiveModel { following: sea_orm::ActiveValue::Set(object.following().id()), created: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())), updated: sea_orm::ActiveValue::Set(chrono::Utc::now()), - following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i64), - followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i64), - statuses_count: sea_orm::ActiveValue::Set(object.statuses_count().unwrap_or(0) as i64), + following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i32), + followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i32), + statuses_count: sea_orm::ActiveValue::Set(object.statuses_count().unwrap_or(0) as i32), public_key: sea_orm::ActiveValue::Set(object.public_key().get().ok_or(super::FieldError("publicKey"))?.public_key_pem().to_string()), private_key: sea_orm::ActiveValue::Set(None), // there's no way to transport privkey over AP json, must come from DB }) diff --git a/src/model/addressing.rs b/src/model/addressing.rs index 26175bea..2eb6ecff 100644 --- a/src/model/addressing.rs +++ b/src/model/addressing.rs @@ -154,12 +154,12 @@ impl FromQueryResult for Event { impl Entity { - pub fn find_addressed(uid: Option<&str>) -> Select { + pub fn find_addressed(uid: Option) -> Select { let mut select = Entity::find() .distinct() .select_only() - .join(sea_orm::JoinType::LeftJoin, Relation::Object.def()) - .join(sea_orm::JoinType::LeftJoin, Relation::Activity.def()) + .join(sea_orm::JoinType::LeftJoin, Relation::Objects.def()) + .join(sea_orm::JoinType::LeftJoin, Relation::Activities.def()) .filter( // TODO ghetto double inner join because i want to filter out tombstones Condition::any() @@ -169,12 +169,11 @@ impl Entity { .order_by(Column::Published, Order::Desc); if let Some(uid) = uid { - let uid = uid.to_string(); select = select .join( sea_orm::JoinType::LeftJoin, - crate::model::object::Relation::Like.def() - .on_condition(move |_l, _r| crate::model::like::Column::Actor.eq(uid.clone()).into_condition()), + crate::model::object::Relation::Likes.def() + .on_condition(move |_l, _r| crate::model::like::Column::Actor.eq(uid).into_condition()), ) .select_column_as(crate::model::like::Column::Actor, format!("{}{}", crate::model::like::Entity.table_name(), crate::model::like::Column::Actor.to_string())); } diff --git a/src/model/attachment.rs b/src/model/attachment.rs index 30930f4e..93c6ea46 100644 --- a/src/model/attachment.rs +++ b/src/model/attachment.rs @@ -1,4 +1,4 @@ -use apb::{DocumentMut, ObjectMut}; +use apb::{DocumentMut, DocumentType, ObjectMut}; use sea_orm::entity::prelude::*; use crate::routes::activitypub::jsonld::LD; @@ -13,7 +13,7 @@ pub struct Model { #[sea_orm(unique)] pub url: String, pub object: i64, - pub document_type: String, + pub document_type: DocumentType, pub name: Option, pub media_type: String, pub created: ChronoDateTimeUtc, @@ -52,12 +52,12 @@ impl Model { #[axum::async_trait] pub trait BatchFillable { - async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr>; + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr>; } #[axum::async_trait] impl BatchFillable for &[Event] { - async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { let objects : Vec = self .iter() .filter_map(|x| match x { @@ -70,12 +70,12 @@ impl BatchFillable for &[Event] { let attachments = objects.load_many(Entity, db).await?; - let mut out : std::collections::BTreeMap> = std::collections::BTreeMap::new(); + let mut out : std::collections::BTreeMap> = std::collections::BTreeMap::new(); for attach in attachments.into_iter().flatten() { if out.contains_key(&attach.object) { out.get_mut(&attach.object).expect("contains but get failed?").push(attach); } else { - out.insert(attach.object.clone(), vec![attach]); + out.insert(attach.object, vec![attach]); } } @@ -85,14 +85,14 @@ impl BatchFillable for &[Event] { #[axum::async_trait] impl BatchFillable for Vec { - async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { self.as_slice().load_attachments_batch(db).await } } #[axum::async_trait] impl BatchFillable for Event { - async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { + async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient x.load_attachments_batch(db).await } diff --git a/src/model/instance.rs b/src/model/instance.rs index fe63ac40..83c8c6e9 100644 --- a/src/model/instance.rs +++ b/src/model/instance.rs @@ -1,4 +1,6 @@ -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; + +use crate::errors::UpubError; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "instances")] @@ -39,3 +41,20 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_by_domain(domain: &str) -> Select { + Entity::find().filter(Column::Domain.eq(domain)) + } + + pub async fn domain_to_internal(domain: &str, db: &DatabaseConnection) -> crate::Result { + Entity::find() + .filter(Column::Domain.eq(domain)) + .select_only() + .select_column(Column::Internal) + .into_tuple::() + .one(db) + .await? + .ok_or_else(UpubError::not_found) + } +} diff --git a/src/model/object.rs b/src/model/object.rs index bb1c9f63..bbaab0e4 100644 --- a/src/model/object.rs +++ b/src/model/object.rs @@ -1,7 +1,7 @@ -use apb::{BaseMut, Collection, CollectionMut, ObjectMut}; -use sea_orm::entity::prelude::*; +use apb::{BaseMut, Collection, CollectionMut, ObjectMut, ObjectType}; +use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns}; -use crate::routes::activitypub::jsonld::LD; +use crate::{errors::UpubError, routes::activitypub::jsonld::LD}; use super::Audience; @@ -12,7 +12,7 @@ pub struct Model { pub internal: i64, #[sea_orm(unique)] pub id: String, - pub object_type: String, + pub object_type: ObjectType, pub attributed_to: Option, pub name: Option, pub summary: Option, @@ -24,10 +24,10 @@ pub struct Model { pub announces: i32, pub replies: i32, pub context: Option, - pub to: Option, - pub bto: Option, - pub cc: Option, - pub bcc: Option, + pub to: Audience, + pub bto: Audience, + pub cc: Audience, + pub bcc: Audience, pub published: ChronoDateTimeUtc, pub updated: ChronoDateTimeUtc, } @@ -122,11 +122,28 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} +impl Entity { + pub fn find_by_ap_id(id: &str) -> Select { + Entity::find().filter(Column::Id.eq(id)) + } + + pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result { + Entity::find() + .filter(Column::Id.eq(id)) + .select_only() + .select_column(Column::Internal) + .into_tuple::() + .one(db) + .await? + .ok_or_else(UpubError::not_found) + } +} + impl ActiveModel { pub fn new(object: &impl apb::Object) -> Result { Ok(ActiveModel { - id: sea_orm::ActiveValue::NotSet, - ap_id: sea_orm::ActiveValue::Set(object.id().ok_or(super::FieldError("id"))?.to_string()), + internal: sea_orm::ActiveValue::NotSet, + id: sea_orm::ActiveValue::Set(object.id().ok_or(super::FieldError("id"))?.to_string()), object_type: sea_orm::ActiveValue::Set(object.object_type().ok_or(super::FieldError("type"))?), attributed_to: sea_orm::ActiveValue::Set(object.attributed_to().id()), name: sea_orm::ActiveValue::Set(object.name().map(|x| x.to_string())), @@ -135,14 +152,14 @@ impl ActiveModel { context: sea_orm::ActiveValue::Set(object.context().id()), in_reply_to: sea_orm::ActiveValue::Set(object.in_reply_to().id()), published: sea_orm::ActiveValue::Set(object.published().ok_or(super::FieldError("published"))?), - updated: sea_orm::ActiveValue::Set(object.updated()), + updated: sea_orm::ActiveValue::Set(object.updated().unwrap_or_else(chrono::Utc::now)), url: sea_orm::ActiveValue::Set(object.url().id()), replies: sea_orm::ActiveValue::Set(object.replies().get() - .map_or(0, |x| x.total_items().unwrap_or(0)) as i64), + .map_or(0, |x| x.total_items().unwrap_or(0)) as i32), likes: sea_orm::ActiveValue::Set(object.likes().get() - .map_or(0, |x| x.total_items().unwrap_or(0)) as i64), + .map_or(0, |x| x.total_items().unwrap_or(0)) as i32), announces: sea_orm::ActiveValue::Set(object.shares().get() - .map_or(0, |x| x.total_items().unwrap_or(0)) as i64), + .map_or(0, |x| x.total_items().unwrap_or(0)) as i32), to: sea_orm::ActiveValue::Set(object.to().into()), bto: sea_orm::ActiveValue::Set(object.bto().into()), cc: sea_orm::ActiveValue::Set(object.cc().into()), @@ -176,7 +193,7 @@ impl Model { .set_shares(apb::Node::object( serde_json::Value::new_object() .set_collection_type(Some(apb::CollectionType::OrderedCollection)) - .set_total_items(Some(self.shares as u64)) + .set_total_items(Some(self.announces as u64)) )) .set_likes(apb::Node::object( serde_json::Value::new_object() @@ -186,7 +203,7 @@ impl Model { .set_replies(apb::Node::object( serde_json::Value::new_object() .set_collection_type(Some(apb::CollectionType::OrderedCollection)) - .set_total_items(Some(self.comments as u64)) + .set_total_items(Some(self.replies as u64)) )) } } diff --git a/src/routes/activitypub/auth.rs b/src/routes/activitypub/auth.rs index acb01b52..c1928f00 100644 --- a/src/routes/activitypub/auth.rs +++ b/src/routes/activitypub/auth.rs @@ -25,7 +25,7 @@ pub async fn login( // TODO salt the pwd match model::credential::Entity::find() .filter(Condition::all() - .add(model::credential::Column::Email.eq(login.email)) + .add(model::credential::Column::Login.eq(login.email)) .add(model::credential::Column::Password.eq(sha256::digest(login.password))) ) .one(ctx.db()) @@ -41,8 +41,9 @@ pub async fn login( let expires = chrono::Utc::now() + std::time::Duration::from_secs(3600 * 6); model::session::Entity::insert( model::session::ActiveModel { - id: sea_orm::ActiveValue::Set(token.clone()), - actor: sea_orm::ActiveValue::Set(x.id.clone()), + internal: sea_orm::ActiveValue::NotSet, + secret: sea_orm::ActiveValue::Set(token.clone()), + actor: sea_orm::ActiveValue::Set(x.actor.clone()), expires: sea_orm::ActiveValue::Set(expires), } ) @@ -50,7 +51,7 @@ pub async fn login( .await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(AuthSuccess { token, expires, - user: x.id + user: x.actor })) }, None => Err(UpubError::unauthorized()), diff --git a/src/routes/activitypub/object/mod.rs b/src/routes/activitypub/object/mod.rs index 59a4fc97..445dd29c 100644 --- a/src/routes/activitypub/object/mod.rs +++ b/src/routes/activitypub/object/mod.rs @@ -2,7 +2,7 @@ pub mod replies; use apb::{CollectionMut, ObjectMut}; use axum::extract::{Path, Query, State}; -use sea_orm::{ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns}; +use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns}; use crate::{errors::UpubError, model::{self, addressing::Event}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}}; @@ -62,7 +62,7 @@ pub async fn view( // .set_id(Some(&crate::url!(ctx, "/objects/{id}/replies"))) // .set_first(apb::Node::link(crate::url!(ctx, "/objects/{id}/replies/page"))) .set_collection_type(Some(apb::CollectionType::Collection)) - .set_total_items(Some(object.comments as u64)) + .set_total_items(Some(object.replies as u64)) .set_items(apb::Node::links(replies_ids)) ); } diff --git a/src/routes/activitypub/user/mod.rs b/src/routes/activitypub/user/mod.rs index 5819f242..a8076bfd 100644 --- a/src/routes/activitypub/user/mod.rs +++ b/src/routes/activitypub/user/mod.rs @@ -8,7 +8,7 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use apb::{ActorMut, EndpointsMut, Node}; -use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; +use crate::{errors::UpubError, model, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -61,7 +61,7 @@ pub async fn view( }, }; - match user::Entity::find_by_id(&uid) + match model::actor::Entity::find_by_ap_id(&uid) .find_also_related(model::config::Entity) .one(ctx.db()).await? { diff --git a/src/routes/activitypub/well_known.rs b/src/routes/activitypub/well_known.rs index 732f6908..5b725094 100644 --- a/src/routes/activitypub/well_known.rs +++ b/src/routes/activitypub/well_known.rs @@ -35,7 +35,7 @@ pub async fn nodeinfo_discovery(State(ctx): State) -> Json, Path(version): Path) -> Result, StatusCode> { // TODO it's unsustainable to count these every time, especially comments since it's a complex // filter! keep these numbers caches somewhere, maybe db, so that we can just look them up - let total_users = model::user::Entity::find().count(ctx.db()).await.ok(); + let total_users = model::actor::Entity::find().count(ctx.db()).await.ok(); let total_posts = None; let total_comments = None; let (software, version) = match version.as_str() { @@ -103,54 +103,33 @@ pub async fn webfinger(State(ctx): State, Query(query): Query ) -> Result, StatusCode> { - match model::user::Entity::find_by_id(ctx.uid(&id)) + match model::actor::Entity::find_by_ap_id(&ctx.uid(&id)) .find_also_related(model::config::Entity) .one(ctx.db()) .await diff --git a/src/server/admin.rs b/src/server/admin.rs index 0ee1dd74..c5f70e9e 100644 --- a/src/server/admin.rs +++ b/src/server/admin.rs @@ -1,4 +1,4 @@ -use sea_orm::{EntityTrait, IntoActiveModel}; +use sea_orm::{ActiveValue::{Set, NotSet}, EntityTrait}; #[axum::async_trait] pub trait Administrable { @@ -28,52 +28,56 @@ impl Administrable for super::Context { let ap_id = self.uid(&username); let db = self.db(); let domain = self.domain().to_string(); - let user_model = crate::model::user::Model { - id: ap_id.clone(), - name: display_name, - domain, summary, - preferred_username: username.clone(), - following: None, - following_count: 0, - followers: None, - followers_count: 0, - statuses_count: 0, - icon: avatar_url, - image: banner_url, - inbox: None, - shared_inbox: None, - outbox: None, - actor_type: apb::ActorType::Person, - created: chrono::Utc::now(), - updated: chrono::Utc::now(), - private_key: Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string()), - public_key: std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string(), + let user_model = crate::model::actor::Model { + internal: NotSet, + id: Set(ap_id.clone()), + name: Set(display_name), + domain: Set(domain), + summary: Set(summary), + preferred_username: Set(username.clone()), + following: Set(None), + following_count: Set(0), + followers: Set(None), + followers_count: Set(0), + statuses_count: Set(0), + icon: Set(avatar_url), + image: Set(banner_url), + inbox: Set(None), + shared_inbox: Set(None), + outbox: Set(None), + actor_type: Set(apb::ActorType::Person), + created: Set(chrono::Utc::now()), + updated: Set(chrono::Utc::now()), + private_key: Set(Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string())), + public_key: Set(std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string()), }; - crate::model::user::Entity::insert(user_model.into_active_model()) + crate::model::actor::Entity::insert(user_model) .exec(db) .await?; - let config_model = crate::model::config::Model { - id: ap_id.clone(), - accept_follow_requests: true, - show_followers_count: true, - show_following_count: true, - show_followers: false, - show_following: false, + let config_model = crate::model::config::ActiveModel { + internal: NotSet, + actor: Set(ap_id.clone()), + accept_follow_requests: Set(true), + show_followers_count: Set(true), + show_following_count: Set(true), + show_followers: Set(false), + show_following: Set(false), }; - crate::model::config::Entity::insert(config_model.into_active_model()) + crate::model::config::Entity::insert(config_model) .exec(db) .await?; - let credentials_model = crate::model::credential::Model { - id: ap_id, - email: username, - password, + let credentials_model = crate::model::credential::ActiveModel { + internal: NotSet, + actor: Set(ap_id), + login: Set(username), + password: Set(password), }; - crate::model::credential::Entity::insert(credentials_model.into_active_model()) + crate::model::credential::Entity::insert(credentials_model) .exec(db) .await?; diff --git a/src/server/dispatcher.rs b/src/server/dispatcher.rs index 4a545505..f48f5a69 100644 --- a/src/server/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -54,7 +54,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker }; let del_row = model::delivery::ActiveModel { - id: sea_orm::ActiveValue::Set(delivery.id), + internal: sea_orm::ActiveValue::Set(delivery.internal), ..Default::default() }; let del = model::delivery::Entity::delete(del_row) @@ -72,7 +72,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker tracing::info!("delivering {} to {}", delivery.activity, delivery.target); - let payload = match model::activity::Entity::find_by_id(&delivery.activity) + let payload = match model::activity::Entity::find_by_ap_id(&delivery.activity) .find_also_related(model::object::Entity) .one(db) .await? // TODO probably should not fail here and at least re-insert the delivery @@ -99,24 +99,19 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker }, }; - let key = if delivery.actor == format!("https://{domain}") { - let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find() - .one(db).await? - else { - tracing::error!("no private key configured for application"); - continue; - }; - key - } else { - let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor) - .one(db).await? - else { - tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor); - continue; - }; - key + let Some(actor) = model::actor::Entity::find_by_ap_id(&delivery.actor) + .one(db) + .await? + else { + tracing::error!("abandoning delivery of {} from non existant actor: {}", delivery.activity, delivery.actor); + continue; }; + let Some(key) = actor.private_key + else { + tracing::error!("abandoning delivery of {} from actor without private key: {}", delivery.activity, delivery.actor); + continue; + }; if let Err(e) = Context::request( Method::POST, &delivery.target, @@ -125,7 +120,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker ).await { tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); let new_delivery = model::delivery::ActiveModel { - id: sea_orm::ActiveValue::NotSet, + internal: sea_orm::ActiveValue::NotSet, not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()), actor: sea_orm::ActiveValue::Set(delivery.actor), target: sea_orm::ActiveValue::Set(delivery.target), diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 44830ffd..9abeb881 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -1,9 +1,9 @@ use std::collections::BTreeMap; -use apb::{target::Addressed, Activity, Base, Collection, CollectionPage, Link, Object}; +use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Link, Object}; use base64::Engine; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; +use sea_orm::EntityTrait; use crate::{errors::UpubError, model, VERSION}; @@ -13,14 +13,14 @@ use super::{httpsign::HttpSignature, normalizer::Normalizer, Context}; pub trait Fetcher { async fn webfinger(&self, user: &str, host: &str) -> crate::Result; - async fn fetch_user(&self, id: &str) -> crate::Result; - async fn pull_user(&self, id: &str) -> crate::Result; + async fn fetch_user(&self, id: &str) -> crate::Result; + async fn pull_user(&self, id: &str) -> crate::Result; async fn fetch_object(&self, id: &str) -> crate::Result; - async fn pull_object(&self, id: &str) -> crate::Result; + async fn pull_object(&self, id: &str) -> crate::Result; async fn fetch_activity(&self, id: &str) -> crate::Result; - async fn pull_activity(&self, id: &str) -> crate::Result; + async fn pull_activity(&self, id: &str) -> crate::Result; async fn fetch_thread(&self, id: &str) -> crate::Result<()>; @@ -115,30 +115,35 @@ impl Fetcher for Context { } - async fn fetch_user(&self, id: &str) -> crate::Result { - if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? { + async fn fetch_user(&self, id: &str) -> crate::Result { + if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } - let user_model = self.pull_user(id).await?; + let user_document = self.pull_user(id).await?; + let user_model = model::actor::ActiveModel::new(&user_document)?; // TODO this may fail: while fetching, remote server may fetch our service actor. // if it does so with http signature, we will fetch that actor in background // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - model::user::Entity::insert(user_model.clone().into_active_model()) - .exec(self.db()).await?; - - Ok(user_model) + model::actor::Entity::insert(user_model).exec(self.db()).await?; + + // TODO fetch it back to get the internal id + Ok( + model::actor::Entity::find_by_ap_id(id) + .one(self.db()) + .await? + .ok_or_else(UpubError::internal_server_error)? + ) } - async fn pull_user(&self, id: &str) -> crate::Result { - let user = Self::request( + async fn pull_user(&self, id: &str) -> crate::Result { + let mut user = Self::request( Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), ).await?.json::().await?; - let mut user_model = model::user::Model::new(&user)?; // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs - if let Some(followers_url) = &user_model.followers { + if let Some(followers_url) = &user.followers().id() { let req = Self::request( Method::GET, followers_url, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), @@ -146,13 +151,13 @@ impl Fetcher for Context { if let Ok(res) = req { if let Ok(user_followers) = res.json::().await { if let Some(total) = user_followers.total_items() { - user_model.followers_count = total as i64; + user = user.set_followers_count(Some(total)); } } } } - if let Some(following_url) = &user_model.following { + if let Some(following_url) = &user.following().id() { let req = Self::request( Method::GET, following_url, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), @@ -160,33 +165,38 @@ impl Fetcher for Context { if let Ok(res) = req { if let Ok(user_following) = res.json::().await { if let Some(total) = user_following.total_items() { - user_model.following_count = total as i64; + user = user.set_following_count(Some(total)); } } } } - Ok(user_model) + Ok(user) } async fn fetch_activity(&self, id: &str) -> crate::Result { - if let Some(x) = model::activity::Entity::find_by_id(id).one(self.db()).await? { + if let Some(x) = model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } - let activity_model = self.pull_activity(id).await?; + let activity_document = self.pull_activity(id).await?; + let activity_model = model::activity::ActiveModel::new(&activity_document)?; - model::activity::Entity::insert(activity_model.clone().into_active_model()) + model::activity::Entity::insert(activity_model) .exec(self.db()).await?; - let addressed = activity_model.addressed(); - let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?; + // TODO fetch it back to get the internal id + let activity = model::activity::Entity::find_by_ap_id(id) + .one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?; - Ok(activity_model) + let addressed = activity.addressed(); + let expanded_addresses = self.expand_addressing(addressed).await?; + self.address_to(Some(&activity.id), None, &expanded_addresses).await?; + + Ok(activity) } - async fn pull_activity(&self, id: &str) -> crate::Result { + async fn pull_activity(&self, id: &str) -> crate::Result { let activity = Self::request( Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), ).await?.json::().await?; @@ -203,84 +213,33 @@ impl Fetcher for Context { } } - let activity_model = model::activity::Model::new(&activity)?; - - Ok(activity_model) + Ok(activity) } async fn fetch_thread(&self, id: &str) -> crate::Result<()> { - crawl_replies(self, id, 0).await + // crawl_replies(self, id, 0).await + todo!() } async fn fetch_object(&self, id: &str) -> crate::Result { fetch_object_inner(self, id, 0).await } - async fn pull_object(&self, id: &str) -> crate::Result { - let object = Context::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), - ).await?.json::().await?; - - Ok(model::object::Model::new(&object)?) + async fn pull_object(&self, id: &str) -> crate::Result { + Ok( + Context::request( + Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + ) + .await? + .json::() + .await? + ) } } -#[async_recursion::async_recursion] -async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> { - tracing::info!("crawling replies of '{id}'"); - let object = Context::request( - Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), - ).await?.json::().await?; - - let object_model = model::object::Model::new(&object)?; - match model::object::Entity::insert(object_model.into_active_model()) - .exec(ctx.db()).await - { - Ok(_) => {}, - Err(sea_orm::DbErr::RecordNotInserted) => {}, - Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite - Err(e) => return Err(e.into()), - } - - if depth > 16 { - tracing::warn!("stopping thread crawling: too deep!"); - return Ok(()); - } - - let mut page_url = match object.replies().get() { - Some(serde_json::Value::String(x)) => { - let replies = Context::request( - Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), - ).await?.json::().await?; - replies.first().id() - }, - Some(serde_json::Value::Object(x)) => { - let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO! - obj.first().id() - }, - _ => return Ok(()), - }; - - while let Some(ref url) = page_url { - let replies = Context::request( - Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), - ).await?.json::().await?; - - for reply in replies.items() { - // TODO right now it crawls one by one, could be made in parallel but would be quite more - // abusive, so i'll keep it like this while i try it out - crawl_replies(ctx, reply.href(), depth + 1).await?; - } - - page_url = replies.next().id(); - } - - Ok(()) -} - #[async_recursion::async_recursion] async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result { - if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? { + if let Some(x) = model::object::Entity::find_by_ap_id(id).one(ctx.db()).await? { return Ok(x); // already in db, easy } @@ -290,7 +249,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res if let Some(oid) = object.id() { if oid != id { - if let Some(x) = model::object::Entity::find_by_id(oid).one(ctx.db()).await? { + if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? { return Ok(x); // already in db, but with id different that given url } } @@ -341,3 +300,56 @@ impl Fetchable for apb::Node { Ok(self) } } + +// #[async_recursion::async_recursion] +// async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> { +// tracing::info!("crawling replies of '{id}'"); +// let object = Context::request( +// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), +// ).await?.json::().await?; +// +// let object_model = model::object::Model::new(&object)?; +// match model::object::Entity::insert(object_model.into_active_model()) +// .exec(ctx.db()).await +// { +// Ok(_) => {}, +// Err(sea_orm::DbErr::RecordNotInserted) => {}, +// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite +// Err(e) => return Err(e.into()), +// } +// +// if depth > 16 { +// tracing::warn!("stopping thread crawling: too deep!"); +// return Ok(()); +// } +// +// let mut page_url = match object.replies().get() { +// Some(serde_json::Value::String(x)) => { +// let replies = Context::request( +// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), +// ).await?.json::().await?; +// replies.first().id() +// }, +// Some(serde_json::Value::Object(x)) => { +// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO! +// obj.first().id() +// }, +// _ => return Ok(()), +// }; +// +// while let Some(ref url) = page_url { +// let replies = Context::request( +// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), +// ).await?.json::().await?; +// +// for reply in replies.items() { +// // TODO right now it crawls one by one, could be made in parallel but would be quite more +// // abusive, so i'll keep it like this while i try it out +// crawl_replies(ctx, reply.href(), depth + 1).await?; +// } +// +// page_url = replies.next().id(); +// } +// +// Ok(()) +// } diff --git a/src/server/normalizer.rs b/src/server/normalizer.rs index f4fe2dba..a4d2fde6 100644 --- a/src/server/normalizer.rs +++ b/src/server/normalizer.rs @@ -1,5 +1,5 @@ use apb::{Node, Base, Object, Document}; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set}; +use sea_orm::{sea_query::Expr, ActiveValue::Set, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; use crate::{errors::UpubError, model, server::Context}; use super::fetcher::Fetcher; @@ -12,23 +12,23 @@ pub trait Normalizer { #[axum::async_trait] impl Normalizer for super::Context { async fn insert_object(&self, object_node: impl apb::Object, server: Option) -> crate::Result { - let mut object_model = model::object::Model::new(&object_node)?; - let oid = object_model.id.clone(); - let uid = object_model.attributed_to.clone(); + let oid = object_node.id().ok_or_else(UpubError::bad_request)?.to_string(); + let uid = object_node.attributed_to().id(); + let mut object_model = model::object::ActiveModel::new(&object_node)?; if let Some(server) = server { // make sure we're allowed to create this object - if let Some(object_author) = &object_model.attributed_to { + if let Set(Some(object_author)) = &object_model.attributed_to { if server != Context::server(object_author) { return Err(UpubError::forbidden()); } - } else if server != Context::server(&object_model.id) { + } else if server != Context::server(&oid) { return Err(UpubError::forbidden()); }; } // make sure content only contains a safe subset of html - if let Some(content) = object_model.content { - object_model.content = Some(mdhtml::safe_html(&content)); + if let Set(Some(content)) = object_model.content { + object_model.content = Set(Some(mdhtml::safe_html(&content))); } // fix context for remote posts @@ -37,33 +37,34 @@ impl Normalizer for super::Context { // > btw! also if any link is broken or we get rate limited, the whole insertion fails which is // > kind of dumb. there should be a job system so this can be done in waves. or maybe there's // > some whole other way to do this?? im thinking but misskey aaaa!! TODO - if let Some(ref reply) = object_model.in_reply_to { - if let Some(o) = model::object::Entity::find_by_id(reply).one(self.db()).await? { - object_model.context = o.context; + if let Set(Some(ref reply)) = object_model.in_reply_to { + if let Some(o) = model::object::Entity::find_by_ap_id(reply).one(self.db()).await? { + object_model.context = Set(o.context); } else { - object_model.context = None; // TODO to be filled by some other task + object_model.context = Set(None); // TODO to be filled by some other task } } else { - object_model.context = Some(object_model.id.clone()); + object_model.context = Set(Some(oid.clone())); } model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?; + let object = model::object::Entity::find_by_ap_id(&oid).one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?; // update replies counter - if let Some(ref in_reply_to) = object_model.in_reply_to { + if let Set(Some(ref in_reply_to)) = object_model.in_reply_to { if self.fetch_object(in_reply_to).await.is_ok() { model::object::Entity::update_many() .filter(model::object::Column::Id.eq(in_reply_to)) - .col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1)) + .col_expr(model::object::Column::Replies, Expr::col(model::object::Column::Replies).add(1)) .exec(self.db()) .await?; } } // update statuses counter if let Some(object_author) = uid { - model::user::Entity::update_many() - .col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1)) - .filter(model::user::Column::Id.eq(&object_author)) + model::actor::Entity::update_many() + .col_expr(model::actor::Column::StatusesCount, Expr::col(model::actor::Column::StatusesCount).add(1)) + .filter(model::actor::Column::Id.eq(&object_author)) .exec(self.db()) .await?; } @@ -76,18 +77,18 @@ impl Normalizer for super::Context { continue }, Node::Link(l) => model::attachment::ActiveModel { - id: sea_orm::ActiveValue::NotSet, + internal: sea_orm::ActiveValue::NotSet, url: Set(l.href().to_string()), - object: Set(oid.clone()), + object: Set(object.internal), document_type: Set(apb::DocumentType::Page), name: Set(l.link_name().map(|x| x.to_string())), media_type: Set(l.link_media_type().unwrap_or("link").to_string()), created: Set(chrono::Utc::now()), }, Node::Object(o) => model::attachment::ActiveModel { - id: sea_orm::ActiveValue::NotSet, + internal: sea_orm::ActiveValue::NotSet, url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())), - object: Set(oid.clone()), + object: Set(object.internal), document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), name: Set(o.name().map(|x| x.to_string())), media_type: Set(o.media_type().unwrap_or("link").to_string()), @@ -113,9 +114,9 @@ impl Normalizer for super::Context { }; let attachment_model = model::attachment::ActiveModel { - id: sea_orm::ActiveValue::NotSet, + internal: sea_orm::ActiveValue::NotSet, url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())), - object: Set(oid.clone()), + object: Set(object.internal), document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), name: Set(img.name().map(|x| x.to_string())), media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()), @@ -126,6 +127,6 @@ impl Normalizer for super::Context { .await?; } - Ok(object_model) + Ok(object) } }