From c83e1df110c0ab79d45f8c9cb2f2fe89b4de9daf Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 6 Jun 2024 02:21:36 +0200 Subject: [PATCH] feat: added worker and monolith modes --- Cargo.lock | 46 ++++++++++++++++++------------------ Cargo.toml | 8 ++++--- main.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64b1cecd..de555e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4693,9 +4693,10 @@ name = "upub" version = "0.2.0" dependencies = [ "apb", - "axum", + "async-trait", "base64 0.22.1", "chrono", + "httpsign", "jrd", "mdhtml", "nodeinfo", @@ -4718,7 +4719,7 @@ dependencies = [ [[package]] name = "upub-bin" -version = "0.2.0" +version = "0.3.0" dependencies = [ "clap", "sea-orm", @@ -4730,6 +4731,7 @@ dependencies = [ "upub-cli", "upub-migrations", "upub-routes", + "upub-worker", ] [[package]] @@ -4746,7 +4748,6 @@ dependencies = [ "sha256", "tracing", "upub", - "upub-processor", "uuid", ] @@ -4757,25 +4758,6 @@ dependencies = [ "sea-orm-migration", ] -[[package]] -name = "upub-processor" -version = "0.2.0" -dependencies = [ - "apb", - "async-trait", - "chrono", - "httpsign", - "jrd", - "mdhtml", - "reqwest", - "sea-orm", - "serde_json", - "thiserror", - "tokio", - "tracing", - "upub", -] - [[package]] name = "upub-routes" version = "0.2.0" @@ -4829,6 +4811,26 @@ dependencies = [ "web-sys", ] +[[package]] +name = "upub-worker" +version = "0.2.0" +dependencies = [ + "apb", + "async-trait", + "chrono", + "httpsign", + "jrd", + "mdhtml", + "regex", + "reqwest", + "sea-orm", + "serde_json", + "thiserror", + "tokio", + "tracing", + "upub", +] + [[package]] name = "uriproxy" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cd3c7879..15b64ee1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ members = [ "upub/cli", "upub/migrations", "upub/routes", - "upub/processor", + "upub/worker", "web", "utils/httpsign", "utils/mdhtml", @@ -14,7 +14,7 @@ members = [ [package] name = "upub-bin" -version = "0.2.0" +version = "0.3.0" edition = "2021" authors = [ "alemi " ] description = "Traits and types to handle ActivityPub objects" @@ -39,9 +39,11 @@ upub = { path = "upub/core" } upub-cli = { path = "upub/cli", optional = true } upub-migrations = { path = "upub/migrations", optional = true } upub-routes = { path = "upub/routes", optional = true } +upub-worker = { path = "upub/worker", optional = true } [features] -default = ["serve", "migrate", "cli"] +default = ["serve", "migrate", "cli", "worker"] serve = ["dep:upub-routes"] migrate = ["dep:upub-migrations"] cli = ["dep:upub-cli"] +worker = ["dep:upub-worker"] diff --git a/main.rs b/main.rs index 171d5595..61560530 100644 --- a/main.rs +++ b/main.rs @@ -11,6 +11,9 @@ use upub_migrations as migrations; #[cfg(feature = "serve")] use upub_routes as routes; +#[cfg(feature = "worker")] +use upub_worker as worker; + #[derive(Parser)] /// all names were taken @@ -53,13 +56,52 @@ enum Mode { command: cli::CliCommand, }, + #[cfg(all(feature = "serve", feature = "worker"))] + /// start both api routes and background workers + Monolith { + #[arg(short, long, default_value="127.0.0.1:3000")] + /// addr to bind and serve onto + bind: String, + + #[arg(short, long, default_value_t = 4)] + /// how many concurrent jobs to process with this worker + tasks: usize, + + #[arg(short, long, default_value_t = 20)] + /// interval for polling new tasks + poll: u64, + }, + #[cfg(feature = "serve")] - /// run fediverse server + /// start api routes server Serve { #[arg(short, long, default_value="127.0.0.1:3000")] /// addr to bind and serve onto bind: String, }, + + #[cfg(feature = "worker")] + /// start background job worker + Work { + /// only run tasks of this type, run all if not given + filter: Filter, + + /// how many concurrent jobs to process with this worker + #[arg(short, long, default_value_t = 4)] + tasks: usize, + + #[arg(short, long, default_value_t = 20)] + /// interval for polling new tasks + poll: u64, + }, +} + +#[derive(Debug, Clone, clap::ValueEnum)] +enum Filter { + All, + Local, + Inbound, + Outbound, } #[tokio::main] @@ -118,5 +160,29 @@ async fn main() { Mode::Serve { bind } => routes::serve(ctx, bind) .await.expect("failed serving api routes"), + + #[cfg(feature = "worker")] + Mode::Work { filter, tasks, poll } => + worker::spawn(ctx, tasks, poll, filter.into()) + .await.expect("failed running worker"), + + #[cfg(all(feature = "serve", feature = "worker"))] + Mode::Monolith { bind, tasks, poll } => { + worker::spawn(ctx.clone(), tasks, poll, None); + + routes::serve(ctx, bind) + .await.expect("failed serving api routes"); + }, + } +} + +impl From for Option { + fn from(value: Filter) -> Self { + match value { + Filter::All => None, + Filter::Local => Some(upub::model::job::JobType::Local), + Filter::Inbound => Some(upub::model::job::JobType::Inbound), + Filter::Outbound => Some(upub::model::job::JobType::Outbound), + } } }