diff --git a/Cargo.toml b/Cargo.toml index 835a7aa..1a8c8c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" edition = "2021" [dependencies] -rocket = { version = "0.5.0-rc.2", features = ["json"] } systemstat = "0.2" -serde = "1" +serde_json = "1" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1.33", features = ["macros", "rt-multi-thread"] } +clap = { version = "4.4", features = ["derive"] } +axum = "0.6" +tracing-subscriber = "0.3.17" +tracing = "0.1.40" diff --git a/src/main.rs b/src/main.rs index 6fde423..806d923 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,189 +1,267 @@ -#[macro_use] -extern crate rocket; +use std::{collections::HashMap, sync::Arc}; -use std::collections::HashMap; -use std::time::Duration; +use axum::{response::{IntoResponse, Response}, http::StatusCode, extract::{State, Json, Query}}; +use clap::Parser; -use rocket::{ - response::status::Unauthorized, - serde::{json::Json, Serialize}, - tokio::{sync::Mutex, time::sleep}, - State, -}; -use systemstat::{CPULoad, DelayedMeasurement, Platform, System, BTreeMap, BlockDeviceStats}; +use serde::{Serialize, Deserialize}; +use systemstat::{CPULoad, DelayedMeasurement, Platform, System, Duration}; +use tokio::sync::Mutex; -fn to_403_err(e: T) -> Unauthorized -where - T: std::error::Error, -{ - Unauthorized(Some(e.to_string())) + +// [==========] Error mapping to responses +pub enum AppError { + IoError(std::io::Error), + NoSuchDisk, } -#[derive(Serialize)] -struct SystemInfoView { +impl From for AppError { + fn from(inner: std::io::Error) -> Self { + AppError::IoError(inner) + } +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let (status, error_message) = match self { + AppError::IoError(e) => { + (StatusCode::INTERNAL_SERVER_ERROR, format!("I/O error: {}", e)) + } + AppError::NoSuchDisk => { + (StatusCode::NOT_FOUND, "Requested block device doesn't exist".into()) + } + }; + + let body = Json(serde_json::json!({ + "error": error_message, + })); + + (status, body).into_response() + } +} + +pub type AppResult = Result; + + + +// [==========] CLI args +#[derive(Parser)] +/// simple web server exposing server cpu, memory, disk and network load +struct CliArgs { + /// address to bind onto + #[arg(default_value = "127.0.0.1:8484")] + addr: String, + + /// block device to monitor for read and writes + #[arg(long, short, default_value = "sda2")] + disk: String, + + /// network interface to monitor for traffic + #[arg(long, short, default_value = "eth0")] + net: String, + + /// run axum server with debug logging + #[arg(long, default_value_t = false)] + debug: bool, +} + + + +// [==========] App State +struct Tuple(u64, u64); + +impl From<(u64, u64)> for Tuple { + fn from(value: (u64, u64)) -> Self { + Tuple(value.0, value.1) + } +} + +impl Tuple { + fn delta(&mut self, first: u64, second: u64) -> (u64, u64) { + let delta = (first - self.0, second - self.1); + self.0 = first; + self.1 = second; + delta + } +} + + +pub struct StateDeltaStoreInner { + args: CliArgs, + store_disk: HashMap, + store_net: HashMap, + store_cpu: HashMap>, +} + +impl From:: for StateDeltaStore { + fn from(args: CliArgs) -> Self { + Arc::new(Mutex::new(StateDeltaStoreInner { + args, + store_disk: HashMap::default(), + store_net: HashMap::default(), + store_cpu: HashMap::default() + })) + } +} + +pub type StateDeltaStore = Arc>; + + + +// [==========] App types +#[derive(Debug, Clone, Serialize)] +pub struct SystemInfoView { session: String, mem: f32, cpu: f32, + net: NetworkView, + disk: DiskView, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NetworkView { tx: f32, rx: f32, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DiskView { r: f32, w: f32, } -struct DiskDeltaTracker { - r: u64, - w: u64, +#[derive(Deserialize)] +pub struct SystemInfoControls { + session: Option, + delay: Option, } -impl Default for DiskDeltaTracker { - fn default() -> Self { - DiskDeltaTracker { r: 0, w: 0 } - } -} -impl DiskDeltaTracker { - pub fn delta(&mut self, r: u64, w: u64) -> (u64, u64) { - let out = (r - self.r, w - self.w); - self.r = r; - self.w = w; - return out; - } -} -struct NetDeltaTracker { - tx: u64, - rx: u64, -} - -impl Default for NetDeltaTracker { - fn default() -> Self { - NetDeltaTracker { tx: 0, rx: 0 } - } -} - -impl NetDeltaTracker { - pub fn delta(&mut self, tx: u64, rx: u64) -> (u64, u64) { - let out = (tx - self.tx, rx - self.rx); - self.tx = tx; - self.rx = rx; - return out; - } -} - -struct CpuDeltaTracker { - load: DelayedMeasurement, -} - -struct StateDeltaStore { - pub store_disk: Mutex>, - pub store_net: Mutex>, - pub store_cpu: Mutex>, -} - -#[get("/")] -async fn stats_delta( - session: String, - state: &State, -) -> Result, Unauthorized> { +// [==========] Routes +pub async fn stats_info( + Query(query): Query, + State(state): State, +) -> AppResult> { let sys = System::new(); - let (delta_tx, delta_rx); - let (net_tx, net_rx) = match sys.network_stats(&"enp5s0".to_string()) { - Ok(stats) => (stats.tx_bytes.as_u64(), stats.rx_bytes.as_u64()), - Err(_e) => (0, 0), + if let Some(session) = &query.session { + tracing::info!("serving session '{}'", session); + } else { + tracing::info!("serving session anonymous with {}ms delay", query.delay.unwrap_or(1000)); + } + + let (net, disk) = { + let args = &state.lock().await.args; + (args.net.clone(), args.disk.clone()) }; - { - let mut store_net = state.inner().store_net.lock().await; - if !store_net.contains_key(&session) { - store_net.insert(session.clone(), NetDeltaTracker::default()); + let (net_tx, net_rx) = { + let (tx, rx) = { + let stats = sys.network_stats(&net)?; + (stats.tx_bytes.as_u64(), stats.rx_bytes.as_u64()) + }; + + if let Some(session) = &query.session { + let store_net = &mut state.lock().await.store_net; + if !store_net.contains_key(session) { + store_net.insert(session.clone(), (0, 0).into()); + } + + let net_tracker = store_net.get_mut(session).unwrap(); + + net_tracker.delta(tx, rx) + } else { + (tx, rx) } - - let net_tracker = store_net.get_mut(&session).unwrap(); - - (delta_tx, delta_rx) = net_tracker.delta(net_tx, net_rx); - } - - let cpu; - - { - let mut store_cpu = state.inner().store_cpu.lock().await; - if !store_cpu.contains_key(&session) { - let cpu = sys.cpu_load_aggregate().map_err(to_403_err)?; - store_cpu.insert(session.clone(), CpuDeltaTracker { load: cpu }); - sleep(Duration::from_secs(1)).await; - } - - let cpu_tracker = store_cpu.get_mut(&session).unwrap(); - - let cpu_stats = cpu_tracker.load.done().map_err(to_403_err)?; - cpu = 100.0 * (1.0 - cpu_stats.idle); - - let new_cpu = sys.cpu_load_aggregate().map_err(to_403_err)?; - store_cpu.insert(session.clone(), CpuDeltaTracker { load: new_cpu }); - } - - let mem = match sys.memory() { - Ok(mem) => 100.0 * (1.0 - ((mem.free.as_u64() as f32) / (mem.total.as_u64() as f32))), - Err(_e) => -1.0, }; - let (disk_r, disk_w); + let cpu_usage = { + if let Some(session) = &query.session { + let store_cpu = &mut state.lock().await.store_cpu; + if !store_cpu.contains_key(session) { + let cpu = sys.cpu_load_aggregate()?; + store_cpu.insert(session.clone(), cpu); + tokio::time::sleep(Duration::from_millis(query.delay.unwrap_or(1000))).await; + } - { - let mut store_disk = state.inner().store_disk.lock().await; - if !store_disk.contains_key(&session) { - store_disk.insert(session.clone(), DiskDeltaTracker::default()); + let cpu_tracker = store_cpu.get_mut(session).unwrap(); + + let cpu_stats = cpu_tracker.done()?; + + let new_cpu = sys.cpu_load_aggregate()?; + store_cpu.insert(session.clone(), new_cpu); + + 100.0 * (1.0 - cpu_stats.idle) + } else { + let cpu = sys.cpu_load_aggregate()?; + tokio::time::sleep(Duration::from_millis(query.delay.unwrap_or(1000))).await; + 100.0 * (1.0 - cpu.done()?.idle) } + }; - let net_tracker = store_disk.get_mut(&session).unwrap(); + let mem_usage = { + let mem = sys.memory()?; + 100.0 * (1.0 - ((mem.free.as_u64() as f32) / (mem.total.as_u64() as f32))) + }; - let disks = sys.block_device_statistics().map_err(to_403_err)?; - let disk = disks.get("sda2").ok_or(Unauthorized(Some("not found".to_string())))?; - (disk_r, disk_w) = net_tracker.delta(disk.read_sectors as u64, disk.write_sectors as u64); - } + let (disk_r, disk_w) = { + let (r, w) = { + let disks = sys.block_device_statistics()?; + let disk_stats = disks.get(&disk).ok_or(AppError::NoSuchDisk)?; + (disk_stats.read_sectors as u64, disk_stats.write_sectors as u64) + }; + + if let Some(session) = &query.session { + let store_disk = &mut state.lock().await.store_disk; + if !store_disk.contains_key(session) { + store_disk.insert(session.clone(), (0, 0).into()); + } + + let disk_tracker = store_disk.get_mut(session).unwrap(); + disk_tracker.delta(r, w) + } else { + (r, w) + } + }; Ok(Json(SystemInfoView { - session, - mem, - cpu, - tx: delta_tx as f32 / (1024.0 * 1024.0), - rx: delta_rx as f32 / (1024.0 * 1024.0), - r: disk_r as f32 / (1024.0 * 2.0), - w: disk_w as f32 / (1024.0 * 2.0), + session: query.session.unwrap_or("".into()), + mem: mem_usage, + cpu: cpu_usage, + net: NetworkView { + tx: net_tx as f32 / (1024.0 * 1024.0), // given in bytes + rx: net_rx as f32 / (1024.0 * 1024.0), // convert to MB + }, + disk: DiskView { + r: disk_r as f32 / (1024.0 * 2.0), // given in sectors, thus 512byte blocks + w: disk_w as f32 / (1024.0 * 2.0), // convert to MB + } })) } -#[get("/")] -async fn stats_total() -> Json { - let sys = System::new(); - let (net_tx, net_rx) = match sys.network_stats(&"eth0".to_string()) { - Ok(stats) => (stats.tx_bytes.as_u64(), stats.rx_bytes.as_u64()), - Err(_e) => (0, 0), - }; - Json(SystemInfoView { - session: "".to_string(), - mem: match sys.memory() { - Ok(mem) => 1.0 - ((mem.free.as_u64() as f32) / (mem.total.as_u64() as f32)), - Err(_e) => -1.0, - }, - cpu: 0.0, - tx: net_tx as f32 / (1024.0 * 1024.0), - rx: net_rx as f32 / (1024.0 * 1024.0), - r: 0.0, - w: 0.0, - }) -} - -#[launch] -fn rocket() -> _ { - rocket::build() - .manage(StateDeltaStore { - store_disk: Mutex::new(HashMap::new()), - store_net: Mutex::new(HashMap::new()), - store_cpu: Mutex::new(HashMap::new()), - }) - .mount("/", routes![stats_delta, stats_total]) +// [==========] main +#[tokio::main] +async fn main() { + let args = CliArgs::parse(); + + tracing_subscriber::fmt() + .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) + .init(); + + let addr = args.addr.parse().expect("error parsing addr"); + + tracing::info!("serving on {}", addr); + + let state = StateDeltaStore::from(args); + + let app = axum::Router::new() + .route("/", axum::routing::get(stats_info)) + .with_state(state); + + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .expect("error serving app"); }