1
0
Fork 0
forked from alemi/upub

chore: split down context.rs a little

This commit is contained in:
əlemi 2024-05-29 01:49:44 +02:00
parent 38fa6df39d
commit 40c80fa181
Signed by: alemi
GPG key ID: A4895B84D311642C
9 changed files with 230 additions and 156 deletions

View file

@ -1,5 +1,7 @@
use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
use crate::server::addresser::Addresser;
pub async fn relay(ctx: crate::server::Context, actor: String, accept: bool) -> crate::Result<()> { pub async fn relay(ctx: crate::server::Context, actor: String, accept: bool) -> crate::Result<()> {
let aid = ctx.aid(&uuid::Uuid::new_v4().to_string()); let aid = ctx.aid(&uuid::Uuid::new_v4().to_string());

View file

@ -26,14 +26,14 @@ pub async fn view(
.set_summary(Some(&ctx.cfg().instance.description)) .set_summary(Some(&ctx.cfg().instance.description))
.set_inbox(apb::Node::link(url!(ctx, "/inbox"))) .set_inbox(apb::Node::link(url!(ctx, "/inbox")))
.set_outbox(apb::Node::link(url!(ctx, "/outbox"))) .set_outbox(apb::Node::link(url!(ctx, "/outbox")))
.set_published(Some(ctx.app().published)) .set_published(Some(ctx.actor().published))
.set_endpoints(apb::Node::Empty) .set_endpoints(apb::Node::Empty)
.set_preferred_username(Some(ctx.domain())) .set_preferred_username(Some(ctx.domain()))
.set_public_key(apb::Node::object( .set_public_key(apb::Node::object(
serde_json::Value::new_object() serde_json::Value::new_object()
.set_id(Some(&url!(ctx, "#main-key"))) .set_id(Some(&url!(ctx, "#main-key")))
.set_owner(Some(&url!(ctx, ""))) .set_owner(Some(&url!(ctx, "")))
.set_public_key_pem(&ctx.app().public_key) .set_public_key_pem(&ctx.actor().public_key)
)) ))
.ld_context() .ld_context()
).into_response()) ).into_response())

130
src/server/addresser.rs Normal file
View file

@ -0,0 +1,130 @@
use sea_orm::{ActiveValue::{NotSet, Set}, EntityTrait};
use crate::model;
use super::{fetcher::Fetcher, Context};
#[axum::async_trait]
pub trait Addresser {
async fn expand_addressing(&self, targets: Vec<String>) -> crate::Result<Vec<String>>;
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String]) -> crate::Result<()>;
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()>;
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()>;
}
#[axum::async_trait]
impl Addresser for super::Context {
async fn expand_addressing(&self, targets: Vec<String>) -> crate::Result<Vec<String>> {
let mut out = Vec::new();
for target in targets {
if target.ends_with("/followers") {
let target_id = target.replace("/followers", "");
let mut followers = model::relation::Entity::followers(&target_id, self.db()).await?;
if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO
followers.push(target_id);
}
for follower in followers {
out.push(follower);
}
} else {
out.push(target);
}
}
Ok(out)
}
async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String]) -> crate::Result<()> {
// TODO address_to became kind of expensive, with these two selects right away and then another
// select for each target we're addressing to... can this be improved??
let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await.unwrap_or(false) } else { false };
let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await.unwrap_or(false) } else { false };
let mut addressing = Vec::new();
for target in targets
.iter()
.filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to))
{
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
match (
model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await,
model::actor::Entity::ap_to_internal(target, self.db()).await,
) {
(Ok(server), Ok(actor)) => (Some(server), Some(actor)),
(Err(e), Ok(_)) => { tracing::error!("failed resolving domain: {e}"); continue; },
(Ok(_), Err(e)) => { tracing::error!("failed resolving actor: {e}"); continue; },
(Err(es), Err(ea)) => { tracing::error!("failed resolving domain ({es}) and actor ({ea})"); continue; },
}
};
addressing.push(
model::addressing::ActiveModel {
internal: NotSet,
instance: Set(server),
actor: Set(actor),
activity: Set(aid),
object: Set(oid),
published: Set(chrono::Utc::now()),
}
);
}
if !addressing.is_empty() {
model::addressing::Entity::insert_many(addressing)
.exec(self.db())
.await?;
}
Ok(())
}
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()> {
let mut deliveries = Vec::new();
for target in targets.iter()
.filter(|to| !to.is_empty())
.filter(|to| Context::server(to) != self.domain())
.filter(|to| to != &apb::target::PUBLIC)
{
// TODO fetch concurrently
match self.fetch_user(target).await {
Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
model::delivery::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
actor: Set(from.to_string()),
// TODO we should resolve each user by id and check its inbox because we can't assume
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
target: Set(inbox),
activity: Set(aid.to_string()),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
}
),
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),
Err(e) => tracing::error!("failed resolving target inbox: {e}, skipping delivery to '{target}'"),
}
}
if !deliveries.is_empty() {
model::delivery::Entity::insert_many(deliveries)
.exec(self.db())
.await?;
}
self.dispatcher().wakeup();
Ok(())
}
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(activity_targets).await?;
let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?;
let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None };
self.address_to(Some(internal_aid), internal_oid, &addressed).await?;
self.deliver_to(aid, uid, &addressed).await?;
Ok(())
}
}

View file

@ -1,9 +1,8 @@
use std::{collections::BTreeSet, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
use openssl::rsa::Rsa; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
use crate::{config::Config, errors::UpubError, model, server::fetcher::Fetcher}; use crate::{config::Config, errors::UpubError, model};
use uriproxy::UriClass; use uriproxy::UriClass;
use super::{builders::AnyQuery, dispatcher::Dispatcher}; use super::{builders::AnyQuery, dispatcher::Dispatcher};
@ -19,7 +18,8 @@ struct ContextInner {
base_url: String, base_url: String,
dispatcher: Dispatcher, dispatcher: Dispatcher,
// TODO keep these pre-parsed // TODO keep these pre-parsed
app: model::actor::Model, actor: model::actor::Model,
instance: model::instance::Model,
pkey: String, pkey: String,
relay: Relays, relay: Relays,
} }
@ -53,47 +53,14 @@ impl Context {
dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!!
} }
let base_url = format!("{}{}", protocol, domain); let base_url = format!("{}{}", protocol, domain);
let app = match model::actor::Entity::find_by_ap_id(&base_url).one(&db).await? {
Some(model) => model, let (actor, instance) = super::init::application(domain.clone(), base_url.clone(), &db).await?;
None => {
tracing::info!("generating application keys");
let rsa = Rsa::generate(2048)?;
let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string();
let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string();
let system = model::actor::ActiveModel {
internal: NotSet,
id: Set(base_url.clone()),
domain: Set(domain.clone()),
preferred_username: Set(domain.clone()),
actor_type: Set(apb::ActorType::Application),
private_key: Set(Some(privk)),
public_key: Set(pubk),
following: Set(None),
following_count: Set(0),
followers: Set(None),
followers_count: Set(0),
statuses_count: Set(0),
summary: Set(Some("micro social network, federated".to_string())),
name: Set(Some("μpub".to_string())),
image: Set(None),
icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())),
inbox: Set(Some(format!("{base_url}/inbox"))),
shared_inbox: Set(Some(format!("{base_url}/inbox"))),
outbox: Set(Some(format!("{base_url}/outbox"))),
published: Set(chrono::Utc::now()),
updated: Set(chrono::Utc::now()),
};
model::actor::Entity::insert(system).exec(&db).await?;
// sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time
model::actor::Entity::find().one(&db).await?.expect("could not find app config just inserted")
}
};
// TODO maybe we could provide a more descriptive error... // TODO maybe we could provide a more descriptive error...
let pkey = app.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string(); let pkey = actor.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string();
let relay_sinks = model::relation::Entity::followers(&app.id, &db).await?; let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?;
let relay_sources = model::relation::Entity::following(&app.id, &db).await?; let relay_sources = model::relation::Entity::following(&actor.id, &db).await?;
let relay = Relays { let relay = Relays {
sources: BTreeSet::from_iter(relay_sources), sources: BTreeSet::from_iter(relay_sources),
@ -101,12 +68,16 @@ impl Context {
}; };
Ok(Context(Arc::new(ContextInner { Ok(Context(Arc::new(ContextInner {
base_url, db, domain, protocol, app, dispatcher, config, pkey, relay, base_url, db, domain, protocol, actor, instance, dispatcher, config, pkey, relay,
}))) })))
} }
pub fn app(&self) -> &model::actor::Model { pub fn actor(&self) -> &model::actor::Model {
&self.0.app &self.0.actor
}
pub fn instance(&self) -> &model::instance::Model {
&self.0.instance
} }
pub fn pkey(&self) -> &str { pub fn pkey(&self) -> &str {
@ -133,6 +104,10 @@ impl Context {
&self.0.base_url &self.0.base_url
} }
pub fn dispatcher(&self) -> &Dispatcher {
&self.0.dispatcher
}
/// get full user id uri /// get full user id uri
pub fn uid(&self, id: &str) -> String { pub fn uid(&self, id: &str) -> String {
uriproxy::uri(self.base(), UriClass::User, id) uriproxy::uri(self.base(), UriClass::User, id)
@ -211,112 +186,6 @@ impl Context {
.await .await
} }
pub async fn expand_addressing(&self, targets: Vec<String>) -> crate::Result<Vec<String>> {
let mut out = Vec::new();
for target in targets {
if target.ends_with("/followers") {
let target_id = target.replace("/followers", "");
let mut followers = model::relation::Entity::followers(&target_id, self.db()).await?;
if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO
followers.push(target_id);
}
for follower in followers {
out.push(follower);
}
} else {
out.push(target);
}
}
Ok(out)
}
pub async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String]) -> crate::Result<()> {
// TODO address_to became kind of expensive, with these two selects right away and then another
// select for each target we're addressing to... can this be improved??
let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await? } else { false };
let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await? } else { false };
let mut addressing = Vec::new();
for target in targets
.iter()
.filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to))
{
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
(
Some(model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await?),
Some(model::actor::Entity::ap_to_internal(target, self.db()).await?),
)
};
addressing.push(
model::addressing::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
instance: Set(server),
actor: Set(actor),
activity: Set(aid),
object: Set(oid),
published: Set(chrono::Utc::now()),
}
);
}
if !addressing.is_empty() {
model::addressing::Entity::insert_many(addressing)
.exec(self.db())
.await?;
}
Ok(())
}
pub async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()> {
let mut deliveries = Vec::new();
for target in targets.iter()
.filter(|to| !to.is_empty())
.filter(|to| Context::server(to) != self.domain())
.filter(|to| to != &apb::target::PUBLIC)
{
// TODO fetch concurrently
match self.fetch_user(target).await {
Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
model::delivery::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
actor: Set(from.to_string()),
// TODO we should resolve each user by id and check its inbox because we can't assume
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
target: Set(inbox),
activity: Set(aid.to_string()),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
}
),
Ok(_) => tracing::error!("resolved target but missing inbox: '{target}', skipping delivery"),
Err(e) => tracing::error!("failed resolving target inbox: {e}, skipping delivery to '{target}'"),
}
}
if !deliveries.is_empty() {
model::delivery::Entity::insert_many(deliveries)
.exec(self.db())
.await?;
}
self.0.dispatcher.wakeup();
Ok(())
}
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(activity_targets).await?;
let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?;
let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None };
self.address_to(Some(internal_aid), internal_oid, &addressed).await?;
self.deliver_to(aid, uid, &addressed).await?;
Ok(())
}
pub fn is_relay(&self, id: &str) -> bool { pub fn is_relay(&self, id: &str) -> bool {
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)
} }

View file

@ -7,7 +7,7 @@ use sea_orm::{EntityTrait, IntoActiveModel, NotSet};
use crate::{errors::UpubError, model, VERSION}; use crate::{errors::UpubError, model, VERSION};
use super::{httpsign::HttpSignature, normalizer::Normalizer, Context}; use super::{addresser::Addresser, httpsign::HttpSignature, normalizer::Normalizer, Context};
#[axum::async_trait] #[axum::async_trait]
pub trait Fetcher { pub trait Fetcher {

View file

@ -2,7 +2,7 @@ use apb::{target::Addressed, Activity, Base, Object};
use reqwest::StatusCode; use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{errors::{LoggableError, UpubError}, model, server::{builders::AnyQuery, normalizer::Normalizer}}; use crate::{errors::{LoggableError, UpubError}, model, server::{addresser::Addresser, builders::AnyQuery, normalizer::Normalizer}};
use super::{fetcher::Fetcher, Context}; use super::{fetcher::Fetcher, Context};

71
src/server/init.rs Normal file
View file

@ -0,0 +1,71 @@
use openssl::rsa::Rsa;
use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseConnection, EntityTrait};
use crate::model;
pub async fn application(
domain: String,
base_url: String,
db: &DatabaseConnection
) -> crate::Result<(model::actor::Model, model::instance::Model)> {
Ok((
match model::actor::Entity::find_by_ap_id(&base_url).one(db).await? {
Some(model) => model,
None => {
tracing::info!("generating application keys");
let rsa = Rsa::generate(2048)?;
let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string();
let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string();
let system = model::actor::ActiveModel {
internal: NotSet,
id: Set(base_url.clone()),
domain: Set(domain.clone()),
preferred_username: Set(domain.clone()),
actor_type: Set(apb::ActorType::Application),
private_key: Set(Some(privk)),
public_key: Set(pubk),
following: Set(None),
following_count: Set(0),
followers: Set(None),
followers_count: Set(0),
statuses_count: Set(0),
summary: Set(Some("micro social network, federated".to_string())),
name: Set(Some("μpub".to_string())),
image: Set(None),
icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())),
inbox: Set(Some(format!("{base_url}/inbox"))),
shared_inbox: Set(Some(format!("{base_url}/inbox"))),
outbox: Set(Some(format!("{base_url}/outbox"))),
published: Set(chrono::Utc::now()),
updated: Set(chrono::Utc::now()),
};
model::actor::Entity::insert(system).exec(db).await?;
// sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time
model::actor::Entity::find().one(db).await?.expect("could not find app actor just inserted")
}
},
match model::instance::Entity::find_by_domain(&domain).one(db).await? {
Some(model) => model,
None => {
tracing::info!("generating instance counters");
let system = model::instance::ActiveModel {
internal: NotSet,
domain: Set(domain.clone()),
down_since: Set(None),
software: Set(Some("upub".to_string())),
version: Set(Some(crate::VERSION.to_string())),
name: Set(None),
icon: Set(None),
users: Set(Some(0)),
posts: Set(Some(0)),
published: Set(chrono::Utc::now()),
updated: Set(chrono::Utc::now()),
};
model::instance::Entity::insert(system).exec(db).await?;
// sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time
model::instance::Entity::find().one(db).await?.expect("could not find app instance just inserted")
}
}
))
}

View file

@ -1,8 +1,10 @@
pub mod addresser;
pub mod admin; pub mod admin;
pub mod context; pub mod context;
pub mod dispatcher; pub mod dispatcher;
pub mod fetcher; pub mod fetcher;
pub mod inbox; pub mod inbox;
pub mod init;
pub mod outbox; pub mod outbox;
pub mod auth; pub mod auth;
pub mod builders; pub mod builders;

View file

@ -4,7 +4,7 @@ use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrai
use crate::{errors::UpubError, model}; use crate::{errors::UpubError, model};
use super::{builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, Context}; use super::{addresser::Addresser, builders::AnyQuery, fetcher::Fetcher, normalizer::Normalizer, Context};
#[axum::async_trait] #[axum::async_trait]