From 52f12380522ed285bfe54a2afc65f7219b64ada2 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 02:14:35 +0200 Subject: [PATCH] chore: traits are back in core worker is just a worker, everything else is upub --- upub/core/Cargo.toml | 3 +- upub/core/src/config.rs | 4 + upub/core/src/{server => }/context.rs | 14 +- upub/core/src/errors.rs | 160 ------------------ upub/core/src/ext.rs | 32 +++- upub/core/src/{server => }/init.rs | 14 +- upub/core/src/lib.rs | 17 +- upub/core/src/model/attachment.rs | 8 +- upub/core/src/model/mod.rs | 2 +- upub/core/src/server/mod.rs | 5 - .../src => core/src/traits}/address.rs | 26 +-- upub/core/src/{server => traits}/admin.rs | 6 +- .../src => core/src/traits}/fetch.rs | 94 +++++----- upub/core/src/traits/mod.rs | 11 ++ .../src => core/src/traits}/normalize.rs | 62 +++---- .../src => core/src/traits}/process.rs | 147 ++++++++-------- upub/processor/src/dispatcher.rs | 134 --------------- upub/processor/src/lib.rs | 6 - upub/routes/src/activitypub/activity.rs | 14 +- upub/routes/src/activitypub/application.rs | 56 +++--- upub/routes/src/activitypub/auth.rs | 2 +- upub/routes/src/activitypub/inbox.rs | 61 +++---- upub/routes/src/activitypub/object/mod.rs | 16 +- upub/routes/src/activitypub/object/replies.rs | 2 +- upub/routes/src/activitypub/user/inbox.rs | 4 +- upub/routes/src/activitypub/user/mod.rs | 24 +-- upub/routes/src/activitypub/user/outbox.rs | 58 +++---- upub/routes/src/activitypub/well_known.rs | 9 +- upub/routes/src/auth.rs | 13 +- upub/routes/src/error.rs | 16 +- upub/routes/src/lib.rs | 5 +- upub/{processor => worker}/Cargo.toml | 5 +- upub/{processor => worker}/README.md | 0 upub/worker/src/dispatcher.rs | 134 +++++++++++++++ upub/worker/src/inbound.rs | 20 +++ upub/worker/src/lib.rs | 18 ++ .../src/outbox.rs => worker/src/local.rs} | 95 ++++++++--- upub/worker/src/outbound.rs | 63 +++++++ 38 files changed, 684 insertions(+), 676 deletions(-) rename upub/core/src/{server => }/context.rs (90%) delete mode 100644 upub/core/src/errors.rs rename upub/core/src/{server => }/init.rs (87%) delete mode 100644 upub/core/src/server/mod.rs rename upub/{processor/src => core/src/traits}/address.rs (83%) rename upub/core/src/{server => traits}/admin.rs (95%) rename upub/{processor/src => core/src/traits}/fetch.rs (80%) create mode 100644 upub/core/src/traits/mod.rs rename upub/{processor/src => core/src/traits}/normalize.rs (79%) rename upub/{processor/src => core/src/traits}/process.rs (66%) delete mode 100644 upub/processor/src/dispatcher.rs delete mode 100644 upub/processor/src/lib.rs rename upub/{processor => worker}/Cargo.toml (86%) rename upub/{processor => worker}/README.md (100%) create mode 100644 upub/worker/src/dispatcher.rs create mode 100644 upub/worker/src/inbound.rs create mode 100644 upub/worker/src/lib.rs rename upub/{processor/src/outbox.rs => worker/src/local.rs} (88%) create mode 100644 upub/worker/src/outbound.rs diff --git a/upub/core/Cargo.toml b/upub/core/Cargo.toml index 5a1d28cd..60f9f0f8 100644 --- a/upub/core/Cargo.toml +++ b/upub/core/Cargo.toml @@ -12,6 +12,7 @@ readme = "README.md" [dependencies] thiserror = "1" +async-trait = "0.1" sha256 = "1.5" openssl = "0.10" # TODO handle pubkeys with a smaller crate base64 = "0.22" @@ -25,12 +26,12 @@ serde-inline-default = "0.2" toml = "0.8" mdhtml = { path = "../../utils/mdhtml", features = ["markdown"] } uriproxy = { path = "../../utils/uriproxy" } +httpsign = { path = "../../utils/httpsign/" } jrd = "0.1" tracing = "0.1" tokio = { version = "1.35", features = ["full"] } # TODO slim this down sea-orm = { version = "0.12", features = ["macros", "sqlx-sqlite", "runtime-tokio-rustls"] } reqwest = { version = "0.12", features = ["json"] } -axum = "0.7" apb = { path = "../../apb", features = ["unstructured", "orm", "activitypub-fe", "activitypub-counters", "litepub", "ostatus", "toot"] } # nodeinfo = "0.0.2" # the version on crates.io doesn't re-export necessary types to build the struct!!! nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" } diff --git a/upub/core/src/config.rs b/upub/core/src/config.rs index a040a305..ec8e10f9 100644 --- a/upub/core/src/config.rs +++ b/upub/core/src/config.rs @@ -94,4 +94,8 @@ impl Config { } Config::default() } + + pub fn frontend_url(&self, url: &str) -> Option { + Some(format!("{}{}", self.instance.frontend.as_deref()?, url)) + } } diff --git a/upub/core/src/server/context.rs b/upub/core/src/context.rs similarity index 90% rename from upub/core/src/server/context.rs rename to upub/core/src/context.rs index b7b26c33..d7c83e7c 100644 --- a/upub/core/src/server/context.rs +++ b/upub/core/src/context.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeSet, sync::Arc}; use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{config::Config, errors::UpubError, model, ext::AnyQuery}; +use crate::{config::Config, ext::AnyQuery, model}; use uriproxy::UriClass; #[derive(Clone)] @@ -36,7 +36,7 @@ macro_rules! url { impl Context { // TODO slim constructor down, maybe make a builder? - pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> crate::Result { + pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result { let protocol = if domain.starts_with("http://") { "http://" } else { "https://" }.to_string(); if domain.ends_with('/') { @@ -50,10 +50,10 @@ impl Context { let (actor, instance) = super::init::application(domain.clone(), base_url.clone(), &db).await?; // TODO maybe we could provide a more descriptive error... - let pkey = actor.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string(); + let pkey = actor.private_key.as_deref().ok_or_else(|| DbErr::RecordNotFound("application private key".into()))?.to_string(); - let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?; - let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?; + let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(|| DbErr::RecordNotFound(actor.id.clone()))?; + let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(|| DbErr::RecordNotFound(actor.id.clone()))?; let relay = Relays { sources: BTreeSet::from_iter(relay_sources), @@ -98,6 +98,10 @@ impl Context { &self.0.base_url } + pub fn new_id() -> String { + uuid::Uuid::new_v4().to_string() + } + /// get full user id uri pub fn uid(&self, id: &str) -> String { uriproxy::uri(self.base(), UriClass::Actor, id) diff --git a/upub/core/src/errors.rs b/upub/core/src/errors.rs deleted file mode 100644 index 594f6740..00000000 --- a/upub/core/src/errors.rs +++ /dev/null @@ -1,160 +0,0 @@ -use axum::{http::StatusCode, response::Redirect}; - -#[derive(Debug, thiserror::Error)] -pub enum UpubError { - #[error("database error: {0:?}")] - Database(#[from] sea_orm::DbErr), - - #[error("{0}")] - Status(axum::http::StatusCode), - - #[error("{0}")] - Field(#[from] apb::FieldErr), - - #[error("openssl error: {0:?}")] - OpenSSL(#[from] openssl::error::ErrorStack), - - #[error("invalid UTF8 in key: {0:?}")] - OpenSSLParse(#[from] std::str::Utf8Error), - - #[error("fetch error: {0:?}")] - Reqwest(#[from] reqwest::Error), - - // TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut - // helps with debugging! - #[error("fetch error: {0:?} -- server responded with {1}")] - FetchError(reqwest::Error, String), - - #[error("invalid base64 string: {0:?}")] - Base64(#[from] base64::DecodeError), - - #[error("type mismatch on object: expected {0:?}, found {1:?}")] - Mismatch(apb::ObjectType, apb::ObjectType), - - #[error("os I/O error: {0}")] - IO(#[from] std::io::Error), - - // TODO this isn't really an error but i need to redirect from some routes so this allows me to - // keep the type hints on the return type, still what the hell!!!! - #[error("redirecting to {0}")] - Redirect(String), -} - -impl UpubError { - pub fn bad_request() -> Self { - Self::Status(axum::http::StatusCode::BAD_REQUEST) - } - - pub fn unprocessable() -> Self { - Self::Status(axum::http::StatusCode::UNPROCESSABLE_ENTITY) - } - - pub fn not_found() -> Self { - Self::Status(axum::http::StatusCode::NOT_FOUND) - } - - pub fn forbidden() -> Self { - Self::Status(axum::http::StatusCode::FORBIDDEN) - } - - pub fn unauthorized() -> Self { - Self::Status(axum::http::StatusCode::UNAUTHORIZED) - } - - pub fn not_modified() -> Self { - Self::Status(axum::http::StatusCode::NOT_MODIFIED) - } - - pub fn internal_server_error() -> Self { - Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR) - } -} - -pub type UpubResult = Result; - -impl From for UpubError { - fn from(value: axum::http::StatusCode) -> Self { - UpubError::Status(value) - } -} - -impl axum::response::IntoResponse for UpubError { - fn into_response(self) -> axum::response::Response { - // TODO it's kind of jank to hide this print down here, i should probably learn how spans work - // in tracing and use the library's features but ehhhh - tracing::debug!("emitting error response: {self:?}"); - let descr = self.to_string(); - match self { - UpubError::Redirect(to) => Redirect::to(&to).into_response(), - UpubError::Status(status) => status.into_response(), - UpubError::Database(e) => ( - StatusCode::SERVICE_UNAVAILABLE, - axum::Json(serde_json::json!({ - "error": "database", - "inner": format!("{e:#?}"), - })) - ).into_response(), - UpubError::Reqwest(x) | UpubError::FetchError(x, _) => ( - x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), - axum::Json(serde_json::json!({ - "error": "request", - "status": x.status().map(|s| s.to_string()).unwrap_or_default(), - "url": x.url().map(|x| x.to_string()).unwrap_or_default(), - "description": descr, - "inner": format!("{x:#?}"), - })) - ).into_response(), - UpubError::Field(x) => ( - axum::http::StatusCode::BAD_REQUEST, - axum::Json(serde_json::json!({ - "error": "field", - "field": x.0.to_string(), - "description": descr, - })) - ).into_response(), - UpubError::Mismatch(expected, found) => ( - axum::http::StatusCode::UNPROCESSABLE_ENTITY, - axum::Json(serde_json::json!({ - "error": "type", - "expected": expected.as_ref().to_string(), - "found": found.as_ref().to_string(), - "description": descr, - })) - ).into_response(), - x => ( - StatusCode::INTERNAL_SERVER_ERROR, - axum::Json(serde_json::json!({ - "error": "unknown", - "description": descr, - "inner": format!("{x:#?}"), - })) - ).into_response(), - } - } -} - -pub trait LoggableError { - fn info_failed(self, msg: &str); - fn warn_failed(self, msg: &str); - fn err_failed(self, msg: &str); -} - -impl LoggableError for Result { - fn info_failed(self, msg: &str) { - if let Err(e) = self { - tracing::info!("{} : {}", msg, e); - } - } - - fn warn_failed(self, msg: &str) { - if let Err(e) = self { - tracing::warn!("{} : {}", msg, e); - } - } - - fn err_failed(self, msg: &str) { - if let Err(e) = self { - tracing::error!("{} : {}", msg, e); - } - } -} diff --git a/upub/core/src/ext.rs b/upub/core/src/ext.rs index 7b11045b..1ec194b6 100644 --- a/upub/core/src/ext.rs +++ b/upub/core/src/ext.rs @@ -1,19 +1,45 @@ -#[axum::async_trait] +#[async_trait::async_trait] pub trait AnyQuery { async fn any(self, db: &sea_orm::DatabaseConnection) -> Result; } -#[axum::async_trait] +#[async_trait::async_trait] impl AnyQuery for sea_orm::Select { async fn any(self, db: &sea_orm::DatabaseConnection) -> Result { Ok(self.one(db).await?.is_some()) } } -#[axum::async_trait] +#[async_trait::async_trait] impl AnyQuery for sea_orm::Selector { async fn any(self, db: &sea_orm::DatabaseConnection) -> Result { Ok(self.one(db).await?.is_some()) } } + +pub trait LoggableError { + fn info_failed(self, msg: &str); + fn warn_failed(self, msg: &str); + fn err_failed(self, msg: &str); +} + +impl LoggableError for Result { + fn info_failed(self, msg: &str) { + if let Err(e) = self { + tracing::info!("{} : {}", msg, e); + } + } + + fn warn_failed(self, msg: &str) { + if let Err(e) = self { + tracing::warn!("{} : {}", msg, e); + } + } + + fn err_failed(self, msg: &str) { + if let Err(e) = self { + tracing::error!("{} : {}", msg, e); + } + } +} diff --git a/upub/core/src/server/init.rs b/upub/core/src/init.rs similarity index 87% rename from upub/core/src/server/init.rs rename to upub/core/src/init.rs index b1541372..8522c620 100644 --- a/upub/core/src/server/init.rs +++ b/upub/core/src/init.rs @@ -3,11 +3,23 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseConnection, EntityTrait}; use crate::model; +#[derive(Debug, thiserror::Error)] +pub enum InitError { + #[error("database error: {0:?}")] + Database(#[from] sea_orm::DbErr), + + #[error("openssl error: {0:?}")] + OpenSSL(#[from] openssl::error::ErrorStack), + + #[error("pem format error: {0:?}")] + KeyError(#[from] std::str::Utf8Error), +} + pub async fn application( domain: String, base_url: String, db: &DatabaseConnection -) -> crate::Result<(model::actor::Model, model::instance::Model)> { +) -> Result<(model::actor::Model, model::instance::Model), InitError> { Ok(( match model::actor::Entity::find_by_ap_id(&base_url).one(db).await? { Some(model) => model, diff --git a/upub/core/src/lib.rs b/upub/core/src/lib.rs index d6bfc92f..bbff05e9 100644 --- a/upub/core/src/lib.rs +++ b/upub/core/src/lib.rs @@ -1,12 +1,15 @@ -pub mod config; -pub mod errors; -pub mod server; pub mod model; +pub mod traits; + +pub mod context; +pub use context::Context; + +pub mod config; +pub use config::Config; + +pub mod init; pub mod ext; -pub use server::Context; -pub use config::Config; -pub use errors::UpubResult as Result; -pub use errors::UpubError as Error; +pub use traits::normalize::AP; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/upub/core/src/model/attachment.rs b/upub/core/src/model/attachment.rs index 19077f38..811d7a8d 100644 --- a/upub/core/src/model/attachment.rs +++ b/upub/core/src/model/attachment.rs @@ -48,12 +48,12 @@ impl Model { } } -#[axum::async_trait] +#[async_trait::async_trait] pub trait BatchFillable { async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr>; } -#[axum::async_trait] +#[async_trait::async_trait] impl BatchFillable for &[Event] { async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { let objects : Vec = self @@ -80,14 +80,14 @@ impl BatchFillable for &[Event] { } } -#[axum::async_trait] +#[async_trait::async_trait] impl BatchFillable for Vec { async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { self.as_slice().load_attachments_batch(db).await } } -#[axum::async_trait] +#[async_trait::async_trait] impl BatchFillable for Event { async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result>, DbErr> { let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient diff --git a/upub/core/src/model/mod.rs b/upub/core/src/model/mod.rs index ac209032..2315231d 100644 --- a/upub/core/src/model/mod.rs +++ b/upub/core/src/model/mod.rs @@ -9,7 +9,7 @@ pub mod session; pub mod addressing; pub mod instance; pub mod delivery; -pub mod processing; +pub mod job; pub mod relation; pub mod announce; diff --git a/upub/core/src/server/mod.rs b/upub/core/src/server/mod.rs deleted file mode 100644 index 90135f06..00000000 --- a/upub/core/src/server/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod admin; -pub mod context; -pub mod init; - -pub use context::Context; diff --git a/upub/processor/src/address.rs b/upub/core/src/traits/address.rs similarity index 83% rename from upub/processor/src/address.rs rename to upub/core/src/traits/address.rs index eca3d356..30ac338d 100644 --- a/upub/processor/src/address.rs +++ b/upub/core/src/traits/address.rs @@ -1,6 +1,6 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait}; -use crate::fetch::Fetcher; +use crate::traits::fetch::Fetcher; #[async_trait::async_trait] pub trait Addresser { @@ -12,13 +12,13 @@ pub trait Addresser { } #[async_trait::async_trait] -impl Addresser for upub::Context { +impl Addresser for crate::Context { async fn expand_addressing(&self, targets: Vec) -> Result, DbErr> { let mut out = Vec::new(); for target in targets { if target.ends_with("/followers") { let target_id = target.replace("/followers", ""); - let mut followers = upub::model::relation::Entity::followers(&target_id, self.db()) + let mut followers = crate::model::relation::Entity::followers(&target_id, self.db()) .await? .unwrap_or_else(Vec::new); if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO @@ -48,8 +48,8 @@ impl Addresser for upub::Context { { let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { match ( - upub::model::instance::Entity::domain_to_internal(&upub::Context::server(target), self.db()).await?, - upub::model::actor::Entity::ap_to_internal(target, self.db()).await?, + crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), self.db()).await?, + crate::model::actor::Entity::ap_to_internal(target, self.db()).await?, ) { (Some(server), Some(actor)) => (Some(server), Some(actor)), (None, _) => { tracing::error!("failed resolving domain"); continue; }, @@ -57,7 +57,7 @@ impl Addresser for upub::Context { } }; addressing.push( - upub::model::addressing::ActiveModel { + crate::model::addressing::ActiveModel { internal: NotSet, instance: Set(server), actor: Set(actor), @@ -69,7 +69,7 @@ impl Addresser for upub::Context { } if !addressing.is_empty() { - upub::model::addressing::Entity::insert_many(addressing) + crate::model::addressing::Entity::insert_many(addressing) .exec(self.db()) .await?; } @@ -81,13 +81,13 @@ impl Addresser for upub::Context { let mut deliveries = Vec::new(); for target in targets.iter() .filter(|to| !to.is_empty()) - .filter(|to| upub::Context::server(to) != self.domain()) + .filter(|to| crate::Context::server(to) != self.domain()) .filter(|to| to != &apb::target::PUBLIC) { // TODO fetch concurrently match self.fetch_user(target).await { - Ok(upub::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( - upub::model::delivery::ActiveModel { + Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push( + crate::model::delivery::ActiveModel { internal: sea_orm::ActiveValue::NotSet, actor: Set(from.to_string()), // TODO we should resolve each user by id and check its inbox because we can't assume @@ -105,7 +105,7 @@ impl Addresser for upub::Context { } if !deliveries.is_empty() { - upub::model::delivery::Entity::insert_many(deliveries) + crate::model::delivery::Entity::insert_many(deliveries) .exec(self.db()) .await?; } @@ -119,12 +119,12 @@ impl Addresser for upub::Context { //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> Result<(), DbErr> { let addressed = self.expand_addressing(activity_targets).await?; - let internal_aid = upub::model::activity::Entity::ap_to_internal(aid, self.db()) + let internal_aid = crate::model::activity::Entity::ap_to_internal(aid, self.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(aid.to_string()))?; let internal_oid = if let Some(o) = oid { Some( - upub::model::object::Entity::ap_to_internal(o, self.db()) + crate::model::object::Entity::ap_to_internal(o, self.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(o.to_string()))? ) diff --git a/upub/core/src/server/admin.rs b/upub/core/src/traits/admin.rs similarity index 95% rename from upub/core/src/server/admin.rs rename to upub/core/src/traits/admin.rs index 35c10c27..3a6d47d6 100644 --- a/upub/core/src/server/admin.rs +++ b/upub/core/src/traits/admin.rs @@ -1,6 +1,6 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait}; -#[axum::async_trait] +#[async_trait::async_trait] pub trait Administrable { async fn register_user( &self, @@ -13,8 +13,8 @@ pub trait Administrable { ) -> Result<(), DbErr>; } -#[axum::async_trait] -impl Administrable for super::Context { +#[async_trait::async_trait] +impl Administrable for crate::Context { async fn register_user( &self, username: String, diff --git a/upub/processor/src/fetch.rs b/upub/core/src/traits/fetch.rs similarity index 80% rename from upub/processor/src/fetch.rs rename to upub/core/src/traits/fetch.rs index ca77f623..ba9703f1 100644 --- a/upub/processor/src/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -4,7 +4,9 @@ use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response}; use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet}; -use super::{address::Addresser, normalize::Normalizer}; +use crate::traits::normalize::AP; + +use super::{Addresser, Normalizer}; use httpsign::HttpSignature; #[derive(Debug, Clone)] @@ -32,7 +34,7 @@ pub enum PullError { Malformed(#[from] apb::FieldErr), #[error("error normalizing resource: {0:?}")] - Normalization(#[from] crate::normalize::NormalizerError), + Normalization(#[from] crate::traits::normalize::NormalizerError), #[error("too many redirects while resolving resource id, aborting")] TooManyRedirects, @@ -84,19 +86,19 @@ pub trait Fetcher { async fn webfinger(&self, user: &str, host: &str) -> Result, PullError>; - async fn fetch_domain(&self, domain: &str) -> Result; + async fn fetch_domain(&self, domain: &str) -> Result; - async fn fetch_user(&self, id: &str) -> Result; - async fn resolve_user(&self, actor: serde_json::Value) -> Result; + async fn fetch_user(&self, id: &str) -> Result; + async fn resolve_user(&self, actor: serde_json::Value) -> Result; - async fn fetch_activity(&self, id: &str) -> Result; - async fn resolve_activity(&self, activity: serde_json::Value) -> Result; + async fn fetch_activity(&self, id: &str) -> Result; + async fn resolve_activity(&self, activity: serde_json::Value) -> Result; - async fn fetch_object(&self, id: &str) -> Result { self.fetch_object_r(id, 0).await } - #[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result { self.resolve_object_r(object, 0).await } + async fn fetch_object(&self, id: &str) -> Result { self.fetch_object_r(id, 0).await } + #[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result { self.resolve_object_r(object, 0).await } - async fn fetch_object_r(&self, id: &str, depth: u32) -> Result; - async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result; + async fn fetch_object_r(&self, id: &str, depth: u32) -> Result; + async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result; async fn fetch_thread(&self, id: &str) -> Result<(), PullError>; @@ -109,7 +111,7 @@ pub trait Fetcher { key: &str, domain: &str, ) -> Result { - let host = upub::Context::server(url); + let host = crate::Context::server(url); let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT" let path = url.replace("https://", "").replace("http://", "").replace(&host, ""); let digest = httpsign::digest(payload.unwrap_or_default()); @@ -136,7 +138,7 @@ pub trait Fetcher { .request(method.clone(), url) .header(ACCEPT, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"") .header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"") - .header(USER_AGENT, format!("upub+{} ({domain})", upub::VERSION)) + .header(USER_AGENT, format!("upub+{} ({domain})", crate::VERSION)) .header("Host", host.clone()) .header("Date", date.clone()) .header("Digest", digest) @@ -159,9 +161,9 @@ pub trait Fetcher { #[async_trait::async_trait] -impl Fetcher for upub::Context { +impl Fetcher for crate::Context { async fn pull_r(&self, id: &str, depth: u32) -> Result, PullError> { - let _domain = self.fetch_domain(&upub::Context::server(id)).await?; + let _domain = self.fetch_domain(&crate::Context::server(id)).await?; let document = Self::request( Method::GET, id, None, @@ -195,7 +197,7 @@ impl Fetcher for upub::Context { let resource = reqwest::Client::new() .get(webfinger_uri) .header(ACCEPT, "application/jrd+json") - .header(USER_AGENT, format!("upub+{} ({})", upub::VERSION, self.domain())) + .header(USER_AGENT, format!("upub+{} ({})", crate::VERSION, self.domain())) .send() .await? .json::() @@ -221,12 +223,12 @@ impl Fetcher for upub::Context { Ok(None) } - async fn fetch_domain(&self, domain: &str) -> Result { - if let Some(x) = upub::model::instance::Entity::find_by_domain(domain).one(self.db()).await? { + async fn fetch_domain(&self, domain: &str) -> Result { + if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(self.db()).await? { return Ok(x); // already in db, easy } - let mut instance_model = upub::model::instance::Model { + let mut instance_model = crate::model::instance::Model { internal: 0, domain: domain.to_string(), name: None, @@ -254,7 +256,7 @@ impl Fetcher for upub::Context { } } - if let Ok(nodeinfo) = upub::model::instance::Entity::nodeinfo(domain).await { + if let Ok(nodeinfo) = crate::model::instance::Entity::nodeinfo(domain).await { instance_model.software = Some(nodeinfo.software.name); instance_model.version = nodeinfo.software.version; instance_model.users = nodeinfo.usage.users.and_then(|x| x.total); @@ -263,8 +265,8 @@ impl Fetcher for upub::Context { let mut active_model = instance_model.clone().into_active_model(); active_model.internal = NotSet; - upub::model::instance::Entity::insert(active_model).exec(self.db()).await?; - let internal = upub::model::instance::Entity::domain_to_internal(domain, self.db()) + crate::model::instance::Entity::insert(active_model).exec(self.db()).await?; + let internal = crate::model::instance::Entity::domain_to_internal(domain, self.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?; instance_model.internal = internal; @@ -272,7 +274,7 @@ impl Fetcher for upub::Context { Ok(instance_model) } - async fn resolve_user(&self, mut document: serde_json::Value) -> Result { + async fn resolve_user(&self, mut document: serde_json::Value) -> Result { let id = document.id()?.to_string(); // TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time @@ -304,24 +306,24 @@ impl Fetcher for upub::Context { } } - let user_model = upub::model::actor::ActiveModel::new(&document)?; + let user_model = AP::actor_q(&document)?; // TODO this may fail: while fetching, remote server may fetch our service actor. // if it does so with http signature, we will fetch that actor in background // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - upub::model::actor::Entity::insert(user_model).exec(self.db()).await?; + crate::model::actor::Entity::insert(user_model).exec(self.db()).await?; // TODO fetch it back to get the internal id Ok( - upub::model::actor::Entity::find_by_ap_id(&id) + crate::model::actor::Entity::find_by_ap_id(&id) .one(self.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(id.to_string()))? ) } - async fn fetch_user(&self, id: &str) -> Result { - if let Some(x) = upub::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_user(&self, id: &str) -> Result { + if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } @@ -330,8 +332,8 @@ impl Fetcher for upub::Context { self.resolve_user(document).await } - async fn fetch_activity(&self, id: &str) -> Result { - if let Some(x) = upub::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_activity(&self, id: &str) -> Result { + if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } @@ -340,15 +342,15 @@ impl Fetcher for upub::Context { self.resolve_activity(activity).await } - async fn resolve_activity(&self, activity: serde_json::Value) -> Result { + async fn resolve_activity(&self, activity: serde_json::Value) -> Result { if let Ok(activity_actor) = activity.actor().id() { - if let Err(e) = self.fetch_user(&activity_actor).await { + if let Err(e) = self.fetch_user(activity_actor).await { tracing::warn!("could not get actor of fetched activity: {e}"); } } if let Ok(activity_object) = activity.object().id() { - if let Err(e) = self.fetch_object(&activity_object).await { + if let Err(e) = self.fetch_object(activity_object).await { tracing::warn!("could not get object of fetched activity: {e}"); } } @@ -367,8 +369,8 @@ impl Fetcher for upub::Context { todo!() } - async fn fetch_object_r(&self, id: &str, depth: u32) -> Result { - if let Some(x) = upub::model::object::Entity::find_by_ap_id(id).one(self.db()).await? { + async fn fetch_object_r(&self, id: &str, depth: u32) -> Result { + if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } @@ -377,12 +379,12 @@ impl Fetcher for upub::Context { self.resolve_object_r(object, depth).await } - async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result { + async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result { let id = object.id()?.to_string(); if let Ok(oid) = object.id() { if oid != id { - if let Some(x) = upub::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? { + if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? { return Ok(x); // already in db, but with id different that given url } } @@ -415,14 +417,14 @@ impl Fetcher for upub::Context { #[async_trait::async_trait] pub trait Fetchable : Sync + Send { - async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError>; + async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>; } #[async_trait::async_trait] impl Fetchable for apb::Node { - async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError> { + async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> { if let apb::Node::Link(uri) = self { - *self = upub::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain()) + *self = crate::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain()) .await? .json::() .await? @@ -434,14 +436,14 @@ impl Fetchable for apb::Node { } // #[async_recursion::async_recursion] -// async fn crawl_replies(ctx: &upub::Context, id: &str, depth: usize) -> Result<(), PullError> { +// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> { // tracing::info!("crawling replies of '{id}'"); -// let object = upub::Context::request( +// let object = crate::Context::request( // Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), // ).await?.json::().await?; // -// let object_model = upub::model::object::Model::new(&object)?; -// match upub::model::object::Entity::insert(object_model.into_active_model()) +// let object_model = crate::model::object::Model::new(&object)?; +// match crate::model::object::Entity::insert(object_model.into_active_model()) // .exec(ctx.db()).await // { // Ok(_) => {}, @@ -457,7 +459,7 @@ impl Fetchable for apb::Node { // // let mut page_url = match object.replies().get() { // Some(serde_json::Value::String(x)) => { -// let replies = upub::Context::request( +// let replies = crate::Context::request( // Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), // ).await?.json::().await?; // replies.first().id() @@ -470,7 +472,7 @@ impl Fetchable for apb::Node { // }; // // while let Some(ref url) = page_url { -// let replies = upub::Context::request( +// let replies = crate::Context::request( // Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), // ).await?.json::().await?; // diff --git a/upub/core/src/traits/mod.rs b/upub/core/src/traits/mod.rs new file mode 100644 index 00000000..e2229a64 --- /dev/null +++ b/upub/core/src/traits/mod.rs @@ -0,0 +1,11 @@ +pub mod address; +pub mod fetch; +pub mod normalize; +pub mod process; +pub mod admin; + +pub use admin::Administrable; +pub use address::Addresser; +pub use normalize::Normalizer; +pub use process::Processor; +pub use fetch::Fetcher; diff --git a/upub/processor/src/normalize.rs b/upub/core/src/traits/normalize.rs similarity index 79% rename from upub/processor/src/normalize.rs rename to upub/core/src/traits/normalize.rs index 30928ff5..6db5e970 100644 --- a/upub/processor/src/normalize.rs +++ b/upub/core/src/traits/normalize.rs @@ -12,14 +12,14 @@ pub enum NormalizerError { #[async_trait::async_trait] pub trait Normalizer { - async fn insert_object(&self, obj: impl apb::Object) -> Result; - async fn insert_activity(&self, act: impl apb::Activity) -> Result; + async fn insert_object(&self, obj: impl apb::Object) -> Result; + async fn insert_activity(&self, act: impl apb::Activity) -> Result; } #[async_trait::async_trait] -impl Normalizer for upub::Context { +impl Normalizer for crate::Context { - async fn insert_object(&self, object: impl apb::Object) -> Result { + async fn insert_object(&self, object: impl apb::Object) -> Result { let oid = object.id()?.to_string(); let uid = object.attributed_to().id().str(); let t = object.object_type()?; @@ -45,7 +45,7 @@ impl Normalizer for upub::Context { // > kind of dumb. there should be a job system so this can be done in waves. or maybe there's // > some whole other way to do this?? im thinking but misskey aaaa!! TODO if let Set(Some(ref reply)) = object_active_model.in_reply_to { - if let Some(o) = upub::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? { + if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? { object_active_model.context = Set(o.context); } else { object_active_model.context = Set(None); // TODO to be filled by some other task @@ -54,25 +54,25 @@ impl Normalizer for upub::Context { object_active_model.context = Set(Some(oid.clone())); } - upub::model::object::Entity::insert(object_active_model).exec(self.db()).await?; - let object_model = upub::model::object::Entity::find_by_ap_id(&oid) + crate::model::object::Entity::insert(object_active_model).exec(self.db()).await?; + let object_model = crate::model::object::Entity::find_by_ap_id(&oid) .one(self.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?; // update replies counter if let Some(ref in_reply_to) = object_model.in_reply_to { - upub::model::object::Entity::update_many() - .filter(upub::model::object::Column::Id.eq(in_reply_to)) - .col_expr(upub::model::object::Column::Replies, Expr::col(upub::model::object::Column::Replies).add(1)) + crate::model::object::Entity::update_many() + .filter(crate::model::object::Column::Id.eq(in_reply_to)) + .col_expr(crate::model::object::Column::Replies, Expr::col(crate::model::object::Column::Replies).add(1)) .exec(self.db()) .await?; } // update statuses counter if let Some(object_author) = uid { - upub::model::actor::Entity::update_many() - .col_expr(upub::model::actor::Column::StatusesCount, Expr::col(upub::model::actor::Column::StatusesCount).add(1)) - .filter(upub::model::actor::Column::Id.eq(&object_author)) + crate::model::actor::Entity::update_many() + .col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1)) + .filter(crate::model::actor::Column::Id.eq(&object_author)) .exec(self.db()) .await?; } @@ -84,7 +84,7 @@ impl Normalizer for upub::Context { tracing::warn!("ignoring array-in-array while processing attachments"); continue }, - Node::Link(l) => upub::model::attachment::ActiveModel { + Node::Link(l) => crate::model::attachment::ActiveModel { internal: sea_orm::ActiveValue::NotSet, url: Set(l.href().to_string()), object: Set(object_model.internal), @@ -96,7 +96,7 @@ impl Normalizer for upub::Context { Node::Object(o) => AP::attachment_q(o.as_document()?, object_model.internal)?, }; - upub::model::attachment::Entity::insert(attachment_model) + crate::model::attachment::Entity::insert(attachment_model) .exec(self.db()) .await?; } @@ -123,7 +123,7 @@ impl Normalizer for upub::Context { } } - upub::model::attachment::Entity::insert(attachment_model) + crate::model::attachment::Entity::insert(attachment_model) .exec(self.db()) .await?; } @@ -131,16 +131,16 @@ impl Normalizer for upub::Context { Ok(object_model) } - async fn insert_activity(&self, activity: impl apb::Activity) -> Result { + async fn insert_activity(&self, activity: impl apb::Activity) -> Result { let mut activity_model = AP::activity(&activity)?; let mut active_model = activity_model.clone().into_active_model(); active_model.internal = NotSet; - upub::model::activity::Entity::insert(active_model) + crate::model::activity::Entity::insert(active_model) .exec(self.db()) .await?; - let internal = upub::model::activity::Entity::ap_to_internal(&activity_model.id, self.db()) + let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, self.db()) .await? .ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?; activity_model.internal = internal; @@ -152,8 +152,8 @@ impl Normalizer for upub::Context { pub struct AP; impl AP { - pub fn activity(activity: &impl apb::Activity) -> Result { - Ok(upub::model::activity::Model { + pub fn activity(activity: &impl apb::Activity) -> Result { + Ok(crate::model::activity::Model { internal: 0, id: activity.id()?.to_string(), activity_type: activity.activity_type()?, @@ -168,7 +168,7 @@ impl AP { }) } - pub fn activity_q(activity: &impl apb::Activity) -> Result { + pub fn activity_q(activity: &impl apb::Activity) -> Result { let mut m = AP::activity(activity)?.into_active_model(); m.internal = NotSet; Ok(m) @@ -177,8 +177,8 @@ impl AP { - pub fn attachment(document: &impl apb::Document, parent: i64) -> Result { - Ok(upub::model::attachment::Model { + pub fn attachment(document: &impl apb::Document, parent: i64) -> Result { + Ok(crate::model::attachment::Model { internal: 0, url: document.url().id().str().unwrap_or_default(), object: parent, @@ -189,7 +189,7 @@ impl AP { }) } - pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result { + pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result { let mut m = AP::attachment(document, parent)?.into_active_model(); m.internal = NotSet; Ok(m) @@ -197,7 +197,7 @@ impl AP { - pub fn object(object: &impl apb::Object) -> Result { + pub fn object(object: &impl apb::Object) -> Result { let t = object.object_type()?; if matches!(t, apb::ObjectType::Activity(_) @@ -207,7 +207,7 @@ impl AP { ) { return Err(apb::FieldErr("type")); } - Ok(upub::model::object::Model { + Ok(crate::model::object::Model { internal: 0, id: object.id()?.to_string(), object_type: t, @@ -235,7 +235,7 @@ impl AP { }) } - pub fn object_q(object: &impl apb::Object) -> Result { + pub fn object_q(object: &impl apb::Object) -> Result { let mut m = AP::object(object)?.into_active_model(); m.internal = NotSet; Ok(m) @@ -243,7 +243,7 @@ impl AP { - pub fn actor(actor: &impl apb::Actor) -> Result { + pub fn actor(actor: &impl apb::Actor) -> Result { let ap_id = actor.id()?.to_string(); let (domain, fallback_preferred_username) = { let clean = ap_id @@ -254,7 +254,7 @@ impl AP { let last = splits.last().unwrap_or(first); (first.to_string(), last.to_string()) }; - Ok(upub::model::actor::Model { + Ok(crate::model::actor::Model { internal: 0, domain, id: ap_id, @@ -279,7 +279,7 @@ impl AP { }) } - pub fn actor_q(actor: &impl apb::Actor) -> Result { + pub fn actor_q(actor: &impl apb::Actor) -> Result { let mut m = AP::actor(actor)?.into_active_model(); m.internal = NotSet; Ok(m) diff --git a/upub/processor/src/process.rs b/upub/core/src/traits/process.rs similarity index 66% rename from upub/processor/src/process.rs rename to upub/core/src/traits/process.rs index ad4647fc..36420dac 100644 --- a/upub/processor/src/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,7 +1,6 @@ use apb::{target::Addressed, Activity, Base, Object}; use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use upub::{errors::LoggableError, ext::AnyQuery}; -use crate::{address::Addresser, fetch::{Fetcher, Pull}, normalize::Normalizer}; +use crate::{ext::{AnyQuery, LoggableError}, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] pub enum ProcessorError { @@ -24,10 +23,10 @@ pub enum ProcessorError { Unprocessable, #[error("failed normalizing and inserting entity: {0:?}")] - NormalizerError(#[from] crate::normalize::NormalizerError), + NormalizerError(#[from] crate::traits::normalize::NormalizerError), #[error("failed fetching resource: {0:?}")] - PullError(#[from] crate::fetch::PullError), + PullError(#[from] crate::traits::fetch::PullError), } #[async_trait::async_trait] @@ -36,7 +35,7 @@ pub trait Processor { } #[async_trait::async_trait] -impl Processor for upub::Context { +impl Processor for crate::Context { async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError> { // TODO we could process Links and bare Objects maybe, but probably out of AP spec? match activity.activity_type()? { @@ -55,7 +54,7 @@ impl Processor for upub::Context { } } -pub async fn create(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let Some(object_node) = activity.object().extract() else { // TODO we could process non-embedded activities or arrays but im lazy rn tracing::error!("refusing to process activity without embedded object"); @@ -74,15 +73,15 @@ pub async fn create(ctx: &upub::Context, activity: impl apb::Activity) -> Result Ok(()) } -pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); - let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; let object_uri = activity.object().id()?.to_string(); let published = activity.published().unwrap_or_else(|_|chrono::Utc::now()); let obj = ctx.fetch_object(&object_uri).await?; - if upub::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) + if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal) .any(ctx.db()) .await? { @@ -91,26 +90,26 @@ pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<( let activity_model = ctx.insert_activity(activity).await?; - let like = upub::model::like::ActiveModel { + let like = crate::model::like::ActiveModel { internal: NotSet, actor: Set(internal_uid), object: Set(obj.internal), activity: Set(activity_model.internal), published: Set(published), }; - upub::model::like::Entity::insert(like).exec(ctx.db()).await?; - upub::model::object::Entity::update_many() - .col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).add(1)) - .filter(upub::model::object::Column::Internal.eq(obj.internal)) + crate::model::like::Entity::insert(like).exec(ctx.db()).await?; + crate::model::object::Entity::update_many() + .col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1)) + .filter(crate::model::object::Column::Internal.eq(obj.internal)) .exec(ctx.db()) .await?; let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! expanded_addressing.push( - upub::model::object::Entity::find_by_id(obj.internal) + crate::model::object::Entity::find_by_id(obj.internal) .select_only() - .select_column(upub::model::object::Column::AttributedTo) + .select_column(crate::model::object::Column::AttributedTo) .into_tuple::() .one(ctx.db()) .await? @@ -122,22 +121,22 @@ pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<( Ok(()) } -pub async fn follow(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let source_actor = activity.actor().id()?.to_string(); - let source_actor_internal = upub::model::actor::Entity::ap_to_internal(&source_actor, ctx.db()) + let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; let target_actor = activity.object().id()?.to_string(); let usr = ctx.fetch_user(&target_actor).await?; let activity_model = ctx.insert_activity(activity).await?; - let relation_model = upub::model::relation::ActiveModel { + let relation_model = crate::model::relation::ActiveModel { internal: NotSet, accept: Set(None), activity: Set(activity_model.internal), follower: Set(source_actor_internal), following: Set(usr.internal), }; - upub::model::relation::Entity::insert(relation_model) + crate::model::relation::Entity::insert(relation_model) .exec(ctx.db()).await?; let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?; if !expanded_addressing.contains(&target_actor) { @@ -148,11 +147,11 @@ pub async fn follow(ctx: &upub::Context, activity: impl apb::Activity) -> Result Ok(()) } -pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { // TODO what about TentativeAccept let target_actor = activity.actor().id()?.to_string(); let follow_request_id = activity.object().id()?.to_string(); - let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id) + let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id) .one(ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; @@ -163,26 +162,26 @@ pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result let activity_model = ctx.insert_activity(activity).await?; - upub::model::actor::Entity::update_many() + crate::model::actor::Entity::update_many() .col_expr( - upub::model::actor::Column::FollowingCount, - Expr::col(upub::model::actor::Column::FollowingCount).add(1) + crate::model::actor::Column::FollowingCount, + Expr::col(crate::model::actor::Column::FollowingCount).add(1) ) - .filter(upub::model::actor::Column::Id.eq(&follow_activity.actor)) + .filter(crate::model::actor::Column::Id.eq(&follow_activity.actor)) .exec(ctx.db()) .await?; - upub::model::actor::Entity::update_many() + crate::model::actor::Entity::update_many() .col_expr( - upub::model::actor::Column::FollowersCount, - Expr::col(upub::model::actor::Column::FollowersCount).add(1) + crate::model::actor::Column::FollowersCount, + Expr::col(crate::model::actor::Column::FollowersCount).add(1) ) - .filter(upub::model::actor::Column::Id.eq(&follow_activity.actor)) + .filter(crate::model::actor::Column::Id.eq(&follow_activity.actor)) .exec(ctx.db()) .await?; - upub::model::relation::Entity::update_many() - .col_expr(upub::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) - .filter(upub::model::relation::Column::Activity.eq(follow_activity.internal)) + crate::model::relation::Entity::update_many() + .col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal))) + .filter(crate::model::relation::Column::Activity.eq(follow_activity.internal)) .exec(ctx.db()).await?; tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); @@ -195,11 +194,11 @@ pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result Ok(()) } -pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { // TODO what about TentativeReject? let uid = activity.actor().id()?.to_string(); let follow_request_id = activity.object().id()?.to_string(); - let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id) + let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id) .one(ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; @@ -210,8 +209,8 @@ pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result let activity_model = ctx.insert_activity(activity).await?; - upub::model::relation::Entity::delete_many() - .filter(upub::model::relation::Column::Activity.eq(activity_model.internal)) + crate::model::relation::Entity::delete_many() + .filter(crate::model::relation::Column::Activity.eq(activity_model.internal)) .exec(ctx.db()) .await?; @@ -226,17 +225,16 @@ pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result Ok(()) } -pub async fn delete(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let oid = activity.object().id()?.to_string(); - upub::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users"); - upub::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects"); + crate::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users"); + crate::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects"); tracing::debug!("deleted '{oid}'"); Ok(()) } -pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); - let aid = activity.id()?.to_string(); let Some(object_node) = activity.object().extract() else { tracing::error!("refusing to process activity without embedded object"); return Err(ProcessorError::Unprocessable); @@ -247,24 +245,24 @@ pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result match object_node.object_type()? { apb::ObjectType::Actor(_) => { - let internal_uid = upub::model::actor::Entity::ap_to_internal(&oid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; - let mut actor_model = upub::model::actor::ActiveModel::new(object_node.as_actor()?)?; + let mut actor_model = crate::AP::actor_q(object_node.as_actor()?)?; actor_model.internal = Set(internal_uid); actor_model.updated = Set(chrono::Utc::now()); - upub::model::actor::Entity::update(actor_model) + crate::model::actor::Entity::update(actor_model) .exec(ctx.db()) .await?; }, apb::ObjectType::Note => { - let internal_oid = upub::model::object::Entity::ap_to_internal(&oid, ctx.db()) + let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; - let mut object_model = upub::model::object::ActiveModel::new(&object_node)?; + let mut object_model = crate::AP::object_q(&object_node)?; object_model.internal = Set(internal_oid); object_model.updated = Set(chrono::Utc::now()); - upub::model::object::Entity::update(object_model) + crate::model::object::Entity::update(object_model) .exec(ctx.db()) .await?; }, @@ -277,11 +275,10 @@ pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result Ok(()) } -pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); // TODO in theory we could work with just object_id but right now only accept embedded let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?; - let undone_activity_id = undone_activity.id()?; let undone_activity_author = undone_activity.as_activity()?.actor().id()?.to_string(); if uid != undone_activity_author { @@ -290,7 +287,7 @@ pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<( let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string(); - let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; @@ -301,40 +298,40 @@ pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<( match activity_type { apb::ActivityType::Like => { - let internal_oid = upub::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db()) + let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; - upub::model::like::Entity::delete_many() + crate::model::like::Entity::delete_many() .filter( Condition::all() - .add(upub::model::like::Column::Actor.eq(internal_uid)) - .add(upub::model::like::Column::Object.eq(internal_oid)) + .add(crate::model::like::Column::Actor.eq(internal_uid)) + .add(crate::model::like::Column::Object.eq(internal_oid)) ) .exec(ctx.db()) .await?; - upub::model::object::Entity::update_many() - .filter(upub::model::object::Column::Internal.eq(internal_oid)) - .col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).sub(1)) + crate::model::object::Entity::update_many() + .filter(crate::model::object::Column::Internal.eq(internal_oid)) + .col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).sub(1)) .exec(ctx.db()) .await?; }, apb::ActivityType::Follow => { - let internal_uid_following = upub::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db()) + let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; - upub::model::relation::Entity::delete_many() - .filter(upub::model::relation::Column::Follower.eq(internal_uid)) - .filter(upub::model::relation::Column::Following.eq(internal_uid_following)) + crate::model::relation::Entity::delete_many() + .filter(crate::model::relation::Column::Follower.eq(internal_uid)) + .filter(crate::model::relation::Column::Following.eq(internal_uid_following)) .exec(ctx.db()) .await?; - upub::model::actor::Entity::update_many() - .filter(upub::model::actor::Column::Internal.eq(internal_uid)) - .col_expr(upub::model::actor::Column::FollowingCount, Expr::col(upub::model::actor::Column::FollowingCount).sub(1)) + crate::model::actor::Entity::update_many() + .filter(crate::model::actor::Column::Internal.eq(internal_uid)) + .col_expr(crate::model::actor::Column::FollowingCount, Expr::col(crate::model::actor::Column::FollowingCount).sub(1)) .exec(ctx.db()) .await?; - upub::model::actor::Entity::update_many() - .filter(upub::model::actor::Column::Internal.eq(internal_uid_following)) - .col_expr(upub::model::actor::Column::FollowersCount, Expr::col(upub::model::actor::Column::FollowersCount).sub(1)) + crate::model::actor::Entity::update_many() + .filter(crate::model::actor::Column::Internal.eq(internal_uid_following)) + .col_expr(crate::model::actor::Column::FollowersCount, Expr::col(crate::model::actor::Column::FollowersCount).sub(1)) .exec(ctx.db()) .await?; }, @@ -347,10 +344,10 @@ pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<( Ok(()) } -pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { +pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> { let uid = activity.actor().id()?.to_string(); let actor = ctx.fetch_user(&uid).await?; - let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()) + let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db()) .await? .ok_or(ProcessorError::Incomplete)?; let announced_id = activity.object().id()?.to_string(); @@ -371,7 +368,7 @@ pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Resu return Ok(()) } - let share = upub::model::announce::ActiveModel { + let share = crate::model::announce::ActiveModel { internal: NotSet, actor: Set(internal_uid), object: Set(object_model.internal), @@ -380,11 +377,11 @@ pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Resu let expanded_addressing = ctx.expand_addressing(addressed).await?; ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; - upub::model::announce::Entity::insert(share) + crate::model::announce::Entity::insert(share) .exec(ctx.db()).await?; - upub::model::object::Entity::update_many() - .col_expr(upub::model::object::Column::Announces, Expr::col(upub::model::object::Column::Announces).add(1)) - .filter(upub::model::object::Column::Internal.eq(object_model.internal)) + crate::model::object::Entity::update_many() + .col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1)) + .filter(crate::model::object::Column::Internal.eq(object_model.internal)) .exec(ctx.db()) .await?; diff --git a/upub/processor/src/dispatcher.rs b/upub/processor/src/dispatcher.rs deleted file mode 100644 index 6885c4f0..00000000 --- a/upub/processor/src/dispatcher.rs +++ /dev/null @@ -1,134 +0,0 @@ -use reqwest::Method; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder}; -use tokio::{sync::broadcast, task::JoinHandle}; - -use apb::{ActivityMut, Node}; -use crate::{model, Context, server::{fetcher::Fetcher, jsonld::LD}}; - -pub struct Dispatcher { - waker: broadcast::Sender<()>, -} - -impl Default for Dispatcher { - fn default() -> Self { - let (waker, _) = broadcast::channel(1); - Dispatcher { waker } - } -} - -impl Dispatcher { - pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> { - let mut waker = self.waker.subscribe(); - tokio::spawn(async move { - loop { - if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await { - tracing::error!("delivery worker exited with error: {e}"); - } - tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await; - } - }) - } - - pub fn wakeup(&self) { - match self.waker.send(()) { - Err(_) => tracing::error!("no worker to wakeup"), - Ok(n) => tracing::debug!("woken {n} workers"), - } - } -} - -async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> { - loop { - let Some(delivery) = model::delivery::Entity::find() - .filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now())) - .order_by(model::delivery::Column::NotBefore, Order::Asc) - .one(db) - .await? - else { - tokio::select! { - biased; - _ = waker.recv() => {}, - _ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {}, - } - continue - }; - - let del_row = model::delivery::ActiveModel { - internal: sea_orm::ActiveValue::Set(delivery.internal), - ..Default::default() - }; - let del = model::delivery::Entity::delete(del_row) - .exec(db) - .await?; - - if del.rows_affected == 0 { - // another worker claimed this delivery - continue; // go back to the top - } - if delivery.expired() { - // try polling for another one - continue; // go back to top - } - - tracing::info!("delivering {} to {}", delivery.activity, delivery.target); - - let payload = match model::activity::Entity::find_by_ap_id(&delivery.activity) - .find_also_related(model::object::Entity) - .one(db) - .await? // TODO probably should not fail here and at least re-insert the delivery - { - Some((activity, None)) => activity.ap().ld_context(), - Some((activity, Some(object))) => { - let always_embed = matches!( - activity.activity_type, - apb::ActivityType::Create - | apb::ActivityType::Undo - | apb::ActivityType::Update - | apb::ActivityType::Accept(_) - | apb::ActivityType::Reject(_) - ); - if always_embed { - activity.ap().set_object(Node::object(object.ap())).ld_context() - } else { - activity.ap().ld_context() - } - }, - None => { - tracing::warn!("skipping dispatch for deleted object {}", delivery.activity); - continue; - }, - }; - - let Some(actor) = model::actor::Entity::find_by_ap_id(&delivery.actor) - .one(db) - .await? - else { - tracing::error!("abandoning delivery of {} from non existant actor: {}", delivery.activity, delivery.actor); - continue; - }; - - let Some(key) = actor.private_key - else { - tracing::error!("abandoning delivery of {} from actor without private key: {}", delivery.activity, delivery.actor); - continue; - }; - - if let Err(e) = Context::request( - Method::POST, &delivery.target, - Some(&serde_json::to_string(&payload).unwrap()), - &delivery.actor, &key, domain - ).await { - tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); - let new_delivery = model::delivery::ActiveModel { - internal: sea_orm::ActiveValue::NotSet, - not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()), - actor: sea_orm::ActiveValue::Set(delivery.actor), - target: sea_orm::ActiveValue::Set(delivery.target), - activity: sea_orm::ActiveValue::Set(delivery.activity), - published: sea_orm::ActiveValue::Set(delivery.published), - attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1), - }; - model::delivery::Entity::insert(new_delivery).exec(db).await?; - } - } -} diff --git a/upub/processor/src/lib.rs b/upub/processor/src/lib.rs deleted file mode 100644 index db5aa6e2..00000000 --- a/upub/processor/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod address; -pub mod normalize; -pub mod process; -pub mod fetch; - -// pub mod dispatcher; diff --git a/upub/routes/src/activitypub/activity.rs b/upub/routes/src/activitypub/activity.rs index f3c87f92..91cefd50 100644 --- a/upub/routes/src/activitypub/activity.rs +++ b/upub/routes/src/activitypub/activity.rs @@ -1,6 +1,6 @@ use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, QueryFilter}; -use upub::{model::{self, addressing::Event, attachment::BatchFillable}, Context}; +use upub::{model::{self, addressing::Event, attachment::BatchFillable}, traits::Fetcher, Context}; use apb::LD; use crate::{builders::JsonLD, AuthIdentity}; @@ -14,12 +14,12 @@ pub async fn view( Query(query): Query, ) -> crate::ApiResult> { let aid = ctx.aid(&id); - // if auth.is_local() && query.fetch && !ctx.is_local(&aid) { - // let obj = ctx.fetch_activity(&aid).await?; - // if obj.id != aid { - // return Err(crate::ApiError::Redirect(obj.id)); - // } - // } + if auth.is_local() && query.fetch && !ctx.is_local(&aid) { + let obj = ctx.fetch_activity(&aid).await?; + if obj.id != aid { + return Err(crate::ApiError::Redirect(obj.id)); + } + } let row = model::addressing::Entity::find_addressed(auth.my_id()) .filter(model::activity::Column::Id.eq(&aid)) diff --git a/upub/routes/src/activitypub/application.rs b/upub/routes/src/activitypub/application.rs index e3a5e23c..99b572ac 100644 --- a/upub/routes/src/activitypub/application.rs +++ b/upub/routes/src/activitypub/application.rs @@ -1,7 +1,7 @@ use apb::{LD, ActorMut, BaseMut, ObjectMut, PublicKeyMut}; use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Redirect, Response}, Form, Json}; use reqwest::Method; -use upub::Context; +use upub::{traits::Fetcher, Context}; use crate::{builders::JsonLD, AuthIdentity}; @@ -52,20 +52,19 @@ pub async fn proxy_get( if !ctx.cfg().security.allow_public_debugger && !auth.is_local() { return Err(crate::ApiError::unauthorized()); } - todo!() - // Ok(Json( - // Context::request( - // Method::GET, - // &query.id, - // None, - // ctx.base(), - // ctx.pkey(), - // &format!("{}+proxy", ctx.domain()), - // ) - // .await? - // .json::() - // .await? - // )) + Ok(Json( + Context::request( + Method::GET, + &query.id, + None, + ctx.base(), + ctx.pkey(), + &format!("{}+proxy", ctx.domain()), + ) + .await? + .json::() + .await? + )) } pub async fn proxy_form( @@ -77,18 +76,17 @@ pub async fn proxy_form( if !ctx.cfg().security.allow_public_debugger && auth.is_local() { return Err(crate::ApiError::unauthorized()); } - todo!() - // Ok(Json( - // Context::request( - // Method::GET, - // &query.id, - // None, - // ctx.base(), - // ctx.pkey(), - // &format!("{}+proxy", ctx.domain()), - // ) - // .await? - // .json::() - // .await? - // )) + Ok(Json( + Context::request( + Method::GET, + &query.id, + None, + ctx.base(), + ctx.pkey(), + &format!("{}+proxy", ctx.domain()), + ) + .await? + .json::() + .await? + )) } diff --git a/upub/routes/src/activitypub/auth.rs b/upub/routes/src/activitypub/auth.rs index 015eff73..d31e0cc0 100644 --- a/upub/routes/src/activitypub/auth.rs +++ b/upub/routes/src/activitypub/auth.rs @@ -1,7 +1,7 @@ use axum::{http::StatusCode, extract::State, Json}; use rand::Rng; use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter}; -use upub::{server::admin::Administrable, Context}; +use upub::{traits::Administrable, Context}; #[derive(Debug, Clone, serde::Deserialize)] diff --git a/upub/routes/src/activitypub/inbox.rs b/upub/routes/src/activitypub/inbox.rs index 500a3396..2f5b4c9b 100644 --- a/upub/routes/src/activitypub/inbox.rs +++ b/upub/routes/src/activitypub/inbox.rs @@ -1,7 +1,7 @@ use apb::{Activity, ActivityType, Base}; use axum::{extract::{Query, State}, http::StatusCode, Json}; -use sea_orm::{sea_query::IntoCondition, ColumnTrait}; -use upub::Context; +use sea_orm::{sea_query::IntoCondition, ActiveValue::{NotSet, Set}, ColumnTrait, EntityTrait}; +use upub::{model::job::JobType, Context}; use crate::{AuthIdentity, Identity, builders::JsonLD}; @@ -42,8 +42,8 @@ pub async fn post( State(ctx): State, AuthIdentity(auth): AuthIdentity, Json(activity): Json -) -> crate::ApiResult<()> { - let Identity::Remote { domain: server, user: uid, .. } = auth else { +) -> crate::ApiResult { + let Identity::Remote { domain: _server, user: uid, .. } = auth else { if matches!(activity.activity_type(), Ok(ActivityType::Delete)) { // this is spammy af, ignore them! // we basically received a delete for a user we can't fetch and verify, meaning remote @@ -51,48 +51,35 @@ pub async fn post( // but mastodon keeps hammering us trying to delete this user, so just make mastodon happy // and return 200 without even bothering checking this stuff // would be cool if mastodon played nicer with the network... - return Ok(()); + return Ok(StatusCode::OK); } tracing::warn!("refusing unauthorized activity: {}", pretty_json!(activity)); if matches!(auth, Identity::Anonymous) { - return Err(crate::ApiError::unauthorized()); + return Ok(StatusCode::UNAUTHORIZED); } else { - return Err(crate::ApiError::forbidden()); + return Ok(StatusCode::FORBIDDEN); } }; - todo!() + let aid = activity.id()?.to_string(); - // let aid = activity.id().ok_or_else(|| crate::ApiError::field("id"))?.to_string(); - // let actor = activity.actor().id().ok_or_else(|| crate::ApiError::field("actor"))?; + if let Some(_internal) = upub::model::activity::Entity::ap_to_internal(&aid, ctx.db()).await? { + return Ok(StatusCode::OK); // already processed + } - // if uid != actor { - // return Err(crate::ApiError::unauthorized()); - // } + let job = upub::model::job::ActiveModel { + internal: NotSet, + job_type: Set(JobType::Inbound), + actor: Set(uid), + target: Set(None), + activity: Set(aid), + payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing json payload"))), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + attempt: Set(0) + }; - // tracing::debug!("processing federated activity: '{:#}'", activity); + upub::model::job::Entity::insert(job).exec(ctx.db()).await?; - // // TODO we could process Links and bare Objects maybe, but probably out of AP spec? - // match activity.activity_type().ok_or_else(crate::ApiError::bad_request)? { - // ActivityType::Activity => { - // tracing::warn!("skipping unprocessable base activity: {}", pretty_json!(activity)); - // Err(StatusCode::UNPROCESSABLE_ENTITY.into()) // won't ingest useless stuff - // }, - - // // TODO emojireacts are NOT likes, but let's process them like ones for now maybe? - // ActivityType::Like | ActivityType::EmojiReact => Ok(ctx.like(server, activity).await?), - // ActivityType::Create => Ok(ctx.create(server, activity).await?), - // ActivityType::Follow => Ok(ctx.follow(server, activity).await?), - // ActivityType::Announce => Ok(ctx.announce(server, activity).await?), - // ActivityType::Accept(_) => Ok(ctx.accept(server, activity).await?), - // ActivityType::Reject(_) => Ok(ctx.reject(server, activity).await?), - // ActivityType::Undo => Ok(ctx.undo(server, activity).await?), - // ActivityType::Delete => Ok(ctx.delete(server, activity).await?), - // ActivityType::Update => Ok(ctx.update(server, activity).await?), - - // _x => { - // tracing::info!("received unimplemented activity on inbox: {}", pretty_json!(activity)); - // Err(StatusCode::NOT_IMPLEMENTED.into()) - // }, - // } + Ok(StatusCode::ACCEPTED) } diff --git a/upub/routes/src/activitypub/object/mod.rs b/upub/routes/src/activitypub/object/mod.rs index 69f7e340..baad5b37 100644 --- a/upub/routes/src/activitypub/object/mod.rs +++ b/upub/routes/src/activitypub/object/mod.rs @@ -3,7 +3,7 @@ pub mod replies; use apb::{CollectionMut, ObjectMut, LD}; use axum::extract::{Path, Query, State}; use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns}; -use upub::{model::{self, addressing::Event}, Context}; +use upub::{model::{self, addressing::Event}, traits::Fetcher, Context}; use crate::{builders::JsonLD, AuthIdentity}; @@ -16,13 +16,13 @@ pub async fn view( Query(query): Query, ) -> crate::ApiResult> { let oid = ctx.oid(&id); - // if auth.is_local() && query.fetch && !ctx.is_local(&oid) { - // let obj = ctx.fetch_object(&oid).await?; - // // some implementations serve statuses on different urls than their AP id - // if obj.id != oid { - // return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); - // } - // } + if auth.is_local() && query.fetch && !ctx.is_local(&oid) { + let obj = ctx.fetch_object(&oid).await?; + // some implementations serve statuses on different urls than their AP id + if obj.id != oid { + return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id)))); + } + } let item = model::addressing::Entity::find_addressed(auth.my_id()) .filter(model::object::Column::Id.eq(&oid)) diff --git a/upub/routes/src/activitypub/object/replies.rs b/upub/routes/src/activitypub/object/replies.rs index d88804e7..6a033aea 100644 --- a/upub/routes/src/activitypub/object/replies.rs +++ b/upub/routes/src/activitypub/object/replies.rs @@ -8,7 +8,7 @@ pub async fn get( State(ctx): State, Path(id): Path, AuthIdentity(auth): AuthIdentity, - Query(q): Query, + Query(_q): Query, ) -> crate::ApiResult> { let replies_id = upub::url!(ctx, "/objects/{id}/replies"); let oid = ctx.oid(&id); diff --git a/upub/routes/src/activitypub/user/inbox.rs b/upub/routes/src/activitypub/user/inbox.rs index a8777986..6ae79918 100644 --- a/upub/routes/src/activitypub/user/inbox.rs +++ b/upub/routes/src/activitypub/user/inbox.rs @@ -1,4 +1,4 @@ -use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; +use axum::{http::StatusCode, extract::{Path, Query, State}, Json}; use sea_orm::{ColumnTrait, Condition}; use upub::{model, Context}; @@ -54,7 +54,7 @@ pub async fn post( Path(_id): Path, AuthIdentity(_auth): AuthIdentity, Json(activity): Json, -) -> crate::ApiResult<()> { +) -> crate::ApiResult { // POSTing to user inboxes is effectively the same as POSTing to the main inbox super::super::inbox::post(State(ctx), AuthIdentity(_auth), Json(activity)).await } diff --git a/upub/routes/src/activitypub/user/mod.rs b/upub/routes/src/activitypub/user/mod.rs index 332089e9..88cd4326 100644 --- a/upub/routes/src/activitypub/user/mod.rs +++ b/upub/routes/src/activitypub/user/mod.rs @@ -7,7 +7,7 @@ pub mod following; use axum::extract::{Path, Query, State}; use apb::{LD, ActorMut, EndpointsMut, Node, ObjectMut}; -use upub::{ext::AnyQuery, model, Context}; +use upub::{ext::AnyQuery, model, traits::Fetcher, Context}; use crate::{builders::JsonLD, ApiError, AuthIdentity}; @@ -21,16 +21,18 @@ pub async fn view( Query(query): Query, ) -> crate::ApiResult> { let mut uid = ctx.uid(&id); - // if auth.is_local() { - // if id.starts_with('@') { - // if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') { - // uid = ctx.webfinger(user, host).await?; - // } - // } - // if query.fetch && !ctx.is_local(&uid) { - // ctx.fetch_user(&uid).await?; - // } - // } + if auth.is_local() { + if id.starts_with('@') { + if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') { + if let Some(webfinger) = ctx.webfinger(user, host).await? { + uid = webfinger; + } + } + } + if query.fetch && !ctx.is_local(&uid) { + ctx.fetch_user(&uid).await?; + } + } let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db()) .await? .ok_or_else(ApiError::not_found)?; diff --git a/upub/routes/src/activitypub/user/outbox.rs b/upub/routes/src/activitypub/user/outbox.rs index cf2ba65f..2913953d 100644 --- a/upub/routes/src/activitypub/user/outbox.rs +++ b/upub/routes/src/activitypub/user/outbox.rs @@ -1,7 +1,6 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; -use sea_orm::{ColumnTrait, Condition}; +use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait}; -use apb::{AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType}; use upub::{model, Context}; use crate::{activitypub::{CreationResult, Pagination}, builders::JsonLD, AuthIdentity, Identity}; @@ -46,46 +45,29 @@ pub async fn post( match auth { Identity::Anonymous => Err(StatusCode::UNAUTHORIZED.into()), Identity::Remote { .. } => Err(StatusCode::NOT_IMPLEMENTED.into()), - Identity::Local { id: uid, .. } => if ctx.uid(&id) == uid { - tracing::debug!("processing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default()); - todo!() - // match activity.base_type()? { - // BaseType::Link(_) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()), + Identity::Local { id: uid, .. } => { + if ctx.uid(&id) != uid { + return Err(crate::ApiError::forbidden()); + } - // BaseType::Object(ObjectType::Note) => - // Ok(CreationResult(ctx.create_note(uid, activity).await?)), + tracing::debug!("enqueuing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default()); + let aid = ctx.aid(&Context::new_id()); - // BaseType::Object(ObjectType::Activity(ActivityType::Create)) => - // Ok(CreationResult(ctx.create(uid, activity).await?)), + let job = model::job::ActiveModel { + internal: NotSet, + activity: Set(aid.clone()), + job_type: Set(model::job::JobType::Local), + actor: Set(uid.clone()), + target: Set(None), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + attempt: Set(0), + payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing back json object"))), + }; - // BaseType::Object(ObjectType::Activity(ActivityType::Like)) => - // Ok(CreationResult(ctx.like(uid, activity).await?)), + model::job::Entity::insert(job).exec(ctx.db()).await?; - // BaseType::Object(ObjectType::Activity(ActivityType::Follow)) => - // Ok(CreationResult(ctx.follow(uid, activity).await?)), - - // BaseType::Object(ObjectType::Activity(ActivityType::Announce)) => - // Ok(CreationResult(ctx.announce(uid, activity).await?)), - - // BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept))) => - // Ok(CreationResult(ctx.accept(uid, activity).await?)), - - // BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject))) => - // Ok(CreationResult(ctx.reject(uid, activity).await?)), - - // BaseType::Object(ObjectType::Activity(ActivityType::Undo)) => - // Ok(CreationResult(ctx.undo(uid, activity).await?)), - - // BaseType::Object(ObjectType::Activity(ActivityType::Delete)) => - // Ok(CreationResult(ctx.delete(uid, activity).await?)), - - // BaseType::Object(ObjectType::Activity(ActivityType::Update)) => - // Ok(CreationResult(ctx.update(uid, activity).await?)), - - // _ => Err(StatusCode::NOT_IMPLEMENTED.into()), - // } - } else { - Err(StatusCode::FORBIDDEN.into()) + Ok(CreationResult(aid)) } } } diff --git a/upub/routes/src/activitypub/well_known.rs b/upub/routes/src/activitypub/well_known.rs index 3d1477f0..15e6e9be 100644 --- a/upub/routes/src/activitypub/well_known.rs +++ b/upub/routes/src/activitypub/well_known.rs @@ -95,7 +95,10 @@ impl IntoResponse for JsonRD { } } -pub async fn webfinger(State(ctx): State, Query(query): Query) -> upub::Result> { +pub async fn webfinger( + State(ctx): State, + Query(query): Query +) -> crate::ApiResult> { if let Some((user, domain)) = query .resource .replace("acct:", "") @@ -106,7 +109,7 @@ pub async fn webfinger(State(ctx): State, Query(query): Query) -> upub::Result> { +pub async fn oauth_authorization_server(State(ctx): State) -> crate::ApiResult> { Ok(Json(OauthAuthorizationServerResponse { issuer: upub::url!(ctx, ""), authorization_endpoint: upub::url!(ctx, "/auth"), diff --git a/upub/routes/src/auth.rs b/upub/routes/src/auth.rs index 8a0af895..0a63d2ae 100644 --- a/upub/routes/src/auth.rs +++ b/upub/routes/src/auth.rs @@ -1,7 +1,7 @@ use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}}; -use reqwest::StatusCode; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter}; use httpsign::HttpSignature; +use upub::traits::Fetcher; use crate::ApiError; @@ -120,11 +120,9 @@ where .next().ok_or(ApiError::bad_request())? .to_string(); - match upub::model::actor::Entity::find_by_ap_id(&user_id) - .one(ctx.db()) - .await? - { - Some(user) => match http_signature + match ctx.fetch_user(&user_id).await { + Err(e) => tracing::warn!("failed resolving http signature actor: {e}"), + Ok(user) => match http_signature .build_from_parts(parts) .verify(&user.public_key) { @@ -137,9 +135,6 @@ where Ok(false) => tracing::warn!("invalid signature: {http_signature:?}"), Err(e) => tracing::error!("error verifying signature: {e}"), }, - None => { - // TODO enqueue fetching who tried signing this - } } } diff --git a/upub/routes/src/error.rs b/upub/routes/src/error.rs index 8c7dbaf0..a9970072 100644 --- a/upub/routes/src/error.rs +++ b/upub/routes/src/error.rs @@ -11,13 +11,13 @@ pub enum ApiError { #[error("http signature error: {0:?}")] HttpSignature(#[from] httpsign::HttpSignatureError), - #[error("fetch error: {0:?}")] + #[error("outgoing request error: {0:?}")] Reqwest(#[from] reqwest::Error), // TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut // helps with debugging! - #[error("fetch error: {0:?} -- server responded with {1}")] - FetchError(reqwest::Error, String), + #[error("fetch error: {0:?}")] + FetchError(#[from] upub::traits::fetch::PullError), // wrapper error to return arbitraty status codes #[error("{0}")] @@ -83,7 +83,7 @@ impl axum::response::IntoResponse for ApiError { "inner": format!("{e:#?}"), })) ).into_response(), - ApiError::Reqwest(x) | ApiError::FetchError(x, _) => ( + ApiError::Reqwest(x) => ( x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), axum::Json(serde_json::json!({ "error": "request", @@ -93,6 +93,14 @@ impl axum::response::IntoResponse for ApiError { "inner": format!("{x:#?}"), })) ).into_response(), + ApiError::FetchError(pull) => ( + StatusCode::INTERNAL_SERVER_ERROR, + axum::Json(serde_json::json!({ + "error": "fetch", + "description": descr, + "inner": format!("{pull:#?}"), + })) + ).into_response(), ApiError::Field(x) => ( axum::http::StatusCode::BAD_REQUEST, axum::Json(serde_json::json!({ diff --git a/upub/routes/src/lib.rs b/upub/routes/src/lib.rs index 4abbddd5..c58addb4 100644 --- a/upub/routes/src/lib.rs +++ b/upub/routes/src/lib.rs @@ -25,7 +25,7 @@ pub mod mastodon { impl MastodonRouter for axum::Router {} } -pub async fn serve(ctx: upub::Context, bind: String) -> upub::Result<()> { +pub async fn serve(ctx: upub::Context, bind: String) -> Result<(), std::io::Error> { use activitypub::ActivityPubRouter; use mastodon::MastodonRouter; use tower_http::{cors::CorsLayer, trace::TraceLayer}; @@ -50,8 +50,7 @@ pub async fn serve(ctx: upub::Context, bind: String) -> upub::Result<()> { .with_state(ctx); // run our app with hyper, listening locally on port 3000 - let listener = tokio::net::TcpListener::bind(bind) - .await.expect("could not bind tcp socket"); + let listener = tokio::net::TcpListener::bind(bind).await?; axum::serve(listener, router).await?; diff --git a/upub/processor/Cargo.toml b/upub/worker/Cargo.toml similarity index 86% rename from upub/processor/Cargo.toml rename to upub/worker/Cargo.toml index a0cac856..0abd5a96 100644 --- a/upub/processor/Cargo.toml +++ b/upub/worker/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "upub-processor" +name = "upub-worker" version = "0.2.0" edition = "2021" authors = [ "alemi " ] -description = "upub background activity processing worker" +description = "upub background activity processing and dispatching workers" license = "AGPL-3.0" repository = "https://git.alemi.dev/upub.git" readme = "README.md" @@ -17,6 +17,7 @@ async-trait = "0.1" serde_json = "1" sea-orm = "0.12" jrd = "0.1" +regex = "1.10" chrono = { version = "0.4", features = ["serde"] } tokio = { version = "1.35", features = ["full"] } # TODO slim this down reqwest = { version = "0.12", features = ["json"] } diff --git a/upub/processor/README.md b/upub/worker/README.md similarity index 100% rename from upub/processor/README.md rename to upub/worker/README.md diff --git a/upub/worker/src/dispatcher.rs b/upub/worker/src/dispatcher.rs new file mode 100644 index 00000000..e59cd242 --- /dev/null +++ b/upub/worker/src/dispatcher.rs @@ -0,0 +1,134 @@ +use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder}; + +use upub::{model, Context}; + +#[derive(Debug, thiserror::Error)] +pub enum JobError { + #[error("database error: {0:?}")] + Database(#[from] sea_orm::DbErr), + + #[error("invalid payload json: {0:?}")] + Json(#[from] serde_json::Error), + + #[error("malformed payload: {0}")] + Malformed(#[from] apb::FieldErr), + + #[error("malformed job: missing payload")] + MissingPayload, + + #[error("no available implementation to process job")] + Unprocessable, + + #[error("error processing activity: {0:?}")] + ProcessorError(#[from] upub::traits::process::ProcessorError), +} + +pub type JobResult = Result; + +#[async_trait::async_trait] +pub trait JobDispatcher : Sized { + async fn poll(&self, filter: Option) -> JobResult>; + async fn lock(&self, job_internal: i64) -> JobResult; + async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option); +} + +#[async_trait::async_trait] +impl JobDispatcher for Context { + async fn poll(&self, filter: Option) -> JobResult> { + let mut s = model::job::Entity::find() + .filter(model::job::Column::NotBefore.lte(chrono::Utc::now())); + + if let Some(t) = filter { + s = s.filter(model::job::Column::JobType.eq(t)); + } + + Ok( + s + .order_by(model::job::Column::NotBefore, Order::Asc) + .one(self.db()) + .await? + ) + } + + async fn lock(&self, job_internal: i64) -> JobResult { + let res = model::job::Entity::delete( + model::job::ActiveModel { + internal: sea_orm::ActiveValue::Set(job_internal), + ..Default::default() + } + ) + .exec(self.db()) + .await?; + + if res.rows_affected < 1 { + return Ok(false); + } + + Ok(true) + } + + async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option) { + macro_rules! restart { + (now) => { continue }; + () => { + { + tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; + continue; + } + } + } + + let mut pool = tokio::task::JoinSet::new(); + + loop { + let job = match self.poll(job_filter).await { + Ok(Some(j)) => j, + Ok(None) => restart!(), + Err(e) => { + tracing::error!("error polling for jobs: {e}"); + restart!() + }, + }; + + match self.lock(job.internal).await { + Ok(true) => {}, + Ok(false) => restart!(now), + Err(e) => { + tracing::error!("error locking job: {e}"); + restart!() + }, + } + + if job.expired() { + tracing::info!("dropping expired job {job:?}"); + restart!(now); + } + + let _ctx = self.clone(); + pool.spawn(async move { + let res = match job.job_type { + model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await, + model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await, + model::job::JobType::Local => crate::local::process(_ctx.clone(), &job).await, + }; + + if let Err(e) = res { + tracing::error!("failed processing job #{}: {e}", job.internal); + let active = job.clone().repeat(); + if let Err(e) = model::job::Entity::insert(active) + .exec(_ctx.db()) + .await + { + tracing::error!("could not insert back job ({e}), dropping:\n{job:#?}") + } + } + }); + + while pool.len() >= concurrency { + if let Some(Err(e)) = pool.join_next().await { + tracing::error!("failed joining processing task: {e}"); + } + } + } + } +} diff --git a/upub/worker/src/inbound.rs b/upub/worker/src/inbound.rs new file mode 100644 index 00000000..399ee507 --- /dev/null +++ b/upub/worker/src/inbound.rs @@ -0,0 +1,20 @@ +use upub::traits::Processor; + + +pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate::JobResult<()> { + let Some(ref payload) = job.payload else { + tracing::error!("abandoning inbound job without payload: {job:#?}"); + return Ok(()); + }; + + let Ok(activity) = serde_json::from_str::(payload) else { + tracing::error!("abandoning inbound job with invalid payload: {job:#?}"); + return Ok(()); + }; + + if let Err(e) = ctx.process(activity).await { + tracing::error!("failed processing job #{}: {e}", job.internal); + } + + Ok(()) +} diff --git a/upub/worker/src/lib.rs b/upub/worker/src/lib.rs new file mode 100644 index 00000000..1021f1a9 --- /dev/null +++ b/upub/worker/src/lib.rs @@ -0,0 +1,18 @@ +pub mod dispatcher; +pub mod inbound; +pub mod outbound; +pub mod local; + +pub use dispatcher::{JobError, JobResult}; + +pub fn spawn( + ctx: upub::Context, + concurrency: usize, + poll: u64, + filter: Option, +) -> tokio::task::JoinHandle<()> { + use dispatcher::JobDispatcher; + tokio::spawn(async move { + ctx.run(concurrency, poll, filter).await + }) +} diff --git a/upub/processor/src/outbox.rs b/upub/worker/src/local.rs similarity index 88% rename from upub/processor/src/outbox.rs rename to upub/worker/src/local.rs index 23c0150c..18eb1d69 100644 --- a/upub/processor/src/outbox.rs +++ b/upub/worker/src/local.rs @@ -1,11 +1,73 @@ -use apb::{target::Addressed, Activity, ActivityMut, Base, BaseMut, Node, Object, ObjectMut}; -use reqwest::StatusCode; -use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns}; +use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut}; +use sea_orm::{EntityTrait, QueryFilter, QuerySelect, SelectColumns, ColumnTrait}; +use upub::{model, traits::{Addresser, Processor}, Context}; -use crate::{errors::UpubError, model, ext::AnyQuery}; -use super::{addresser::Addresser, fetcher::Fetcher, normalizer::Normalizer, side_effects::SideEffects, Context}; +pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { + let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?; + let mut activity : serde_json::Value = serde_json::from_str(payload)?; + let mut t = activity.object_type()?; + if matches!(t, apb::ObjectType::Note) { + activity = apb::new() + .set_activity_type(Some(apb::ActivityType::Create)) + .set_object(apb::Node::object(activity)); + t = apb::ObjectType::Activity(apb::ActivityType::Create); + } + + activity = activity + .set_id(Some(&job.activity)) + .set_actor(apb::Node::link(job.actor.clone())) + .set_published(Some(chrono::Utc::now())); + + if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) { + let raw_oid = Context::new_id(); + let oid = ctx.oid(&raw_oid); + // object must be embedded, wont dereference here + let object = activity.object().extract().ok_or(apb::FieldErr("object"))?; + // TODO regex hell here i come... + let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern"); + let mut content = object.content().map(|x| x.to_string()).ok(); + if let Some(c) = content { + let mut tmp = mdhtml::safe_markdown(&c); + for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) { + if let Ok(Some(uid)) = model::actor::Entity::find() + .filter(model::actor::Column::PreferredUsername.eq(user)) + .filter(model::actor::Column::Domain.eq(domain)) + .select_only() + .select_column(model::actor::Column::Id) + .into_tuple::() + .one(ctx.db()) + .await + { + tmp = tmp.replacen(full, &format!("@{user}"), 1); + } + } + content = Some(tmp); + } + + activity = activity + .set_object(apb::Node::object( + object + .set_id(Some(&oid)) + .set_content(content.as_deref()) + .set_attributed_to(apb::Node::link(job.actor.clone())) + .set_published(Some(chrono::Utc::now())) + .set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))), + )); + } + + // TODO we expand addressing twice, ugghhhhh + let targets = ctx.expand_addressing(activity.addressed()).await?; + + ctx.process(activity).await?; + + ctx.deliver_to(&job.activity, &job.actor, &targets).await?; + + Ok(()) +} + +/* #[axum::async_trait] impl apb::server::Outbox for Context { @@ -40,27 +102,6 @@ impl apb::server::Outbox for Context { self.fetch_object(&reply).await?; } - // TODO regex hell here i come... - let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern"); - let mut content = object.content().map(|x| x.to_string()); - if let Some(c) = content { - let mut tmp = mdhtml::safe_markdown(&c); - for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) { - if let Ok(Some(uid)) = model::actor::Entity::find() - .filter(model::actor::Column::PreferredUsername.eq(user)) - .filter(model::actor::Column::Domain.eq(domain)) - .select_only() - .select_column(model::actor::Column::Id) - .into_tuple::() - .one(self.db()) - .await - { - tmp = tmp.replacen(full, &format!("@{user}"), 1); - } - } - content = Some(tmp); - } - self.insert_object( object .set_id(Some(&oid)) @@ -421,3 +462,5 @@ impl apb::server::Outbox for Context { Ok(aid) } } + +*/ diff --git a/upub/worker/src/outbound.rs b/upub/worker/src/outbound.rs new file mode 100644 index 00000000..b8a59d45 --- /dev/null +++ b/upub/worker/src/outbound.rs @@ -0,0 +1,63 @@ +use sea_orm::EntityTrait; +use reqwest::Method; + +use apb::{LD, Node, ActivityMut}; +use upub::{Context, model, traits::Fetcher}; + +pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { + tracing::info!("delivering {} to {:?}", job.activity, job.target); + + let payload = match model::activity::Entity::find_by_ap_id(&job.activity) + .find_also_related(model::object::Entity) + .one(ctx.db()) + .await? + { + Some((activity, None)) => activity.ap().ld_context(), + Some((activity, Some(object))) => { + let always_embed = matches!( + activity.activity_type, + apb::ActivityType::Create + | apb::ActivityType::Undo + | apb::ActivityType::Update + | apb::ActivityType::Accept(_) + | apb::ActivityType::Reject(_) + ); + if always_embed { + activity.ap().set_object(Node::object(object.ap())).ld_context() + } else { + activity.ap().ld_context() + } + }, + None => { + tracing::info!("skipping dispatch for deleted object {}", job.activity); + return Ok(()); + }, + }; + + let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor) + .one(ctx.db()) + .await? + else { + tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor); + return Ok(()); + }; + + let Some(key) = actor.private_key + else { + tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor); + return Ok(()); + }; + + if let Err(e) = Context::request( + Method::POST, job.target.as_deref().unwrap_or(""), + Some(&serde_json::to_string(&payload).unwrap()), + &job.actor, &key, ctx.domain() + ).await { + tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target); + model::job::Entity::insert(job.clone().repeat()) + .exec(ctx.db()) + .await?; + } + + Ok(()) +}