feat: added worker and monolith modes

This commit is contained in:
əlemi 2024-06-06 02:21:36 +02:00
parent bbcc46d0ee
commit c83e1df110
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 96 additions and 26 deletions

46
Cargo.lock generated
View file

@ -4693,9 +4693,10 @@ name = "upub"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"apb", "apb",
"axum", "async-trait",
"base64 0.22.1", "base64 0.22.1",
"chrono", "chrono",
"httpsign",
"jrd", "jrd",
"mdhtml", "mdhtml",
"nodeinfo", "nodeinfo",
@ -4718,7 +4719,7 @@ dependencies = [
[[package]] [[package]]
name = "upub-bin" name = "upub-bin"
version = "0.2.0" version = "0.3.0"
dependencies = [ dependencies = [
"clap", "clap",
"sea-orm", "sea-orm",
@ -4730,6 +4731,7 @@ dependencies = [
"upub-cli", "upub-cli",
"upub-migrations", "upub-migrations",
"upub-routes", "upub-routes",
"upub-worker",
] ]
[[package]] [[package]]
@ -4746,7 +4748,6 @@ dependencies = [
"sha256", "sha256",
"tracing", "tracing",
"upub", "upub",
"upub-processor",
"uuid", "uuid",
] ]
@ -4757,25 +4758,6 @@ dependencies = [
"sea-orm-migration", "sea-orm-migration",
] ]
[[package]]
name = "upub-processor"
version = "0.2.0"
dependencies = [
"apb",
"async-trait",
"chrono",
"httpsign",
"jrd",
"mdhtml",
"reqwest",
"sea-orm",
"serde_json",
"thiserror",
"tokio",
"tracing",
"upub",
]
[[package]] [[package]]
name = "upub-routes" name = "upub-routes"
version = "0.2.0" version = "0.2.0"
@ -4829,6 +4811,26 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "upub-worker"
version = "0.2.0"
dependencies = [
"apb",
"async-trait",
"chrono",
"httpsign",
"jrd",
"mdhtml",
"regex",
"reqwest",
"sea-orm",
"serde_json",
"thiserror",
"tokio",
"tracing",
"upub",
]
[[package]] [[package]]
name = "uriproxy" name = "uriproxy"
version = "0.1.0" version = "0.1.0"

View file

@ -5,7 +5,7 @@ members = [
"upub/cli", "upub/cli",
"upub/migrations", "upub/migrations",
"upub/routes", "upub/routes",
"upub/processor", "upub/worker",
"web", "web",
"utils/httpsign", "utils/httpsign",
"utils/mdhtml", "utils/mdhtml",
@ -14,7 +14,7 @@ members = [
[package] [package]
name = "upub-bin" name = "upub-bin"
version = "0.2.0" version = "0.3.0"
edition = "2021" edition = "2021"
authors = [ "alemi <me@alemi.dev>" ] authors = [ "alemi <me@alemi.dev>" ]
description = "Traits and types to handle ActivityPub objects" description = "Traits and types to handle ActivityPub objects"
@ -39,9 +39,11 @@ upub = { path = "upub/core" }
upub-cli = { path = "upub/cli", optional = true } upub-cli = { path = "upub/cli", optional = true }
upub-migrations = { path = "upub/migrations", optional = true } upub-migrations = { path = "upub/migrations", optional = true }
upub-routes = { path = "upub/routes", optional = true } upub-routes = { path = "upub/routes", optional = true }
upub-worker = { path = "upub/worker", optional = true }
[features] [features]
default = ["serve", "migrate", "cli"] default = ["serve", "migrate", "cli", "worker"]
serve = ["dep:upub-routes"] serve = ["dep:upub-routes"]
migrate = ["dep:upub-migrations"] migrate = ["dep:upub-migrations"]
cli = ["dep:upub-cli"] cli = ["dep:upub-cli"]
worker = ["dep:upub-worker"]

68
main.rs
View file

@ -11,6 +11,9 @@ use upub_migrations as migrations;
#[cfg(feature = "serve")] #[cfg(feature = "serve")]
use upub_routes as routes; use upub_routes as routes;
#[cfg(feature = "worker")]
use upub_worker as worker;
#[derive(Parser)] #[derive(Parser)]
/// all names were taken /// all names were taken
@ -53,13 +56,52 @@ enum Mode {
command: cli::CliCommand, command: cli::CliCommand,
}, },
#[cfg(all(feature = "serve", feature = "worker"))]
/// start both api routes and background workers
Monolith {
#[arg(short, long, default_value="127.0.0.1:3000")]
/// addr to bind and serve onto
bind: String,
#[arg(short, long, default_value_t = 4)]
/// how many concurrent jobs to process with this worker
tasks: usize,
#[arg(short, long, default_value_t = 20)]
/// interval for polling new tasks
poll: u64,
},
#[cfg(feature = "serve")] #[cfg(feature = "serve")]
/// run fediverse server /// start api routes server
Serve { Serve {
#[arg(short, long, default_value="127.0.0.1:3000")] #[arg(short, long, default_value="127.0.0.1:3000")]
/// addr to bind and serve onto /// addr to bind and serve onto
bind: String, bind: String,
}, },
#[cfg(feature = "worker")]
/// start background job worker
Work {
/// only run tasks of this type, run all if not given
filter: Filter,
/// how many concurrent jobs to process with this worker
#[arg(short, long, default_value_t = 4)]
tasks: usize,
#[arg(short, long, default_value_t = 20)]
/// interval for polling new tasks
poll: u64,
},
}
#[derive(Debug, Clone, clap::ValueEnum)]
enum Filter {
All,
Local,
Inbound,
Outbound,
} }
#[tokio::main] #[tokio::main]
@ -118,5 +160,29 @@ async fn main() {
Mode::Serve { bind } => Mode::Serve { bind } =>
routes::serve(ctx, bind) routes::serve(ctx, bind)
.await.expect("failed serving api routes"), .await.expect("failed serving api routes"),
#[cfg(feature = "worker")]
Mode::Work { filter, tasks, poll } =>
worker::spawn(ctx, tasks, poll, filter.into())
.await.expect("failed running worker"),
#[cfg(all(feature = "serve", feature = "worker"))]
Mode::Monolith { bind, tasks, poll } => {
worker::spawn(ctx.clone(), tasks, poll, None);
routes::serve(ctx, bind)
.await.expect("failed serving api routes");
},
}
}
impl From<Filter> for Option<upub::model::job::JobType> {
fn from(value: Filter) -> Self {
match value {
Filter::All => None,
Filter::Local => Some(upub::model::job::JobType::Local),
Filter::Inbound => Some(upub::model::job::JobType::Inbound),
Filter::Outbound => Some(upub::model::job::JobType::Outbound),
}
} }
} }