diff --git a/src/cli/relay.rs b/src/cli/relay.rs index 4203499..d618229 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -1,5 +1,7 @@ use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; +use crate::server::addresser::Addresser; + pub async fn relay(ctx: crate::server::Context, actor: String, accept: bool) -> crate::Result<()> { let aid = ctx.aid(&uuid::Uuid::new_v4().to_string()); diff --git a/src/routes/activitypub/application.rs b/src/routes/activitypub/application.rs index 594d265..fdadcd2 100644 --- a/src/routes/activitypub/application.rs +++ b/src/routes/activitypub/application.rs @@ -26,14 +26,14 @@ pub async fn view( .set_summary(Some(&ctx.cfg().instance.description)) .set_inbox(apb::Node::link(url!(ctx, "/inbox"))) .set_outbox(apb::Node::link(url!(ctx, "/outbox"))) - .set_published(Some(ctx.app().published)) + .set_published(Some(ctx.actor().published)) .set_endpoints(apb::Node::Empty) .set_preferred_username(Some(ctx.domain())) .set_public_key(apb::Node::object( serde_json::Value::new_object() .set_id(Some(&url!(ctx, "#main-key"))) .set_owner(Some(&url!(ctx, ""))) - .set_public_key_pem(&ctx.app().public_key) + .set_public_key_pem(&ctx.actor().public_key) )) .ld_context() ).into_response()) diff --git a/src/server/addresser.rs b/src/server/addresser.rs new file mode 100644 index 0000000..cf90c80 --- /dev/null +++ b/src/server/addresser.rs @@ -0,0 +1,130 @@ +use sea_orm::{ActiveValue::{NotSet, Set}, EntityTrait}; + +use crate::model; + +use super::{fetcher::Fetcher, Context}; + + +#[axum::async_trait] +pub trait Addresser { + async fn expand_addressing(&self, targets: Vec) -> crate::Result>; + async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> crate::Result<()>; + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()>; + //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] + async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()>; +} + +#[axum::async_trait] +impl Addresser for super::Context { + async fn expand_addressing(&self, targets: Vec) -> crate::Result> { + let mut out = Vec::new(); + for target in targets { + if target.ends_with("/followers") { + let target_id = target.replace("/followers", ""); + let mut followers = model::relation::Entity::followers(&target_id, self.db()).await?; + if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO + followers.push(target_id); + } + for follower in followers { + out.push(follower); + } + } else { + out.push(target); + } + } + Ok(out) + } + + async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> crate::Result<()> { + // TODO address_to became kind of expensive, with these two selects right away and then another + // select for each target we're addressing to... can this be improved?? + let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false }; + let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await.unwrap_or(false) } else { false }; + let mut addressing = Vec::new(); + for target in targets + .iter() + .filter(|to| !to.is_empty()) + .filter(|to| !to.ends_with("/followers")) + .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to)) + { + let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { + match ( + model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await, + model::actor::Entity::ap_to_internal(target, self.db()).await, + ) { + (Ok(server), Ok(actor)) => (Some(server), Some(actor)), + (Err(e), Ok(_)) => { tracing::error!("failed resolving domain: {e}"); continue; }, + (Ok(_), Err(e)) => { tracing::error!("failed resolving actor: {e}"); continue; }, + (Err(es), Err(ea)) => { tracing::error!("failed resolving domain ({es}) and actor ({ea})"); continue; }, + } + }; + addressing.push( + model::addressing::ActiveModel { + internal: NotSet, + instance: Set(server), + actor: Set(actor), + activity: Set(aid), + object: Set(oid), + published: Set(chrono::Utc::now()), + } + ); + } + + if !addressing.is_empty() { + model::addressing::Entity::insert_many(addressing) + .exec(self.db()) + .await?; + } + + Ok(()) + } + + async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()> { + let mut deliveries = Vec::new(); + for target in targets.iter() + .filter(|to| !to.is_empty()) + .filter(|to| Context::server(to) != self.domain()) + .filter(|to| to != &apb::target::PUBLIC) + { + // TODO fetch concurrently + match self.fetch_user(target).await { + Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( + model::delivery::ActiveModel { + internal: sea_orm::ActiveValue::NotSet, + actor: Set(from.to_string()), + // TODO we should resolve each user by id and check its inbox because we can't assume + // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now + target: Set(inbox), + activity: Set(aid.to_string()), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + attempt: Set(0), + } + ), + Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"), + Err(e) => tracing::error!("failed resolving target inbox: {e}, skipping delivery to '{target}'"), + } + } + + if !deliveries.is_empty() { + model::delivery::Entity::insert_many(deliveries) + .exec(self.db()) + .await?; + } + + self.dispatcher().wakeup(); + + Ok(()) + } + + //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] + async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { + let addressed = self.expand_addressing(activity_targets).await?; + let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; + let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None }; + self.address_to(Some(internal_aid), internal_oid, &addressed).await?; + self.deliver_to(aid, uid, &addressed).await?; + Ok(()) + } + +} diff --git a/src/server/context.rs b/src/server/context.rs index 0af77db..e62e976 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -1,9 +1,8 @@ use std::{collections::BTreeSet, sync::Arc}; -use openssl::rsa::Rsa; -use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{config::Config, errors::UpubError, model, server::fetcher::Fetcher}; +use crate::{config::Config, errors::UpubError, model}; use uriproxy::UriClass; use super::{builders::AnyQuery, dispatcher::Dispatcher}; @@ -19,7 +18,8 @@ struct ContextInner { base_url: String, dispatcher: Dispatcher, // TODO keep these pre-parsed - app: model::actor::Model, + actor: model::actor::Model, + instance: model::instance::Model, pkey: String, relay: Relays, } @@ -53,47 +53,14 @@ impl Context { dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! } let base_url = format!("{}{}", protocol, domain); - let app = match model::actor::Entity::find_by_ap_id(&base_url).one(&db).await? { - Some(model) => model, - None => { - tracing::info!("generating application keys"); - let rsa = Rsa::generate(2048)?; - let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string(); - let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string(); - let system = model::actor::ActiveModel { - internal: NotSet, - id: Set(base_url.clone()), - domain: Set(domain.clone()), - preferred_username: Set(domain.clone()), - actor_type: Set(apb::ActorType::Application), - private_key: Set(Some(privk)), - public_key: Set(pubk), - following: Set(None), - following_count: Set(0), - followers: Set(None), - followers_count: Set(0), - statuses_count: Set(0), - summary: Set(Some("micro social network, federated".to_string())), - name: Set(Some("μpub".to_string())), - image: Set(None), - icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())), - inbox: Set(Some(format!("{base_url}/inbox"))), - shared_inbox: Set(Some(format!("{base_url}/inbox"))), - outbox: Set(Some(format!("{base_url}/outbox"))), - published: Set(chrono::Utc::now()), - updated: Set(chrono::Utc::now()), - }; - model::actor::Entity::insert(system).exec(&db).await?; - // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time - model::actor::Entity::find().one(&db).await?.expect("could not find app config just inserted") - } - }; + + let (actor, instance) = super::init::application(domain.clone(), base_url.clone(), &db).await?; // TODO maybe we could provide a more descriptive error... - let pkey = app.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string(); + let pkey = actor.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string(); - let relay_sinks = model::relation::Entity::followers(&app.id, &db).await?; - let relay_sources = model::relation::Entity::following(&app.id, &db).await?; + let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?; + let relay_sources = model::relation::Entity::following(&actor.id, &db).await?; let relay = Relays { sources: BTreeSet::from_iter(relay_sources), @@ -101,12 +68,16 @@ impl Context { }; Ok(Context(Arc::new(ContextInner { - base_url, db, domain, protocol, app, dispatcher, config, pkey, relay, + base_url, db, domain, protocol, actor, instance, dispatcher, config, pkey, relay, }))) } - pub fn app(&self) -> &model::actor::Model { - &self.0.app + pub fn actor(&self) -> &model::actor::Model { + &self.0.actor + } + + pub fn instance(&self) -> &model::instance::Model { + &self.0.instance } pub fn pkey(&self) -> &str { @@ -133,6 +104,10 @@ impl Context { &self.0.base_url } + pub fn dispatcher(&self) -> &Dispatcher { + &self.0.dispatcher + } + /// get full user id uri pub fn uid(&self, id: &str) -> String { uriproxy::uri(self.base(), UriClass::User, id) @@ -211,112 +186,6 @@ impl Context { .await } - pub async fn expand_addressing(&self, targets: Vec) -> crate::Result> { - let mut out = Vec::new(); - for target in targets { - if target.ends_with("/followers") { - let target_id = target.replace("/followers", ""); - let mut followers = model::relation::Entity::followers(&target_id, self.db()).await?; - if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO - followers.push(target_id); - } - for follower in followers { - out.push(follower); - } - } else { - out.push(target); - } - } - Ok(out) - } - - pub async fn address_to(&self, aid: Option, oid: Option, targets: &[String]) -> crate::Result<()> { - // TODO address_to became kind of expensive, with these two selects right away and then another - // select for each target we're addressing to... can this be improved?? - let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await? } else { false }; - let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await? } else { false }; - let mut addressing = Vec::new(); - for target in targets - .iter() - .filter(|to| !to.is_empty()) - .filter(|to| !to.ends_with("/followers")) - .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to)) - { - let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { - ( - Some(model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await?), - Some(model::actor::Entity::ap_to_internal(target, self.db()).await?), - ) - }; - addressing.push( - model::addressing::ActiveModel { - internal: sea_orm::ActiveValue::NotSet, - instance: Set(server), - actor: Set(actor), - activity: Set(aid), - object: Set(oid), - published: Set(chrono::Utc::now()), - } - ); - } - - if !addressing.is_empty() { - model::addressing::Entity::insert_many(addressing) - .exec(self.db()) - .await?; - } - - Ok(()) - } - - pub async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()> { - let mut deliveries = Vec::new(); - for target in targets.iter() - .filter(|to| !to.is_empty()) - .filter(|to| Context::server(to) != self.domain()) - .filter(|to| to != &apb::target::PUBLIC) - { - // TODO fetch concurrently - match self.fetch_user(target).await { - Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( - model::delivery::ActiveModel { - internal: sea_orm::ActiveValue::NotSet, - actor: Set(from.to_string()), - // TODO we should resolve each user by id and check its inbox because we can't assume - // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now - target: Set(inbox), - activity: Set(aid.to_string()), - published: Set(chrono::Utc::now()), - not_before: Set(chrono::Utc::now()), - attempt: Set(0), - } - ), - Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"), - Err(e) => tracing::error!("failed resolving target inbox: {e}, skipping delivery to '{target}'"), - } - } - - if !deliveries.is_empty() { - model::delivery::Entity::insert_many(deliveries) - .exec(self.db()) - .await?; - } - - self.0.dispatcher.wakeup(); - - Ok(()) - } - - //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] - pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { - let addressed = self.expand_addressing(activity_targets).await?; - let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; - let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None }; - self.address_to(Some(internal_aid), internal_oid, &addressed).await?; - self.deliver_to(aid, uid, &addressed).await?; - Ok(()) - } - pub fn is_relay(&self, id: &str) -> bool { self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) } diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 60ecd8f..83d1adc 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -7,7 +7,7 @@ use sea_orm::{EntityTrait, IntoActiveModel, NotSet}; use crate::{errors::UpubError, model, VERSION}; -use super::{httpsign::HttpSignature, normalizer::Normalizer, Context}; +use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context}; #[axum::async_trait] pub trait Fetcher { diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 10369c2..93cf29e 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -2,7 +2,7 @@ use apb::{target::Addressed, Activity, Base, Object}; use reqwest::StatusCode; use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{errors::{LoggableError, UpubError}, model, server::{builders::AnyQuery, normalizer::Normalizer}}; +use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}}; use super::{fetcher::Fetcher, Context}; diff --git a/src/server/init.rs b/src/server/init.rs new file mode 100644 index 0000000..b154137 --- /dev/null +++ b/src/server/init.rs @@ -0,0 +1,71 @@ +use openssl::rsa::Rsa; +use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseConnection, EntityTrait}; + +use crate::model; + +pub async fn application( + domain: String, + base_url: String, + db: &DatabaseConnection +) -> crate::Result<(model::actor::Model, model::instance::Model)> { + Ok(( + match model::actor::Entity::find_by_ap_id(&base_url).one(db).await? { + Some(model) => model, + None => { + tracing::info!("generating application keys"); + let rsa = Rsa::generate(2048)?; + let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string(); + let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string(); + let system = model::actor::ActiveModel { + internal: NotSet, + id: Set(base_url.clone()), + domain: Set(domain.clone()), + preferred_username: Set(domain.clone()), + actor_type: Set(apb::ActorType::Application), + private_key: Set(Some(privk)), + public_key: Set(pubk), + following: Set(None), + following_count: Set(0), + followers: Set(None), + followers_count: Set(0), + statuses_count: Set(0), + summary: Set(Some("micro social network, federated".to_string())), + name: Set(Some("μpub".to_string())), + image: Set(None), + icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())), + inbox: Set(Some(format!("{base_url}/inbox"))), + shared_inbox: Set(Some(format!("{base_url}/inbox"))), + outbox: Set(Some(format!("{base_url}/outbox"))), + published: Set(chrono::Utc::now()), + updated: Set(chrono::Utc::now()), + }; + model::actor::Entity::insert(system).exec(db).await?; + // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time + model::actor::Entity::find().one(db).await?.expect("could not find app actor just inserted") + } + }, + + match model::instance::Entity::find_by_domain(&domain).one(db).await? { + Some(model) => model, + None => { + tracing::info!("generating instance counters"); + let system = model::instance::ActiveModel { + internal: NotSet, + domain: Set(domain.clone()), + down_since: Set(None), + software: Set(Some("upub".to_string())), + version: Set(Some(crate::VERSION.to_string())), + name: Set(None), + icon: Set(None), + users: Set(Some(0)), + posts: Set(Some(0)), + published: Set(chrono::Utc::now()), + updated: Set(chrono::Utc::now()), + }; + model::instance::Entity::insert(system).exec(db).await?; + // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time + model::instance::Entity::find().one(db).await?.expect("could not find app instance just inserted") + } + } + )) +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 392c281..457fa08 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,8 +1,10 @@ +pub mod addresser; pub mod admin; pub mod context; pub mod dispatcher; pub mod fetcher; pub mod inbox; +pub mod init; pub mod outbox; pub mod auth; pub mod builders; diff --git a/src/server/outbox.rs b/src/server/outbox.rs index 6ac32bb..077a1d5 100644 --- a/src/server/outbox.rs +++ b/src/server/outbox.rs @@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrai use crate::{errors::UpubError, model}; -use super::{builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, Context}; +use super::{addresser::Addresser, builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, Context}; #[axum::async_trait]