From bd9f8551077b91df39a6004c7ca497e10c5d94a2 Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 3 Dec 2024 02:16:38 +0100 Subject: [PATCH] chore: huge refactor --- src/api.rs | 85 ++++++++++++++++ src/config.rs | 28 +++++ src/db.rs | 112 ++++++++++++++++++++ src/main.rs | 276 ++++---------------------------------------------- src/up.rs | 40 ++++++++ 5 files changed, 282 insertions(+), 259 deletions(-) create mode 100644 src/api.rs create mode 100644 src/config.rs create mode 100644 src/db.rs create mode 100644 src/up.rs diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..1d33e6f --- /dev/null +++ b/src/api.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; + +use axum::{extract::{Path, Query, State}, response::{Html, IntoResponse}, Json}; + +use crate::{db::Database, Config}; + +pub async fn serve(config: Config, db: Database, addr: &str) -> std::io::Result<()>{ + // whats a jinja + let index = include_str!("../index.html") + .replacen("%%DESCRIPTION%%", config.description.as_deref().unwrap_or("keeping track of your infra's up status"), 1) + .replacen("%%THRESHOLD%%", &config.threshold.unwrap_or(1000).to_string(), 1); + + let app = axum::Router::new() + .route("/", axum::routing::get(|| async { Html(index) })) + .route("/api/status", axum::routing::get(api_status)) + .route("/api/status/:service", axum::routing::get(api_status_service)) + .with_state(db); + + let listener = tokio::net::TcpListener::bind(addr).await?; + + // TODO graceful shutdown + // TODO maybe don't block here but allow parent to compose things? + axum::serve(listener, app).await?; + + Ok(()) +} + +type ApiResult = Result, ApiError>; + +#[derive(Debug, thiserror::Error)] +enum ApiError { + #[error("error interacting with database: {0}")] + Db(#[from] rusqlite::Error), +} + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + match self { + ApiError::Db(error) => ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "error": "database", + "message": format!("{error}"), + "struct": format!("{error:?}"), + })) + ).into_response(), + } + } +} + +#[derive(serde::Deserialize)] +struct StatusQuery { + since: Option, +} + +async fn api_status( + State(db): State, + Query(q): Query, +) -> ApiResult>> { + let mut state = HashMap::new(); + let five_min_ago = (chrono::Utc::now() - chrono::Duration::minutes(5)).timestamp(); + let since = q.since.unwrap_or(five_min_ago); + for (sid, name) in db.services().await? { + state.insert( + name, + db.up(sid, since).await? + ); + } + Ok(Json(state)) +} + +#[derive(serde::Deserialize)] +struct ServiceStatusQuery { + limit: Option, +} + +async fn api_status_service( + State(db): State, + Path(service): axum::extract::Path, + Query(q): Query, +) -> ApiResult)>> { + let limit = q.limit.unwrap_or(50).min(250); + let sid = db.sid(&service, false).await?; + Ok(Json(db.get(sid, limit).await?)) +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..9365abe --- /dev/null +++ b/src/config.rs @@ -0,0 +1,28 @@ + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct Config { + /// defined services, singular because makes more sense in toml + pub service: std::collections::BTreeMap, + + /// service description shown in web page + pub description: Option, + + /// requests taking longer than this limit (in ms) will be marked as "slow" in FE + pub threshold: Option, + + // TODO reintroduce this! should allow to optionally trim db periodically + /// how many samples of history to keep + //history: usize, + + /// poll services at this interval + pub interval_s: u64, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct Service { + /// url to query + pub endpoint: String, + + /// override poll rate for this service + pub interval_s: Option, +} diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..26e8539 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use rusqlite::{named_params, params, Connection, OptionalExtension}; +use tokio::sync::Mutex; + + +pub type Event = (i64, Option); + +#[derive(Clone)] +pub struct Database(Arc>); + +impl Database { + pub fn open(path: Option<&str>) -> rusqlite::Result { + let db = match path { + Some(p) => Connection::open(p)?, + None => Connection::open_in_memory()?, + }; + db.execute( + "CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + service INTEGER NOT NULL, + time BIGINT NOT NULL, + value BIGINT NULL + )", params![] + )?; + + db.execute( + "CREATE INDEX IF NOT EXISTS event_per_service + ON events (service)", + params![], + )?; + + db.execute( + "CREATE TABLE IF NOT EXISTS services ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name STRING NOT NULL + )", params![] + )?; + + db.execute( + "CREATE INDEX IF NOT EXISTS services_names_lookup + ON services (name)", + params![], + )?; + + Ok(Self(Arc::new(Mutex::new(db)))) + } + + pub async fn services(&self) -> rusqlite::Result> { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT id, name FROM services")?; + let res = stmt.query_map( + params![], + |row| Ok((row.get(0)?, row.get(1)?)) + )?; + Ok(res.filter_map(|x| x.ok()).collect()) + } + + pub async fn insert(&self, sid: i64, value: Option) -> rusqlite::Result<()> { + self.0.lock().await.execute( + "INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)", + named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value } + )?; + + Ok(()) + } + + pub async fn get(&self, sid: i64, limit: i64) -> rusqlite::Result> { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT time, value FROM events WHERE service = :sid LIMIT :limit")?; + let results = stmt.query_map( + named_params! { ":sid": sid, ":limit": limit }, + |row| Ok((row.get(0)?, row.get(1)?)), + )?; + + Ok( + results + .filter_map(|x| x.ok()) + .collect() + ) + } + + #[async_recursion::async_recursion] // TODO can we not??? + pub async fn sid(&self, service: &str, upsert: bool) -> rusqlite::Result { + let res = { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?; + stmt.query_row(params![service], |row| row.get(0)).optional()? + }; + + match res { + Some(sid) => Ok(sid), + None => { + if upsert { + self.0.lock().await.execute("INSERT INTO services(name) VALUES (?)", params![service])?; + self.sid(service, upsert).await + } else { + Err(rusqlite::Error::QueryReturnedNoRows) + } + } + } + } + + pub async fn up(&self, sid: i64, since: i64) -> rusqlite::Result> { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time")?; + stmt.query_row( + named_params! { ":sid": sid, ":time": since }, + |row| row.get::>(0) + ) + } +} diff --git a/src/main.rs b/src/main.rs index 1cf7769..07e125e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,13 @@ -use std::{collections::HashMap, sync::Arc}; +mod db; +mod api; +mod config; +mod up; + +use config::Config; +use db::Database; use clap::Parser; -use rusqlite::{named_params, params, Connection, OptionalExtension}; -use tokio::sync::Mutex; + #[derive(Parser)] struct Cli { @@ -18,33 +23,6 @@ struct Cli { addr: String, } -#[derive(serde::Deserialize)] -struct Config { - /// defined services, singular because makes more sense in toml - service: std::collections::BTreeMap, - - /// service description shown in web page - description: Option, - - /// requests taking longer than this limit (in ms) will be marked as "slow" in FE - threshold: Option, - - /// how many samples of history to keep - //history: usize, - - /// poll services at this interval - interval_s: u64, -} - -#[derive(serde::Deserialize)] -struct Service { - /// url to query - endpoint: String, - - /// override poll rate for this service - interval_s: Option, -} - fn main() { let cli = Cli::parse(); @@ -76,237 +54,17 @@ fn main() { .enable_all() .build() .expect("could not create tokio runtime") - .block_on(entry(cli, config, db)) + .block_on(async move { + up::work(config.clone(), db.clone()).await?; // <<-- this spawns background workers + + api::serve(config, db, &cli.addr).await?; // <<-- this blocks! + + // TODO it's a bit weird that these two work so differently, can we make them more similar? + + Ok::<(), Box>(()) // ughhh + }) { println!("event loop terminated with error: {e}"); eprintln!("{e:?}"); } } - -async fn entry(cli: Cli, config: Config, db: Database) -> Result<(), Box> { - let default_interval = config.interval_s; - - for (key, service) in config.service { - let interval = service.interval_s.unwrap_or(default_interval); - let db = db.clone(); - let sid = db.sid(&key, true).await?; - - tokio::spawn(async move { - loop { - let res = test_route(&service.endpoint).await; - let value = match res { - Ok(rtt) => Some(rtt), - Err(e) => { - eprintln!(" ? error polling service {key}: {e} -- {e:?}"); - None - }, - }; - if let Err(e) = db.insert(sid, value).await { - eprintln!("[!] error inserting value in database: {e} -- {e:?}"); - } - tokio::time::sleep(std::time::Duration::from_secs(interval)).await; - } - }); - } - - let index = include_str!("../index.html") - .replacen("%%DESCRIPTION%%", config.description.as_deref().unwrap_or("keeping track of your infra's up status"), 1) - .replacen("%%THRESHOLD%%", &config.threshold.unwrap_or(1000).to_string(), 1); - - // build our application with a single route - let app = axum::Router::new() - .route("/", axum::routing::get(|| async { Html(index) })) - .route("/api/status", axum::routing::get(api_status)) - .route("/api/status/:service", axum::routing::get(api_status_service)) - .with_state(db); - - let listener = tokio::net::TcpListener::bind(&cli.addr).await?; - axum::serve(listener, app).await?; - - Ok(()) -} - -async fn test_route(url: &str) -> reqwest::Result { - let before = chrono::Utc::now(); - reqwest::get(url) - .await? - .error_for_status()?; - let delta = chrono::Utc::now() - before; - Ok(delta.num_milliseconds()) -} - - -// ============= APIs - - -type ApiResult = Result, ApiError>; - -#[derive(Debug, thiserror::Error)] -enum ApiError { - #[error("error interacting with database: {0}")] - Db(#[from] rusqlite::Error), -} - -impl IntoResponse for ApiError { - fn into_response(self) -> axum::response::Response { - match self { - ApiError::Db(error) => ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "error": "database", - "message": format!("{error}"), - "struct": format!("{error:?}"), - })) - ).into_response(), - } - } -} - -use axum::{extract::{Path, Query, State}, response::{Html, IntoResponse}, Json}; - -#[derive(serde::Deserialize)] -struct StatusQuery { - since: Option, -} - -async fn api_status( - State(db): State, - Query(q): Query, -) -> ApiResult>> { - let mut state = HashMap::new(); - let five_min_ago = (chrono::Utc::now() - chrono::Duration::minutes(5)).timestamp(); - let since = q.since.unwrap_or(five_min_ago); - for (sid, name) in db.services().await? { - state.insert( - name, - db.up(sid, since).await? - ); - } - Ok(Json(state)) -} - -#[derive(serde::Deserialize)] -struct ServiceStatusQuery { - limit: Option, -} - -async fn api_status_service( - State(db): State, - Path(service): axum::extract::Path, - Query(q): Query, -) -> ApiResult)>> { - let limit = q.limit.unwrap_or(50).min(250); - let sid = db.sid(&service, false).await?; - Ok(Json(db.get(sid, limit).await?)) -} - - -// ============= DATABASE - - -type Event = (i64, Option); - -#[derive(Clone)] -struct Database(Arc>); - -impl Database { - fn open(path: Option<&str>) -> rusqlite::Result { - let db = match path { - Some(p) => Connection::open(p)?, - None => Connection::open_in_memory()?, - }; - db.execute( - "CREATE TABLE IF NOT EXISTS events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - service INTEGER NOT NULL, - time BIGINT NOT NULL, - value BIGINT NULL - )", params![] - )?; - - db.execute( - "CREATE INDEX IF NOT EXISTS event_per_service - ON events (service)", - params![], - )?; - - db.execute( - "CREATE TABLE IF NOT EXISTS services ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name STRING NOT NULL - )", params![] - )?; - - db.execute( - "CREATE INDEX IF NOT EXISTS services_names_lookup - ON services (name)", - params![], - )?; - - Ok(Self(Arc::new(Mutex::new(db)))) - } - - async fn services(&self) -> rusqlite::Result> { - let db = self.0.lock().await; - let mut stmt = db.prepare("SELECT id, name FROM services")?; - let res = stmt.query_map( - params![], - |row| Ok((row.get(0)?, row.get(1)?)) - )?; - Ok(res.filter_map(|x| x.ok()).collect()) - } - - async fn insert(&self, sid: i64, value: Option) -> rusqlite::Result<()> { - self.0.lock().await.execute( - "INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)", - named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value } - )?; - - Ok(()) - } - - async fn get(&self, sid: i64, limit: i64) -> rusqlite::Result> { - let db = self.0.lock().await; - let mut stmt = db.prepare("SELECT time, value FROM events WHERE service = :sid LIMIT :limit")?; - let results = stmt.query_map( - named_params! { ":sid": sid, ":limit": limit }, - |row| Ok((row.get(0)?, row.get(1)?)), - )?; - - Ok( - results - .filter_map(|x| x.ok()) - .collect() - ) - } - - #[async_recursion::async_recursion] - async fn sid(&self, service: &str, upsert: bool) -> rusqlite::Result { - let res = { - let db = self.0.lock().await; - let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?; - stmt.query_row(params![service], |row| row.get(0)).optional()? - }; - - match res { - Some(sid) => Ok(sid), - None => { - if upsert { - self.0.lock().await.execute("INSERT INTO services(name) VALUES (?)", params![service])?; - self.sid(service, upsert).await - } else { - Err(rusqlite::Error::QueryReturnedNoRows) - } - } - } - } - - async fn up(&self, sid: i64, since: i64) -> rusqlite::Result> { - let db = self.0.lock().await; - let mut stmt = db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time")?; - stmt.query_row( - named_params! { ":sid": sid, ":time": since }, - |row| row.get::>(0) - ) - } -} diff --git a/src/up.rs b/src/up.rs new file mode 100644 index 0000000..273508b --- /dev/null +++ b/src/up.rs @@ -0,0 +1,40 @@ +use crate::{config::Config, db::Database}; + + +pub async fn work(config: Config, db: Database) -> Result<(), rusqlite::Error> { + let default_interval = config.interval_s; + for (key, service) in config.service { + let interval = service.interval_s.unwrap_or(default_interval); + let db = db.clone(); + let sid = db.sid(&key, true).await?; + + tokio::spawn(async move { + loop { + let res = test_route(&service.endpoint).await; + let value = match res { + Ok(rtt) => Some(rtt), + Err(e) => { + eprintln!(" ? error polling service {key}: {e} -- {e:?}"); + None + }, + }; + if let Err(e) = db.insert(sid, value).await { + eprintln!("[!] error inserting value in database: {e} -- {e:?}"); + } + tokio::time::sleep(std::time::Duration::from_secs(interval)).await; + } + }); + } + + Ok(()) +} + + +async fn test_route(url: &str) -> reqwest::Result { + let before = chrono::Utc::now(); + reqwest::get(url) + .await? + .error_for_status()?; + let delta = chrono::Utc::now() - before; + Ok(delta.num_milliseconds()) +}