Compare commits
No commits in common. "c83e1df110c0ab79d45f8c9cb2f2fe89b4de9daf" and "0c1160b42f6b2fc9c606a1c84db873662737c7e9" have entirely different histories.
c83e1df110
...
0c1160b42f
54 changed files with 772 additions and 973 deletions
46
Cargo.lock
generated
46
Cargo.lock
generated
|
@ -4693,10 +4693,9 @@ name = "upub"
|
|||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"apb",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
"httpsign",
|
||||
"jrd",
|
||||
"mdhtml",
|
||||
"nodeinfo",
|
||||
|
@ -4719,7 +4718,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "upub-bin"
|
||||
version = "0.3.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"sea-orm",
|
||||
|
@ -4731,7 +4730,6 @@ dependencies = [
|
|||
"upub-cli",
|
||||
"upub-migrations",
|
||||
"upub-routes",
|
||||
"upub-worker",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4748,6 +4746,7 @@ dependencies = [
|
|||
"sha256",
|
||||
"tracing",
|
||||
"upub",
|
||||
"upub-processor",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
@ -4758,6 +4757,25 @@ dependencies = [
|
|||
"sea-orm-migration",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "upub-processor"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"apb",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"httpsign",
|
||||
"jrd",
|
||||
"mdhtml",
|
||||
"reqwest",
|
||||
"sea-orm",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"upub",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "upub-routes"
|
||||
version = "0.2.0"
|
||||
|
@ -4811,26 +4829,6 @@ dependencies = [
|
|||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "upub-worker"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"apb",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"httpsign",
|
||||
"jrd",
|
||||
"mdhtml",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"sea-orm",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"upub",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uriproxy"
|
||||
version = "0.1.0"
|
||||
|
|
|
@ -5,7 +5,7 @@ members = [
|
|||
"upub/cli",
|
||||
"upub/migrations",
|
||||
"upub/routes",
|
||||
"upub/worker",
|
||||
"upub/processor",
|
||||
"web",
|
||||
"utils/httpsign",
|
||||
"utils/mdhtml",
|
||||
|
@ -14,7 +14,7 @@ members = [
|
|||
|
||||
[package]
|
||||
name = "upub-bin"
|
||||
version = "0.3.0"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
authors = [ "alemi <me@alemi.dev>" ]
|
||||
description = "Traits and types to handle ActivityPub objects"
|
||||
|
@ -39,11 +39,9 @@ upub = { path = "upub/core" }
|
|||
upub-cli = { path = "upub/cli", optional = true }
|
||||
upub-migrations = { path = "upub/migrations", optional = true }
|
||||
upub-routes = { path = "upub/routes", optional = true }
|
||||
upub-worker = { path = "upub/worker", optional = true }
|
||||
|
||||
[features]
|
||||
default = ["serve", "migrate", "cli", "worker"]
|
||||
default = ["serve", "migrate", "cli"]
|
||||
serve = ["dep:upub-routes"]
|
||||
migrate = ["dep:upub-migrations"]
|
||||
cli = ["dep:upub-cli"]
|
||||
worker = ["dep:upub-worker"]
|
||||
|
|
68
main.rs
68
main.rs
|
@ -11,9 +11,6 @@ use upub_migrations as migrations;
|
|||
#[cfg(feature = "serve")]
|
||||
use upub_routes as routes;
|
||||
|
||||
#[cfg(feature = "worker")]
|
||||
use upub_worker as worker;
|
||||
|
||||
|
||||
#[derive(Parser)]
|
||||
/// all names were taken
|
||||
|
@ -56,52 +53,13 @@ enum Mode {
|
|||
command: cli::CliCommand,
|
||||
},
|
||||
|
||||
#[cfg(all(feature = "serve", feature = "worker"))]
|
||||
/// start both api routes and background workers
|
||||
Monolith {
|
||||
#[arg(short, long, default_value="127.0.0.1:3000")]
|
||||
/// addr to bind and serve onto
|
||||
bind: String,
|
||||
|
||||
#[arg(short, long, default_value_t = 4)]
|
||||
/// how many concurrent jobs to process with this worker
|
||||
tasks: usize,
|
||||
|
||||
#[arg(short, long, default_value_t = 20)]
|
||||
/// interval for polling new tasks
|
||||
poll: u64,
|
||||
},
|
||||
|
||||
#[cfg(feature = "serve")]
|
||||
/// start api routes server
|
||||
/// run fediverse server
|
||||
Serve {
|
||||
#[arg(short, long, default_value="127.0.0.1:3000")]
|
||||
/// addr to bind and serve onto
|
||||
bind: String,
|
||||
},
|
||||
|
||||
#[cfg(feature = "worker")]
|
||||
/// start background job worker
|
||||
Work {
|
||||
/// only run tasks of this type, run all if not given
|
||||
filter: Filter,
|
||||
|
||||
/// how many concurrent jobs to process with this worker
|
||||
#[arg(short, long, default_value_t = 4)]
|
||||
tasks: usize,
|
||||
|
||||
#[arg(short, long, default_value_t = 20)]
|
||||
/// interval for polling new tasks
|
||||
poll: u64,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, clap::ValueEnum)]
|
||||
enum Filter {
|
||||
All,
|
||||
Local,
|
||||
Inbound,
|
||||
Outbound,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -160,29 +118,5 @@ async fn main() {
|
|||
Mode::Serve { bind } =>
|
||||
routes::serve(ctx, bind)
|
||||
.await.expect("failed serving api routes"),
|
||||
|
||||
#[cfg(feature = "worker")]
|
||||
Mode::Work { filter, tasks, poll } =>
|
||||
worker::spawn(ctx, tasks, poll, filter.into())
|
||||
.await.expect("failed running worker"),
|
||||
|
||||
#[cfg(all(feature = "serve", feature = "worker"))]
|
||||
Mode::Monolith { bind, tasks, poll } => {
|
||||
worker::spawn(ctx.clone(), tasks, poll, None);
|
||||
|
||||
routes::serve(ctx, bind)
|
||||
.await.expect("failed serving api routes");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Filter> for Option<upub::model::job::JobType> {
|
||||
fn from(value: Filter) -> Self {
|
||||
match value {
|
||||
Filter::All => None,
|
||||
Filter::Local => Some(upub::model::job::JobType::Local),
|
||||
Filter::Inbound => Some(upub::model::job::JobType::Inbound),
|
||||
Filter::Outbound => Some(upub::model::job::JobType::Outbound),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ readme = "README.md"
|
|||
[dependencies]
|
||||
apb = { path = "../../apb/" }
|
||||
upub = { path = "../core" }
|
||||
upub-processor = { path = "../processor/" }
|
||||
tracing = "0.1"
|
||||
serde_json = "1"
|
||||
sha256 = "1.5"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use sea_orm::EntityTrait;
|
||||
use upub::traits::{fetch::{Fetchable, PullError}, Normalizer};
|
||||
use upub_processor::{fetch::{Fetchable, PullError}, normalize::{AP, Normalizer}};
|
||||
|
||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), PullError> {
|
||||
use apb::Base;
|
||||
|
@ -15,7 +15,7 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool) -> Result<(), Pu
|
|||
match obj.base_type() {
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Actor(_))) => {
|
||||
upub::model::actor::Entity::insert(
|
||||
upub::AP::actor_q(&obj).unwrap()
|
||||
AP::actor_q(&obj).unwrap()
|
||||
).exec(ctx.db()).await.unwrap();
|
||||
},
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use sea_orm::EntityTrait;
|
||||
|
||||
pub async fn fix(ctx: upub::Context, likes: bool, shares: bool, replies: bool) -> Result<(), sea_orm::DbErr> {
|
||||
pub async fn fix(ctx: upub::Context, likes: bool, shares: bool, replies: bool) -> upub::Result<()> {
|
||||
use futures::TryStreamExt;
|
||||
let db = ctx.db();
|
||||
|
||||
|
|
|
@ -93,12 +93,12 @@ pub enum CliCommand {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn run(ctx: upub::Context, command: CliCommand) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn run(ctx: upub::Context, command: CliCommand) -> upub::Result<()> {
|
||||
match command {
|
||||
CliCommand::Faker { count } =>
|
||||
Ok(faker(ctx, count as i64).await?),
|
||||
CliCommand::Fetch { uri, save } =>
|
||||
Ok(fetch(ctx, uri, save).await?),
|
||||
Ok(fetch(ctx, uri, save).await.map_err(|_e| upub::Error::internal_server_error())?),
|
||||
CliCommand::Relay { actor, accept } =>
|
||||
Ok(relay(ctx, actor, accept).await?),
|
||||
CliCommand::Fix { likes, shares, replies } =>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use upub::traits::Administrable;
|
||||
use upub::server::admin::Administrable;
|
||||
|
||||
pub async fn register(
|
||||
ctx: upub::Context,
|
||||
|
@ -8,7 +8,7 @@ pub async fn register(
|
|||
summary: Option<String>,
|
||||
avatar_url: Option<String>,
|
||||
banner_url: Option<String>,
|
||||
) -> Result<(), sea_orm::DbErr> {
|
||||
) -> upub::Result<()> {
|
||||
ctx.register_user(
|
||||
username.clone(),
|
||||
password,
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
|
||||
use upub::traits::Addresser;
|
||||
|
||||
pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<(), sea_orm::DbErr> {
|
||||
use upub_processor::address::Addresser;
|
||||
|
||||
pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> upub::Result<()> {
|
||||
let aid = ctx.aid(&uuid::Uuid::new_v4().to_string());
|
||||
|
||||
let mut activity_model = upub::model::activity::ActiveModel {
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use futures::TryStreamExt;
|
||||
use sea_orm::{ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter};
|
||||
use upub::traits::Fetcher;
|
||||
|
||||
pub async fn update_users(ctx: upub::Context, days: i64) -> Result<(), sea_orm::DbErr> {
|
||||
use upub_processor::{fetch::Fetcher, normalize::AP};
|
||||
|
||||
pub async fn update_users(ctx: upub::Context, days: i64) -> upub::Result<()> {
|
||||
let mut count = 0;
|
||||
let mut insertions = Vec::new();
|
||||
|
||||
|
@ -18,7 +19,7 @@ pub async fn update_users(ctx: upub::Context, days: i64) -> Result<(), sea_orm::
|
|||
match ctx.pull(&user.id).await.map(|x| x.actor()) {
|
||||
Err(e) => tracing::warn!("could not update user {}: {e}", user.id),
|
||||
Ok(Err(e)) => tracing::warn!("could not update user {}: {e}", user.id),
|
||||
Ok(Ok(doc)) => match upub::AP::actor_q(&doc) {
|
||||
Ok(Ok(doc)) => match AP::actor_q(&doc) {
|
||||
Ok(mut u) => {
|
||||
u.internal = Set(user.internal);
|
||||
u.updated = Set(chrono::Utc::now());
|
||||
|
|
|
@ -12,7 +12,6 @@ readme = "README.md"
|
|||
|
||||
[dependencies]
|
||||
thiserror = "1"
|
||||
async-trait = "0.1"
|
||||
sha256 = "1.5"
|
||||
openssl = "0.10" # TODO handle pubkeys with a smaller crate
|
||||
base64 = "0.22"
|
||||
|
@ -26,12 +25,12 @@ serde-inline-default = "0.2"
|
|||
toml = "0.8"
|
||||
mdhtml = { path = "../../utils/mdhtml", features = ["markdown"] }
|
||||
uriproxy = { path = "../../utils/uriproxy" }
|
||||
httpsign = { path = "../../utils/httpsign/" }
|
||||
jrd = "0.1"
|
||||
tracing = "0.1"
|
||||
tokio = { version = "1.35", features = ["full"] } # TODO slim this down
|
||||
sea-orm = { version = "0.12", features = ["macros", "sqlx-sqlite", "runtime-tokio-rustls"] }
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
||||
axum = "0.7"
|
||||
apb = { path = "../../apb", features = ["unstructured", "orm", "activitypub-fe", "activitypub-counters", "litepub", "ostatus", "toot"] }
|
||||
# nodeinfo = "0.0.2" # the version on crates.io doesn't re-export necessary types to build the struct!!!
|
||||
nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" }
|
||||
|
|
|
@ -94,8 +94,4 @@ impl Config {
|
|||
}
|
||||
Config::default()
|
||||
}
|
||||
|
||||
pub fn frontend_url(&self, url: &str) -> Option<String> {
|
||||
Some(format!("{}{}", self.instance.frontend.as_deref()?, url))
|
||||
}
|
||||
}
|
||||
|
|
160
upub/core/src/errors.rs
Normal file
160
upub/core/src/errors.rs
Normal file
|
@ -0,0 +1,160 @@
|
|||
use axum::{http::StatusCode, response::Redirect};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum UpubError {
|
||||
#[error("database error: {0:?}")]
|
||||
Database(#[from] sea_orm::DbErr),
|
||||
|
||||
#[error("{0}")]
|
||||
Status(axum::http::StatusCode),
|
||||
|
||||
#[error("{0}")]
|
||||
Field(#[from] apb::FieldErr),
|
||||
|
||||
#[error("openssl error: {0:?}")]
|
||||
OpenSSL(#[from] openssl::error::ErrorStack),
|
||||
|
||||
#[error("invalid UTF8 in key: {0:?}")]
|
||||
OpenSSLParse(#[from] std::str::Utf8Error),
|
||||
|
||||
#[error("fetch error: {0:?}")]
|
||||
Reqwest(#[from] reqwest::Error),
|
||||
|
||||
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
||||
// helps with debugging!
|
||||
#[error("fetch error: {0:?} -- server responded with {1}")]
|
||||
FetchError(reqwest::Error, String),
|
||||
|
||||
#[error("invalid base64 string: {0:?}")]
|
||||
Base64(#[from] base64::DecodeError),
|
||||
|
||||
#[error("type mismatch on object: expected {0:?}, found {1:?}")]
|
||||
Mismatch(apb::ObjectType, apb::ObjectType),
|
||||
|
||||
#[error("os I/O error: {0}")]
|
||||
IO(#[from] std::io::Error),
|
||||
|
||||
// TODO this isn't really an error but i need to redirect from some routes so this allows me to
|
||||
// keep the type hints on the return type, still what the hell!!!!
|
||||
#[error("redirecting to {0}")]
|
||||
Redirect(String),
|
||||
}
|
||||
|
||||
impl UpubError {
|
||||
pub fn bad_request() -> Self {
|
||||
Self::Status(axum::http::StatusCode::BAD_REQUEST)
|
||||
}
|
||||
|
||||
pub fn unprocessable() -> Self {
|
||||
Self::Status(axum::http::StatusCode::UNPROCESSABLE_ENTITY)
|
||||
}
|
||||
|
||||
pub fn not_found() -> Self {
|
||||
Self::Status(axum::http::StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
pub fn forbidden() -> Self {
|
||||
Self::Status(axum::http::StatusCode::FORBIDDEN)
|
||||
}
|
||||
|
||||
pub fn unauthorized() -> Self {
|
||||
Self::Status(axum::http::StatusCode::UNAUTHORIZED)
|
||||
}
|
||||
|
||||
pub fn not_modified() -> Self {
|
||||
Self::Status(axum::http::StatusCode::NOT_MODIFIED)
|
||||
}
|
||||
|
||||
pub fn internal_server_error() -> Self {
|
||||
Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
|
||||
pub type UpubResult<T> = Result<T, UpubError>;
|
||||
|
||||
impl From<axum::http::StatusCode> for UpubError {
|
||||
fn from(value: axum::http::StatusCode) -> Self {
|
||||
UpubError::Status(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl axum::response::IntoResponse for UpubError {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
// TODO it's kind of jank to hide this print down here, i should probably learn how spans work
|
||||
// in tracing and use the library's features but ehhhh
|
||||
tracing::debug!("emitting error response: {self:?}");
|
||||
let descr = self.to_string();
|
||||
match self {
|
||||
UpubError::Redirect(to) => Redirect::to(&to).into_response(),
|
||||
UpubError::Status(status) => status.into_response(),
|
||||
UpubError::Database(e) => (
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "database",
|
||||
"inner": format!("{e:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
UpubError::Reqwest(x) | UpubError::FetchError(x, _) => (
|
||||
x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "request",
|
||||
"status": x.status().map(|s| s.to_string()).unwrap_or_default(),
|
||||
"url": x.url().map(|x| x.to_string()).unwrap_or_default(),
|
||||
"description": descr,
|
||||
"inner": format!("{x:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
UpubError::Field(x) => (
|
||||
axum::http::StatusCode::BAD_REQUEST,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "field",
|
||||
"field": x.0.to_string(),
|
||||
"description": descr,
|
||||
}))
|
||||
).into_response(),
|
||||
UpubError::Mismatch(expected, found) => (
|
||||
axum::http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "type",
|
||||
"expected": expected.as_ref().to_string(),
|
||||
"found": found.as_ref().to_string(),
|
||||
"description": descr,
|
||||
}))
|
||||
).into_response(),
|
||||
x => (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "unknown",
|
||||
"description": descr,
|
||||
"inner": format!("{x:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait LoggableError {
|
||||
fn info_failed(self, msg: &str);
|
||||
fn warn_failed(self, msg: &str);
|
||||
fn err_failed(self, msg: &str);
|
||||
}
|
||||
|
||||
impl<T, E: std::error::Error> LoggableError for Result<T, E> {
|
||||
fn info_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::info!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn warn_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::warn!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn err_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::error!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,45 +1,19 @@
|
|||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
pub trait AnyQuery {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
impl<T : sea_orm::EntityTrait> AnyQuery for sea_orm::Select<T> {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr> {
|
||||
Ok(self.one(db).await?.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
impl<T : sea_orm::SelectorTrait + Send> AnyQuery for sea_orm::Selector<T> {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr> {
|
||||
Ok(self.one(db).await?.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait LoggableError {
|
||||
fn info_failed(self, msg: &str);
|
||||
fn warn_failed(self, msg: &str);
|
||||
fn err_failed(self, msg: &str);
|
||||
}
|
||||
|
||||
impl<T, E: std::error::Error> LoggableError for Result<T, E> {
|
||||
fn info_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::info!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn warn_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::warn!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn err_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::error!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
pub mod model;
|
||||
pub mod traits;
|
||||
|
||||
pub mod context;
|
||||
pub use context::Context;
|
||||
|
||||
pub mod config;
|
||||
pub use config::Config;
|
||||
|
||||
pub mod init;
|
||||
pub mod errors;
|
||||
pub mod server;
|
||||
pub mod model;
|
||||
pub mod ext;
|
||||
|
||||
pub use traits::normalize::AP;
|
||||
pub use server::Context;
|
||||
pub use config::Config;
|
||||
pub use errors::UpubResult as Result;
|
||||
pub use errors::UpubError as Error;
|
||||
|
||||
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
|
|
@ -33,6 +33,8 @@ pub enum Relation {
|
|||
Actors,
|
||||
#[sea_orm(has_many = "super::addressing::Entity")]
|
||||
Addressing,
|
||||
#[sea_orm(has_many = "super::delivery::Entity")]
|
||||
Deliveries,
|
||||
#[sea_orm(
|
||||
belongs_to = "super::object::Entity",
|
||||
from = "Column::Object",
|
||||
|
@ -55,6 +57,12 @@ impl Related<super::addressing::Entity> for Entity {
|
|||
}
|
||||
}
|
||||
|
||||
impl Related<super::delivery::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Deliveries.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::object::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Objects.def()
|
||||
|
|
|
@ -42,6 +42,8 @@ pub enum Relation {
|
|||
Configs,
|
||||
#[sea_orm(has_many = "super::credential::Entity")]
|
||||
Credentials,
|
||||
#[sea_orm(has_many = "super::delivery::Entity")]
|
||||
Deliveries,
|
||||
#[sea_orm(
|
||||
belongs_to = "super::instance::Entity",
|
||||
from = "Column::Domain",
|
||||
|
@ -92,6 +94,12 @@ impl Related<super::credential::Entity> for Entity {
|
|||
}
|
||||
}
|
||||
|
||||
impl Related<super::delivery::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Deliveries.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::instance::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Instances.def()
|
||||
|
|
|
@ -48,12 +48,12 @@ impl Model {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
pub trait BatchFillable {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
impl BatchFillable for &[Event] {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
|
||||
let objects : Vec<crate::model::object::Model> = self
|
||||
|
@ -80,14 +80,14 @@ impl BatchFillable for &[Event] {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
impl BatchFillable for Vec<Event> {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
|
||||
self.as_slice().load_attachments_batch(db).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
impl BatchFillable for Event {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
|
||||
let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient
|
||||
|
|
|
@ -1,36 +1,54 @@
|
|||
use sea_orm::entity::prelude::*;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
|
||||
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
||||
pub enum JobType {
|
||||
Inbound = 1,
|
||||
Outbound = 2,
|
||||
Local = 3,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
|
||||
#[sea_orm(table_name = "jobs")]
|
||||
#[sea_orm(table_name = "deliveries")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
pub internal: i64,
|
||||
pub job_type: JobType,
|
||||
pub actor: String,
|
||||
pub target: Option<String>,
|
||||
#[sea_orm(unique)]
|
||||
pub target: String,
|
||||
pub activity: String,
|
||||
pub payload: Option<String>,
|
||||
pub published: ChronoDateTimeUtc,
|
||||
pub not_before: ChronoDateTimeUtc,
|
||||
pub attempt: i32,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
pub enum Relation {
|
||||
#[sea_orm(
|
||||
belongs_to = "super::activity::Entity",
|
||||
from = "Column::Activity",
|
||||
to = "super::activity::Column::Id",
|
||||
on_update = "Cascade",
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Activities,
|
||||
#[sea_orm(
|
||||
belongs_to = "super::actor::Entity",
|
||||
from = "Column::Actor",
|
||||
to = "super::actor::Column::Id",
|
||||
on_update = "Cascade",
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Actors,
|
||||
}
|
||||
|
||||
impl Related<super::activity::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Activities.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::actor::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Actors.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
impl Model {
|
||||
pub fn next_attempt(&self) -> ChronoDateTimeUtc {
|
||||
pub fn next_delivery(&self) -> ChronoDateTimeUtc {
|
||||
match self.attempt {
|
||||
0 => chrono::Utc::now() + std::time::Duration::from_secs(10),
|
||||
1 => chrono::Utc::now() + std::time::Duration::from_secs(60),
|
||||
|
@ -45,18 +63,4 @@ impl Model {
|
|||
pub fn expired(&self) -> bool {
|
||||
chrono::Utc::now() - self.published > chrono::Duration::days(7)
|
||||
}
|
||||
|
||||
pub fn repeat(self) -> ActiveModel {
|
||||
ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
job_type: sea_orm::ActiveValue::Set(self.job_type),
|
||||
not_before: sea_orm::ActiveValue::Set(self.next_attempt()),
|
||||
actor: sea_orm::ActiveValue::Set(self.actor),
|
||||
target: sea_orm::ActiveValue::Set(self.target),
|
||||
payload: sea_orm::ActiveValue::Set(self.payload),
|
||||
activity: sea_orm::ActiveValue::Set(self.activity),
|
||||
published: sea_orm::ActiveValue::Set(self.published),
|
||||
attempt: sea_orm::ActiveValue::Set(self.attempt + 1),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,9 +6,10 @@ pub mod config;
|
|||
pub mod credential;
|
||||
pub mod session;
|
||||
|
||||
pub mod instance;
|
||||
pub mod addressing;
|
||||
pub mod job;
|
||||
pub mod instance;
|
||||
pub mod delivery;
|
||||
pub mod processing;
|
||||
|
||||
pub mod relation;
|
||||
pub mod announce;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[axum::async_trait]
|
||||
pub trait Administrable {
|
||||
async fn register_user(
|
||||
&self,
|
||||
|
@ -13,8 +13,8 @@ pub trait Administrable {
|
|||
) -> Result<(), DbErr>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Administrable for crate::Context {
|
||||
#[axum::async_trait]
|
||||
impl Administrable for super::Context {
|
||||
async fn register_user(
|
||||
&self,
|
||||
username: String,
|
|
@ -2,7 +2,7 @@ use std::{collections::BTreeSet, sync::Arc};
|
|||
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
|
||||
use crate::{config::Config, ext::AnyQuery, model};
|
||||
use crate::{config::Config, errors::UpubError, model, ext::AnyQuery};
|
||||
use uriproxy::UriClass;
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -36,7 +36,7 @@ macro_rules! url {
|
|||
impl Context {
|
||||
|
||||
// TODO slim constructor down, maybe make a builder?
|
||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> {
|
||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> crate::Result<Self> {
|
||||
let protocol = if domain.starts_with("http://")
|
||||
{ "http://" } else { "https://" }.to_string();
|
||||
if domain.ends_with('/') {
|
||||
|
@ -50,10 +50,10 @@ impl Context {
|
|||
let (actor, instance) = super::init::application(domain.clone(), base_url.clone(), &db).await?;
|
||||
|
||||
// TODO maybe we could provide a more descriptive error...
|
||||
let pkey = actor.private_key.as_deref().ok_or_else(|| DbErr::RecordNotFound("application private key".into()))?.to_string();
|
||||
let pkey = actor.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string();
|
||||
|
||||
let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(|| DbErr::RecordNotFound(actor.id.clone()))?;
|
||||
let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(|| DbErr::RecordNotFound(actor.id.clone()))?;
|
||||
let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?;
|
||||
let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?;
|
||||
|
||||
let relay = Relays {
|
||||
sources: BTreeSet::from_iter(relay_sources),
|
||||
|
@ -98,10 +98,6 @@ impl Context {
|
|||
&self.0.base_url
|
||||
}
|
||||
|
||||
pub fn new_id() -> String {
|
||||
uuid::Uuid::new_v4().to_string()
|
||||
}
|
||||
|
||||
/// get full user id uri
|
||||
pub fn uid(&self, id: &str) -> String {
|
||||
uriproxy::uri(self.base(), UriClass::Actor, id)
|
|
@ -3,23 +3,11 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseConnection, EntityTrait};
|
|||
|
||||
use crate::model;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum InitError {
|
||||
#[error("database error: {0:?}")]
|
||||
Database(#[from] sea_orm::DbErr),
|
||||
|
||||
#[error("openssl error: {0:?}")]
|
||||
OpenSSL(#[from] openssl::error::ErrorStack),
|
||||
|
||||
#[error("pem format error: {0:?}")]
|
||||
KeyError(#[from] std::str::Utf8Error),
|
||||
}
|
||||
|
||||
pub async fn application(
|
||||
domain: String,
|
||||
base_url: String,
|
||||
db: &DatabaseConnection
|
||||
) -> Result<(model::actor::Model, model::instance::Model), InitError> {
|
||||
) -> crate::Result<(model::actor::Model, model::instance::Model)> {
|
||||
Ok((
|
||||
match model::actor::Entity::find_by_ap_id(&base_url).one(db).await? {
|
||||
Some(model) => model,
|
5
upub/core/src/server/mod.rs
Normal file
5
upub/core/src/server/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
|||
pub mod admin;
|
||||
pub mod context;
|
||||
pub mod init;
|
||||
|
||||
pub use context::Context;
|
|
@ -1,11 +0,0 @@
|
|||
pub mod address;
|
||||
pub mod fetch;
|
||||
pub mod normalize;
|
||||
pub mod process;
|
||||
pub mod admin;
|
||||
|
||||
pub use admin::Administrable;
|
||||
pub use address::Addresser;
|
||||
pub use normalize::Normalizer;
|
||||
pub use process::Processor;
|
||||
pub use fetch::Fetcher;
|
3
upub/migrations/src/README.md
Normal file
3
upub/migrations/src/README.md
Normal file
|
@ -0,0 +1,3 @@
|
|||
# migrations
|
||||
|
||||
there are sea_orm migrations to apply to your database
|
|
@ -6,7 +6,6 @@ mod m20240524_000003_create_users_auth_and_config;
|
|||
mod m20240524_000004_create_addressing_deliveries;
|
||||
mod m20240524_000005_create_attachments_tags_mentions;
|
||||
mod m20240529_000001_add_relation_unique_index;
|
||||
mod m20240605_000001_add_jobs_table;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
|
@ -20,7 +19,6 @@ impl MigratorTrait for Migrator {
|
|||
Box::new(m20240524_000004_create_addressing_deliveries::Migration),
|
||||
Box::new(m20240524_000005_create_attachments_tags_mentions::Migration),
|
||||
Box::new(m20240529_000001_add_relation_unique_index::Migration),
|
||||
Box::new(m20240605_000001_add_jobs_table::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,146 +0,0 @@
|
|||
use sea_orm_migration::prelude::*;
|
||||
|
||||
use crate::{m20240524_000001_create_actor_activity_object_tables::{Activities, Actors}, m20240524_000004_create_addressing_deliveries::Deliveries};
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
pub enum Jobs {
|
||||
Table,
|
||||
Internal,
|
||||
JobType,
|
||||
Actor,
|
||||
Target,
|
||||
Activity,
|
||||
Payload,
|
||||
Published,
|
||||
NotBefore,
|
||||
Attempt,
|
||||
}
|
||||
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
|
||||
manager
|
||||
.drop_table(Table::drop().table(Deliveries::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_index(Index::drop().name("index-deliveries-not-before").table(Deliveries::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(Jobs::Table)
|
||||
.comment("background job queue: delivery, fetch and processing tasks")
|
||||
.col(
|
||||
ColumnDef::new(Jobs::Internal)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.auto_increment()
|
||||
.primary_key()
|
||||
)
|
||||
.col(ColumnDef::new(Jobs::JobType).small_integer().not_null())
|
||||
.col(ColumnDef::new(Jobs::Actor).string().not_null())
|
||||
.col(ColumnDef::new(Jobs::Target).string().null())
|
||||
.col(ColumnDef::new(Jobs::Activity).string().not_null().unique_key())
|
||||
.col(ColumnDef::new(Jobs::Payload).string().null())
|
||||
.col(ColumnDef::new(Jobs::Published).date_time().not_null().default(Expr::current_timestamp()))
|
||||
.col(ColumnDef::new(Jobs::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
|
||||
.col(ColumnDef::new(Jobs::Attempt).small_integer().not_null().default(0))
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_index(
|
||||
Index::create()
|
||||
.unique()
|
||||
.name("index-jobs-activity")
|
||||
.table(Jobs::Table)
|
||||
.col(Jobs::Activity)
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_index(
|
||||
Index::create()
|
||||
.name("index-jobs-not-before")
|
||||
.table(Jobs::Table)
|
||||
.col((Jobs::NotBefore, IndexOrder::Asc))
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.drop_table(Table::drop().table(Jobs::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_index(Index::drop().name("index-jobs-activity").table(Jobs::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_index(Index::drop().name("index-jobs-not-before").table(Jobs::Table).to_owned())
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(Deliveries::Table)
|
||||
.comment("this table contains all enqueued outgoing delivery jobs")
|
||||
.col(
|
||||
ColumnDef::new(Deliveries::Internal)
|
||||
.big_integer()
|
||||
.not_null()
|
||||
.auto_increment()
|
||||
.primary_key()
|
||||
)
|
||||
.col(ColumnDef::new(Deliveries::Actor).string().not_null())
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fkey-deliveries-actor")
|
||||
.from(Deliveries::Table, Deliveries::Actor)
|
||||
.to(Actors::Table, Actors::Id)
|
||||
.on_update(ForeignKeyAction::Cascade)
|
||||
.on_delete(ForeignKeyAction::Cascade)
|
||||
)
|
||||
.col(ColumnDef::new(Deliveries::Target).string().not_null())
|
||||
.col(ColumnDef::new(Deliveries::Activity).string().not_null())
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fkey-deliveries-activity")
|
||||
.from(Deliveries::Table, Deliveries::Activity)
|
||||
.to(Activities::Table, Activities::Id)
|
||||
.on_update(ForeignKeyAction::Cascade)
|
||||
.on_delete(ForeignKeyAction::Cascade)
|
||||
)
|
||||
.col(ColumnDef::new(Deliveries::Published).date_time().not_null().default(Expr::current_timestamp()))
|
||||
.col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
|
||||
.col(ColumnDef::new(Deliveries::Attempt).integer().not_null().default(0))
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_index(
|
||||
Index::create()
|
||||
.name("index-deliveries-not-before")
|
||||
.table(Deliveries::Table)
|
||||
.col((Deliveries::NotBefore, IndexOrder::Asc))
|
||||
.to_owned()
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,9 +1,9 @@
|
|||
[package]
|
||||
name = "upub-worker"
|
||||
name = "upub-processor"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
authors = [ "alemi <me@alemi.dev>" ]
|
||||
description = "upub background activity processing and dispatching workers"
|
||||
description = "upub background activity processing worker"
|
||||
license = "AGPL-3.0"
|
||||
repository = "https://git.alemi.dev/upub.git"
|
||||
readme = "README.md"
|
||||
|
@ -17,7 +17,6 @@ async-trait = "0.1"
|
|||
serde_json = "1"
|
||||
sea-orm = "0.12"
|
||||
jrd = "0.1"
|
||||
regex = "1.10"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tokio = { version = "1.35", features = ["full"] } # TODO slim this down
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
|
@ -1,6 +1,6 @@
|
|||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||
|
||||
use crate::traits::fetch::Fetcher;
|
||||
use crate::fetch::Fetcher;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Addresser {
|
||||
|
@ -12,13 +12,13 @@ pub trait Addresser {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Addresser for crate::Context {
|
||||
impl Addresser for upub::Context {
|
||||
async fn expand_addressing(&self, targets: Vec<String>) -> Result<Vec<String>, DbErr> {
|
||||
let mut out = Vec::new();
|
||||
for target in targets {
|
||||
if target.ends_with("/followers") {
|
||||
let target_id = target.replace("/followers", "");
|
||||
let mut followers = crate::model::relation::Entity::followers(&target_id, self.db())
|
||||
let mut followers = upub::model::relation::Entity::followers(&target_id, self.db())
|
||||
.await?
|
||||
.unwrap_or_else(Vec::new);
|
||||
if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO
|
||||
|
@ -48,8 +48,8 @@ impl Addresser for crate::Context {
|
|||
{
|
||||
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
|
||||
match (
|
||||
crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), self.db()).await?,
|
||||
crate::model::actor::Entity::ap_to_internal(target, self.db()).await?,
|
||||
upub::model::instance::Entity::domain_to_internal(&upub::Context::server(target), self.db()).await?,
|
||||
upub::model::actor::Entity::ap_to_internal(target, self.db()).await?,
|
||||
) {
|
||||
(Some(server), Some(actor)) => (Some(server), Some(actor)),
|
||||
(None, _) => { tracing::error!("failed resolving domain"); continue; },
|
||||
|
@ -57,7 +57,7 @@ impl Addresser for crate::Context {
|
|||
}
|
||||
};
|
||||
addressing.push(
|
||||
crate::model::addressing::ActiveModel {
|
||||
upub::model::addressing::ActiveModel {
|
||||
internal: NotSet,
|
||||
instance: Set(server),
|
||||
actor: Set(actor),
|
||||
|
@ -69,7 +69,7 @@ impl Addresser for crate::Context {
|
|||
}
|
||||
|
||||
if !addressing.is_empty() {
|
||||
crate::model::addressing::Entity::insert_many(addressing)
|
||||
upub::model::addressing::Entity::insert_many(addressing)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -81,20 +81,18 @@ impl Addresser for crate::Context {
|
|||
let mut deliveries = Vec::new();
|
||||
for target in targets.iter()
|
||||
.filter(|to| !to.is_empty())
|
||||
.filter(|to| crate::Context::server(to) != self.domain())
|
||||
.filter(|to| upub::Context::server(to) != self.domain())
|
||||
.filter(|to| to != &apb::target::PUBLIC)
|
||||
{
|
||||
// TODO fetch concurrently
|
||||
match self.fetch_user(target).await {
|
||||
Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
|
||||
crate::model::job::ActiveModel {
|
||||
Ok(upub::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
|
||||
upub::model::delivery::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
actor: Set(from.to_string()),
|
||||
job_type: Set(crate::model::job::JobType::Outbound),
|
||||
payload: Set(None),
|
||||
// TODO we should resolve each user by id and check its inbox because we can't assume
|
||||
// it's /actors/{id}/inbox for every software, but oh well it's waaaaay easier now
|
||||
target: Set(Some(inbox)),
|
||||
target: Set(inbox),
|
||||
activity: Set(aid.to_string()),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
|
@ -107,7 +105,7 @@ impl Addresser for crate::Context {
|
|||
}
|
||||
|
||||
if !deliveries.is_empty() {
|
||||
crate::model::job::Entity::insert_many(deliveries)
|
||||
upub::model::delivery::Entity::insert_many(deliveries)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -121,12 +119,12 @@ impl Addresser for crate::Context {
|
|||
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
|
||||
async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> Result<(), DbErr> {
|
||||
let addressed = self.expand_addressing(activity_targets).await?;
|
||||
let internal_aid = crate::model::activity::Entity::ap_to_internal(aid, self.db())
|
||||
let internal_aid = upub::model::activity::Entity::ap_to_internal(aid, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(aid.to_string()))?;
|
||||
let internal_oid = if let Some(o) = oid {
|
||||
Some(
|
||||
crate::model::object::Entity::ap_to_internal(o, self.db())
|
||||
upub::model::object::Entity::ap_to_internal(o, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(o.to_string()))?
|
||||
)
|
134
upub/processor/src/dispatcher.rs
Normal file
134
upub/processor/src/dispatcher.rs
Normal file
|
@ -0,0 +1,134 @@
|
|||
use reqwest::Method;
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder};
|
||||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
|
||||
use apb::{ActivityMut, Node};
|
||||
use crate::{model, Context, server::{fetcher::Fetcher, jsonld::LD}};
|
||||
|
||||
pub struct Dispatcher {
|
||||
waker: broadcast::Sender<()>,
|
||||
}
|
||||
|
||||
impl Default for Dispatcher {
|
||||
fn default() -> Self {
|
||||
let (waker, _) = broadcast::channel(1);
|
||||
Dispatcher { waker }
|
||||
}
|
||||
}
|
||||
|
||||
impl Dispatcher {
|
||||
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
|
||||
let mut waker = self.waker.subscribe();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await {
|
||||
tracing::error!("delivery worker exited with error: {e}");
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn wakeup(&self) {
|
||||
match self.waker.send(()) {
|
||||
Err(_) => tracing::error!("no worker to wakeup"),
|
||||
Ok(n) => tracing::debug!("woken {n} workers"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> {
|
||||
loop {
|
||||
let Some(delivery) = model::delivery::Entity::find()
|
||||
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
|
||||
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
||||
.one(db)
|
||||
.await?
|
||||
else {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = waker.recv() => {},
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
|
||||
}
|
||||
continue
|
||||
};
|
||||
|
||||
let del_row = model::delivery::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::Set(delivery.internal),
|
||||
..Default::default()
|
||||
};
|
||||
let del = model::delivery::Entity::delete(del_row)
|
||||
.exec(db)
|
||||
.await?;
|
||||
|
||||
if del.rows_affected == 0 {
|
||||
// another worker claimed this delivery
|
||||
continue; // go back to the top
|
||||
}
|
||||
if delivery.expired() {
|
||||
// try polling for another one
|
||||
continue; // go back to top
|
||||
}
|
||||
|
||||
tracing::info!("delivering {} to {}", delivery.activity, delivery.target);
|
||||
|
||||
let payload = match model::activity::Entity::find_by_ap_id(&delivery.activity)
|
||||
.find_also_related(model::object::Entity)
|
||||
.one(db)
|
||||
.await? // TODO probably should not fail here and at least re-insert the delivery
|
||||
{
|
||||
Some((activity, None)) => activity.ap().ld_context(),
|
||||
Some((activity, Some(object))) => {
|
||||
let always_embed = matches!(
|
||||
activity.activity_type,
|
||||
apb::ActivityType::Create
|
||||
| apb::ActivityType::Undo
|
||||
| apb::ActivityType::Update
|
||||
| apb::ActivityType::Accept(_)
|
||||
| apb::ActivityType::Reject(_)
|
||||
);
|
||||
if always_embed {
|
||||
activity.ap().set_object(Node::object(object.ap())).ld_context()
|
||||
} else {
|
||||
activity.ap().ld_context()
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::warn!("skipping dispatch for deleted object {}", delivery.activity);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
let Some(actor) = model::actor::Entity::find_by_ap_id(&delivery.actor)
|
||||
.one(db)
|
||||
.await?
|
||||
else {
|
||||
tracing::error!("abandoning delivery of {} from non existant actor: {}", delivery.activity, delivery.actor);
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(key) = actor.private_key
|
||||
else {
|
||||
tracing::error!("abandoning delivery of {} from actor without private key: {}", delivery.activity, delivery.actor);
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Err(e) = Context::request(
|
||||
Method::POST, &delivery.target,
|
||||
Some(&serde_json::to_string(&payload).unwrap()),
|
||||
&delivery.actor, &key, domain
|
||||
).await {
|
||||
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
||||
let new_delivery = model::delivery::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()),
|
||||
actor: sea_orm::ActiveValue::Set(delivery.actor),
|
||||
target: sea_orm::ActiveValue::Set(delivery.target),
|
||||
activity: sea_orm::ActiveValue::Set(delivery.activity),
|
||||
published: sea_orm::ActiveValue::Set(delivery.published),
|
||||
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
||||
};
|
||||
model::delivery::Entity::insert(new_delivery).exec(db).await?;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,9 +4,7 @@ use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object
|
|||
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
|
||||
use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet};
|
||||
|
||||
use crate::traits::normalize::AP;
|
||||
|
||||
use super::{Addresser, Normalizer};
|
||||
use super::{address::Addresser, normalize::Normalizer};
|
||||
use httpsign::HttpSignature;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -34,7 +32,7 @@ pub enum PullError {
|
|||
Malformed(#[from] apb::FieldErr),
|
||||
|
||||
#[error("error normalizing resource: {0:?}")]
|
||||
Normalization(#[from] crate::traits::normalize::NormalizerError),
|
||||
Normalization(#[from] crate::normalize::NormalizerError),
|
||||
|
||||
#[error("too many redirects while resolving resource id, aborting")]
|
||||
TooManyRedirects,
|
||||
|
@ -86,19 +84,19 @@ pub trait Fetcher {
|
|||
|
||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>;
|
||||
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<crate::model::instance::Model, PullError>;
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<upub::model::instance::Model, PullError>;
|
||||
|
||||
async fn fetch_user(&self, id: &str) -> Result<crate::model::actor::Model, PullError>;
|
||||
async fn resolve_user(&self, actor: serde_json::Value) -> Result<crate::model::actor::Model, PullError>;
|
||||
async fn fetch_user(&self, id: &str) -> Result<upub::model::actor::Model, PullError>;
|
||||
async fn resolve_user(&self, actor: serde_json::Value) -> Result<upub::model::actor::Model, PullError>;
|
||||
|
||||
async fn fetch_activity(&self, id: &str) -> Result<crate::model::activity::Model, PullError>;
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError>;
|
||||
async fn fetch_activity(&self, id: &str) -> Result<upub::model::activity::Model, PullError>;
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<upub::model::activity::Model, PullError>;
|
||||
|
||||
async fn fetch_object(&self, id: &str) -> Result<crate::model::object::Model, PullError> { self.fetch_object_r(id, 0).await }
|
||||
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result<crate::model::object::Model, PullError> { self.resolve_object_r(object, 0).await }
|
||||
async fn fetch_object(&self, id: &str) -> Result<upub::model::object::Model, PullError> { self.fetch_object_r(id, 0).await }
|
||||
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result<upub::model::object::Model, PullError> { self.resolve_object_r(object, 0).await }
|
||||
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError>;
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError>;
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<upub::model::object::Model, PullError>;
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<upub::model::object::Model, PullError>;
|
||||
|
||||
|
||||
async fn fetch_thread(&self, id: &str) -> Result<(), PullError>;
|
||||
|
@ -111,7 +109,7 @@ pub trait Fetcher {
|
|||
key: &str,
|
||||
domain: &str,
|
||||
) -> Result<Response, PullError> {
|
||||
let host = crate::Context::server(url);
|
||||
let host = upub::Context::server(url);
|
||||
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
|
||||
let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
|
||||
let digest = httpsign::digest(payload.unwrap_or_default());
|
||||
|
@ -138,7 +136,7 @@ pub trait Fetcher {
|
|||
.request(method.clone(), url)
|
||||
.header(ACCEPT, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
|
||||
.header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
|
||||
.header(USER_AGENT, format!("upub+{} ({domain})", crate::VERSION))
|
||||
.header(USER_AGENT, format!("upub+{} ({domain})", upub::VERSION))
|
||||
.header("Host", host.clone())
|
||||
.header("Date", date.clone())
|
||||
.header("Digest", digest)
|
||||
|
@ -161,9 +159,9 @@ pub trait Fetcher {
|
|||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Fetcher for crate::Context {
|
||||
impl Fetcher for upub::Context {
|
||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> {
|
||||
let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
|
||||
let _domain = self.fetch_domain(&upub::Context::server(id)).await?;
|
||||
|
||||
let document = Self::request(
|
||||
Method::GET, id, None,
|
||||
|
@ -197,7 +195,7 @@ impl Fetcher for crate::Context {
|
|||
let resource = reqwest::Client::new()
|
||||
.get(webfinger_uri)
|
||||
.header(ACCEPT, "application/jrd+json")
|
||||
.header(USER_AGENT, format!("upub+{} ({})", crate::VERSION, self.domain()))
|
||||
.header(USER_AGENT, format!("upub+{} ({})", upub::VERSION, self.domain()))
|
||||
.send()
|
||||
.await?
|
||||
.json::<jrd::JsonResourceDescriptor>()
|
||||
|
@ -223,12 +221,12 @@ impl Fetcher for crate::Context {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<crate::model::instance::Model, PullError> {
|
||||
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(self.db()).await? {
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<upub::model::instance::Model, PullError> {
|
||||
if let Some(x) = upub::model::instance::Entity::find_by_domain(domain).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
let mut instance_model = crate::model::instance::Model {
|
||||
let mut instance_model = upub::model::instance::Model {
|
||||
internal: 0,
|
||||
domain: domain.to_string(),
|
||||
name: None,
|
||||
|
@ -256,7 +254,7 @@ impl Fetcher for crate::Context {
|
|||
}
|
||||
}
|
||||
|
||||
if let Ok(nodeinfo) = crate::model::instance::Entity::nodeinfo(domain).await {
|
||||
if let Ok(nodeinfo) = upub::model::instance::Entity::nodeinfo(domain).await {
|
||||
instance_model.software = Some(nodeinfo.software.name);
|
||||
instance_model.version = nodeinfo.software.version;
|
||||
instance_model.users = nodeinfo.usage.users.and_then(|x| x.total);
|
||||
|
@ -265,8 +263,8 @@ impl Fetcher for crate::Context {
|
|||
|
||||
let mut active_model = instance_model.clone().into_active_model();
|
||||
active_model.internal = NotSet;
|
||||
crate::model::instance::Entity::insert(active_model).exec(self.db()).await?;
|
||||
let internal = crate::model::instance::Entity::domain_to_internal(domain, self.db())
|
||||
upub::model::instance::Entity::insert(active_model).exec(self.db()).await?;
|
||||
let internal = upub::model::instance::Entity::domain_to_internal(domain, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?;
|
||||
instance_model.internal = internal;
|
||||
|
@ -274,7 +272,7 @@ impl Fetcher for crate::Context {
|
|||
Ok(instance_model)
|
||||
}
|
||||
|
||||
async fn resolve_user(&self, mut document: serde_json::Value) -> Result<crate::model::actor::Model, PullError> {
|
||||
async fn resolve_user(&self, mut document: serde_json::Value) -> Result<upub::model::actor::Model, PullError> {
|
||||
let id = document.id()?.to_string();
|
||||
|
||||
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time
|
||||
|
@ -306,24 +304,24 @@ impl Fetcher for crate::Context {
|
|||
}
|
||||
}
|
||||
|
||||
let user_model = AP::actor_q(&document)?;
|
||||
let user_model = upub::model::actor::ActiveModel::new(&document)?;
|
||||
|
||||
// TODO this may fail: while fetching, remote server may fetch our service actor.
|
||||
// if it does so with http signature, we will fetch that actor in background
|
||||
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
|
||||
crate::model::actor::Entity::insert(user_model).exec(self.db()).await?;
|
||||
upub::model::actor::Entity::insert(user_model).exec(self.db()).await?;
|
||||
|
||||
// TODO fetch it back to get the internal id
|
||||
Ok(
|
||||
crate::model::actor::Entity::find_by_ap_id(&id)
|
||||
upub::model::actor::Entity::find_by_ap_id(&id)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
|
||||
)
|
||||
}
|
||||
|
||||
async fn fetch_user(&self, id: &str) -> Result<crate::model::actor::Model, PullError> {
|
||||
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
async fn fetch_user(&self, id: &str) -> Result<upub::model::actor::Model, PullError> {
|
||||
if let Some(x) = upub::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
|
@ -332,8 +330,8 @@ impl Fetcher for crate::Context {
|
|||
self.resolve_user(document).await
|
||||
}
|
||||
|
||||
async fn fetch_activity(&self, id: &str) -> Result<crate::model::activity::Model, PullError> {
|
||||
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
async fn fetch_activity(&self, id: &str) -> Result<upub::model::activity::Model, PullError> {
|
||||
if let Some(x) = upub::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
|
@ -342,15 +340,15 @@ impl Fetcher for crate::Context {
|
|||
self.resolve_activity(activity).await
|
||||
}
|
||||
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError> {
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<upub::model::activity::Model, PullError> {
|
||||
if let Ok(activity_actor) = activity.actor().id() {
|
||||
if let Err(e) = self.fetch_user(activity_actor).await {
|
||||
if let Err(e) = self.fetch_user(&activity_actor).await {
|
||||
tracing::warn!("could not get actor of fetched activity: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(activity_object) = activity.object().id() {
|
||||
if let Err(e) = self.fetch_object(activity_object).await {
|
||||
if let Err(e) = self.fetch_object(&activity_object).await {
|
||||
tracing::warn!("could not get object of fetched activity: {e}");
|
||||
}
|
||||
}
|
||||
|
@ -369,8 +367,8 @@ impl Fetcher for crate::Context {
|
|||
todo!()
|
||||
}
|
||||
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError> {
|
||||
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<upub::model::object::Model, PullError> {
|
||||
if let Some(x) = upub::model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
|
@ -379,12 +377,12 @@ impl Fetcher for crate::Context {
|
|||
self.resolve_object_r(object, depth).await
|
||||
}
|
||||
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError> {
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<upub::model::object::Model, PullError> {
|
||||
let id = object.id()?.to_string();
|
||||
|
||||
if let Ok(oid) = object.id() {
|
||||
if oid != id {
|
||||
if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? {
|
||||
if let Some(x) = upub::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? {
|
||||
return Ok(x); // already in db, but with id different that given url
|
||||
}
|
||||
}
|
||||
|
@ -417,14 +415,14 @@ impl Fetcher for crate::Context {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Fetchable : Sync + Send {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>;
|
||||
async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Fetchable for apb::Node<serde_json::Value> {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> {
|
||||
async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError> {
|
||||
if let apb::Node::Link(uri) = self {
|
||||
*self = crate::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||
*self = upub::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||
.await?
|
||||
.json::<serde_json::Value>()
|
||||
.await?
|
||||
|
@ -436,14 +434,14 @@ impl Fetchable for apb::Node<serde_json::Value> {
|
|||
}
|
||||
|
||||
// #[async_recursion::async_recursion]
|
||||
// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> {
|
||||
// async fn crawl_replies(ctx: &upub::Context, id: &str, depth: usize) -> Result<(), PullError> {
|
||||
// tracing::info!("crawling replies of '{id}'");
|
||||
// let object = crate::Context::request(
|
||||
// let object = upub::Context::request(
|
||||
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
//
|
||||
// let object_model = crate::model::object::Model::new(&object)?;
|
||||
// match crate::model::object::Entity::insert(object_model.into_active_model())
|
||||
// let object_model = upub::model::object::Model::new(&object)?;
|
||||
// match upub::model::object::Entity::insert(object_model.into_active_model())
|
||||
// .exec(ctx.db()).await
|
||||
// {
|
||||
// Ok(_) => {},
|
||||
|
@ -459,7 +457,7 @@ impl Fetchable for apb::Node<serde_json::Value> {
|
|||
//
|
||||
// let mut page_url = match object.replies().get() {
|
||||
// Some(serde_json::Value::String(x)) => {
|
||||
// let replies = crate::Context::request(
|
||||
// let replies = upub::Context::request(
|
||||
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
// replies.first().id()
|
||||
|
@ -472,7 +470,7 @@ impl Fetchable for apb::Node<serde_json::Value> {
|
|||
// };
|
||||
//
|
||||
// while let Some(ref url) = page_url {
|
||||
// let replies = crate::Context::request(
|
||||
// let replies = upub::Context::request(
|
||||
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
//
|
6
upub/processor/src/lib.rs
Normal file
6
upub/processor/src/lib.rs
Normal file
|
@ -0,0 +1,6 @@
|
|||
pub mod address;
|
||||
pub mod normalize;
|
||||
pub mod process;
|
||||
pub mod fetch;
|
||||
|
||||
// pub mod dispatcher;
|
|
@ -12,14 +12,14 @@ pub enum NormalizerError {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Normalizer {
|
||||
async fn insert_object(&self, obj: impl apb::Object) -> Result<crate::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity) -> Result<crate::model::activity::Model, NormalizerError>;
|
||||
async fn insert_object(&self, obj: impl apb::Object) -> Result<upub::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity) -> Result<upub::model::activity::Model, NormalizerError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Normalizer for crate::Context {
|
||||
impl Normalizer for upub::Context {
|
||||
|
||||
async fn insert_object(&self, object: impl apb::Object) -> Result<crate::model::object::Model, NormalizerError> {
|
||||
async fn insert_object(&self, object: impl apb::Object) -> Result<upub::model::object::Model, NormalizerError> {
|
||||
let oid = object.id()?.to_string();
|
||||
let uid = object.attributed_to().id().str();
|
||||
let t = object.object_type()?;
|
||||
|
@ -45,7 +45,7 @@ impl Normalizer for crate::Context {
|
|||
// > kind of dumb. there should be a job system so this can be done in waves. or maybe there's
|
||||
// > some whole other way to do this?? im thinking but misskey aaaa!! TODO
|
||||
if let Set(Some(ref reply)) = object_active_model.in_reply_to {
|
||||
if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? {
|
||||
if let Some(o) = upub::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? {
|
||||
object_active_model.context = Set(o.context);
|
||||
} else {
|
||||
object_active_model.context = Set(None); // TODO to be filled by some other task
|
||||
|
@ -54,25 +54,25 @@ impl Normalizer for crate::Context {
|
|||
object_active_model.context = Set(Some(oid.clone()));
|
||||
}
|
||||
|
||||
crate::model::object::Entity::insert(object_active_model).exec(self.db()).await?;
|
||||
let object_model = crate::model::object::Entity::find_by_ap_id(&oid)
|
||||
upub::model::object::Entity::insert(object_active_model).exec(self.db()).await?;
|
||||
let object_model = upub::model::object::Entity::find_by_ap_id(&oid)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?;
|
||||
|
||||
// update replies counter
|
||||
if let Some(ref in_reply_to) = object_model.in_reply_to {
|
||||
crate::model::object::Entity::update_many()
|
||||
.filter(crate::model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(crate::model::object::Column::Replies, Expr::col(crate::model::object::Column::Replies).add(1))
|
||||
upub::model::object::Entity::update_many()
|
||||
.filter(upub::model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(upub::model::object::Column::Replies, Expr::col(upub::model::object::Column::Replies).add(1))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
// update statuses counter
|
||||
if let Some(object_author) = uid {
|
||||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1))
|
||||
.filter(crate::model::actor::Column::Id.eq(&object_author))
|
||||
upub::model::actor::Entity::update_many()
|
||||
.col_expr(upub::model::actor::Column::StatusesCount, Expr::col(upub::model::actor::Column::StatusesCount).add(1))
|
||||
.filter(upub::model::actor::Column::Id.eq(&object_author))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ impl Normalizer for crate::Context {
|
|||
tracing::warn!("ignoring array-in-array while processing attachments");
|
||||
continue
|
||||
},
|
||||
Node::Link(l) => crate::model::attachment::ActiveModel {
|
||||
Node::Link(l) => upub::model::attachment::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
url: Set(l.href().to_string()),
|
||||
object: Set(object_model.internal),
|
||||
|
@ -96,7 +96,7 @@ impl Normalizer for crate::Context {
|
|||
Node::Object(o) =>
|
||||
AP::attachment_q(o.as_document()?, object_model.internal)?,
|
||||
};
|
||||
crate::model::attachment::Entity::insert(attachment_model)
|
||||
upub::model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ impl Normalizer for crate::Context {
|
|||
}
|
||||
}
|
||||
|
||||
crate::model::attachment::Entity::insert(attachment_model)
|
||||
upub::model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -131,16 +131,16 @@ impl Normalizer for crate::Context {
|
|||
Ok(object_model)
|
||||
}
|
||||
|
||||
async fn insert_activity(&self, activity: impl apb::Activity) -> Result<crate::model::activity::Model, NormalizerError> {
|
||||
async fn insert_activity(&self, activity: impl apb::Activity) -> Result<upub::model::activity::Model, NormalizerError> {
|
||||
let mut activity_model = AP::activity(&activity)?;
|
||||
|
||||
let mut active_model = activity_model.clone().into_active_model();
|
||||
active_model.internal = NotSet;
|
||||
crate::model::activity::Entity::insert(active_model)
|
||||
upub::model::activity::Entity::insert(active_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, self.db())
|
||||
let internal = upub::model::activity::Entity::ap_to_internal(&activity_model.id, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?;
|
||||
activity_model.internal = internal;
|
||||
|
@ -152,8 +152,8 @@ impl Normalizer for crate::Context {
|
|||
pub struct AP;
|
||||
|
||||
impl AP {
|
||||
pub fn activity(activity: &impl apb::Activity) -> Result<crate::model::activity::Model, apb::FieldErr> {
|
||||
Ok(crate::model::activity::Model {
|
||||
pub fn activity(activity: &impl apb::Activity) -> Result<upub::model::activity::Model, apb::FieldErr> {
|
||||
Ok(upub::model::activity::Model {
|
||||
internal: 0,
|
||||
id: activity.id()?.to_string(),
|
||||
activity_type: activity.activity_type()?,
|
||||
|
@ -168,7 +168,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn activity_q(activity: &impl apb::Activity) -> Result<crate::model::activity::ActiveModel, apb::FieldErr> {
|
||||
pub fn activity_q(activity: &impl apb::Activity) -> Result<upub::model::activity::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::activity(activity)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
||||
|
@ -177,8 +177,8 @@ impl AP {
|
|||
|
||||
|
||||
|
||||
pub fn attachment(document: &impl apb::Document, parent: i64) -> Result<crate::model::attachment::Model, apb::FieldErr> {
|
||||
Ok(crate::model::attachment::Model {
|
||||
pub fn attachment(document: &impl apb::Document, parent: i64) -> Result<upub::model::attachment::Model, apb::FieldErr> {
|
||||
Ok(upub::model::attachment::Model {
|
||||
internal: 0,
|
||||
url: document.url().id().str().unwrap_or_default(),
|
||||
object: parent,
|
||||
|
@ -189,7 +189,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result<crate::model::attachment::ActiveModel, apb::FieldErr> {
|
||||
pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result<upub::model::attachment::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::attachment(document, parent)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
||||
|
@ -197,7 +197,7 @@ impl AP {
|
|||
|
||||
|
||||
|
||||
pub fn object(object: &impl apb::Object) -> Result<crate::model::object::Model, apb::FieldErr> {
|
||||
pub fn object(object: &impl apb::Object) -> Result<upub::model::object::Model, apb::FieldErr> {
|
||||
let t = object.object_type()?;
|
||||
if matches!(t,
|
||||
apb::ObjectType::Activity(_)
|
||||
|
@ -207,7 +207,7 @@ impl AP {
|
|||
) {
|
||||
return Err(apb::FieldErr("type"));
|
||||
}
|
||||
Ok(crate::model::object::Model {
|
||||
Ok(upub::model::object::Model {
|
||||
internal: 0,
|
||||
id: object.id()?.to_string(),
|
||||
object_type: t,
|
||||
|
@ -235,7 +235,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn object_q(object: &impl apb::Object) -> Result<crate::model::object::ActiveModel, apb::FieldErr> {
|
||||
pub fn object_q(object: &impl apb::Object) -> Result<upub::model::object::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::object(object)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
||||
|
@ -243,7 +243,7 @@ impl AP {
|
|||
|
||||
|
||||
|
||||
pub fn actor(actor: &impl apb::Actor) -> Result<crate::model::actor::Model, apb::FieldErr> {
|
||||
pub fn actor(actor: &impl apb::Actor) -> Result<upub::model::actor::Model, apb::FieldErr> {
|
||||
let ap_id = actor.id()?.to_string();
|
||||
let (domain, fallback_preferred_username) = {
|
||||
let clean = ap_id
|
||||
|
@ -254,7 +254,7 @@ impl AP {
|
|||
let last = splits.last().unwrap_or(first);
|
||||
(first.to_string(), last.to_string())
|
||||
};
|
||||
Ok(crate::model::actor::Model {
|
||||
Ok(upub::model::actor::Model {
|
||||
internal: 0,
|
||||
domain,
|
||||
id: ap_id,
|
||||
|
@ -279,7 +279,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn actor_q(actor: &impl apb::Actor) -> Result<crate::model::actor::ActiveModel, apb::FieldErr> {
|
||||
pub fn actor_q(actor: &impl apb::Actor) -> Result<upub::model::actor::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::actor(actor)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
|
@ -1,73 +1,11 @@
|
|||
use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut};
|
||||
use sea_orm::{EntityTrait, QueryFilter, QuerySelect, SelectColumns, ColumnTrait};
|
||||
use upub::{model, traits::{Addresser, Processor}, Context};
|
||||
use apb::{target::Addressed, Activity, ActivityMut, Base, BaseMut, Node, Object, ObjectMut};
|
||||
use reqwest::StatusCode;
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns};
|
||||
|
||||
use crate::{errors::UpubError, model, ext::AnyQuery};
|
||||
|
||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||
let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?;
|
||||
let mut activity : serde_json::Value = serde_json::from_str(payload)?;
|
||||
let mut t = activity.object_type()?;
|
||||
use super::{addresser::Addresser, fetcher::Fetcher, normalizer::Normalizer, side_effects::SideEffects, Context};
|
||||
|
||||
if matches!(t, apb::ObjectType::Note) {
|
||||
activity = apb::new()
|
||||
.set_activity_type(Some(apb::ActivityType::Create))
|
||||
.set_object(apb::Node::object(activity));
|
||||
t = apb::ObjectType::Activity(apb::ActivityType::Create);
|
||||
}
|
||||
|
||||
activity = activity
|
||||
.set_id(Some(&job.activity))
|
||||
.set_actor(apb::Node::link(job.actor.clone()))
|
||||
.set_published(Some(chrono::Utc::now()));
|
||||
|
||||
if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) {
|
||||
let raw_oid = Context::new_id();
|
||||
let oid = ctx.oid(&raw_oid);
|
||||
// object must be embedded, wont dereference here
|
||||
let object = activity.object().extract().ok_or(apb::FieldErr("object"))?;
|
||||
// TODO regex hell here i come...
|
||||
let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern");
|
||||
let mut content = object.content().map(|x| x.to_string()).ok();
|
||||
if let Some(c) = content {
|
||||
let mut tmp = mdhtml::safe_markdown(&c);
|
||||
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
|
||||
if let Ok(Some(uid)) = model::actor::Entity::find()
|
||||
.filter(model::actor::Column::PreferredUsername.eq(user))
|
||||
.filter(model::actor::Column::Domain.eq(domain))
|
||||
.select_only()
|
||||
.select_column(model::actor::Column::Id)
|
||||
.into_tuple::<String>()
|
||||
.one(ctx.db())
|
||||
.await
|
||||
{
|
||||
tmp = tmp.replacen(full, &format!("<a href=\"{uid}\" class=\"u-url mention\">@{user}</a>"), 1);
|
||||
}
|
||||
}
|
||||
content = Some(tmp);
|
||||
}
|
||||
|
||||
activity = activity
|
||||
.set_object(apb::Node::object(
|
||||
object
|
||||
.set_id(Some(&oid))
|
||||
.set_content(content.as_deref())
|
||||
.set_attributed_to(apb::Node::link(job.actor.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
.set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO we expand addressing twice, ugghhhhh
|
||||
let targets = ctx.expand_addressing(activity.addressed()).await?;
|
||||
|
||||
ctx.process(activity).await?;
|
||||
|
||||
ctx.deliver_to(&job.activity, &job.actor, &targets).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
#[axum::async_trait]
|
||||
impl apb::server::Outbox for Context {
|
||||
|
@ -102,6 +40,27 @@ impl apb::server::Outbox for Context {
|
|||
self.fetch_object(&reply).await?;
|
||||
}
|
||||
|
||||
// TODO regex hell here i come...
|
||||
let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern");
|
||||
let mut content = object.content().map(|x| x.to_string());
|
||||
if let Some(c) = content {
|
||||
let mut tmp = mdhtml::safe_markdown(&c);
|
||||
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
|
||||
if let Ok(Some(uid)) = model::actor::Entity::find()
|
||||
.filter(model::actor::Column::PreferredUsername.eq(user))
|
||||
.filter(model::actor::Column::Domain.eq(domain))
|
||||
.select_only()
|
||||
.select_column(model::actor::Column::Id)
|
||||
.into_tuple::<String>()
|
||||
.one(self.db())
|
||||
.await
|
||||
{
|
||||
tmp = tmp.replacen(full, &format!("<a href=\"{uid}\" class=\"u-url mention\">@{user}</a>"), 1);
|
||||
}
|
||||
}
|
||||
content = Some(tmp);
|
||||
}
|
||||
|
||||
self.insert_object(
|
||||
object
|
||||
.set_id(Some(&oid))
|
||||
|
@ -462,5 +421,3 @@ impl apb::server::Outbox for Context {
|
|||
Ok(aid)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
|
@ -1,6 +1,7 @@
|
|||
use apb::{target::Addressed, Activity, Base, Object};
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
use crate::{ext::{AnyQuery, LoggableError}, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}};
|
||||
use upub::{errors::LoggableError, ext::AnyQuery};
|
||||
use crate::{address::Addresser, fetch::{Fetcher, Pull}, normalize::Normalizer};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProcessorError {
|
||||
|
@ -23,10 +24,10 @@ pub enum ProcessorError {
|
|||
Unprocessable,
|
||||
|
||||
#[error("failed normalizing and inserting entity: {0:?}")]
|
||||
NormalizerError(#[from] crate::traits::normalize::NormalizerError),
|
||||
NormalizerError(#[from] crate::normalize::NormalizerError),
|
||||
|
||||
#[error("failed fetching resource: {0:?}")]
|
||||
PullError(#[from] crate::traits::fetch::PullError),
|
||||
PullError(#[from] crate::fetch::PullError),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
@ -35,7 +36,7 @@ pub trait Processor {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Processor for crate::Context {
|
||||
impl Processor for upub::Context {
|
||||
async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||
match activity.activity_type()? {
|
||||
|
@ -54,7 +55,7 @@ impl Processor for crate::Context {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn create(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let Some(object_node) = activity.object().extract() else {
|
||||
// TODO we could process non-embedded activities or arrays but im lazy rn
|
||||
tracing::error!("refusing to process activity without embedded object");
|
||||
|
@ -73,15 +74,15 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let object_uri = activity.object().id()?.to_string();
|
||||
let published = activity.published().unwrap_or_else(|_|chrono::Utc::now());
|
||||
let obj = ctx.fetch_object(&object_uri).await?;
|
||||
if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal)
|
||||
if upub::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal)
|
||||
.any(ctx.db())
|
||||
.await?
|
||||
{
|
||||
|
@ -90,26 +91,26 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
|
||||
let like = crate::model::like::ActiveModel {
|
||||
let like = upub::model::like::ActiveModel {
|
||||
internal: NotSet,
|
||||
actor: Set(internal_uid),
|
||||
object: Set(obj.internal),
|
||||
activity: Set(activity_model.internal),
|
||||
published: Set(published),
|
||||
};
|
||||
crate::model::like::Entity::insert(like).exec(ctx.db()).await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1))
|
||||
.filter(crate::model::object::Column::Internal.eq(obj.internal))
|
||||
upub::model::like::Entity::insert(like).exec(ctx.db()).await?;
|
||||
upub::model::object::Entity::update_many()
|
||||
.col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).add(1))
|
||||
.filter(upub::model::object::Column::Internal.eq(obj.internal))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
||||
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!!
|
||||
expanded_addressing.push(
|
||||
crate::model::object::Entity::find_by_id(obj.internal)
|
||||
upub::model::object::Entity::find_by_id(obj.internal)
|
||||
.select_only()
|
||||
.select_column(crate::model::object::Column::AttributedTo)
|
||||
.select_column(upub::model::object::Column::AttributedTo)
|
||||
.into_tuple::<String>()
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
|
@ -121,22 +122,22 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn follow(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let source_actor = activity.actor().id()?.to_string();
|
||||
let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, ctx.db())
|
||||
let source_actor_internal = upub::model::actor::Entity::ap_to_internal(&source_actor, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let target_actor = activity.object().id()?.to_string();
|
||||
let usr = ctx.fetch_user(&target_actor).await?;
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let relation_model = crate::model::relation::ActiveModel {
|
||||
let relation_model = upub::model::relation::ActiveModel {
|
||||
internal: NotSet,
|
||||
accept: Set(None),
|
||||
activity: Set(activity_model.internal),
|
||||
follower: Set(source_actor_internal),
|
||||
following: Set(usr.internal),
|
||||
};
|
||||
crate::model::relation::Entity::insert(relation_model)
|
||||
upub::model::relation::Entity::insert(relation_model)
|
||||
.exec(ctx.db()).await?;
|
||||
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
if !expanded_addressing.contains(&target_actor) {
|
||||
|
@ -147,11 +148,11 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
// TODO what about TentativeAccept
|
||||
let target_actor = activity.actor().id()?.to_string();
|
||||
let follow_request_id = activity.object().id()?.to_string();
|
||||
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
@ -162,26 +163,26 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
|
||||
crate::model::actor::Entity::update_many()
|
||||
upub::model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
crate::model::actor::Column::FollowingCount,
|
||||
Expr::col(crate::model::actor::Column::FollowingCount).add(1)
|
||||
upub::model::actor::Column::FollowingCount,
|
||||
Expr::col(upub::model::actor::Column::FollowingCount).add(1)
|
||||
)
|
||||
.filter(crate::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.filter(upub::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
crate::model::actor::Entity::update_many()
|
||||
upub::model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
crate::model::actor::Column::FollowersCount,
|
||||
Expr::col(crate::model::actor::Column::FollowersCount).add(1)
|
||||
upub::model::actor::Column::FollowersCount,
|
||||
Expr::col(upub::model::actor::Column::FollowersCount).add(1)
|
||||
)
|
||||
.filter(crate::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.filter(upub::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
||||
crate::model::relation::Entity::update_many()
|
||||
.col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
|
||||
.filter(crate::model::relation::Column::Activity.eq(follow_activity.internal))
|
||||
upub::model::relation::Entity::update_many()
|
||||
.col_expr(upub::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
|
||||
.filter(upub::model::relation::Column::Activity.eq(follow_activity.internal))
|
||||
.exec(ctx.db()).await?;
|
||||
|
||||
tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor);
|
||||
|
@ -194,11 +195,11 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
// TODO what about TentativeReject?
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let follow_request_id = activity.object().id()?.to_string();
|
||||
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
@ -209,8 +210,8 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
|
||||
crate::model::relation::Entity::delete_many()
|
||||
.filter(crate::model::relation::Column::Activity.eq(activity_model.internal))
|
||||
upub::model::relation::Entity::delete_many()
|
||||
.filter(upub::model::relation::Column::Activity.eq(activity_model.internal))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
||||
|
@ -225,16 +226,17 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn delete(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let oid = activity.object().id()?.to_string();
|
||||
crate::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users");
|
||||
crate::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects");
|
||||
upub::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users");
|
||||
upub::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects");
|
||||
tracing::debug!("deleted '{oid}'");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let aid = activity.id()?.to_string();
|
||||
let Some(object_node) = activity.object().extract() else {
|
||||
tracing::error!("refusing to process activity without embedded object");
|
||||
return Err(ProcessorError::Unprocessable);
|
||||
|
@ -245,24 +247,24 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
|
||||
match object_node.object_type()? {
|
||||
apb::ObjectType::Actor(_) => {
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, ctx.db())
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&oid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let mut actor_model = crate::AP::actor_q(object_node.as_actor()?)?;
|
||||
let mut actor_model = upub::model::actor::ActiveModel::new(object_node.as_actor()?)?;
|
||||
actor_model.internal = Set(internal_uid);
|
||||
actor_model.updated = Set(chrono::Utc::now());
|
||||
crate::model::actor::Entity::update(actor_model)
|
||||
upub::model::actor::Entity::update(actor_model)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
apb::ObjectType::Note => {
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, ctx.db())
|
||||
let internal_oid = upub::model::object::Entity::ap_to_internal(&oid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let mut object_model = crate::AP::object_q(&object_node)?;
|
||||
let mut object_model = upub::model::object::ActiveModel::new(&object_node)?;
|
||||
object_model.internal = Set(internal_oid);
|
||||
object_model.updated = Set(chrono::Utc::now());
|
||||
crate::model::object::Entity::update(object_model)
|
||||
upub::model::object::Entity::update(object_model)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
|
@ -275,10 +277,11 @@ pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Resul
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
// TODO in theory we could work with just object_id but right now only accept embedded
|
||||
let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?;
|
||||
let undone_activity_id = undone_activity.id()?;
|
||||
let undone_activity_author = undone_activity.as_activity()?.actor().id()?.to_string();
|
||||
|
||||
if uid != undone_activity_author {
|
||||
|
@ -287,7 +290,7 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
|
||||
let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string();
|
||||
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
||||
|
@ -298,40 +301,40 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
|
||||
match activity_type {
|
||||
apb::ActivityType::Like => {
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
let internal_oid = upub::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
crate::model::like::Entity::delete_many()
|
||||
upub::model::like::Entity::delete_many()
|
||||
.filter(
|
||||
Condition::all()
|
||||
.add(crate::model::like::Column::Actor.eq(internal_uid))
|
||||
.add(crate::model::like::Column::Object.eq(internal_oid))
|
||||
.add(upub::model::like::Column::Actor.eq(internal_uid))
|
||||
.add(upub::model::like::Column::Object.eq(internal_oid))
|
||||
)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.filter(crate::model::object::Column::Internal.eq(internal_oid))
|
||||
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).sub(1))
|
||||
upub::model::object::Entity::update_many()
|
||||
.filter(upub::model::object::Column::Internal.eq(internal_oid))
|
||||
.col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).sub(1))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
apb::ActivityType::Follow => {
|
||||
let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
let internal_uid_following = upub::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
crate::model::relation::Entity::delete_many()
|
||||
.filter(crate::model::relation::Column::Follower.eq(internal_uid))
|
||||
.filter(crate::model::relation::Column::Following.eq(internal_uid_following))
|
||||
upub::model::relation::Entity::delete_many()
|
||||
.filter(upub::model::relation::Column::Follower.eq(internal_uid))
|
||||
.filter(upub::model::relation::Column::Following.eq(internal_uid_following))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
crate::model::actor::Entity::update_many()
|
||||
.filter(crate::model::actor::Column::Internal.eq(internal_uid))
|
||||
.col_expr(crate::model::actor::Column::FollowingCount, Expr::col(crate::model::actor::Column::FollowingCount).sub(1))
|
||||
upub::model::actor::Entity::update_many()
|
||||
.filter(upub::model::actor::Column::Internal.eq(internal_uid))
|
||||
.col_expr(upub::model::actor::Column::FollowingCount, Expr::col(upub::model::actor::Column::FollowingCount).sub(1))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
crate::model::actor::Entity::update_many()
|
||||
.filter(crate::model::actor::Column::Internal.eq(internal_uid_following))
|
||||
.col_expr(crate::model::actor::Column::FollowersCount, Expr::col(crate::model::actor::Column::FollowersCount).sub(1))
|
||||
upub::model::actor::Entity::update_many()
|
||||
.filter(upub::model::actor::Column::Internal.eq(internal_uid_following))
|
||||
.col_expr(upub::model::actor::Column::FollowersCount, Expr::col(upub::model::actor::Column::FollowersCount).sub(1))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
|
@ -344,10 +347,10 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let actor = ctx.fetch_user(&uid).await?;
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let announced_id = activity.object().id()?.to_string();
|
||||
|
@ -368,7 +371,7 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
|
|||
return Ok(())
|
||||
}
|
||||
|
||||
let share = crate::model::announce::ActiveModel {
|
||||
let share = upub::model::announce::ActiveModel {
|
||||
internal: NotSet,
|
||||
actor: Set(internal_uid),
|
||||
object: Set(object_model.internal),
|
||||
|
@ -377,11 +380,11 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
|
|||
|
||||
let expanded_addressing = ctx.expand_addressing(addressed).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
crate::model::announce::Entity::insert(share)
|
||||
upub::model::announce::Entity::insert(share)
|
||||
.exec(ctx.db()).await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
|
||||
.filter(crate::model::object::Column::Internal.eq(object_model.internal))
|
||||
upub::model::object::Entity::update_many()
|
||||
.col_expr(upub::model::object::Column::Announces, Expr::col(upub::model::object::Column::Announces).add(1))
|
||||
.filter(upub::model::object::Column::Internal.eq(object_model.internal))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
use axum::extract::{Path, Query, State};
|
||||
use sea_orm::{ColumnTrait, QueryFilter};
|
||||
use upub::{model::{self, addressing::Event, attachment::BatchFillable}, traits::Fetcher, Context};
|
||||
use upub::{model::{self, addressing::Event, attachment::BatchFillable}, Context};
|
||||
use apb::LD;
|
||||
|
||||
use crate::{builders::JsonLD, AuthIdentity};
|
||||
|
@ -14,12 +14,12 @@ pub async fn view(
|
|||
Query(query): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let aid = ctx.aid(&id);
|
||||
if auth.is_local() && query.fetch && !ctx.is_local(&aid) {
|
||||
let obj = ctx.fetch_activity(&aid).await?;
|
||||
if obj.id != aid {
|
||||
return Err(crate::ApiError::Redirect(obj.id));
|
||||
}
|
||||
}
|
||||
// if auth.is_local() && query.fetch && !ctx.is_local(&aid) {
|
||||
// let obj = ctx.fetch_activity(&aid).await?;
|
||||
// if obj.id != aid {
|
||||
// return Err(crate::ApiError::Redirect(obj.id));
|
||||
// }
|
||||
// }
|
||||
|
||||
let row = model::addressing::Entity::find_addressed(auth.my_id())
|
||||
.filter(model::activity::Column::Id.eq(&aid))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use apb::{LD, ActorMut, BaseMut, ObjectMut, PublicKeyMut};
|
||||
use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Redirect, Response}, Form, Json};
|
||||
use reqwest::Method;
|
||||
use upub::{traits::Fetcher, Context};
|
||||
use upub::Context;
|
||||
|
||||
use crate::{builders::JsonLD, AuthIdentity};
|
||||
|
||||
|
@ -52,19 +52,20 @@ pub async fn proxy_get(
|
|||
if !ctx.cfg().security.allow_public_debugger && !auth.is_local() {
|
||||
return Err(crate::ApiError::unauthorized());
|
||||
}
|
||||
Ok(Json(
|
||||
Context::request(
|
||||
Method::GET,
|
||||
&query.id,
|
||||
None,
|
||||
ctx.base(),
|
||||
ctx.pkey(),
|
||||
&format!("{}+proxy", ctx.domain()),
|
||||
)
|
||||
.await?
|
||||
.json::<serde_json::Value>()
|
||||
.await?
|
||||
))
|
||||
todo!()
|
||||
// Ok(Json(
|
||||
// Context::request(
|
||||
// Method::GET,
|
||||
// &query.id,
|
||||
// None,
|
||||
// ctx.base(),
|
||||
// ctx.pkey(),
|
||||
// &format!("{}+proxy", ctx.domain()),
|
||||
// )
|
||||
// .await?
|
||||
// .json::<serde_json::Value>()
|
||||
// .await?
|
||||
// ))
|
||||
}
|
||||
|
||||
pub async fn proxy_form(
|
||||
|
@ -76,17 +77,18 @@ pub async fn proxy_form(
|
|||
if !ctx.cfg().security.allow_public_debugger && auth.is_local() {
|
||||
return Err(crate::ApiError::unauthorized());
|
||||
}
|
||||
Ok(Json(
|
||||
Context::request(
|
||||
Method::GET,
|
||||
&query.id,
|
||||
None,
|
||||
ctx.base(),
|
||||
ctx.pkey(),
|
||||
&format!("{}+proxy", ctx.domain()),
|
||||
)
|
||||
.await?
|
||||
.json::<serde_json::Value>()
|
||||
.await?
|
||||
))
|
||||
todo!()
|
||||
// Ok(Json(
|
||||
// Context::request(
|
||||
// Method::GET,
|
||||
// &query.id,
|
||||
// None,
|
||||
// ctx.base(),
|
||||
// ctx.pkey(),
|
||||
// &format!("{}+proxy", ctx.domain()),
|
||||
// )
|
||||
// .await?
|
||||
// .json::<serde_json::Value>()
|
||||
// .await?
|
||||
// ))
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use axum::{http::StatusCode, extract::State, Json};
|
||||
use rand::Rng;
|
||||
use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||
use upub::{traits::Administrable, Context};
|
||||
use upub::{server::admin::Administrable, Context};
|
||||
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use apb::{Activity, ActivityType, Base};
|
||||
use axum::{extract::{Query, State}, http::StatusCode, Json};
|
||||
use sea_orm::{sea_query::IntoCondition, ActiveValue::{NotSet, Set}, ColumnTrait, EntityTrait};
|
||||
use upub::{model::job::JobType, Context};
|
||||
use sea_orm::{sea_query::IntoCondition, ColumnTrait};
|
||||
use upub::Context;
|
||||
|
||||
use crate::{AuthIdentity, Identity, builders::JsonLD};
|
||||
|
||||
|
@ -42,8 +42,8 @@ pub async fn post(
|
|||
State(ctx): State<Context>,
|
||||
AuthIdentity(auth): AuthIdentity,
|
||||
Json(activity): Json<serde_json::Value>
|
||||
) -> crate::ApiResult<StatusCode> {
|
||||
let Identity::Remote { domain: _server, user: uid, .. } = auth else {
|
||||
) -> crate::ApiResult<()> {
|
||||
let Identity::Remote { domain: server, user: uid, .. } = auth else {
|
||||
if matches!(activity.activity_type(), Ok(ActivityType::Delete)) {
|
||||
// this is spammy af, ignore them!
|
||||
// we basically received a delete for a user we can't fetch and verify, meaning remote
|
||||
|
@ -51,35 +51,48 @@ pub async fn post(
|
|||
// but mastodon keeps hammering us trying to delete this user, so just make mastodon happy
|
||||
// and return 200 without even bothering checking this stuff
|
||||
// would be cool if mastodon played nicer with the network...
|
||||
return Ok(StatusCode::OK);
|
||||
return Ok(());
|
||||
}
|
||||
tracing::warn!("refusing unauthorized activity: {}", pretty_json!(activity));
|
||||
if matches!(auth, Identity::Anonymous) {
|
||||
return Ok(StatusCode::UNAUTHORIZED);
|
||||
return Err(crate::ApiError::unauthorized());
|
||||
} else {
|
||||
return Ok(StatusCode::FORBIDDEN);
|
||||
return Err(crate::ApiError::forbidden());
|
||||
}
|
||||
};
|
||||
|
||||
let aid = activity.id()?.to_string();
|
||||
todo!()
|
||||
|
||||
if let Some(_internal) = upub::model::activity::Entity::ap_to_internal(&aid, ctx.db()).await? {
|
||||
return Ok(StatusCode::OK); // already processed
|
||||
}
|
||||
// let aid = activity.id().ok_or_else(|| crate::ApiError::field("id"))?.to_string();
|
||||
// let actor = activity.actor().id().ok_or_else(|| crate::ApiError::field("actor"))?;
|
||||
|
||||
let job = upub::model::job::ActiveModel {
|
||||
internal: NotSet,
|
||||
job_type: Set(JobType::Inbound),
|
||||
actor: Set(uid),
|
||||
target: Set(None),
|
||||
activity: Set(aid),
|
||||
payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing json payload"))),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0)
|
||||
};
|
||||
// if uid != actor {
|
||||
// return Err(crate::ApiError::unauthorized());
|
||||
// }
|
||||
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
// tracing::debug!("processing federated activity: '{:#}'", activity);
|
||||
|
||||
Ok(StatusCode::ACCEPTED)
|
||||
// // TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||
// match activity.activity_type().ok_or_else(crate::ApiError::bad_request)? {
|
||||
// ActivityType::Activity => {
|
||||
// tracing::warn!("skipping unprocessable base activity: {}", pretty_json!(activity));
|
||||
// Err(StatusCode::UNPROCESSABLE_ENTITY.into()) // won't ingest useless stuff
|
||||
// },
|
||||
|
||||
// // TODO emojireacts are NOT likes, but let's process them like ones for now maybe?
|
||||
// ActivityType::Like | ActivityType::EmojiReact => Ok(ctx.like(server, activity).await?),
|
||||
// ActivityType::Create => Ok(ctx.create(server, activity).await?),
|
||||
// ActivityType::Follow => Ok(ctx.follow(server, activity).await?),
|
||||
// ActivityType::Announce => Ok(ctx.announce(server, activity).await?),
|
||||
// ActivityType::Accept(_) => Ok(ctx.accept(server, activity).await?),
|
||||
// ActivityType::Reject(_) => Ok(ctx.reject(server, activity).await?),
|
||||
// ActivityType::Undo => Ok(ctx.undo(server, activity).await?),
|
||||
// ActivityType::Delete => Ok(ctx.delete(server, activity).await?),
|
||||
// ActivityType::Update => Ok(ctx.update(server, activity).await?),
|
||||
|
||||
// _x => {
|
||||
// tracing::info!("received unimplemented activity on inbox: {}", pretty_json!(activity));
|
||||
// Err(StatusCode::NOT_IMPLEMENTED.into())
|
||||
// },
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ pub mod replies;
|
|||
use apb::{CollectionMut, ObjectMut, LD};
|
||||
use axum::extract::{Path, Query, State};
|
||||
use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
use upub::{model::{self, addressing::Event}, traits::Fetcher, Context};
|
||||
use upub::{model::{self, addressing::Event}, Context};
|
||||
|
||||
use crate::{builders::JsonLD, AuthIdentity};
|
||||
|
||||
|
@ -16,13 +16,13 @@ pub async fn view(
|
|||
Query(query): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let oid = ctx.oid(&id);
|
||||
if auth.is_local() && query.fetch && !ctx.is_local(&oid) {
|
||||
let obj = ctx.fetch_object(&oid).await?;
|
||||
// some implementations serve statuses on different urls than their AP id
|
||||
if obj.id != oid {
|
||||
return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id))));
|
||||
}
|
||||
}
|
||||
// if auth.is_local() && query.fetch && !ctx.is_local(&oid) {
|
||||
// let obj = ctx.fetch_object(&oid).await?;
|
||||
// // some implementations serve statuses on different urls than their AP id
|
||||
// if obj.id != oid {
|
||||
// return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id))));
|
||||
// }
|
||||
// }
|
||||
|
||||
let item = model::addressing::Entity::find_addressed(auth.my_id())
|
||||
.filter(model::object::Column::Id.eq(&oid))
|
||||
|
|
|
@ -8,7 +8,7 @@ pub async fn get(
|
|||
State(ctx): State<Context>,
|
||||
Path(id): Path<String>,
|
||||
AuthIdentity(auth): AuthIdentity,
|
||||
Query(_q): Query<TryFetch>,
|
||||
Query(q): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let replies_id = upub::url!(ctx, "/objects/{id}/replies");
|
||||
let oid = ctx.oid(&id);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use axum::{http::StatusCode, extract::{Path, Query, State}, Json};
|
||||
use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
|
||||
|
||||
use sea_orm::{ColumnTrait, Condition};
|
||||
use upub::{model, Context};
|
||||
|
@ -54,7 +54,7 @@ pub async fn post(
|
|||
Path(_id): Path<String>,
|
||||
AuthIdentity(_auth): AuthIdentity,
|
||||
Json(activity): Json<serde_json::Value>,
|
||||
) -> crate::ApiResult<StatusCode> {
|
||||
) -> crate::ApiResult<()> {
|
||||
// POSTing to user inboxes is effectively the same as POSTing to the main inbox
|
||||
super::super::inbox::post(State(ctx), AuthIdentity(_auth), Json(activity)).await
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ pub mod following;
|
|||
use axum::extract::{Path, Query, State};
|
||||
|
||||
use apb::{LD, ActorMut, EndpointsMut, Node, ObjectMut};
|
||||
use upub::{ext::AnyQuery, model, traits::Fetcher, Context};
|
||||
use upub::{ext::AnyQuery, model, Context};
|
||||
|
||||
use crate::{builders::JsonLD, ApiError, AuthIdentity};
|
||||
|
||||
|
@ -21,18 +21,16 @@ pub async fn view(
|
|||
Query(query): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let mut uid = ctx.uid(&id);
|
||||
if auth.is_local() {
|
||||
if id.starts_with('@') {
|
||||
if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') {
|
||||
if let Some(webfinger) = ctx.webfinger(user, host).await? {
|
||||
uid = webfinger;
|
||||
}
|
||||
}
|
||||
}
|
||||
if query.fetch && !ctx.is_local(&uid) {
|
||||
ctx.fetch_user(&uid).await?;
|
||||
}
|
||||
}
|
||||
// if auth.is_local() {
|
||||
// if id.starts_with('@') {
|
||||
// if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') {
|
||||
// uid = ctx.webfinger(user, host).await?;
|
||||
// }
|
||||
// }
|
||||
// if query.fetch && !ctx.is_local(&uid) {
|
||||
// ctx.fetch_user(&uid).await?;
|
||||
// }
|
||||
// }
|
||||
let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or_else(ApiError::not_found)?;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
|
||||
use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait};
|
||||
use sea_orm::{ColumnTrait, Condition};
|
||||
|
||||
use apb::{AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType};
|
||||
use upub::{model, Context};
|
||||
|
||||
use crate::{activitypub::{CreationResult, Pagination}, builders::JsonLD, AuthIdentity, Identity};
|
||||
|
@ -45,29 +46,46 @@ pub async fn post(
|
|||
match auth {
|
||||
Identity::Anonymous => Err(StatusCode::UNAUTHORIZED.into()),
|
||||
Identity::Remote { .. } => Err(StatusCode::NOT_IMPLEMENTED.into()),
|
||||
Identity::Local { id: uid, .. } => {
|
||||
if ctx.uid(&id) != uid {
|
||||
return Err(crate::ApiError::forbidden());
|
||||
}
|
||||
Identity::Local { id: uid, .. } => if ctx.uid(&id) == uid {
|
||||
tracing::debug!("processing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default());
|
||||
todo!()
|
||||
// match activity.base_type()? {
|
||||
// BaseType::Link(_) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()),
|
||||
|
||||
tracing::debug!("enqueuing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default());
|
||||
let aid = ctx.aid(&Context::new_id());
|
||||
// BaseType::Object(ObjectType::Note) =>
|
||||
// Ok(CreationResult(ctx.create_note(uid, activity).await?)),
|
||||
|
||||
let job = model::job::ActiveModel {
|
||||
internal: NotSet,
|
||||
activity: Set(aid.clone()),
|
||||
job_type: Set(model::job::JobType::Local),
|
||||
actor: Set(uid.clone()),
|
||||
target: Set(None),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0),
|
||||
payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing back json object"))),
|
||||
};
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Create)) =>
|
||||
// Ok(CreationResult(ctx.create(uid, activity).await?)),
|
||||
|
||||
model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Like)) =>
|
||||
// Ok(CreationResult(ctx.like(uid, activity).await?)),
|
||||
|
||||
Ok(CreationResult(aid))
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Follow)) =>
|
||||
// Ok(CreationResult(ctx.follow(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Announce)) =>
|
||||
// Ok(CreationResult(ctx.announce(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept))) =>
|
||||
// Ok(CreationResult(ctx.accept(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject))) =>
|
||||
// Ok(CreationResult(ctx.reject(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Undo)) =>
|
||||
// Ok(CreationResult(ctx.undo(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Delete)) =>
|
||||
// Ok(CreationResult(ctx.delete(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Update)) =>
|
||||
// Ok(CreationResult(ctx.update(uid, activity).await?)),
|
||||
|
||||
// _ => Err(StatusCode::NOT_IMPLEMENTED.into()),
|
||||
// }
|
||||
} else {
|
||||
Err(StatusCode::FORBIDDEN.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,10 +95,7 @@ impl<T: serde::Serialize> IntoResponse for JsonRD<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn webfinger(
|
||||
State(ctx): State<Context>,
|
||||
Query(query): Query<WebfingerQuery>
|
||||
) -> crate::ApiResult<JsonRD<JsonResourceDescriptor>> {
|
||||
pub async fn webfinger(State(ctx): State<Context>, Query(query): Query<WebfingerQuery>) -> upub::Result<JsonRD<JsonResourceDescriptor>> {
|
||||
if let Some((user, domain)) = query
|
||||
.resource
|
||||
.replace("acct:", "")
|
||||
|
@ -109,7 +106,7 @@ pub async fn webfinger(
|
|||
.filter(model::actor::Column::Domain.eq(domain))
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or_else(crate::ApiError::not_found)?;
|
||||
.ok_or_else(upub::Error::not_found)?;
|
||||
|
||||
let expires = if domain == ctx.domain() {
|
||||
// TODO configurable webfinger TTL, also 30 days may be too much???
|
||||
|
@ -165,7 +162,7 @@ pub struct OauthAuthorizationServerResponse {
|
|||
authorization_response_iss_parameter_supported: bool,
|
||||
}
|
||||
|
||||
pub async fn oauth_authorization_server(State(ctx): State<Context>) -> crate::ApiResult<Json<OauthAuthorizationServerResponse>> {
|
||||
pub async fn oauth_authorization_server(State(ctx): State<Context>) -> upub::Result<Json<OauthAuthorizationServerResponse>> {
|
||||
Ok(Json(OauthAuthorizationServerResponse {
|
||||
issuer: upub::url!(ctx, ""),
|
||||
authorization_endpoint: upub::url!(ctx, "/auth"),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}};
|
||||
use reqwest::StatusCode;
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||
use httpsign::HttpSignature;
|
||||
use upub::traits::Fetcher;
|
||||
|
||||
use crate::ApiError;
|
||||
|
||||
|
@ -120,9 +120,11 @@ where
|
|||
.next().ok_or(ApiError::bad_request())?
|
||||
.to_string();
|
||||
|
||||
match ctx.fetch_user(&user_id).await {
|
||||
Err(e) => tracing::warn!("failed resolving http signature actor: {e}"),
|
||||
Ok(user) => match http_signature
|
||||
match upub::model::actor::Entity::find_by_ap_id(&user_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
{
|
||||
Some(user) => match http_signature
|
||||
.build_from_parts(parts)
|
||||
.verify(&user.public_key)
|
||||
{
|
||||
|
@ -135,6 +137,9 @@ where
|
|||
Ok(false) => tracing::warn!("invalid signature: {http_signature:?}"),
|
||||
Err(e) => tracing::error!("error verifying signature: {e}"),
|
||||
},
|
||||
None => {
|
||||
// TODO enqueue fetching who tried signing this
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,13 +11,13 @@ pub enum ApiError {
|
|||
#[error("http signature error: {0:?}")]
|
||||
HttpSignature(#[from] httpsign::HttpSignatureError),
|
||||
|
||||
#[error("outgoing request error: {0:?}")]
|
||||
#[error("fetch error: {0:?}")]
|
||||
Reqwest(#[from] reqwest::Error),
|
||||
|
||||
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
||||
// helps with debugging!
|
||||
#[error("fetch error: {0:?}")]
|
||||
FetchError(#[from] upub::traits::fetch::PullError),
|
||||
#[error("fetch error: {0:?} -- server responded with {1}")]
|
||||
FetchError(reqwest::Error, String),
|
||||
|
||||
// wrapper error to return arbitraty status codes
|
||||
#[error("{0}")]
|
||||
|
@ -83,7 +83,7 @@ impl axum::response::IntoResponse for ApiError {
|
|||
"inner": format!("{e:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
ApiError::Reqwest(x) => (
|
||||
ApiError::Reqwest(x) | ApiError::FetchError(x, _) => (
|
||||
x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "request",
|
||||
|
@ -93,14 +93,6 @@ impl axum::response::IntoResponse for ApiError {
|
|||
"inner": format!("{x:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
ApiError::FetchError(pull) => (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "fetch",
|
||||
"description": descr,
|
||||
"inner": format!("{pull:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
ApiError::Field(x) => (
|
||||
axum::http::StatusCode::BAD_REQUEST,
|
||||
axum::Json(serde_json::json!({
|
||||
|
|
|
@ -25,7 +25,7 @@ pub mod mastodon {
|
|||
impl MastodonRouter for axum::Router<upub::Context> {}
|
||||
}
|
||||
|
||||
pub async fn serve(ctx: upub::Context, bind: String) -> Result<(), std::io::Error> {
|
||||
pub async fn serve(ctx: upub::Context, bind: String) -> upub::Result<()> {
|
||||
use activitypub::ActivityPubRouter;
|
||||
use mastodon::MastodonRouter;
|
||||
use tower_http::{cors::CorsLayer, trace::TraceLayer};
|
||||
|
@ -50,7 +50,8 @@ pub async fn serve(ctx: upub::Context, bind: String) -> Result<(), std::io::Erro
|
|||
.with_state(ctx);
|
||||
|
||||
// run our app with hyper, listening locally on port 3000
|
||||
let listener = tokio::net::TcpListener::bind(bind).await?;
|
||||
let listener = tokio::net::TcpListener::bind(bind)
|
||||
.await.expect("could not bind tcp socket");
|
||||
|
||||
axum::serve(listener, router).await?;
|
||||
|
||||
|
|
|
@ -1,134 +0,0 @@
|
|||
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
|
||||
|
||||
use upub::{model, Context};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum JobError {
|
||||
#[error("database error: {0:?}")]
|
||||
Database(#[from] sea_orm::DbErr),
|
||||
|
||||
#[error("invalid payload json: {0:?}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
|
||||
#[error("malformed payload: {0}")]
|
||||
Malformed(#[from] apb::FieldErr),
|
||||
|
||||
#[error("malformed job: missing payload")]
|
||||
MissingPayload,
|
||||
|
||||
#[error("no available implementation to process job")]
|
||||
Unprocessable,
|
||||
|
||||
#[error("error processing activity: {0:?}")]
|
||||
ProcessorError(#[from] upub::traits::process::ProcessorError),
|
||||
}
|
||||
|
||||
pub type JobResult<T> = Result<T, JobError>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait JobDispatcher : Sized {
|
||||
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
|
||||
async fn lock(&self, job_internal: i64) -> JobResult<bool>;
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>);
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl JobDispatcher for Context {
|
||||
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>> {
|
||||
let mut s = model::job::Entity::find()
|
||||
.filter(model::job::Column::NotBefore.lte(chrono::Utc::now()));
|
||||
|
||||
if let Some(t) = filter {
|
||||
s = s.filter(model::job::Column::JobType.eq(t));
|
||||
}
|
||||
|
||||
Ok(
|
||||
s
|
||||
.order_by(model::job::Column::NotBefore, Order::Asc)
|
||||
.one(self.db())
|
||||
.await?
|
||||
)
|
||||
}
|
||||
|
||||
async fn lock(&self, job_internal: i64) -> JobResult<bool> {
|
||||
let res = model::job::Entity::delete(
|
||||
model::job::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::Set(job_internal),
|
||||
..Default::default()
|
||||
}
|
||||
)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
if res.rows_affected < 1 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>) {
|
||||
macro_rules! restart {
|
||||
(now) => { continue };
|
||||
() => {
|
||||
{
|
||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut pool = tokio::task::JoinSet::new();
|
||||
|
||||
loop {
|
||||
let job = match self.poll(job_filter).await {
|
||||
Ok(Some(j)) => j,
|
||||
Ok(None) => restart!(),
|
||||
Err(e) => {
|
||||
tracing::error!("error polling for jobs: {e}");
|
||||
restart!()
|
||||
},
|
||||
};
|
||||
|
||||
match self.lock(job.internal).await {
|
||||
Ok(true) => {},
|
||||
Ok(false) => restart!(now),
|
||||
Err(e) => {
|
||||
tracing::error!("error locking job: {e}");
|
||||
restart!()
|
||||
},
|
||||
}
|
||||
|
||||
if job.expired() {
|
||||
tracing::info!("dropping expired job {job:?}");
|
||||
restart!(now);
|
||||
}
|
||||
|
||||
let _ctx = self.clone();
|
||||
pool.spawn(async move {
|
||||
let res = match job.job_type {
|
||||
model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await,
|
||||
model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await,
|
||||
model::job::JobType::Local => crate::local::process(_ctx.clone(), &job).await,
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::error!("failed processing job #{}: {e}", job.internal);
|
||||
let active = job.clone().repeat();
|
||||
if let Err(e) = model::job::Entity::insert(active)
|
||||
.exec(_ctx.db())
|
||||
.await
|
||||
{
|
||||
tracing::error!("could not insert back job ({e}), dropping:\n{job:#?}")
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while pool.len() >= concurrency {
|
||||
if let Some(Err(e)) = pool.join_next().await {
|
||||
tracing::error!("failed joining processing task: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
use upub::traits::Processor;
|
||||
|
||||
|
||||
pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate::JobResult<()> {
|
||||
let Some(ref payload) = job.payload else {
|
||||
tracing::error!("abandoning inbound job without payload: {job:#?}");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Ok(activity) = serde_json::from_str::<serde_json::Value>(payload) else {
|
||||
tracing::error!("abandoning inbound job with invalid payload: {job:#?}");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Err(e) = ctx.process(activity).await {
|
||||
tracing::error!("failed processing job #{}: {e}", job.internal);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
pub mod dispatcher;
|
||||
pub mod inbound;
|
||||
pub mod outbound;
|
||||
pub mod local;
|
||||
|
||||
pub use dispatcher::{JobError, JobResult};
|
||||
|
||||
pub fn spawn(
|
||||
ctx: upub::Context,
|
||||
concurrency: usize,
|
||||
poll: u64,
|
||||
filter: Option<upub::model::job::JobType>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
use dispatcher::JobDispatcher;
|
||||
tokio::spawn(async move {
|
||||
ctx.run(concurrency, poll, filter).await
|
||||
})
|
||||
}
|
|
@ -1,63 +0,0 @@
|
|||
use sea_orm::EntityTrait;
|
||||
use reqwest::Method;
|
||||
|
||||
use apb::{LD, Node, ActivityMut};
|
||||
use upub::{Context, model, traits::Fetcher};
|
||||
|
||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
||||
|
||||
let payload = match model::activity::Entity::find_by_ap_id(&job.activity)
|
||||
.find_also_related(model::object::Entity)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
{
|
||||
Some((activity, None)) => activity.ap().ld_context(),
|
||||
Some((activity, Some(object))) => {
|
||||
let always_embed = matches!(
|
||||
activity.activity_type,
|
||||
apb::ActivityType::Create
|
||||
| apb::ActivityType::Undo
|
||||
| apb::ActivityType::Update
|
||||
| apb::ActivityType::Accept(_)
|
||||
| apb::ActivityType::Reject(_)
|
||||
);
|
||||
if always_embed {
|
||||
activity.ap().set_object(Node::object(object.ap())).ld_context()
|
||||
} else {
|
||||
activity.ap().ld_context()
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::info!("skipping dispatch for deleted object {}", job.activity);
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
else {
|
||||
tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some(key) = actor.private_key
|
||||
else {
|
||||
tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Err(e) = Context::request(
|
||||
Method::POST, job.target.as_deref().unwrap_or(""),
|
||||
Some(&serde_json::to_string(&payload).unwrap()),
|
||||
&job.actor, &key, ctx.domain()
|
||||
).await {
|
||||
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
|
||||
model::job::Entity::insert(job.clone().repeat())
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue