diff --git a/Cargo.lock b/Cargo.lock index b31fefd..4c5aa34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -81,6 +93,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.83" @@ -311,6 +334,18 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.2.0" @@ -386,12 +421,30 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -668,7 +721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -705,6 +758,17 @@ version = "0.2.167" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -927,6 +991,20 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1152,6 +1230,26 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -1295,13 +1393,17 @@ checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" [[package]] name = "uppe-rs" -version = "0.1.0" +version = "0.2.0" dependencies = [ + "async-recursion", "axum", "chrono", "clap", "reqwest", + "rusqlite", "serde", + "serde_json", + "thiserror", "tokio", "toml", ] @@ -1341,6 +1443,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" @@ -1600,6 +1708,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 714433d..0e79478 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uppe-rs" -version = "0.1.0" +version = "0.2.0" edition = "2021" [dependencies] @@ -11,3 +11,11 @@ tokio = { version = "1.41.1", features = ["rt", "macros"] } reqwest = { version = "0.12.9", default-features = false, features = ["default-tls", "native-tls"] } chrono = { version = "0.4.38", features = ["serde"] } axum = "0.7.9" +rusqlite = "0.32.1" +thiserror = "2.0.3" +serde_json = "1.0.133" +async-recursion = "1.1.1" + +[features] +default = [] +bundled = ["rusqlite/bundled"] diff --git a/src/main.rs b/src/main.rs index 63adfd6..ccbc87b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,16 @@ - -use std::{collections::{HashMap, VecDeque}, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use clap::Parser; -use tokio::sync::RwLock; +use rusqlite::{named_params, params, Connection, OptionalExtension}; +use tokio::sync::Mutex; #[derive(Parser)] struct Cli { + /// path to storage file, if not given will operate in memory + storage: Option, + /// path to config file - #[arg(short, long, default_value = "uppe-rs.toml")] + #[arg(short, long, default_value = "uppe.toml")] config: String, /// host to bind api onto @@ -21,7 +24,7 @@ struct Config { service: std::collections::BTreeMap, /// how many samples of history to keep - history: usize, + //history: usize, /// poll services at this interval interval_s: u64, @@ -36,61 +39,6 @@ struct Service { interval_s: Option, } -type AppState = Arc>; - -type Event = (i64, Option); - -struct StateStorage { - size: usize, - store: HashMap>, -} - -impl StateStorage { - fn new(size: usize) -> AppState { - Arc::new(RwLock::new(Self { - size, store: HashMap::default(), - })) - } - - fn get(&self, k: &str) -> Vec { - match self.store.get(k) { - Some(x) => x.clone().into(), - None => Vec::new(), - } - } - - fn put(&mut self, key: &str, timestamp: i64, rtt: Option) { - match self.store.get_mut(key) { - Some(x) => { - x.push_back((timestamp, rtt)); - while x.len() > self.size { - x.pop_front(); - } - }, - None => { - let mut q = VecDeque::new(); - q.push_back((timestamp, rtt)); - self.store.insert(key.to_string(), q); - }, - } - } - - fn up(&self, key: &str) -> bool { - match self.store.get(key) { - None => false, // this key is not being tracked, or we never polled it - Some(x) => match x.back() { - None => false, // this key has never been polled yet - Some((_, None)) => false, // last poll was a failure - Some((_, Some(_))) => true, // last poll was a success - } - } - } - - fn services(&self) -> Vec { - self.store.keys().cloned().collect() - } -} - fn main() { let cli = Cli::parse(); @@ -100,32 +48,37 @@ fn main() { let config = toml::from_str::(&raw_config) .expect("invalid config format"); + let db = Database::open(cli.storage.as_deref()) + .expect("failed instantiating database"); + tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("could not create tokio runtime") - .block_on(entry(cli, config)) + .block_on(entry(cli, config, db)) .expect("event loop terminated with error"); } -async fn entry(cli: Cli, config: Config) -> Result<(), Box> { - let state = StateStorage::new(config.history); +async fn entry(cli: Cli, config: Config, db: Database) -> Result<(), Box> { let default_interval = config.interval_s; for (key, service) in config.service { let interval = service.interval_s.unwrap_or(default_interval); - let state = state.clone(); + let db = db.clone(); + let sid = db.sid(&key).await?; tokio::spawn(async move { loop { let res = test(&service.endpoint).await; - let timestamp = chrono::Utc::now().timestamp(); - match res { - Ok(rtt) => state.write().await.put(&key, timestamp, Some(rtt)), + let value = match res { + Ok(rtt) => Some(rtt), Err(e) => { - eprintln!("[!] error polling service {key}: {e}"); - state.write().await.put(&key, timestamp, None); + eprintln!(" ? error polling service {key}: {e} -- {e:?}"); + None }, + }; + if let Err(e) = db.insert(sid, value).await { + eprintln!("[!] error inserting value in database: {e} -- {e:?}"); } tokio::time::sleep(std::time::Duration::from_secs(interval)).await; } @@ -137,7 +90,7 @@ async fn entry(cli: Cli, config: Config) -> Result<(), Box Result<(), Box = Result, ApiError>; + +#[derive(Debug, thiserror::Error)] +enum ApiError { + #[error("error interacting with database: {0}")] + Db(#[from] rusqlite::Error), +} + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + match self { + ApiError::Db(error) => ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "error": "database", + "message": format!("{error}"), + "struct": format!("{error:?}"), + })) + ).into_response(), + } + } +} + async fn root() -> Html<&'static str> { Html(include_str!("../index.html")) } @@ -158,23 +138,147 @@ async fn test(url: &str) -> reqwest::Result { Ok(delta.num_milliseconds()) } -use axum::{extract::{Path, State}, response::Html, Json}; +use axum::{extract::{Path, Query, State}, response::{Html, IntoResponse}, Json}; + +#[derive(serde::Deserialize)] +struct StatusQuery { + since: Option, +} async fn api_status( - State(state): State, -) -> Json> { - let services = state.read().await.services(); - let mut out = HashMap::new(); - for service in services { - let up = state.read().await.up(&service); - out.insert(service, up); + State(db): State, + Query(q): Query, +) -> ApiResult>> { + let mut state = HashMap::new(); + let five_min_ago = (chrono::Utc::now() - chrono::Duration::minutes(5)).timestamp(); + let since = q.since.unwrap_or(five_min_ago); + for (sid, name) in db.services().await? { + state.insert( + name, + db.up(sid, since).await? + ); } - Json(out) + Ok(Json(state)) +} + +#[derive(serde::Deserialize)] +struct ServiceStatusQuery { + limit: Option, } async fn api_status_service( - State(state): State, + State(db): State, Path(service): axum::extract::Path, -) -> Json)>> { - Json(state.read().await.get(&service)) + Query(q): Query, +) -> ApiResult)>> { + let limit = q.limit.unwrap_or(50).min(250); + let sid = db.sid(&service).await?; + Ok(Json(db.get(sid, Some(limit)).await?)) +} + + +// ============= DATABASE + + +type Event = (i64, Option); + +#[derive(Clone)] +struct Database(Arc>); + +impl Database { + fn open(path: Option<&str>) -> rusqlite::Result { + let db = match path { + Some(p) => Connection::open(p)?, + None => Connection::open_in_memory()?, + }; + db.execute( + "CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTO INCREMENT, + service INTEGER NOT NULL, + time BIG INTEGER NOT NULL, + value BIG INTEGER NULL, + )", params![] + )?; + + db.execute( + "CREATE INDEX IF NOT EXISTS event_per_service + ON events (service)", + params![], + )?; + + db.execute( + "CREATE TABLE IF NOT EXISTS services ( + id INTEGER PRIMARY KEY AUTO INCREMENT, + name STRING NOT NULL, + )", params![] + )?; + + db.execute( + "CREATE INDEX IF NOT EXISTS services_names_lookup + ON services (name)", + params![], + )?; + + Ok(Self(Arc::new(Mutex::new(db)))) + } + + async fn services(&self) -> rusqlite::Result> { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT * FROM services")?; + let res = stmt.query_map( + params![], + |row| Ok((row.get(0)?, row.get(1)?)) + )?; + Ok(res.filter_map(|x| x.ok()).collect()) + } + + async fn insert(&self, sid: i64, value: Option) -> rusqlite::Result<()> { + self.0.lock().await.execute( + "INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)", + named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value } + )?; + + Ok(()) + } + + async fn get(&self, sid: i64, limit: Option) -> rusqlite::Result> { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT time, value FROM events WHERE sid = :sid LIMIT :limit")?; + let results = stmt.query_map( + named_params! { ":sid": sid, ":limit": limit }, + |row| Ok((row.get(0)?, row.get(1).optional()?)), + )?; + + Ok( + results + .filter_map(|x| x.ok()) + .collect() + ) + } + + #[async_recursion::async_recursion] + async fn sid(&self, service: &str) -> rusqlite::Result { + let res = { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?; + stmt.query_row(params![service], |row| row.get(0)).optional()? + }; + + match res { + Some(sid) => Ok(sid), + None => { + self.0.lock().await.execute("INSERT INTO services(name) VALUES ?", params![service])?; + self.sid(service).await + } + } + } + + async fn up(&self, sid: i64, since: i64) -> rusqlite::Result> { + let db = self.0.lock().await; + let mut stmt = db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time")?; + stmt.query_row( + named_params! { ":sid": sid, ":time": since }, + |row| row.get(0).optional() + ) + } }