diff --git a/src/migrations/m20240524_000001_create_actor_activity_object_tables.rs b/src/migrations/m20240524_000001_create_actor_activity_object_tables.rs index 7c3439f..a7143af 100644 --- a/src/migrations/m20240524_000001_create_actor_activity_object_tables.rs +++ b/src/migrations/m20240524_000001_create_actor_activity_object_tables.rs @@ -21,7 +21,7 @@ pub enum Actors { StatusesCount, PublicKey, PrivateKey, - Created, + Published, Updated, } @@ -159,7 +159,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Actors::StatusesCount).integer().not_null().default(0)) .col(ColumnDef::new(Actors::PublicKey).string().not_null()) .col(ColumnDef::new(Actors::PrivateKey).string().null()) - .col(ColumnDef::new(Actors::Created).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Actors::Published).date_time().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Actors::Updated).date_time().not_null().default(Expr::current_timestamp())) .to_owned() ) @@ -174,7 +174,7 @@ impl MigrationTrait for Migration { .await?; manager - .create_index(Index::create().name("index-actors-instance").table(Actors::Table).col(Actors::Instance).to_owned()) + .create_index(Index::create().name("index-actors-domain").table(Actors::Table).col(Actors::Domain).to_owned()) .await?; @@ -321,7 +321,7 @@ impl MigrationTrait for Migration { .await?; manager - .drop_index(Index::drop().name("index-actors-instance").table(Actors::Table).to_owned()) + .drop_index(Index::drop().name("index-actors-domain").table(Actors::Table).to_owned()) .await?; diff --git a/src/migrations/m20240524_000002_create_relations_likes_shares.rs b/src/migrations/m20240524_000002_create_relations_likes_shares.rs index cceddd9..fbd8504 100644 --- a/src/migrations/m20240524_000002_create_relations_likes_shares.rs +++ b/src/migrations/m20240524_000002_create_relations_likes_shares.rs @@ -136,7 +136,7 @@ impl MigrationTrait for Migration { .await?; manager - .create_index(Index::create().name("index-likes-object").table(Likes::Table).col(Likes::Likes).to_owned()) + .create_index(Index::create().name("index-likes-object").table(Likes::Table).col(Likes::Object).to_owned()) .await?; manager @@ -190,7 +190,7 @@ impl MigrationTrait for Migration { .await?; manager - .create_index(Index::create().name("index-announces-object").table(Announces::Table).col(Announces::Announces).to_owned()) + .create_index(Index::create().name("index-announces-object").table(Announces::Table).col(Announces::Object).to_owned()) .await?; Ok(()) diff --git a/src/migrations/m20240524_000004_create_addressing_deliveries.rs b/src/migrations/m20240524_000004_create_addressing_deliveries.rs index 2c82461..0e1ecb2 100644 --- a/src/migrations/m20240524_000004_create_addressing_deliveries.rs +++ b/src/migrations/m20240524_000004_create_addressing_deliveries.rs @@ -20,7 +20,7 @@ pub enum Deliveries { Actor, Target, Activity, - Created, + Published, NotBefore, Attempt, } @@ -141,7 +141,7 @@ impl MigrationTrait for Migration { .on_update(ForeignKeyAction::Cascade) .on_delete(ForeignKeyAction::Cascade) ) - .col(ColumnDef::new(Deliveries::Created).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Deliveries::Published).date_time().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null().default(Expr::current_timestamp())) .col(ColumnDef::new(Deliveries::Attempt).integer().not_null().default(0)) .to_owned() diff --git a/src/migrations/m20240524_000005_create_attachments_tags_mentions.rs b/src/migrations/m20240524_000005_create_attachments_tags_mentions.rs index bf0c2c5..863bcf5 100644 --- a/src/migrations/m20240524_000005_create_attachments_tags_mentions.rs +++ b/src/migrations/m20240524_000005_create_attachments_tags_mentions.rs @@ -11,7 +11,7 @@ pub enum Attachments { Object, Name, MediaType, - Created, + Published, } #[derive(DeriveIden)] @@ -63,7 +63,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Attachments::DocumentType).string().not_null()) .col(ColumnDef::new(Attachments::Name).string().null()) .col(ColumnDef::new(Attachments::MediaType).string().not_null()) - .col(ColumnDef::new(Attachments::Created).date_time().not_null().default(Expr::current_timestamp())) + .col(ColumnDef::new(Attachments::Published).date_time().not_null().default(Expr::current_timestamp())) .to_owned() ) .await?; diff --git a/src/model/actor.rs b/src/model/actor.rs index cd41d08..6a8086f 100644 --- a/src/model/actor.rs +++ b/src/model/actor.rs @@ -29,7 +29,7 @@ pub struct Model { pub statuses_count: i32, pub public_key: String, pub private_key: Option, - pub created: ChronoDateTimeUtc, + pub published: ChronoDateTimeUtc, pub updated: ChronoDateTimeUtc, } @@ -169,7 +169,7 @@ impl ActiveModel { shared_inbox: sea_orm::ActiveValue::Set(object.endpoints().get().and_then(|x| Some(x.shared_inbox()?.to_string()))), followers: sea_orm::ActiveValue::Set(object.followers().id()), following: sea_orm::ActiveValue::Set(object.following().id()), - created: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())), + published: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())), updated: sea_orm::ActiveValue::Set(chrono::Utc::now()), following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i32), followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i32), @@ -197,7 +197,7 @@ impl Model { .set_document_type(Some(apb::DocumentType::Image)) .set_url(apb::Node::link(i.clone())) ))) - .set_published(Some(self.created)) + .set_published(Some(self.published)) .set_preferred_username(Some(&self.preferred_username)) .set_statuses_count(Some(self.statuses_count as u64)) .set_followers_count(Some(self.followers_count as u64)) diff --git a/src/model/addressing.rs b/src/model/addressing.rs index 2eb6ecf..72ac9b3 100644 --- a/src/model/addressing.rs +++ b/src/model/addressing.rs @@ -106,6 +106,15 @@ impl Event { } } + pub fn internal(&self) -> i64 { + match self { + Event::Tombstone => 0, + Event::Activity(x) => x.internal, + Event::StrayObject { object, liked: _ } => object.internal, + Event::DeepActivity { activity: _, liked: _, object } => object.internal, + } + } + pub fn ap(self, attachment: Option>) -> serde_json::Value { let attachment = match attachment { None => apb::Node::Empty, diff --git a/src/model/attachment.rs b/src/model/attachment.rs index 93c6ea4..ac32246 100644 --- a/src/model/attachment.rs +++ b/src/model/attachment.rs @@ -16,7 +16,7 @@ pub struct Model { pub document_type: DocumentType, pub name: Option, pub media_type: String, - pub created: ChronoDateTimeUtc, + pub published: ChronoDateTimeUtc, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -46,7 +46,7 @@ impl Model { .set_document_type(Some(self.document_type)) .set_media_type(Some(&self.media_type)) .set_name(self.name.as_deref()) - .set_published(Some(self.created)) + .set_published(Some(self.published)) } } diff --git a/src/model/delivery.rs b/src/model/delivery.rs index eac7aa0..b767f67 100644 --- a/src/model/delivery.rs +++ b/src/model/delivery.rs @@ -8,7 +8,7 @@ pub struct Model { pub actor: String, pub target: String, pub activity: String, - pub created: ChronoDateTimeUtc, + pub published: ChronoDateTimeUtc, pub not_before: ChronoDateTimeUtc, pub attempt: i32, } @@ -61,6 +61,6 @@ impl Model { } pub fn expired(&self) -> bool { - chrono::Utc::now() - self.created > chrono::Duration::days(7) + chrono::Utc::now() - self.published > chrono::Duration::days(7) } } diff --git a/src/routes/activitypub/activity.rs b/src/routes/activitypub/activity.rs index bc15a35..a7880bc 100644 --- a/src/routes/activitypub/activity.rs +++ b/src/routes/activitypub/activity.rs @@ -27,7 +27,7 @@ pub async fn view( .ok_or_else(UpubError::not_found)?; let mut attachments = row.load_attachments_batch(ctx.db()).await?; - let attach = attachments.remove(row.id()); + let attach = attachments.remove(&row.internal()); Ok(JsonLD(row.ap(attach).ld_context())) } diff --git a/src/routes/activitypub/application.rs b/src/routes/activitypub/application.rs index 7d9aa4e..594d265 100644 --- a/src/routes/activitypub/application.rs +++ b/src/routes/activitypub/application.rs @@ -2,7 +2,7 @@ use apb::{ActorMut, BaseMut, ObjectMut, PublicKeyMut}; use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Redirect, Response}, Form, Json}; use reqwest::Method; -use crate::{errors::UpubError, server::{auth::{AuthIdentity, Identity}, fetcher::Fetcher, Context}, url}; +use crate::{errors::UpubError, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url}; use super::{jsonld::LD, JsonLD}; @@ -26,7 +26,7 @@ pub async fn view( .set_summary(Some(&ctx.cfg().instance.description)) .set_inbox(apb::Node::link(url!(ctx, "/inbox"))) .set_outbox(apb::Node::link(url!(ctx, "/outbox"))) - .set_published(Some(ctx.app().created)) + .set_published(Some(ctx.app().published)) .set_endpoints(apb::Node::Empty) .set_preferred_username(Some(ctx.domain())) .set_public_key(apb::Node::object( @@ -50,7 +50,7 @@ pub async fn proxy_get( AuthIdentity(auth): AuthIdentity, ) -> crate::Result> { // only local users can request fetches - if !ctx.cfg().security.allow_public_debugger && !matches!(auth, Identity::Local(_)) { + if !ctx.cfg().security.allow_public_debugger && !auth.is_local() { return Err(UpubError::unauthorized()); } Ok(Json( @@ -59,7 +59,7 @@ pub async fn proxy_get( &query.id, None, ctx.base(), - &ctx.app().private_key, + ctx.pkey(), &format!("{}+proxy", ctx.domain()), ) .await? @@ -74,7 +74,7 @@ pub async fn proxy_form( Form(query): Form, ) -> crate::Result> { // only local users can request fetches - if !ctx.cfg().security.allow_public_debugger && !matches!(auth, Identity::Local(_)) { + if !ctx.cfg().security.allow_public_debugger && auth.is_local() { return Err(UpubError::unauthorized()); } Ok(Json( @@ -83,7 +83,7 @@ pub async fn proxy_form( &query.id, None, ctx.base(), - &ctx.app().private_key, + ctx.pkey(), &format!("{}+proxy", ctx.domain()), ) .await? diff --git a/src/routes/activitypub/inbox.rs b/src/routes/activitypub/inbox.rs index 1aec234..a0a0a23 100644 --- a/src/routes/activitypub/inbox.rs +++ b/src/routes/activitypub/inbox.rs @@ -41,7 +41,7 @@ pub async fn post( AuthIdentity(auth): AuthIdentity, Json(activity): Json ) -> crate::Result<()> { - let Identity::Remote(server) = auth else { + let Identity::Remote { domain: server, .. } = auth else { if activity.activity_type() == Some(ActivityType::Delete) { // this is spammy af, ignore them! // we basically received a delete for a user we can't fetch and verify, meaning remote @@ -63,8 +63,7 @@ pub async fn post( return Err(UpubError::bad_request()); }; - // TODO add whitelist of relays - if !server.ends_with(&Context::server(&actor)) { + if !(server == Context::server(&actor)) { return Err(UpubError::unauthorized()); } diff --git a/src/routes/activitypub/user/inbox.rs b/src/routes/activitypub/user/inbox.rs index 1d02f09..5e9a7a9 100644 --- a/src/routes/activitypub/user/inbox.rs +++ b/src/routes/activitypub/user/inbox.rs @@ -10,8 +10,8 @@ pub async fn get( ) -> crate::Result> { match auth { Identity::Anonymous => Err(StatusCode::FORBIDDEN.into()), - Identity::Remote(_) => Err(StatusCode::FORBIDDEN.into()), - Identity::Local(user) => if ctx.uid(&id) == user { + Identity::Remote { .. } => Err(StatusCode::FORBIDDEN.into()), + Identity::Local { id: user, .. } => if ctx.uid(&id) == user { crate::server::builders::collection(&url!(ctx, "/users/{id}/inbox"), None) } else { Err(StatusCode::FORBIDDEN.into()) @@ -25,7 +25,7 @@ pub async fn page( AuthIdentity(auth): AuthIdentity, Query(page): Query, ) -> crate::Result> { - let Identity::Local(uid) = &auth else { + let Identity::Local { id: uid, .. } = &auth else { // local inbox is only for local users return Err(UpubError::forbidden()); }; diff --git a/src/routes/activitypub/user/outbox.rs b/src/routes/activitypub/user/outbox.rs index d1be16f..774eb75 100644 --- a/src/routes/activitypub/user/outbox.rs +++ b/src/routes/activitypub/user/outbox.rs @@ -42,8 +42,8 @@ pub async fn post( ) -> Result { match auth { Identity::Anonymous => Err(StatusCode::UNAUTHORIZED.into()), - Identity::Remote(_) => Err(StatusCode::NOT_IMPLEMENTED.into()), - Identity::Local(uid) => if ctx.uid(&id) == uid { + Identity::Remote { .. } => Err(StatusCode::NOT_IMPLEMENTED.into()), + Identity::Local { id: uid, .. } => if ctx.uid(&id) == uid { tracing::debug!("processing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default()); match activity.base_type() { None => Err(StatusCode::BAD_REQUEST.into()), diff --git a/src/routes/mastodon/accounts.rs b/src/routes/mastodon/accounts.rs index f1bea0e..ef9581f 100644 --- a/src/routes/mastodon/accounts.rs +++ b/src/routes/mastodon/accounts.rs @@ -20,7 +20,7 @@ pub async fn view( acct: x.preferred_username.clone(), avatar: x.icon.as_deref().unwrap_or("").to_string(), avatar_static: x.icon.unwrap_or_default(), - created_at: time::OffsetDateTime::from_unix_timestamp(x.created.timestamp()).unwrap(), + created_at: time::OffsetDateTime::from_unix_timestamp(x.published.timestamp()).unwrap(), display_name: x.name.unwrap_or_default(), // TODO hide these maybe followers_count: x.followers_count as u64, diff --git a/src/server/admin.rs b/src/server/admin.rs index c5f70e9..ca69feb 100644 --- a/src/server/admin.rs +++ b/src/server/admin.rs @@ -28,7 +28,7 @@ impl Administrable for super::Context { let ap_id = self.uid(&username); let db = self.db(); let domain = self.domain().to_string(); - let user_model = crate::model::actor::Model { + let user_model = crate::model::actor::ActiveModel { internal: NotSet, id: Set(ap_id.clone()), name: Set(display_name), @@ -46,7 +46,7 @@ impl Administrable for super::Context { shared_inbox: Set(None), outbox: Set(None), actor_type: Set(apb::ActorType::Person), - created: Set(chrono::Utc::now()), + published: Set(chrono::Utc::now()), updated: Set(chrono::Utc::now()), private_key: Set(Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string())), public_key: Set(std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string()), diff --git a/src/server/auth.rs b/src/server/auth.rs index 2470e86..1bba370 100644 --- a/src/server/auth.rs +++ b/src/server/auth.rs @@ -9,8 +9,14 @@ use super::{fetcher::Fetcher, httpsign::HttpSignature}; #[derive(Debug, Clone)] pub enum Identity { Anonymous, - Local(String), - Remote(String), + Remote { + domain: String, + internal: i64, + }, + Local { + id: String, + internal: i64, + }, } impl Identity { @@ -18,18 +24,18 @@ impl Identity { let base_cond = Condition::any().add(model::addressing::Column::Actor.eq(apb::target::PUBLIC)); match self { Identity::Anonymous => base_cond, - Identity::Remote(server) => base_cond.add(model::addressing::Column::Server.eq(server)), + Identity::Remote { internal, .. } => base_cond.add(model::addressing::Column::Instance.eq(*internal)), // TODO should we allow all users on same server to see? or just specific user?? - Identity::Local(uid) => base_cond - .add(model::addressing::Column::Actor.eq(uid)) - .add(model::activity::Column::Actor.eq(uid)) - .add(model::object::Column::AttributedTo.eq(uid)), + Identity::Local { id, internal } => base_cond + .add(model::addressing::Column::Actor.eq(*internal)) + .add(model::activity::Column::Actor.eq(id)) + .add(model::object::Column::AttributedTo.eq(id)), } } - pub fn my_id(&self) -> Option<&str> { + pub fn my_id(&self) -> Option { match self { - Identity::Local(x) => Some(x.as_str()), + Identity::Local { internal, .. } => Some(*internal), _ => None, } } @@ -37,8 +43,8 @@ impl Identity { pub fn is(&self, id: &str) -> bool { match self { Identity::Anonymous => false, - Identity::Remote(_) => false, // TODO per-actor server auth should check this - Identity::Local(uid) => uid.as_str() == id + Identity::Remote { .. } => false, // TODO per-actor server auth should check this + Identity::Local { id, .. } => id.as_str() == id } } @@ -47,23 +53,16 @@ impl Identity { } pub fn is_local(&self) -> bool { - matches!(self, Self::Local(_)) + matches!(self, Self::Local { .. }) } pub fn is_remote(&self) -> bool { - matches!(self, Self::Remote(_)) + matches!(self, Self::Remote { .. }) } pub fn is_local_user(&self, uid: &str) -> bool { match self { - Self::Local(x) => x == uid, - _ => false, - } - } - - pub fn is_remote_server(&self, uid: &str) -> bool { - match self { - Self::Remote(x) => x == uid, + Self::Local { id, .. } => id == uid, _ => false, } } @@ -90,13 +89,19 @@ where .unwrap_or(""); if auth_header.starts_with("Bearer ") { - match model::session::Entity::find_by_id(auth_header.replace("Bearer ", "")) + match model::session::Entity::find() + .filter(model::session::Column::Secret.eq(auth_header.replace("Bearer ", ""))) .filter(model::session::Column::Expires.gt(chrono::Utc::now())) .one(ctx.db()) .await { - Ok(Some(x)) => identity = Identity::Local(x.actor), Ok(None) => return Err(UpubError::unauthorized()), + Ok(Some(x)) => { + // TODO could we store both actor ap id and internal id in session? to avoid this extra + // lookup on *every* local authed request we receive... + let internal = model::actor::Entity::ap_to_internal(&x.actor, ctx.db()).await?; + identity = Identity::Local { id: x.actor, internal }; + }, Err(e) => { tracing::error!("failed querying user session: {e}"); return Err(UpubError::internal_server_error()) @@ -122,7 +127,13 @@ where .build_from_parts(parts) .verify(&user.public_key) { - Ok(true) => identity = Identity::Remote(Context::server(&user_id)), + Ok(true) => { + // TODO can we avoid this extra db rountrip made on each server fetch? + let domain = Context::server(&user_id); + // TODO this will fail because we never fetch and insert into instance oops + let internal = model::instance::Entity::domain_to_internal(&domain, ctx.db()).await?; + identity = Identity::Remote { domain, internal }; + }, Ok(false) => tracing::warn!("invalid signature: {http_signature:?}"), Err(e) => tracing::error!("error verifying signature: {e}"), }, diff --git a/src/server/builders.rs b/src/server/builders.rs index 7664d63..039883b 100644 --- a/src/server/builders.rs +++ b/src/server/builders.rs @@ -8,7 +8,7 @@ pub async fn paginate( filter: Condition, db: &DatabaseConnection, page: Pagination, - my_id: Option<&str>, + my_id: Option, ) -> crate::Result> { let limit = page.batch.unwrap_or(20).min(50); let offset = page.offset.unwrap_or(0); @@ -27,7 +27,7 @@ pub async fn paginate( let items : Vec = items .into_iter() .map(|item| { - let attach = attachments.remove(item.id()); + let attach = attachments.remove(&item.internal()); item.ap(attach) }) .collect(); diff --git a/src/server/context.rs b/src/server/context.rs index 2021622..3064568 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -1,9 +1,9 @@ use std::{collections::BTreeSet, sync::Arc}; use openssl::rsa::Rsa; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; +use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, RelationTrait, SelectColumns, Set}; -use crate::{config::Config, model, server::fetcher::Fetcher}; +use crate::{config::Config, errors::UpubError, model, server::fetcher::Fetcher}; use uriproxy::UriClass; use super::dispatcher::Dispatcher; @@ -19,7 +19,8 @@ struct ContextInner { base_url: String, dispatcher: Dispatcher, // TODO keep these pre-parsed - app: model::application::Model, + app: model::actor::Model, + pkey: String, relays: BTreeSet, } @@ -46,44 +47,73 @@ impl Context { for _ in 0..1 { // TODO customize delivery workers amount dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! } - let app = match model::application::Entity::find().one(&db).await? { + let base_url = format!("{}{}", protocol, domain); + let app = match model::actor::Entity::find_by_ap_id(&base_url).one(&db).await? { Some(model) => model, None => { tracing::info!("generating application keys"); let rsa = Rsa::generate(2048)?; let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string(); let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string(); - let system = model::application::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - private_key: sea_orm::ActiveValue::Set(privk.clone()), - public_key: sea_orm::ActiveValue::Set(pubk.clone()), - created: sea_orm::ActiveValue::Set(chrono::Utc::now()), + let system = model::actor::ActiveModel { + internal: NotSet, + id: NotSet, + domain: Set(domain.clone()), + preferred_username: Set(domain.clone()), + actor_type: Set(apb::ActorType::Application), + private_key: Set(Some(privk)), + public_key: Set(pubk), + following: Set(None), + following_count: Set(0), + followers: Set(None), + followers_count: Set(0), + statuses_count: Set(0), + summary: Set(Some("micro social network, federated".to_string())), + name: Set(Some("μpub".to_string())), + image: Set(None), + icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())), + inbox: Set(Some(format!("{base_url}/inbox"))), + shared_inbox: Set(Some(format!("{base_url}/inbox"))), + outbox: Set(Some(format!("{base_url}/outbox"))), + published: Set(chrono::Utc::now()), + updated: Set(chrono::Utc::now()), }; - model::application::Entity::insert(system).exec(&db).await?; + model::actor::Entity::insert(system).exec(&db).await?; // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time - model::application::Entity::find().one(&db).await?.expect("could not find app config just inserted") + model::actor::Entity::find().one(&db).await?.expect("could not find app config just inserted") } }; - let relays = model::relay::Entity::find() - .select_only() - .select_column(model::relay::Column::Id) - .filter(model::relay::Column::Accepted.eq(true)) - .into_tuple::() - .all(&db) - .await?; + // TODO maybe we could provide a more descriptive error... + let pkey = app.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string(); + + // TODO how to handle relations when there's two to the same model??? + // let relays = model::relation::Entity::find() + // .filter(model::relation::Column::Following.eq(app.internal)) + // .filter(model::relation::Column::Accept.is_not_null()) + // .left_join(model::relation::Relation::ActorsFollower.def()) + // .select_only() + // .select_column(model::actor::Column::Id) + // .into_tuple::() + // .all(&db) + // .await?; + + let relays = Vec::new(); Ok(Context(Arc::new(ContextInner { - base_url: format!("{}{}", protocol, domain), - db, domain, protocol, app, dispatcher, config, + base_url, db, domain, protocol, app, dispatcher, config, pkey, relays: BTreeSet::from_iter(relays.into_iter()), }))) } - pub fn app(&self) -> &model::application::Model { + pub fn app(&self) -> &model::actor::Model { &self.0.app } + pub fn pkey(&self) -> &str { + &self.0.pkey + } + pub fn db(&self) -> &DatabaseConnection { &self.0.db } @@ -150,6 +180,18 @@ impl Context { id.starts_with(self.base()) } + pub async fn is_local_internal_object(&self, internal: i64) -> crate::Result { + Ok(model::object::Entity::find_by_id(internal).select_only().one(self.db()).await?.is_some()) + } + + pub async fn is_local_internal_activity(&self, internal: i64) -> crate::Result { + Ok(model::activity::Entity::find_by_id(internal).select_only().one(self.db()).await?.is_some()) + } + + pub async fn is_local_internal_actor(&self, internal: i64) -> crate::Result { + Ok(model::actor::Entity::find_by_id(internal).select_only().one(self.db()).await?.is_some()) + } + pub async fn expand_addressing(&self, targets: Vec) -> crate::Result> { let mut out = Vec::new(); for target in targets { @@ -171,26 +213,38 @@ impl Context { Ok(out) } - pub async fn address_to(&self, aid: Option<&str>, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { - let local_activity = aid.map(|x| self.is_local(x)).unwrap_or(false); - let local_object = oid.map(|x| self.is_local(x)).unwrap_or(false); - let addressings : Vec = targets + pub async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> crate::Result<()> { + // TODO address_to became kind of expensive, with these two selects right away and then another + // select for each target we're addressing to... can this be improved?? + let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await? } else { false }; + let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await? } else { false }; + let mut addressing = Vec::new(); + for target in targets .iter() .filter(|to| !to.is_empty()) .filter(|to| !to.ends_with("/followers")) .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to)) - .map(|to| model::addressing::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - server: Set(Context::server(to)), - actor: Set(to.to_string()), - activity: Set(aid.map(|x| x.to_string())), - object: Set(oid.map(|x| x.to_string())), - published: Set(chrono::Utc::now()), - }) - .collect(); + { + let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { + ( + Some(model::instance::Entity::domain_to_internal(&Context::server(&target), self.db()).await?), + Some(model::actor::Entity::ap_to_internal(&target, self.db()).await?), + ) + }; + addressing.push( + model::addressing::ActiveModel { + internal: sea_orm::ActiveValue::NotSet, + instance: Set(server), + actor: Set(actor), + activity: Set(aid), + object: Set(oid), + published: Set(chrono::Utc::now()), + } + ); + } - if !addressings.is_empty() { - model::addressing::Entity::insert_many(addressings) + if !addressing.is_empty() { + model::addressing::Entity::insert_many(addressing) .exec(self.db()) .await?; } @@ -207,15 +261,15 @@ impl Context { { // TODO fetch concurrently match self.fetch_user(target).await { - Ok(model::user::Model { inbox: Some(inbox), .. }) => deliveries.push( + Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( model::delivery::ActiveModel { - id: sea_orm::ActiveValue::NotSet, + internal: sea_orm::ActiveValue::NotSet, actor: Set(from.to_string()), // TODO we should resolve each user by id and check its inbox because we can't assume // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now target: Set(inbox), activity: Set(aid.to_string()), - created: Set(chrono::Utc::now()), + published: Set(chrono::Utc::now()), not_before: Set(chrono::Utc::now()), attempt: Set(0), } @@ -238,7 +292,9 @@ impl Context { pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { let addressed = self.expand_addressing(activity_targets).await?; - self.address_to(Some(aid), oid, &addressed).await?; + let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; + let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None }; + self.address_to(Some(internal_aid), internal_oid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?; Ok(()) } diff --git a/src/server/dispatcher.rs b/src/server/dispatcher.rs index f48f5a6..e24620c 100644 --- a/src/server/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -125,7 +125,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker actor: sea_orm::ActiveValue::Set(delivery.actor), target: sea_orm::ActiveValue::Set(delivery.target), activity: sea_orm::ActiveValue::Set(delivery.activity), - created: sea_orm::ActiveValue::Set(delivery.created), + published: sea_orm::ActiveValue::Set(delivery.published), attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1), }; model::delivery::Entity::insert(new_delivery).exec(db).await?; diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 9abeb88..4762fc1 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Link, Object}; +use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object}; use base64::Engine; use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use sea_orm::EntityTrait; @@ -139,14 +139,14 @@ impl Fetcher for Context { async fn pull_user(&self, id: &str) -> crate::Result { let mut user = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), ).await?.json::().await?; // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs if let Some(followers_url) = &user.followers().id() { let req = Self::request( Method::GET, followers_url, None, - &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + &format!("https://{}", self.domain()), self.pkey(), self.domain(), ).await; if let Ok(res) = req { if let Ok(user_followers) = res.json::().await { @@ -160,7 +160,7 @@ impl Fetcher for Context { if let Some(following_url) = &user.following().id() { let req = Self::request( Method::GET, following_url, None, - &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + &format!("https://{}", self.domain()), self.pkey(), self.domain(), ).await; if let Ok(res) = req { if let Ok(user_following) = res.json::().await { @@ -191,14 +191,14 @@ impl Fetcher for Context { let addressed = activity.addressed(); let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(&activity.id), None, &expanded_addresses).await?; + self.address_to(Some(activity.internal), None, &expanded_addresses).await?; Ok(activity) } async fn pull_activity(&self, id: &str) -> crate::Result { let activity = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), ).await?.json::().await?; if let Some(activity_actor) = activity.actor().id() { @@ -228,7 +228,7 @@ impl Fetcher for Context { async fn pull_object(&self, id: &str) -> crate::Result { Ok( Context::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(), ) .await? .json::() @@ -244,7 +244,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res } let object = Context::request( - Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), + Method::GET, id, None, &format!("https://{}", ctx.domain()), ctx.pkey(), ctx.domain(), ).await?.json::().await?; if let Some(oid) = object.id() { @@ -274,7 +274,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res let object_model = ctx.insert_object(object, None).await?; let expanded_addresses = ctx.expand_addressing(addressed).await?; - ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?; + ctx.address_to(None, Some(object_model.internal), &expanded_addresses).await?; Ok(object_model) } @@ -288,9 +288,7 @@ pub trait Fetchable : Sync + Send { impl Fetchable for apb::Node { async fn fetch(&mut self, ctx: &crate::server::Context) -> crate::Result<&mut Self> { if let apb::Node::Link(uri) = self { - let from = format!("{}{}", ctx.protocol(), ctx.domain()); // TODO helper to avoid this? - let pkey = &ctx.app().private_key; - *self = Context::request(Method::GET, uri.href(), None, &from, pkey, ctx.domain()) + *self = Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain()) .await? .json::() .await? diff --git a/src/server/normalizer.rs b/src/server/normalizer.rs index a4d2fde..4d358c2 100644 --- a/src/server/normalizer.rs +++ b/src/server/normalizer.rs @@ -83,7 +83,7 @@ impl Normalizer for super::Context { document_type: Set(apb::DocumentType::Page), name: Set(l.link_name().map(|x| x.to_string())), media_type: Set(l.link_media_type().unwrap_or("link").to_string()), - created: Set(chrono::Utc::now()), + published: Set(chrono::Utc::now()), }, Node::Object(o) => model::attachment::ActiveModel { internal: sea_orm::ActiveValue::NotSet, @@ -92,7 +92,7 @@ impl Normalizer for super::Context { document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), name: Set(o.name().map(|x| x.to_string())), media_type: Set(o.media_type().unwrap_or("link").to_string()), - created: Set(o.published().unwrap_or_else(chrono::Utc::now)), + published: Set(o.published().unwrap_or_else(chrono::Utc::now)), }, }; model::attachment::Entity::insert(attachment_model) @@ -120,7 +120,7 @@ impl Normalizer for super::Context { document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))), name: Set(img.name().map(|x| x.to_string())), media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()), - created: Set(img.published().unwrap_or_else(chrono::Utc::now)), + published: Set(img.published().unwrap_or_else(chrono::Utc::now)), }; model::attachment::Entity::insert(attachment_model) .exec(self.db()) diff --git a/src/server/outbox.rs b/src/server/outbox.rs index 6131b59..742c323 100644 --- a/src/server/outbox.rs +++ b/src/server/outbox.rs @@ -1,6 +1,6 @@ use apb::{target::Addressed, Activity, ActivityMut, ActorMut, BaseMut, Node, Object, ObjectMut, PublicKeyMut}; use reqwest::StatusCode; -use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; +use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns}; use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD}; @@ -25,11 +25,11 @@ impl apb::server::Outbox for Context { if let Some(c) = content { let mut tmp = mdhtml::safe_markdown(&c); for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) { - if let Ok(Some(uid)) = model::user::Entity::find() - .filter(model::user::Column::PreferredUsername.eq(user)) - .filter(model::user::Column::Domain.eq(domain)) + if let Ok(Some(uid)) = model::actor::Entity::find() + .filter(model::actor::Column::PreferredUsername.eq(user)) + .filter(model::actor::Column::Domain.eq(domain)) .select_only() - .select_column(model::user::Column::Id) + .select_column(model::actor::Column::Id) .into_tuple::() .one(self.db()) .await @@ -178,12 +178,12 @@ impl apb::server::Outbox for Context { match accepted_activity.activity_type { apb::ActivityType::Follow => { - model::user::Entity::update_many() + model::actor::Entity::update_many() .col_expr( - model::user::Column::FollowersCount, - Expr::col(model::user::Column::FollowersCount).add(1) + model::actor::Column::FollowersCount, + Expr::col(model::actor::Column::FollowersCount).add(1) ) - .filter(model::user::Column::Id.eq(&uid)) + .filter(model::actor::Column::Id.eq(&uid)) .exec(self.db()) .await?; model::relation::Entity::insert( @@ -303,7 +303,7 @@ impl apb::server::Outbox for Context { match object_node.object_type() { Some(apb::ObjectType::Actor(_)) => { - let mut actor_model = model::user::Model::new( + let mut actor_model = model::actor::Model::new( &object_node // TODO must set these, but we will ignore them .set_actor_type(Some(apb::ActorType::Person)) @@ -311,7 +311,7 @@ impl apb::server::Outbox for Context { serde_json::Value::new_object().set_public_key_pem("") )) )?; - let old_actor_model = model::user::Entity::find_by_id(&actor_model.id) + let old_actor_model = model::actor::Entity::find_by_id(&actor_model.id) .one(self.db()) .await? .ok_or_else(UpubError::not_found)?; @@ -328,12 +328,12 @@ impl apb::server::Outbox for Context { let mut update_model = actor_model.into_active_model(); update_model.updated = sea_orm::Set(chrono::Utc::now()); - update_model.reset(model::user::Column::Name); - update_model.reset(model::user::Column::Summary); - update_model.reset(model::user::Column::Image); - update_model.reset(model::user::Column::Icon); + update_model.reset(model::actor::Column::Name); + update_model.reset(model::actor::Column::Summary); + update_model.reset(model::actor::Column::Image); + update_model.reset(model::actor::Column::Icon); - model::user::Entity::update(update_model) + model::actor::Entity::update(update_model) .exec(self.db()).await?; }, Some(apb::ObjectType::Note) => { @@ -398,17 +398,17 @@ impl apb::server::Outbox for Context { .set_actor(Node::link(uid.clone())) )?; - let share_model = model::share::ActiveModel { + let share_model = model::announce::ActiveModel { + internal: NotSet, actor: Set(uid.clone()), - shares: Set(oid.clone()), - date: Set(chrono::Utc::now()), - ..Default::default() + object: Set(oid.clone()), + published: Set(chrono::Utc::now()), }; - model::share::Entity::insert(share_model).exec(self.db()).await?; + model::announce::Entity::insert(share_model).exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; model::object::Entity::update_many() - .col_expr(model::object::Column::Shares, Expr::col(model::object::Column::Shares).add(1)) + .col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1)) .filter(model::object::Column::Id.eq(oid)) .exec(self.db()) .await?;