feat: now with 100% more sqlite

This commit is contained in:
əlemi 2024-12-02 23:29:43 +01:00
parent d2b42feab4
commit 7ef5f956b9
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 325 additions and 85 deletions

132
Cargo.lock generated
View file

@ -17,6 +17,18 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
@ -81,6 +93,17 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "async-recursion"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.83"
@ -311,6 +334,18 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.2.0"
@ -386,12 +421,30 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "heck"
version = "0.5.0"
@ -668,7 +721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.15.2",
]
[[package]]
@ -705,6 +758,17 @@ version = "0.2.167"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc"
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
@ -927,6 +991,20 @@ dependencies = [
"windows-registry",
]
[[package]]
name = "rusqlite"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -1152,6 +1230,26 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "thiserror"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tinystr"
version = "0.7.6"
@ -1295,13 +1393,17 @@ checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "uppe-rs"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-recursion",
"axum",
"chrono",
"clap",
"reqwest",
"rusqlite",
"serde",
"serde_json",
"thiserror",
"tokio",
"toml",
]
@ -1341,6 +1443,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "want"
version = "0.3.1"
@ -1600,6 +1708,26 @@ dependencies = [
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zerofrom"
version = "0.1.5"

View file

@ -1,6 +1,6 @@
[package]
name = "uppe-rs"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
[dependencies]
@ -11,3 +11,11 @@ tokio = { version = "1.41.1", features = ["rt", "macros"] }
reqwest = { version = "0.12.9", default-features = false, features = ["default-tls", "native-tls"] }
chrono = { version = "0.4.38", features = ["serde"] }
axum = "0.7.9"
rusqlite = "0.32.1"
thiserror = "2.0.3"
serde_json = "1.0.133"
async-recursion = "1.1.1"
[features]
default = []
bundled = ["rusqlite/bundled"]

View file

@ -1,13 +1,16 @@
use std::{collections::{HashMap, VecDeque}, sync::Arc};
use std::{collections::HashMap, sync::Arc};
use clap::Parser;
use tokio::sync::RwLock;
use rusqlite::{named_params, params, Connection, OptionalExtension};
use tokio::sync::Mutex;
#[derive(Parser)]
struct Cli {
/// path to storage file, if not given will operate in memory
storage: Option<String>,
/// path to config file
#[arg(short, long, default_value = "uppe-rs.toml")]
#[arg(short, long, default_value = "uppe.toml")]
config: String,
/// host to bind api onto
@ -21,7 +24,7 @@ struct Config {
service: std::collections::BTreeMap<String, Service>,
/// how many samples of history to keep
history: usize,
//history: usize,
/// poll services at this interval
interval_s: u64,
@ -36,61 +39,6 @@ struct Service {
interval_s: Option<u64>,
}
type AppState = Arc<RwLock<StateStorage>>;
type Event = (i64, Option<i64>);
struct StateStorage {
size: usize,
store: HashMap<String, VecDeque<Event>>,
}
impl StateStorage {
fn new(size: usize) -> AppState {
Arc::new(RwLock::new(Self {
size, store: HashMap::default(),
}))
}
fn get(&self, k: &str) -> Vec<Event> {
match self.store.get(k) {
Some(x) => x.clone().into(),
None => Vec::new(),
}
}
fn put(&mut self, key: &str, timestamp: i64, rtt: Option<i64>) {
match self.store.get_mut(key) {
Some(x) => {
x.push_back((timestamp, rtt));
while x.len() > self.size {
x.pop_front();
}
},
None => {
let mut q = VecDeque::new();
q.push_back((timestamp, rtt));
self.store.insert(key.to_string(), q);
},
}
}
fn up(&self, key: &str) -> bool {
match self.store.get(key) {
None => false, // this key is not being tracked, or we never polled it
Some(x) => match x.back() {
None => false, // this key has never been polled yet
Some((_, None)) => false, // last poll was a failure
Some((_, Some(_))) => true, // last poll was a success
}
}
}
fn services(&self) -> Vec<String> {
self.store.keys().cloned().collect()
}
}
fn main() {
let cli = Cli::parse();
@ -100,32 +48,37 @@ fn main() {
let config = toml::from_str::<Config>(&raw_config)
.expect("invalid config format");
let db = Database::open(cli.storage.as_deref())
.expect("failed instantiating database");
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("could not create tokio runtime")
.block_on(entry(cli, config))
.block_on(entry(cli, config, db))
.expect("event loop terminated with error");
}
async fn entry(cli: Cli, config: Config) -> Result<(), Box<dyn std::error::Error>> {
let state = StateStorage::new(config.history);
async fn entry(cli: Cli, config: Config, db: Database) -> Result<(), Box<dyn std::error::Error>> {
let default_interval = config.interval_s;
for (key, service) in config.service {
let interval = service.interval_s.unwrap_or(default_interval);
let state = state.clone();
let db = db.clone();
let sid = db.sid(&key).await?;
tokio::spawn(async move {
loop {
let res = test(&service.endpoint).await;
let timestamp = chrono::Utc::now().timestamp();
match res {
Ok(rtt) => state.write().await.put(&key, timestamp, Some(rtt)),
let value = match res {
Ok(rtt) => Some(rtt),
Err(e) => {
eprintln!("[!] error polling service {key}: {e}");
state.write().await.put(&key, timestamp, None);
eprintln!(" ? error polling service {key}: {e} -- {e:?}");
None
},
};
if let Err(e) = db.insert(sid, value).await {
eprintln!("[!] error inserting value in database: {e} -- {e:?}");
}
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
}
@ -137,7 +90,7 @@ async fn entry(cli: Cli, config: Config) -> Result<(), Box<dyn std::error::Error
.route("/", axum::routing::get(root))
.route("/api/status", axum::routing::get(api_status))
.route("/api/status/:service", axum::routing::get(api_status_service))
.with_state(state);
.with_state(db);
let listener = tokio::net::TcpListener::bind(&cli.addr).await?;
axum::serve(listener, app).await?;
@ -145,6 +98,33 @@ async fn entry(cli: Cli, config: Config) -> Result<(), Box<dyn std::error::Error
Ok(())
}
// ============= APIs
type ApiResult<T> = Result<Json<T>, ApiError>;
#[derive(Debug, thiserror::Error)]
enum ApiError {
#[error("error interacting with database: {0}")]
Db(#[from] rusqlite::Error),
}
impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
match self {
ApiError::Db(error) => (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "database",
"message": format!("{error}"),
"struct": format!("{error:?}"),
}))
).into_response(),
}
}
}
async fn root() -> Html<&'static str> {
Html(include_str!("../index.html"))
}
@ -158,23 +138,147 @@ async fn test(url: &str) -> reqwest::Result<i64> {
Ok(delta.num_milliseconds())
}
use axum::{extract::{Path, State}, response::Html, Json};
use axum::{extract::{Path, Query, State}, response::{Html, IntoResponse}, Json};
#[derive(serde::Deserialize)]
struct StatusQuery {
since: Option<i64>,
}
async fn api_status(
State(state): State<AppState>,
) -> Json<HashMap<String, bool>> {
let services = state.read().await.services();
let mut out = HashMap::new();
for service in services {
let up = state.read().await.up(&service);
out.insert(service, up);
State(db): State<Database>,
Query(q): Query<StatusQuery>,
) -> ApiResult<HashMap<String, Option<i64>>> {
let mut state = HashMap::new();
let five_min_ago = (chrono::Utc::now() - chrono::Duration::minutes(5)).timestamp();
let since = q.since.unwrap_or(five_min_ago);
for (sid, name) in db.services().await? {
state.insert(
name,
db.up(sid, since).await?
);
}
Json(out)
Ok(Json(state))
}
#[derive(serde::Deserialize)]
struct ServiceStatusQuery {
limit: Option<i64>,
}
async fn api_status_service(
State(state): State<AppState>,
State(db): State<Database>,
Path(service): axum::extract::Path<String>,
) -> Json<Vec<(i64, Option<i64>)>> {
Json(state.read().await.get(&service))
Query(q): Query<ServiceStatusQuery>,
) -> ApiResult<Vec<(i64, Option<i64>)>> {
let limit = q.limit.unwrap_or(50).min(250);
let sid = db.sid(&service).await?;
Ok(Json(db.get(sid, Some(limit)).await?))
}
// ============= DATABASE
type Event = (i64, Option<i64>);
#[derive(Clone)]
struct Database(Arc<Mutex<Connection>>);
impl Database {
fn open(path: Option<&str>) -> rusqlite::Result<Self> {
let db = match path {
Some(p) => Connection::open(p)?,
None => Connection::open_in_memory()?,
};
db.execute(
"CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTO INCREMENT,
service INTEGER NOT NULL,
time BIG INTEGER NOT NULL,
value BIG INTEGER NULL,
)", params![]
)?;
db.execute(
"CREATE INDEX IF NOT EXISTS event_per_service
ON events (service)",
params![],
)?;
db.execute(
"CREATE TABLE IF NOT EXISTS services (
id INTEGER PRIMARY KEY AUTO INCREMENT,
name STRING NOT NULL,
)", params![]
)?;
db.execute(
"CREATE INDEX IF NOT EXISTS services_names_lookup
ON services (name)",
params![],
)?;
Ok(Self(Arc::new(Mutex::new(db))))
}
async fn services(&self) -> rusqlite::Result<Vec<(i64, String)>> {
let db = self.0.lock().await;
let mut stmt = db.prepare("SELECT * FROM services")?;
let res = stmt.query_map(
params![],
|row| Ok((row.get(0)?, row.get(1)?))
)?;
Ok(res.filter_map(|x| x.ok()).collect())
}
async fn insert(&self, sid: i64, value: Option<i64>) -> rusqlite::Result<()> {
self.0.lock().await.execute(
"INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)",
named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value }
)?;
Ok(())
}
async fn get(&self, sid: i64, limit: Option<i64>) -> rusqlite::Result<Vec<Event>> {
let db = self.0.lock().await;
let mut stmt = db.prepare("SELECT time, value FROM events WHERE sid = :sid LIMIT :limit")?;
let results = stmt.query_map(
named_params! { ":sid": sid, ":limit": limit },
|row| Ok((row.get(0)?, row.get(1).optional()?)),
)?;
Ok(
results
.filter_map(|x| x.ok())
.collect()
)
}
#[async_recursion::async_recursion]
async fn sid(&self, service: &str) -> rusqlite::Result<i64> {
let res = {
let db = self.0.lock().await;
let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?;
stmt.query_row(params![service], |row| row.get(0)).optional()?
};
match res {
Some(sid) => Ok(sid),
None => {
self.0.lock().await.execute("INSERT INTO services(name) VALUES ?", params![service])?;
self.sid(service).await
}
}
}
async fn up(&self, sid: i64, since: i64) -> rusqlite::Result<Option<i64>> {
let db = self.0.lock().await;
let mut stmt = db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time")?;
stmt.query_row(
named_params! { ":sid": sid, ":time": since },
|row| row.get(0).optional()
)
}
}