From cef064765a91babc626e99314cf523958dc7bd38 Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 25 May 2024 03:39:57 +0200 Subject: [PATCH] chore: initial conversion work, in_reply_to as str --- ...524_000004_create_addressing_deliveries.rs | 4 +- src/model/activity.rs | 6 + src/model/actor.rs | 15 +- src/model/instance.rs | 6 + src/model/object.rs | 8 +- src/model/relation.rs | 22 +- src/model/session.rs | 6 + src/routes/activitypub/activity.rs | 4 +- src/routes/activitypub/auth.rs | 11 +- src/routes/activitypub/object/mod.rs | 6 +- src/routes/activitypub/user/mod.rs | 4 +- src/routes/activitypub/well_known.rs | 18 +- src/routes/mastodon/accounts.rs | 7 +- src/server/admin.rs | 85 ++++--- src/server/auth.rs | 45 ++-- src/server/context.rs | 41 ++-- src/server/dispatcher.rs | 30 +-- src/server/fetcher.rs | 230 ++++++++++-------- 18 files changed, 322 insertions(+), 226 deletions(-) diff --git a/src/migrations/m20240524_000004_create_addressing_deliveries.rs b/src/migrations/m20240524_000004_create_addressing_deliveries.rs index 6d7e330..dd980f7 100644 --- a/src/migrations/m20240524_000004_create_addressing_deliveries.rs +++ b/src/migrations/m20240524_000004_create_addressing_deliveries.rs @@ -43,7 +43,7 @@ impl MigrationTrait for Migration { .auto_increment() .primary_key() ) - .col(ColumnDef::new(Addressing::Actor).integer().not_null()) + .col(ColumnDef::new(Addressing::Actor).integer().null()) .foreign_key( ForeignKey::create() .name("fkey-addressing-actor") @@ -51,7 +51,7 @@ impl MigrationTrait for Migration { .to(Actors::Table, Actors::Id) .on_update(ForeignKeyAction::Cascade) ) - .col(ColumnDef::new(Addressing::Instance).integer().not_null()) + .col(ColumnDef::new(Addressing::Instance).integer().null()) .foreign_key( ForeignKey::create() .name("fkey-addressing-instance") diff --git a/src/model/activity.rs b/src/model/activity.rs index 351d2f4..6f0c856 100644 --- a/src/model/activity.rs +++ b/src/model/activity.rs @@ -71,6 +71,12 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} +impl Entity { + pub fn find_by_ap_id(ap_id: &str) -> Select { + Entity::find().filter(Column::ApId.eq(ap_id)) + } +} + impl ActiveModel { pub fn new(activity: &impl apb::Activity) -> Result { Ok(ActiveModel { diff --git a/src/model/actor.rs b/src/model/actor.rs index 66de0a1..0c72d6e 100644 --- a/src/model/actor.rs +++ b/src/model/actor.rs @@ -41,9 +41,9 @@ pub enum Relation { Addressing, #[sea_orm(has_many = "super::announce::Entity")] Announces, - #[sea_orm(has_many = "super::config::Entity")] + #[sea_orm(has_one = "super::config::Entity")] Configs, - #[sea_orm(has_many = "super::credential::Entity")] + #[sea_orm(has_one = "super::credential::Entity")] Credentials, #[sea_orm(has_many = "super::delivery::Entity")] Deliveries, @@ -133,6 +133,17 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} +impl Entity { + pub fn find_by_ap_id(ap_id: &str) -> Select { + Entity::find().filter(Column::ApId.eq(ap_id)) + } + + pub fn find_with_instance() -> Select { + Entity::find() + .left_join(Relation::Instances.def()) + } +} + impl ActiveModel { pub fn new(object: &impl Actor, instance: i32) -> Result { let ap_id = object.id().ok_or(super::FieldError("id"))?.to_string(); diff --git a/src/model/instance.rs b/src/model/instance.rs index 125fc1f..dbf9d47 100644 --- a/src/model/instance.rs +++ b/src/model/instance.rs @@ -37,3 +37,9 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_by_domain(domain: &str) -> Select { + Entity::find().filter(Column::Domain.eq(domain)) + } +} diff --git a/src/model/object.rs b/src/model/object.rs index a56257b..e7721f0 100644 --- a/src/model/object.rs +++ b/src/model/object.rs @@ -13,7 +13,7 @@ pub struct Model { #[sea_orm(unique)] pub ap_id: String, pub object_type: String, - pub attributed_to: Option, + pub attributed_to: Option, pub name: Option, pub summary: Option, pub content: Option, @@ -108,6 +108,12 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} +impl Entity { + pub fn find_by_ap_id(ap_id: &str) -> Select { + Entity::find().filter(Column::ApId.eq(ap_id)) + } +} + impl ActiveModel { pub fn new(object: &impl apb::Object) -> Result { Ok(ActiveModel { diff --git a/src/model/relation.rs b/src/model/relation.rs index 44d01df..be33372 100644 --- a/src/model/relation.rs +++ b/src/model/relation.rs @@ -1,4 +1,4 @@ -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, sea_query::Alias, QuerySelect, SelectColumns}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "relations")] @@ -20,7 +20,7 @@ pub enum Relation { on_update = "Cascade", on_delete = "NoAction" )] - Activities2, + ActivitiesAccept, #[sea_orm( belongs_to = "super::activity::Entity", from = "Column::Activity", @@ -28,7 +28,7 @@ pub enum Relation { on_update = "Cascade", on_delete = "NoAction" )] - Activities1, + ActivitiesFollow, #[sea_orm( belongs_to = "super::actor::Entity", from = "Column::Follower", @@ -36,7 +36,7 @@ pub enum Relation { on_update = "Cascade", on_delete = "Cascade" )] - Actors2, + ActorsFollower, #[sea_orm( belongs_to = "super::actor::Entity", from = "Column::Following", @@ -44,7 +44,19 @@ pub enum Relation { on_update = "Cascade", on_delete = "Cascade" )] - Actors1, + ActorsFollowing, } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_followers(id: &str) -> Select { + Entity::find() + .inner_join(Relation::ActorsFollowing.def()) + .filter(super::actor::Column::ApId.eq(id)) + .left_join(Relation::ActorsFollower.def()) + .select_only() + .select_column(super::actor::Column::ApId) + .into_tuple::() + } +} diff --git a/src/model/session.rs b/src/model/session.rs index 0e7dfea..6496d28 100644 --- a/src/model/session.rs +++ b/src/model/session.rs @@ -29,3 +29,9 @@ impl Related for Entity { } impl ActiveModelBehavior for ActiveModel {} + +impl Entity { + pub fn find_by_secret(secret: &str) -> Select { + Entity::find().filter(Column::Secret.eq(secret)) + } +} diff --git a/src/routes/activitypub/activity.rs b/src/routes/activitypub/activity.rs index bc15a35..6881408 100644 --- a/src/routes/activitypub/activity.rs +++ b/src/routes/activitypub/activity.rs @@ -13,8 +13,8 @@ pub async fn view( let aid = ctx.aid(&id); if auth.is_local() && query.fetch && !ctx.is_local(&aid) { let obj = ctx.fetch_activity(&aid).await?; - if obj.id != aid { - return Err(UpubError::Redirect(obj.id)); + if obj.ap_id != aid { + return Err(UpubError::Redirect(obj.ap_id)); } } diff --git a/src/routes/activitypub/auth.rs b/src/routes/activitypub/auth.rs index acb01b5..029167a 100644 --- a/src/routes/activitypub/auth.rs +++ b/src/routes/activitypub/auth.rs @@ -25,13 +25,17 @@ pub async fn login( // TODO salt the pwd match model::credential::Entity::find() .filter(Condition::all() - .add(model::credential::Column::Email.eq(login.email)) + .add(model::credential::Column::Login.eq(login.email)) .add(model::credential::Column::Password.eq(sha256::digest(login.password))) ) .one(ctx.db()) .await? { Some(x) => { + let user = model::actor::Entity::find_by_id(x.actor) + .one(ctx.db()) + .await? + .ok_or_else(UpubError::not_found)?; // TODO should probably use crypto-safe rng let token : String = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) @@ -41,7 +45,8 @@ pub async fn login( let expires = chrono::Utc::now() + std::time::Duration::from_secs(3600 * 6); model::session::Entity::insert( model::session::ActiveModel { - id: sea_orm::ActiveValue::Set(token.clone()), + id: sea_orm::ActiveValue::NotSet, + secret: sea_orm::ActiveValue::Set(token.clone()), actor: sea_orm::ActiveValue::Set(x.id.clone()), expires: sea_orm::ActiveValue::Set(expires), } @@ -50,7 +55,7 @@ pub async fn login( .await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(AuthSuccess { token, expires, - user: x.id + user: user.ap_id, })) }, None => Err(UpubError::unauthorized()), diff --git a/src/routes/activitypub/object/mod.rs b/src/routes/activitypub/object/mod.rs index 59a4fc9..ba7a1da 100644 --- a/src/routes/activitypub/object/mod.rs +++ b/src/routes/activitypub/object/mod.rs @@ -18,8 +18,8 @@ pub async fn view( if auth.is_local() && query.fetch && !ctx.is_local(&oid) { let obj = ctx.fetch_object(&oid).await?; // some implementations serve statuses on different urls than their AP id - if obj.id != oid { - return Err(UpubError::Redirect(crate::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); + if obj.ap_id != oid { + return Err(UpubError::Redirect(crate::url!(ctx, "/objects/{}", ctx.id(&obj.ap_id)))); } } @@ -62,7 +62,7 @@ pub async fn view( // .set_id(Some(&crate::url!(ctx, "/objects/{id}/replies"))) // .set_first(apb::Node::link(crate::url!(ctx, "/objects/{id}/replies/page"))) .set_collection_type(Some(apb::CollectionType::Collection)) - .set_total_items(Some(object.comments as u64)) + .set_total_items(Some(object.replies as u64)) .set_items(apb::Node::links(replies_ids)) ); } diff --git a/src/routes/activitypub/user/mod.rs b/src/routes/activitypub/user/mod.rs index 5819f24..a8076bf 100644 --- a/src/routes/activitypub/user/mod.rs +++ b/src/routes/activitypub/user/mod.rs @@ -8,7 +8,7 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use apb::{ActorMut, EndpointsMut, Node}; -use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; +use crate::{errors::UpubError, model, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; use super::{jsonld::LD, JsonLD, TryFetch}; @@ -61,7 +61,7 @@ pub async fn view( }, }; - match user::Entity::find_by_id(&uid) + match model::actor::Entity::find_by_ap_id(&uid) .find_also_related(model::config::Entity) .one(ctx.db()).await? { diff --git a/src/routes/activitypub/well_known.rs b/src/routes/activitypub/well_known.rs index 732f690..80293a1 100644 --- a/src/routes/activitypub/well_known.rs +++ b/src/routes/activitypub/well_known.rs @@ -35,7 +35,7 @@ pub async fn nodeinfo_discovery(State(ctx): State) -> Json, Path(version): Path) -> Result, StatusCode> { // TODO it's unsustainable to count these every time, especially comments since it's a complex // filter! keep these numbers caches somewhere, maybe db, so that we can just look them up - let total_users = model::user::Entity::find().count(ctx.db()).await.ok(); + let total_users = model::actor::Entity::find().count(ctx.db()).await.ok(); let total_posts = None; let total_comments = None; let (software, version) = match version.as_str() { @@ -124,19 +124,19 @@ pub async fn webfinger(State(ctx): State, Query(query): Query, Query(query): Query ) -> Result, StatusCode> { - match model::user::Entity::find_by_id(ctx.uid(&id)) + match model::actor::Entity::find_by_ap_id(&ctx.uid(&id)) .find_also_related(model::config::Entity) .one(ctx.db()) .await @@ -28,11 +27,11 @@ pub async fn view( following_count: x.following_count as u64, header: x.image.as_deref().unwrap_or("").to_string(), header_static: x.image.unwrap_or_default(), - id: AccountId::new(x.id.clone()), + id: AccountId::new(x.ap_id.clone()), locked: !cfg.accept_follow_requests, note: x.summary.unwrap_or_default(), statuses_count: 0, // TODO keep track in each user - url: x.id, + url: x.ap_id, username: x.preferred_username, source: None, moved: None, diff --git a/src/server/admin.rs b/src/server/admin.rs index 0ee1dd7..e24af96 100644 --- a/src/server/admin.rs +++ b/src/server/admin.rs @@ -1,4 +1,6 @@ -use sea_orm::{EntityTrait, IntoActiveModel}; +use sea_orm::{ActiveValue::{Set, NotSet}, EntityTrait}; + +use crate::errors::UpubError; #[axum::async_trait] pub trait Administrable { @@ -24,56 +26,69 @@ impl Administrable for super::Context { avatar_url: Option, banner_url: Option, ) -> crate::Result<()> { + let local_instance = crate::model::instance::Entity::find_by_domain(self.domain()) + .one(self.db()) + .await? + .ok_or_else(UpubError::internal_server_error)?; let key = openssl::rsa::Rsa::generate(2048).unwrap(); let ap_id = self.uid(&username); let db = self.db(); let domain = self.domain().to_string(); - let user_model = crate::model::user::Model { - id: ap_id.clone(), - name: display_name, - domain, summary, - preferred_username: username.clone(), - following: None, - following_count: 0, - followers: None, - followers_count: 0, - statuses_count: 0, - icon: avatar_url, - image: banner_url, - inbox: None, - shared_inbox: None, - outbox: None, - actor_type: apb::ActorType::Person, - created: chrono::Utc::now(), - updated: chrono::Utc::now(), - private_key: Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string()), - public_key: std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string(), + let user_model = crate::model::actor::ActiveModel { + id: NotSet, + ap_id: Set(ap_id.clone()), + name: Set(display_name), + instance: Set(local_instance.id), + summary: Set(summary), + preferred_username: Set(username.clone()), + following: Set(None), + following_count: Set(0), + followers: Set(None), + followers_count: Set(0), + statuses_count: Set(0), + icon: Set(avatar_url), + image: Set(banner_url), + inbox: Set(None), + shared_inbox: Set(None), + outbox: Set(None), + actor_type: Set(apb::ActorType::Person), + created: Set(chrono::Utc::now()), + updated: Set(chrono::Utc::now()), + private_key: Set(Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string())), + public_key: Set(std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string()), }; - crate::model::user::Entity::insert(user_model.into_active_model()) + crate::model::actor::Entity::insert(user_model) .exec(db) .await?; - let config_model = crate::model::config::Model { - id: ap_id.clone(), - accept_follow_requests: true, - show_followers_count: true, - show_following_count: true, - show_followers: false, - show_following: false, + let user_model = crate::model::actor::Entity::find_by_ap_id(&ap_id) + .one(db) + .await? + .ok_or_else(UpubError::internal_server_error)?; + + let config_model = crate::model::config::ActiveModel { + id: NotSet, + actor: Set(user_model.id), + accept_follow_requests: Set(true), + show_followers_count: Set(true), + show_following_count: Set(true), + show_followers: Set(false), + show_following: Set(false), }; - crate::model::config::Entity::insert(config_model.into_active_model()) + crate::model::config::Entity::insert(config_model) .exec(db) .await?; - let credentials_model = crate::model::credential::Model { - id: ap_id, - email: username, - password, + let credentials_model = crate::model::credential::ActiveModel { + id: NotSet, + actor: Set(user_model.id), + login: Set(username), + password: Set(password), }; - crate::model::credential::Entity::insert(credentials_model.into_active_model()) + crate::model::credential::Entity::insert(credentials_model) .exec(db) .await?; diff --git a/src/server/auth.rs b/src/server/auth.rs index 2470e86..bcf7f7b 100644 --- a/src/server/auth.rs +++ b/src/server/auth.rs @@ -9,8 +9,8 @@ use super::{fetcher::Fetcher, httpsign::HttpSignature}; #[derive(Debug, Clone)] pub enum Identity { Anonymous, - Local(String), - Remote(String), + Local(i64), + Remote(i64), } impl Identity { @@ -18,27 +18,34 @@ impl Identity { let base_cond = Condition::any().add(model::addressing::Column::Actor.eq(apb::target::PUBLIC)); match self { Identity::Anonymous => base_cond, - Identity::Remote(server) => base_cond.add(model::addressing::Column::Server.eq(server)), + Identity::Remote(server_id) => base_cond.add(model::addressing::Column::Instance.eq(*server_id)), // TODO should we allow all users on same server to see? or just specific user?? - Identity::Local(uid) => base_cond - .add(model::addressing::Column::Actor.eq(uid)) - .add(model::activity::Column::Actor.eq(uid)) - .add(model::object::Column::AttributedTo.eq(uid)), + Identity::Local(user_id) => base_cond + .add(model::addressing::Column::Actor.eq(*user_id)) + .add(model::activity::Column::Actor.eq(*user_id)) + .add(model::object::Column::AttributedTo.eq(*user_id)), } } - pub fn my_id(&self) -> Option<&str> { + pub fn user_id(&self) -> Option { match self { - Identity::Local(x) => Some(x.as_str()), + Identity::Local(x) => Some(*x), _ => None, } } - pub fn is(&self, id: &str) -> bool { + pub fn server_id(&self) -> Option { + match self { + Identity::Remote(x) => Some(*x), + _ => None, + } + } + + pub fn is(&self, id: i64) -> bool { match self { Identity::Anonymous => false, Identity::Remote(_) => false, // TODO per-actor server auth should check this - Identity::Local(uid) => uid.as_str() == id + Identity::Local(user_id) => *user_id == id } } @@ -54,18 +61,12 @@ impl Identity { matches!(self, Self::Remote(_)) } - pub fn is_local_user(&self, uid: &str) -> bool { - match self { - Self::Local(x) => x == uid, - _ => false, - } + pub fn is_user(&self, usr: i64) -> bool { + self.user_id().map(|id| id == usr).unwrap_or(false) } - pub fn is_remote_server(&self, uid: &str) -> bool { - match self { - Self::Remote(x) => x == uid, - _ => false, - } + pub fn is_server(&self, server: i64) -> bool { + self.server_id().map(|id| id == server).unwrap_or(false) } } @@ -90,7 +91,7 @@ where .unwrap_or(""); if auth_header.starts_with("Bearer ") { - match model::session::Entity::find_by_id(auth_header.replace("Bearer ", "")) + match model::session::Entity::find_by_secret(&auth_header.replace("Bearer ", "")) .filter(model::session::Column::Expires.gt(chrono::Utc::now())) .one(ctx.db()) .await diff --git a/src/server/context.rs b/src/server/context.rs index 2021622..3ee6a1a 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeSet, sync::Arc}; use openssl::rsa::Rsa; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; +use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; use crate::{config::Config, model, server::fetcher::Fetcher}; use uriproxy::UriClass; @@ -19,7 +19,7 @@ struct ContextInner { base_url: String, dispatcher: Dispatcher, // TODO keep these pre-parsed - app: model::application::Model, + app: model::actor::Model, relays: BTreeSet, } @@ -42,45 +42,52 @@ impl Context { if domain.starts_with("http") { domain = domain.replace("https://", "").replace("http://", ""); } + let base_url = format!("{}{}", protocol, domain); + let dispatcher = Dispatcher::default(); for _ in 0..1 { // TODO customize delivery workers amount dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! } - let app = match model::application::Entity::find().one(&db).await? { + + let app = match model::actor::Entity::find_by_ap_id(&base_url).one(&db).await? { Some(model) => model, None => { tracing::info!("generating application keys"); let rsa = Rsa::generate(2048)?; let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string(); let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string(); - let system = model::application::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - private_key: sea_orm::ActiveValue::Set(privk.clone()), - public_key: sea_orm::ActiveValue::Set(pubk.clone()), - created: sea_orm::ActiveValue::Set(chrono::Utc::now()), + let system = model::actor::ActiveModel { + id: NotSet, + ap_id: Set(base_url.clone()), + instance: NotSet, // TODO!!! this will fail + preferred_username: Set(domain.clone()), + name: Set(Some("μpub".to_string())), + icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())), + actor_type: Set(apb::ActorType::Application), + private_key: Set(Some(privk.clone())), + public_key: Set(pubk.clone()), + created: Set(chrono::Utc::now()), + updated: Set(chrono::Utc::now()), + ..Default::default() }; - model::application::Entity::insert(system).exec(&db).await?; + model::actor::Entity::insert(system).exec(&db).await?; // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time - model::application::Entity::find().one(&db).await?.expect("could not find app config just inserted") + model::actor::Entity::find_by_ap_id(&base_url).one(&db).await?.expect("could not find app config just inserted") } }; - let relays = model::relay::Entity::find() - .select_only() - .select_column(model::relay::Column::Id) - .filter(model::relay::Column::Accepted.eq(true)) + let relays = model::relation::Entity::find_followers(&base_url) .into_tuple::() .all(&db) .await?; Ok(Context(Arc::new(ContextInner { - base_url: format!("{}{}", protocol, domain), - db, domain, protocol, app, dispatcher, config, + base_url, db, domain, protocol, app, dispatcher, config, relays: BTreeSet::from_iter(relays.into_iter()), }))) } - pub fn app(&self) -> &model::application::Model { + pub fn app(&self) -> &model::actor::Model { &self.0.app } diff --git a/src/server/dispatcher.rs b/src/server/dispatcher.rs index 4a54550..3ae0e3c 100644 --- a/src/server/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -72,7 +72,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker tracing::info!("delivering {} to {}", delivery.activity, delivery.target); - let payload = match model::activity::Entity::find_by_id(&delivery.activity) + let payload = match model::activity::Entity::find_by_id(delivery.activity) .find_also_related(model::object::Entity) .one(db) .await? // TODO probably should not fail here and at least re-insert the delivery @@ -99,29 +99,23 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker }, }; - let key = if delivery.actor == format!("https://{domain}") { - let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find() - .one(db).await? - else { - tracing::error!("no private key configured for application"); - continue; - }; - key - } else { - let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor) - .one(db).await? - else { - tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor); - continue; - }; - key + let Some(actor) = model::actor::Entity::find_by_id(delivery.actor) + .one(db) + .await? + else { + tracing::error!("failed delivery, missing actor {}", delivery.actor); + continue; }; + let Some(key) = actor.private_key else { + tracing::error!("can not dispatch activity for actor without private key: {}", delivery.actor); + continue; + }; if let Err(e) = Context::request( Method::POST, &delivery.target, Some(&serde_json::to_string(&payload).unwrap()), - &delivery.actor, &key, domain + &actor.ap_id, &key, domain ).await { tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); let new_delivery = model::delivery::ActiveModel { diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 44830ff..a56baed 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -1,9 +1,9 @@ use std::collections::BTreeMap; -use apb::{target::Addressed, Activity, Base, Collection, CollectionPage, Link, Object}; +use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, CollectionPage, Link, Object}; use base64::Engine; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; +use sea_orm::EntityTrait; use crate::{errors::UpubError, model, VERSION}; @@ -13,14 +13,14 @@ use super::{httpsign::HttpSignature, normalizer::Normalizer, Context}; pub trait Fetcher { async fn webfinger(&self, user: &str, host: &str) -> crate::Result; - async fn fetch_user(&self, id: &str) -> crate::Result; - async fn pull_user(&self, id: &str) -> crate::Result; + async fn fetch_user(&self, id: &str) -> crate::Result; + async fn pull_user(&self, id: &str) -> crate::Result; async fn fetch_object(&self, id: &str) -> crate::Result; - async fn pull_object(&self, id: &str) -> crate::Result; + async fn pull_object(&self, id: &str) -> crate::Result; async fn fetch_activity(&self, id: &str) -> crate::Result; - async fn pull_activity(&self, id: &str) -> crate::Result; + async fn pull_activity(&self, id: &str) -> crate::Result; async fn fetch_thread(&self, id: &str) -> crate::Result<()>; @@ -115,80 +115,100 @@ impl Fetcher for Context { } - async fn fetch_user(&self, id: &str) -> crate::Result { - if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? { + async fn fetch_user(&self, id: &str) -> crate::Result { + if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } - let user_model = self.pull_user(id).await?; + // TODO PULL INSTANCE!!!!!!! + + let user = self.pull_user(id).await?; + let user_model = model::actor::ActiveModel::new(&user, 0)?; // TODO this may fail: while fetching, remote server may fetch our service actor. // if it does so with http signature, we will fetch that actor in background // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - model::user::Entity::insert(user_model.clone().into_active_model()) + model::actor::Entity::insert(user_model) .exec(self.db()).await?; - Ok(user_model) + // TODO we could fetch only the internal id and avoid getting back the whole user, but this + // happens rarely anyway because after the first time we just get the cached one in our db + let user = model::actor::Entity::find_by_ap_id(id) + .one(self.db()) + .await? + .ok_or_else(UpubError::internal_server_error)?; + + Ok(user) } - async fn pull_user(&self, id: &str) -> crate::Result { - let user = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), - ).await?.json::().await?; - let mut user_model = model::user::Model::new(&user)?; + async fn pull_user(&self, id: &str) -> crate::Result { + let pkey = self.app().private_key.ok_or_else(UpubError::internal_server_error)?; - // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs - if let Some(followers_url) = &user_model.followers { + let mut user = Self::request( + Method::GET, id, None, &format!("https://{}", self.domain()), &pkey, self.domain(), + ) + .await? + .json::() + .await?; + + // TODO try fetching these numbers from audience/generator fields + if let Some(followers_url) = user.followers().id() { let req = Self::request( - Method::GET, followers_url, None, - &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + Method::GET, &followers_url, None, + &format!("https://{}", self.domain()), &pkey, self.domain(), ).await; if let Ok(res) = req { if let Ok(user_followers) = res.json::().await { if let Some(total) = user_followers.total_items() { - user_model.followers_count = total as i64; + user = user.set_followers_count(Some(total)); } } } } - if let Some(following_url) = &user_model.following { + if let Some(following_url) = user.following().id() { let req = Self::request( - Method::GET, following_url, None, - &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + Method::GET, &following_url, None, + &format!("https://{}", self.domain()), &pkey, self.domain(), ).await; if let Ok(res) = req { if let Ok(user_following) = res.json::().await { if let Some(total) = user_following.total_items() { - user_model.following_count = total as i64; + user = user.set_following_count(Some(total)); } } } } - Ok(user_model) + Ok(user) } async fn fetch_activity(&self, id: &str) -> crate::Result { - if let Some(x) = model::activity::Entity::find_by_id(id).one(self.db()).await? { + if let Some(x) = model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } - let activity_model = self.pull_activity(id).await?; + let activity = self.pull_activity(id).await?; + let activity_model = model::activity::ActiveModel::new(&activity)?; - model::activity::Entity::insert(activity_model.clone().into_active_model()) - .exec(self.db()).await?; + model::activity::Entity::insert(activity_model).exec(self.db()).await?; + + let activity = model::activity::Entity::find_by_ap_id(id) + .one(self.db()) + .await? + .ok_or_else(UpubError::internal_server_error)?; - let addressed = activity_model.addressed(); + let addressed = activity.addressed(); let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?; + self.address_to(Some(&activity.ap_id), None, &expanded_addresses).await?; - Ok(activity_model) + Ok(activity) } - async fn pull_activity(&self, id: &str) -> crate::Result { + async fn pull_activity(&self, id: &str) -> crate::Result { + let pkey = self.app().private_key.ok_or_else(UpubError::internal_server_error)?; let activity = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + Method::GET, id, None, &format!("https://{}", self.domain()), &pkey, self.domain(), ).await?.json::().await?; if let Some(activity_actor) = activity.actor().id() { @@ -203,94 +223,47 @@ impl Fetcher for Context { } } - let activity_model = model::activity::Model::new(&activity)?; - - Ok(activity_model) + Ok(activity) } async fn fetch_thread(&self, id: &str) -> crate::Result<()> { - crawl_replies(self, id, 0).await + // crawl_replies(self, id, 0).await + todo!() } async fn fetch_object(&self, id: &str) -> crate::Result { fetch_object_inner(self, id, 0).await } - async fn pull_object(&self, id: &str) -> crate::Result { + async fn pull_object(&self, id: &str) -> crate::Result { + let pkey = self.app().private_key.ok_or_else(UpubError::internal_server_error)?; + let object = Context::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), - ).await?.json::().await?; + Method::GET, id, None, &format!("https://{}", self.domain()), &pkey, self.domain(), + ) + .await? + .json::() + .await?; - Ok(model::object::Model::new(&object)?) + Ok(object) } } -#[async_recursion::async_recursion] -async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> { - tracing::info!("crawling replies of '{id}'"); - let object = Context::request( - Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), - ).await?.json::().await?; - - let object_model = model::object::Model::new(&object)?; - match model::object::Entity::insert(object_model.into_active_model()) - .exec(ctx.db()).await - { - Ok(_) => {}, - Err(sea_orm::DbErr::RecordNotInserted) => {}, - Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite - Err(e) => return Err(e.into()), - } - - if depth > 16 { - tracing::warn!("stopping thread crawling: too deep!"); - return Ok(()); - } - - let mut page_url = match object.replies().get() { - Some(serde_json::Value::String(x)) => { - let replies = Context::request( - Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), - ).await?.json::().await?; - replies.first().id() - }, - Some(serde_json::Value::Object(x)) => { - let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO! - obj.first().id() - }, - _ => return Ok(()), - }; - - while let Some(ref url) = page_url { - let replies = Context::request( - Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), - ).await?.json::().await?; - - for reply in replies.items() { - // TODO right now it crawls one by one, could be made in parallel but would be quite more - // abusive, so i'll keep it like this while i try it out - crawl_replies(ctx, reply.href(), depth + 1).await?; - } - - page_url = replies.next().id(); - } - - Ok(()) -} - #[async_recursion::async_recursion] async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result { - if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? { + if let Some(x) = model::object::Entity::find_by_ap_id(id).one(ctx.db()).await? { return Ok(x); // already in db, easy } + let pkey = ctx.app().private_key.ok_or_else(UpubError::internal_server_error)?; + let object = Context::request( - Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), + Method::GET, id, None, &format!("https://{}", ctx.domain()), &pkey, ctx.domain(), ).await?.json::().await?; if let Some(oid) = object.id() { if oid != id { - if let Some(x) = model::object::Entity::find_by_id(oid).one(ctx.db()).await? { + if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? { return Ok(x); // already in db, but with id different that given url } } @@ -315,7 +288,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res let object_model = ctx.insert_object(object, None).await?; let expanded_addresses = ctx.expand_addressing(addressed).await?; - ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?; + ctx.address_to(None, Some(&object_model.ap_id), &expanded_addresses).await?; Ok(object_model) } @@ -330,7 +303,7 @@ impl Fetchable for apb::Node { async fn fetch(&mut self, ctx: &crate::server::Context) -> crate::Result<&mut Self> { if let apb::Node::Link(uri) = self { let from = format!("{}{}", ctx.protocol(), ctx.domain()); // TODO helper to avoid this? - let pkey = &ctx.app().private_key; + let pkey = &ctx.app().private_key.ok_or_else(UpubError::internal_server_error)?; *self = Context::request(Method::GET, uri.href(), None, &from, pkey, ctx.domain()) .await? .json::() @@ -341,3 +314,58 @@ impl Fetchable for apb::Node { Ok(self) } } + + + +// #[async_recursion::async_recursion] +// async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> { +// tracing::info!("crawling replies of '{id}'"); +// let object = Context::request( +// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), +// ).await?.json::().await?; +// +// let object_model = model::object::Model::new(&object)?; +// match model::object::Entity::insert(object_model.into_active_model()) +// .exec(ctx.db()).await +// { +// Ok(_) => {}, +// Err(sea_orm::DbErr::RecordNotInserted) => {}, +// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite +// Err(e) => return Err(e.into()), +// } +// +// if depth > 16 { +// tracing::warn!("stopping thread crawling: too deep!"); +// return Ok(()); +// } +// +// let mut page_url = match object.replies().get() { +// Some(serde_json::Value::String(x)) => { +// let replies = Context::request( +// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), +// ).await?.json::().await?; +// replies.first().id() +// }, +// Some(serde_json::Value::Object(x)) => { +// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO! +// obj.first().id() +// }, +// _ => return Ok(()), +// }; +// +// while let Some(ref url) = page_url { +// let replies = Context::request( +// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), +// ).await?.json::().await?; +// +// for reply in replies.items() { +// // TODO right now it crawls one by one, could be made in parallel but would be quite more +// // abusive, so i'll keep it like this while i try it out +// crawl_replies(ctx, reply.href(), depth + 1).await?; +// } +// +// page_url = replies.next().id(); +// } +// +// Ok(()) +// }