Compare commits
3 commits
ec77fe6a99
...
bd9f855107
Author | SHA1 | Date | |
---|---|---|---|
bd9f855107 | |||
707525a35b | |||
fb1afda695 |
6 changed files with 305 additions and 260 deletions
|
@ -87,7 +87,7 @@
|
||||||
function cell(timestamp, rtt) {
|
function cell(timestamp, rtt) {
|
||||||
let d = new Date(timestamp * 1000);
|
let d = new Date(timestamp * 1000);
|
||||||
let warning = "";
|
let warning = "";
|
||||||
if (rtt !== null && rtt >= 1000) {
|
if (rtt !== null && rtt >= %%THRESHOLD%%) {
|
||||||
warning = " warning";
|
warning = " warning";
|
||||||
}
|
}
|
||||||
if (rtt === null) {
|
if (rtt === null) {
|
||||||
|
|
85
src/api.rs
Normal file
85
src/api.rs
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use axum::{extract::{Path, Query, State}, response::{Html, IntoResponse}, Json};
|
||||||
|
|
||||||
|
use crate::{db::Database, Config};
|
||||||
|
|
||||||
|
pub async fn serve(config: Config, db: Database, addr: &str) -> std::io::Result<()>{
|
||||||
|
// whats a jinja
|
||||||
|
let index = include_str!("../index.html")
|
||||||
|
.replacen("%%DESCRIPTION%%", config.description.as_deref().unwrap_or("keeping track of your infra's up status"), 1)
|
||||||
|
.replacen("%%THRESHOLD%%", &config.threshold.unwrap_or(1000).to_string(), 1);
|
||||||
|
|
||||||
|
let app = axum::Router::new()
|
||||||
|
.route("/", axum::routing::get(|| async { Html(index) }))
|
||||||
|
.route("/api/status", axum::routing::get(api_status))
|
||||||
|
.route("/api/status/:service", axum::routing::get(api_status_service))
|
||||||
|
.with_state(db);
|
||||||
|
|
||||||
|
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
|
// TODO graceful shutdown
|
||||||
|
// TODO maybe don't block here but allow parent to compose things?
|
||||||
|
axum::serve(listener, app).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
type ApiResult<T> = Result<Json<T>, ApiError>;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
enum ApiError {
|
||||||
|
#[error("error interacting with database: {0}")]
|
||||||
|
Db(#[from] rusqlite::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoResponse for ApiError {
|
||||||
|
fn into_response(self) -> axum::response::Response {
|
||||||
|
match self {
|
||||||
|
ApiError::Db(error) => (
|
||||||
|
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(serde_json::json!({
|
||||||
|
"error": "database",
|
||||||
|
"message": format!("{error}"),
|
||||||
|
"struct": format!("{error:?}"),
|
||||||
|
}))
|
||||||
|
).into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
struct StatusQuery {
|
||||||
|
since: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn api_status(
|
||||||
|
State(db): State<Database>,
|
||||||
|
Query(q): Query<StatusQuery>,
|
||||||
|
) -> ApiResult<HashMap<String, Option<i64>>> {
|
||||||
|
let mut state = HashMap::new();
|
||||||
|
let five_min_ago = (chrono::Utc::now() - chrono::Duration::minutes(5)).timestamp();
|
||||||
|
let since = q.since.unwrap_or(five_min_ago);
|
||||||
|
for (sid, name) in db.services().await? {
|
||||||
|
state.insert(
|
||||||
|
name,
|
||||||
|
db.up(sid, since).await?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(Json(state))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
struct ServiceStatusQuery {
|
||||||
|
limit: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn api_status_service(
|
||||||
|
State(db): State<Database>,
|
||||||
|
Path(service): axum::extract::Path<String>,
|
||||||
|
Query(q): Query<ServiceStatusQuery>,
|
||||||
|
) -> ApiResult<Vec<(i64, Option<i64>)>> {
|
||||||
|
let limit = q.limit.unwrap_or(50).min(250);
|
||||||
|
let sid = db.sid(&service, false).await?;
|
||||||
|
Ok(Json(db.get(sid, limit).await?))
|
||||||
|
}
|
28
src/config.rs
Normal file
28
src/config.rs
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize)]
|
||||||
|
pub struct Config {
|
||||||
|
/// defined services, singular because makes more sense in toml
|
||||||
|
pub service: std::collections::BTreeMap<String, Service>,
|
||||||
|
|
||||||
|
/// service description shown in web page
|
||||||
|
pub description: Option<String>,
|
||||||
|
|
||||||
|
/// requests taking longer than this limit (in ms) will be marked as "slow" in FE
|
||||||
|
pub threshold: Option<u64>,
|
||||||
|
|
||||||
|
// TODO reintroduce this! should allow to optionally trim db periodically
|
||||||
|
/// how many samples of history to keep
|
||||||
|
//history: usize,
|
||||||
|
|
||||||
|
/// poll services at this interval
|
||||||
|
pub interval_s: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize)]
|
||||||
|
pub struct Service {
|
||||||
|
/// url to query
|
||||||
|
pub endpoint: String,
|
||||||
|
|
||||||
|
/// override poll rate for this service
|
||||||
|
pub interval_s: Option<u64>,
|
||||||
|
}
|
112
src/db.rs
Normal file
112
src/db.rs
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use rusqlite::{named_params, params, Connection, OptionalExtension};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
|
||||||
|
pub type Event = (i64, Option<i64>);
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Database(Arc<Mutex<Connection>>);
|
||||||
|
|
||||||
|
impl Database {
|
||||||
|
pub fn open(path: Option<&str>) -> rusqlite::Result<Self> {
|
||||||
|
let db = match path {
|
||||||
|
Some(p) => Connection::open(p)?,
|
||||||
|
None => Connection::open_in_memory()?,
|
||||||
|
};
|
||||||
|
db.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS events (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
service INTEGER NOT NULL,
|
||||||
|
time BIGINT NOT NULL,
|
||||||
|
value BIGINT NULL
|
||||||
|
)", params![]
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS event_per_service
|
||||||
|
ON events (service)",
|
||||||
|
params![],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS services (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
name STRING NOT NULL
|
||||||
|
)", params![]
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS services_names_lookup
|
||||||
|
ON services (name)",
|
||||||
|
params![],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(Self(Arc::new(Mutex::new(db))))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn services(&self) -> rusqlite::Result<Vec<(i64, String)>> {
|
||||||
|
let db = self.0.lock().await;
|
||||||
|
let mut stmt = db.prepare("SELECT id, name FROM services")?;
|
||||||
|
let res = stmt.query_map(
|
||||||
|
params![],
|
||||||
|
|row| Ok((row.get(0)?, row.get(1)?))
|
||||||
|
)?;
|
||||||
|
Ok(res.filter_map(|x| x.ok()).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn insert(&self, sid: i64, value: Option<i64>) -> rusqlite::Result<()> {
|
||||||
|
self.0.lock().await.execute(
|
||||||
|
"INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)",
|
||||||
|
named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value }
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(&self, sid: i64, limit: i64) -> rusqlite::Result<Vec<Event>> {
|
||||||
|
let db = self.0.lock().await;
|
||||||
|
let mut stmt = db.prepare("SELECT time, value FROM events WHERE service = :sid LIMIT :limit")?;
|
||||||
|
let results = stmt.query_map(
|
||||||
|
named_params! { ":sid": sid, ":limit": limit },
|
||||||
|
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(
|
||||||
|
results
|
||||||
|
.filter_map(|x| x.ok())
|
||||||
|
.collect()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_recursion::async_recursion] // TODO can we not???
|
||||||
|
pub async fn sid(&self, service: &str, upsert: bool) -> rusqlite::Result<i64> {
|
||||||
|
let res = {
|
||||||
|
let db = self.0.lock().await;
|
||||||
|
let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?;
|
||||||
|
stmt.query_row(params![service], |row| row.get(0)).optional()?
|
||||||
|
};
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Some(sid) => Ok(sid),
|
||||||
|
None => {
|
||||||
|
if upsert {
|
||||||
|
self.0.lock().await.execute("INSERT INTO services(name) VALUES (?)", params![service])?;
|
||||||
|
self.sid(service, upsert).await
|
||||||
|
} else {
|
||||||
|
Err(rusqlite::Error::QueryReturnedNoRows)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn up(&self, sid: i64, since: i64) -> rusqlite::Result<Option<i64>> {
|
||||||
|
let db = self.0.lock().await;
|
||||||
|
let mut stmt = db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time")?;
|
||||||
|
stmt.query_row(
|
||||||
|
named_params! { ":sid": sid, ":time": since },
|
||||||
|
|row| row.get::<usize, Option<i64>>(0)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
298
src/main.rs
298
src/main.rs
|
@ -1,8 +1,13 @@
|
||||||
use std::{collections::HashMap, sync::Arc};
|
mod db;
|
||||||
|
mod api;
|
||||||
|
mod config;
|
||||||
|
mod up;
|
||||||
|
|
||||||
|
use config::Config;
|
||||||
|
use db::Database;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use rusqlite::{named_params, params, Connection, OptionalExtension};
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
|
@ -18,273 +23,48 @@ struct Cli {
|
||||||
addr: String,
|
addr: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(serde::Deserialize)]
|
|
||||||
struct Config {
|
|
||||||
/// defined services, singular because makes more sense in toml
|
|
||||||
service: std::collections::BTreeMap<String, Service>,
|
|
||||||
|
|
||||||
/// service description shown in web page
|
|
||||||
description: Option<String>,
|
|
||||||
|
|
||||||
/// how many samples of history to keep
|
|
||||||
//history: usize,
|
|
||||||
|
|
||||||
/// poll services at this interval
|
|
||||||
interval_s: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(serde::Deserialize)]
|
|
||||||
struct Service {
|
|
||||||
/// url to query
|
|
||||||
endpoint: String,
|
|
||||||
|
|
||||||
/// override poll rate for this service
|
|
||||||
interval_s: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
|
||||||
let raw_config = std::fs::read_to_string(&cli.config)
|
let raw_config = match std::fs::read_to_string(&cli.config) {
|
||||||
.expect("could not open config file");
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
println!("could not read config: {e}");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
let config = toml::from_str::<Config>(&raw_config)
|
let config = match toml::from_str::<Config>(&raw_config) {
|
||||||
.expect("invalid config format");
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
println!("invalid config file: {e}");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
let db = Database::open(cli.storage.as_deref())
|
let db = match Database::open(cli.storage.as_deref()) {
|
||||||
.expect("failed instantiating database");
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
println!("could not connect do database: {e}");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
tokio::runtime::Builder::new_current_thread()
|
if let Err(e) = tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()
|
.build()
|
||||||
.expect("could not create tokio runtime")
|
.expect("could not create tokio runtime")
|
||||||
.block_on(entry(cli, config, db))
|
.block_on(async move {
|
||||||
.expect("event loop terminated with error");
|
up::work(config.clone(), db.clone()).await?; // <<-- this spawns background workers
|
||||||
}
|
|
||||||
|
|
||||||
async fn entry(cli: Cli, config: Config, db: Database) -> Result<(), Box<dyn std::error::Error>> {
|
api::serve(config, db, &cli.addr).await?; // <<-- this blocks!
|
||||||
let default_interval = config.interval_s;
|
|
||||||
|
|
||||||
for (key, service) in config.service {
|
// TODO it's a bit weird that these two work so differently, can we make them more similar?
|
||||||
let interval = service.interval_s.unwrap_or(default_interval);
|
|
||||||
let db = db.clone();
|
|
||||||
let sid = db.sid(&key, true).await?;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
Ok::<(), Box<dyn std::error::Error>>(()) // ughhh
|
||||||
loop {
|
})
|
||||||
let res = test_route(&service.endpoint).await;
|
{
|
||||||
let value = match res {
|
println!("event loop terminated with error: {e}");
|
||||||
Ok(rtt) => Some(rtt),
|
eprintln!("{e:?}");
|
||||||
Err(e) => {
|
|
||||||
eprintln!(" ? error polling service {key}: {e} -- {e:?}");
|
|
||||||
None
|
|
||||||
},
|
|
||||||
};
|
|
||||||
if let Err(e) = db.insert(sid, value).await {
|
|
||||||
eprintln!("[!] error inserting value in database: {e} -- {e:?}");
|
|
||||||
}
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let index = include_str!("../index.html")
|
|
||||||
.replacen("%%DESCRIPTION%%", config.description.as_deref().unwrap_or("keeping track of your infra's up status"), 1);
|
|
||||||
|
|
||||||
// build our application with a single route
|
|
||||||
let app = axum::Router::new()
|
|
||||||
.route("/", axum::routing::get(|| async { Html(index) }))
|
|
||||||
.route("/api/status", axum::routing::get(api_status))
|
|
||||||
.route("/api/status/:service", axum::routing::get(api_status_service))
|
|
||||||
.with_state(db);
|
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(&cli.addr).await?;
|
|
||||||
axum::serve(listener, app).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn test_route(url: &str) -> reqwest::Result<i64> {
|
|
||||||
let before = chrono::Utc::now();
|
|
||||||
reqwest::get(url)
|
|
||||||
.await?
|
|
||||||
.error_for_status()?;
|
|
||||||
let delta = chrono::Utc::now() - before;
|
|
||||||
Ok(delta.num_milliseconds())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// ============= APIs
|
|
||||||
|
|
||||||
|
|
||||||
type ApiResult<T> = Result<Json<T>, ApiError>;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
enum ApiError {
|
|
||||||
#[error("error interacting with database: {0}")]
|
|
||||||
Db(#[from] rusqlite::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IntoResponse for ApiError {
|
|
||||||
fn into_response(self) -> axum::response::Response {
|
|
||||||
match self {
|
|
||||||
ApiError::Db(error) => (
|
|
||||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
Json(serde_json::json!({
|
|
||||||
"error": "database",
|
|
||||||
"message": format!("{error}"),
|
|
||||||
"struct": format!("{error:?}"),
|
|
||||||
}))
|
|
||||||
).into_response(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
use axum::{extract::{Path, Query, State}, response::{Html, IntoResponse}, Json};
|
|
||||||
|
|
||||||
#[derive(serde::Deserialize)]
|
|
||||||
struct StatusQuery {
|
|
||||||
since: Option<i64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn api_status(
|
|
||||||
State(db): State<Database>,
|
|
||||||
Query(q): Query<StatusQuery>,
|
|
||||||
) -> ApiResult<HashMap<String, Option<i64>>> {
|
|
||||||
let mut state = HashMap::new();
|
|
||||||
let five_min_ago = (chrono::Utc::now() - chrono::Duration::minutes(5)).timestamp();
|
|
||||||
let since = q.since.unwrap_or(five_min_ago);
|
|
||||||
for (sid, name) in db.services().await? {
|
|
||||||
state.insert(
|
|
||||||
name,
|
|
||||||
db.up(sid, since).await?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(Json(state))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(serde::Deserialize)]
|
|
||||||
struct ServiceStatusQuery {
|
|
||||||
limit: Option<i64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn api_status_service(
|
|
||||||
State(db): State<Database>,
|
|
||||||
Path(service): axum::extract::Path<String>,
|
|
||||||
Query(q): Query<ServiceStatusQuery>,
|
|
||||||
) -> ApiResult<Vec<(i64, Option<i64>)>> {
|
|
||||||
let limit = q.limit.unwrap_or(50).min(250);
|
|
||||||
let sid = db.sid(&service, false).await?;
|
|
||||||
Ok(Json(db.get(sid, limit).await?))
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// ============= DATABASE
|
|
||||||
|
|
||||||
|
|
||||||
type Event = (i64, Option<i64>);
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct Database(Arc<Mutex<Connection>>);
|
|
||||||
|
|
||||||
impl Database {
|
|
||||||
fn open(path: Option<&str>) -> rusqlite::Result<Self> {
|
|
||||||
let db = match path {
|
|
||||||
Some(p) => Connection::open(p)?,
|
|
||||||
None => Connection::open_in_memory()?,
|
|
||||||
};
|
|
||||||
db.execute(
|
|
||||||
"CREATE TABLE IF NOT EXISTS events (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
service INTEGER NOT NULL,
|
|
||||||
time BIGINT NOT NULL,
|
|
||||||
value BIGINT NULL
|
|
||||||
)", params![]
|
|
||||||
)?;
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
"CREATE INDEX IF NOT EXISTS event_per_service
|
|
||||||
ON events (service)",
|
|
||||||
params![],
|
|
||||||
)?;
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
"CREATE TABLE IF NOT EXISTS services (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
name STRING NOT NULL
|
|
||||||
)", params![]
|
|
||||||
)?;
|
|
||||||
|
|
||||||
db.execute(
|
|
||||||
"CREATE INDEX IF NOT EXISTS services_names_lookup
|
|
||||||
ON services (name)",
|
|
||||||
params![],
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(Self(Arc::new(Mutex::new(db))))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn services(&self) -> rusqlite::Result<Vec<(i64, String)>> {
|
|
||||||
let db = self.0.lock().await;
|
|
||||||
let mut stmt = db.prepare("SELECT id, name FROM services")?;
|
|
||||||
let res = stmt.query_map(
|
|
||||||
params![],
|
|
||||||
|row| Ok((row.get(0)?, row.get(1)?))
|
|
||||||
)?;
|
|
||||||
Ok(res.filter_map(|x| x.ok()).collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn insert(&self, sid: i64, value: Option<i64>) -> rusqlite::Result<()> {
|
|
||||||
self.0.lock().await.execute(
|
|
||||||
"INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)",
|
|
||||||
named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value }
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get(&self, sid: i64, limit: i64) -> rusqlite::Result<Vec<Event>> {
|
|
||||||
let db = self.0.lock().await;
|
|
||||||
let mut stmt = db.prepare("SELECT time, value FROM events WHERE service = :sid LIMIT :limit")?;
|
|
||||||
let results = stmt.query_map(
|
|
||||||
named_params! { ":sid": sid, ":limit": limit },
|
|
||||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(
|
|
||||||
results
|
|
||||||
.filter_map(|x| x.ok())
|
|
||||||
.collect()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_recursion::async_recursion]
|
|
||||||
async fn sid(&self, service: &str, upsert: bool) -> rusqlite::Result<i64> {
|
|
||||||
let res = {
|
|
||||||
let db = self.0.lock().await;
|
|
||||||
let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?;
|
|
||||||
stmt.query_row(params![service], |row| row.get(0)).optional()?
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Some(sid) => Ok(sid),
|
|
||||||
None => {
|
|
||||||
if upsert {
|
|
||||||
self.0.lock().await.execute("INSERT INTO services(name) VALUES (?)", params![service])?;
|
|
||||||
self.sid(service, upsert).await
|
|
||||||
} else {
|
|
||||||
Err(rusqlite::Error::QueryReturnedNoRows)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn up(&self, sid: i64, since: i64) -> rusqlite::Result<Option<i64>> {
|
|
||||||
let db = self.0.lock().await;
|
|
||||||
let mut stmt = db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time")?;
|
|
||||||
stmt.query_row(
|
|
||||||
named_params! { ":sid": sid, ":time": since },
|
|
||||||
|row| row.get::<usize, Option<i64>>(0)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
40
src/up.rs
Normal file
40
src/up.rs
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
use crate::{config::Config, db::Database};
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn work(config: Config, db: Database) -> Result<(), rusqlite::Error> {
|
||||||
|
let default_interval = config.interval_s;
|
||||||
|
for (key, service) in config.service {
|
||||||
|
let interval = service.interval_s.unwrap_or(default_interval);
|
||||||
|
let db = db.clone();
|
||||||
|
let sid = db.sid(&key, true).await?;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let res = test_route(&service.endpoint).await;
|
||||||
|
let value = match res {
|
||||||
|
Ok(rtt) => Some(rtt),
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!(" ? error polling service {key}: {e} -- {e:?}");
|
||||||
|
None
|
||||||
|
},
|
||||||
|
};
|
||||||
|
if let Err(e) = db.insert(sid, value).await {
|
||||||
|
eprintln!("[!] error inserting value in database: {e} -- {e:?}");
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async fn test_route(url: &str) -> reqwest::Result<i64> {
|
||||||
|
let before = chrono::Utc::now();
|
||||||
|
reqwest::get(url)
|
||||||
|
.await?
|
||||||
|
.error_for_status()?;
|
||||||
|
let delta = chrono::Utc::now() - before;
|
||||||
|
Ok(delta.num_milliseconds())
|
||||||
|
}
|
Loading…
Reference in a new issue