diff --git a/src/data/mod.rs b/src/data/mod.rs index d5b648f..7b092f3 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -2,8 +2,6 @@ pub mod entities; use std::num::ParseFloatError; -use sea_orm::{DatabaseConnection, EntityTrait}; - #[derive(Debug)] pub enum FetchError { ReqwestError(reqwest::Error), @@ -39,39 +37,3 @@ impl From for FetchError { FetchError::DbError(e) } } - -#[allow(dead_code)] -pub struct ApplicationState { - // pub run: bool, - db: DatabaseConnection, - pub panels: Vec, - pub sources: Vec, - pub metrics: Vec, - last_fetch: i64, - // pub diagnostics: RwLock>, -} - -#[allow(dead_code)] -impl ApplicationState { - pub fn new(db: DatabaseConnection) -> Result { - return Ok(ApplicationState { - db, - panels: vec![], - sources: vec![], - metrics: vec![], - last_fetch: 0, - }); - } - - pub async fn fetch(&mut self) -> Result<(), sea_orm::DbErr> { - self.panels = entities::panels::Entity::find().all(&self.db).await?; - self.sources = entities::sources::Entity::find().all(&self.db).await?; - self.metrics = entities::metrics::Entity::find().all(&self.db).await?; - self.last_fetch = chrono::Utc::now().timestamp(); - Ok(()) - } - - pub fn age(&self) -> i64 { - chrono::Utc::now().timestamp() - self.last_fetch - } -} diff --git a/src/gui/metric.rs b/src/gui/metric.rs index 63b981f..4778a50 100644 --- a/src/gui/metric.rs +++ b/src/gui/metric.rs @@ -1,8 +1,8 @@ -use eframe::{egui::{Ui, Layout, Sense, color_picker::show_color_at}, emath::Align, epaint::Color32}; +use eframe::{egui::{Ui, Layout, Sense, color_picker::show_color_at, ComboBox, TextEdit}, emath::Align, epaint::Color32}; use crate::{data::entities, util::unpack_color}; -fn color_square(ui: &mut Ui, color:Color32) { +fn _color_square(ui: &mut Ui, color:Color32) { let size = ui.spacing().interact_size; let (rect, response) = ui.allocate_exact_size(size, Sense::click()); if ui.is_rect_visible(rect) { @@ -17,9 +17,9 @@ fn color_square(ui: &mut Ui, color:Color32) { } } -pub fn metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width: f32) { +pub fn _metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width: f32) { ui.horizontal(|ui| { - color_square(ui, unpack_color(metric.color)); + _color_square(ui, unpack_color(metric.color)); ui.label(&metric.name); ui.with_layout(Layout::top_down(Align::RIGHT), |ui| { ui.horizontal(|ui| { @@ -34,32 +34,42 @@ pub fn metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width: } pub fn metric_edit_ui(ui: &mut Ui, metric: &entities::metrics::Model, panels: Option<&Vec>, width: f32) { - let _text_width = width - 195.0; + let text_width = width - 195.0; + let mut name = metric.name.clone(); + let def_str = "".into(); + let mut query_x = metric.query_x.as_ref().unwrap_or(&def_str).clone(); + let mut query_y = metric.query_y.clone(); + let mut panel_id = 0; ui.horizontal(|ui| { ui.color_edit_button_srgba(&mut unpack_color(metric.color)); - // TextEdit::singleline(&mut metric.name) - // .desired_width(text_width / 2.0) - // .hint_text("name") - // .show(ui); + TextEdit::singleline(&mut name) + .interactive(false) + .desired_width(text_width / 2.0) + .hint_text("name") + .show(ui); ui.separator(); - // TextEdit::singleline(&mut metric.query_x.unwrap_or("".into())) - // .desired_width(text_width / 4.0) - // .hint_text("x") - // .show(ui); - // TextEdit::singleline(&mut metric.query_y) - // .desired_width(text_width / 4.0) - // .hint_text("y") - // .show(ui); - if let Some(_panels) = panels { - // ComboBox::from_id_source(format!("panel-selector-{}", metric.id)) - // .width(60.0) - // .selected_text(format!("panel: {:02}", metric.panel_id)) - // .show_ui(ui, |ui| { - // ui.selectable_value(&mut metric.panel_id, -1, "None"); - // for p in panels { - // ui.selectable_value(&mut metric.panel_id, p.id, p.name.as_str()); - // } - // }); + if query_x.len() > 0 { + TextEdit::singleline(&mut query_x) + .interactive(false) + .desired_width(text_width / 4.0) + .hint_text("x") + .show(ui); + } + TextEdit::singleline(&mut query_y) + .interactive(false) + .desired_width(if query_x.len() > 0 { 0.0 } else { 15.0 } + (text_width / if query_x.len() > 0 { 4.0 } else { 2.0 })) + .hint_text("y") + .show(ui); + if let Some(panels) = panels { + ComboBox::from_id_source(format!("panel-selector-{}", metric.id)) + .width(60.0) + .selected_text(format!("panel: {:02}", metric.panel_id)) + .show_ui(ui, |ui| { + ui.selectable_value(&mut panel_id, -1, "None"); + for p in panels { + ui.selectable_value(&mut panel_id, p.id, p.name.as_str()); + } + }); } }); } diff --git a/src/gui/mod.rs b/src/gui/mod.rs index 5f4482e..72f76e7 100644 --- a/src/gui/mod.rs +++ b/src/gui/mod.rs @@ -4,10 +4,11 @@ pub mod metric; mod scaffold; +use chrono::Utc; use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel}; use tokio::sync::watch; -use crate::data::entities; +use crate::{data::entities, worker::visualizer::AppStateView}; use panel::main_content; use scaffold::{ // confirmation_popup_delete_metric, confirmation_popup_delete_source, footer, @@ -15,14 +16,17 @@ use scaffold::{ }; use source::source_panel; -pub struct App { - panels_rx: watch::Receiver>, - panels: Vec, - view_tx: watch::Sender, +use self::scaffold::footer; - sources: watch::Receiver>, - metrics: watch::Receiver>, - points: watch::Receiver>, +pub struct App { + view: AppStateView, + db_path: String, + interval: i64, + last_redraw: i64, + + panels: Vec, + width_tx: watch::Sender, + logger_view: watch::Receiver>, buffer_panel: entities::panels::Model, buffer_source: entities::sources::Model, @@ -36,24 +40,34 @@ pub struct App { impl App { pub fn new( _cc: &eframe::CreationContext, - panels_rx: watch::Receiver>, - sources: watch::Receiver>, - metrics: watch::Receiver>, - points: watch::Receiver>, - view_tx: watch::Sender, + db_path: String, + interval: i64, + view: AppStateView, + width_tx: watch::Sender, + logger_view: watch::Receiver>, ) -> Self { - let panels = panels_rx.borrow().clone(); + let panels = view.panels.borrow().clone(); Self { - panels_rx, panels, view_tx, - sources, metrics, points, + db_path, interval, panels, width_tx, view, logger_view, buffer_panel: entities::panels::Model::default(), buffer_source: entities::sources::Model::default(), buffer_metric: entities::metrics::Model::default(), + last_redraw: 0, edit: false, sidebar: true, padding: false, } } + + pub fn save_all_panels(&self) { + self.view.op.blocking_send( + crate::worker::visualizer::BackgroundAction::UpdateAllPanels { panels: self.panels.clone() } + ).unwrap(); + } + + pub fn refresh_data(&self) { + self.view.flush.blocking_send(()).unwrap(); + } } impl eframe::App for App { @@ -62,8 +76,8 @@ impl eframe::App for App { header(self, ui, frame); }); - TopBottomPanel::bottom("footer").show(ctx, |_ui| { - // footer(self.data.clone(), ctx, ui); + TopBottomPanel::bottom("footer").show(ctx, |ui| { + footer(ctx, ui, self.logger_view.clone(), self.db_path.clone(), self.view.points.borrow().len()); }); // if let Some(index) = self.deleting_metric { @@ -75,6 +89,10 @@ impl eframe::App for App { // .show(ctx, |ui| confirmation_popup_delete_source(self, ui, index)); // } + // for window in self.windows { + + // } + if self.sidebar { SidePanel::left("sources-bar") .width_range(if self.edit { 400.0..=1000.0 } else { 280.0..=680.0 }) @@ -87,7 +105,12 @@ impl eframe::App for App { }); if let Some(viewsize) = self.panels.iter().map(|p| p.view_size).max() { - self.view_tx.send(viewsize as i64).unwrap(); + self.width_tx.send(viewsize as i64).unwrap(); + } + + if Utc::now().timestamp() > self.last_redraw + self.interval { + ctx.request_repaint(); + self.last_redraw = Utc::now().timestamp(); } } } diff --git a/src/gui/panel.rs b/src/gui/panel.rs index 9c14e94..9d151c6 100644 --- a/src/gui/panel.rs +++ b/src/gui/panel.rs @@ -1,6 +1,6 @@ use chrono::{Local, Utc}; use eframe::{egui::{ - plot::{Corner, GridMark, Legend, Line, Plot, PlotPoints, PlotPoint}, + plot::{Corner, GridMark, Legend, Line, Plot}, Ui, ScrollArea, collapsing_header::CollapsingState, Context, Layout, Slider, DragValue, }, emath::Vec2}; @@ -14,7 +14,7 @@ pub fn main_content(app: &mut App, ctx: &Context, ui: &mut Ui) { ScrollArea::vertical().show(ui, |ui| { let panels = &mut app.panels; let _panels_count = panels.len(); - let metrics = app.metrics.borrow(); + let metrics = app.view.metrics.borrow(); for (index, panel) in panels.iter_mut().enumerate() { if index > 0 { ui.separator(); // only show this if there is at least one panel @@ -41,7 +41,7 @@ pub fn main_content(app: &mut App, ctx: &Context, ui: &mut Ui) { // ui.separator(); panel_title_ui(ui, panel, app.edit); }) - .body(|ui| panel_body_ui(ui, panel, &metrics, &app.points.borrow())); + .body(|ui| panel_body_ui(ui, panel, &metrics, &app.view.points.borrow())); } }); } diff --git a/src/gui/scaffold.rs b/src/gui/scaffold.rs index e4d57b1..e995ffd 100644 --- a/src/gui/scaffold.rs +++ b/src/gui/scaffold.rs @@ -1,8 +1,6 @@ -use std::sync::Arc; - use eframe::{Frame, egui::{collapsing_header::CollapsingState, Context, Ui, Layout, ScrollArea, global_dark_light_mode_switch}, emath::Align}; +use tokio::sync::watch; -use crate::data::ApplicationState; use crate::gui::App; use super::panel::panel_edit_inline_ui; @@ -68,16 +66,19 @@ pub fn header(app: &mut App, ui: &mut Ui, frame: &mut Frame) { ui.checkbox(&mut app.sidebar, "sources"); ui.separator(); if ui.button("reset").clicked() { - app.panels = app.panels_rx.borrow().clone(); + app.panels = app.view.panels.borrow().clone(); + } + ui.separator(); + if ui.button("save").clicked() { + app.save_all_panels(); + } + ui.separator(); + if ui.button("refresh").clicked() { + app.refresh_data(); } ui.separator(); ui.checkbox(&mut app.edit, "edit"); if app.edit { - if ui.button("save").clicked() { - // native_save(app.data.clone()); - app.edit = false; - } - ui.separator(); ui.label("+ panel"); panel_edit_inline_ui(ui, &mut app.buffer_panel); if ui.button("add").clicked() { @@ -97,7 +98,7 @@ pub fn header(app: &mut App, ui: &mut Ui, frame: &mut Frame) { }); } -pub fn _footer(_data: Arc, ctx: &Context, ui: &mut Ui) { +pub fn footer(ctx: &Context, ui: &mut Ui, diagnostics: watch::Receiver>, db_path: String, records: usize) { CollapsingState::load_with_default_open( ctx, ui.make_persistent_id("footer-logs"), @@ -106,8 +107,9 @@ pub fn _footer(_data: Arc, ctx: &Context, ui: &mut Ui) { .show_header(ui, |ui| { ui.horizontal(|ui| { ui.separator(); - // ui.label(data.file_path.to_str().unwrap()); // TODO maybe calculate it just once? + ui.label(db_path); // TODO maybe calculate it just once? ui.separator(); + ui.label(format!("{} records loaded", records)); // TODO put thousands separator // ui.label(human_size( // *data // .file_size @@ -131,14 +133,10 @@ pub fn _footer(_data: Arc, ctx: &Context, ui: &mut Ui) { .body(|ui| { ui.set_height(200.0); ScrollArea::vertical().show(ui, |ui| { - // let msgs = data - // .diagnostics - // .read() - // .expect("Diagnostics RwLock poisoned"); ui.separator(); - //for msg in msgs.iter() { - // ui.label(msg); - //} + for msg in diagnostics.borrow().iter() { + ui.label(msg); + } ui.separator(); }); }); diff --git a/src/gui/source.rs b/src/gui/source.rs index 6a73d57..11cad32 100644 --- a/src/gui/source.rs +++ b/src/gui/source.rs @@ -1,5 +1,5 @@ use eframe::{ - egui::{Layout, ScrollArea, Ui}, + egui::{Layout, ScrollArea, Ui, DragValue, TextEdit, Checkbox}, emath::Align, }; use rfd::FileDialog; @@ -9,7 +9,7 @@ use crate::util::deserialize_values; use crate::gui::App; use crate::data::entities; -use super::metric::{metric_display_ui, metric_edit_ui}; +use super::metric::metric_edit_ui; pub fn source_panel(app: &mut App, ui: &mut Ui) { let mut source_to_put_metric_on : Option = None; @@ -23,138 +23,126 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) { .show(ui, |ui| { // TODO only vertical! { - let sources = app.sources.borrow(); + let sources = app.view.sources.borrow(); let sources_count = sources.len(); ui.heading("Sources"); ui.separator(); for (i, source) in sources.iter().enumerate() { ui.horizontal(|ui| { - if app.edit { // show buttons to move sources up and down - ui.vertical(|ui| { - ui.add_space(10.0); - if ui.small_button("+").clicked() { - if i > 0 { - to_swap = Some(i); // TODO kinda jank but is there a better way? - } + ui.vertical(|ui| { + ui.add_space(10.0); + if ui.small_button("+").clicked() { + if i > 0 { + to_swap = Some(i); // TODO kinda jank but is there a better way? } - if ui.small_button("−").clicked() { - if i < sources_count - 1 { - to_swap = Some(i + 1); // TODO kinda jank but is there a better way? - } + } + if ui.small_button("−").clicked() { + if i < sources_count - 1 { + to_swap = Some(i + 1); // TODO kinda jank but is there a better way? } - }); - } + } + }); ui.vertical(|ui| { // actual sources list container let remaining_width = ui.available_width(); - if app.edit { - ui.group(|ui| { - ui.horizontal(|ui| { - source_edit_ui(ui, source, remaining_width - 34.0); - if ui.small_button("×").clicked() { - // app.deleting_metric = None; - // app.deleting_source = Some(i); - } - }); - let metrics = app - .metrics - .borrow(); - for (_j, metric) in metrics.iter().enumerate() { - if metric.source_id == source.id { - ui.horizontal(|ui| { - metric_edit_ui( - ui, - metric, - Some(&panels), - remaining_width - 53.0, - ); - if ui.small_button("s").clicked() { - let path = FileDialog::new() - .add_filter("csv", &["csv"]) - .set_file_name(format!("{}-{}.csv", source.name, metric.name).as_str()) - .save_file(); - if let Some(_path) = path { - // serialize_values( - // &*metric - // .data - // .read() - // .expect("Values RwLock poisoned"), - // metric, - // path, - // ) - // .expect("Could not serialize data"); - } - } - if ui.small_button("×").clicked() { - // app.deleting_source = None; - // app.deleting_metric = Some(j); - } - }); - } + ui.group(|ui| { + ui.horizontal(|ui| { + source_edit_ui(ui, source, remaining_width - 34.0); + if ui.small_button("×").clicked() { + // app.deleting_metric = None; + // app.deleting_source = Some(i); } - ui.horizontal(|ui| { - metric_edit_ui( - ui, - &mut app.buffer_metric, - None, - remaining_width - 53.0, - ); - ui.add_space(2.0); - if ui.small_button(" + ").clicked() { - source_to_put_metric_on = Some(source.id); - } - ui.add_space(1.0); // DAMN! - if ui.small_button("o").clicked() { - let path = FileDialog::new() - .add_filter("csv", &["csv"]) - .pick_file(); - if let Some(path) = path { - match deserialize_values(path) { - Ok((_name, _query_x, _query_y, _data)) => { - // let mut store = app - // .data - // .storage - // .lock() - // .expect("Storage Mutex poisoned"); - // match store.new_metric( - // name.as_str(), - // source.id, - // query_x.as_str(), - // query_y.as_str(), - // -1, - // Color32::TRANSPARENT, - // metrics.len() as i32, - // ) { - // Ok(verified_metric) => { - // store.put_values(verified_metric.id, &data).unwrap(); - // *verified_metric.data.write().expect("Values RwLock poisoned") = data; - // to_insert.push(verified_metric); - // } - // Err(e) => { - // error!(target: "ui", "could not save metric into archive : {:?}", e); - // } - // } - } - Err(e) => { - error!(target: "ui", "Could not deserialize metric from file : {:?}", e); - } + }); + let metrics = app + .view + .metrics + .borrow(); + for (_j, metric) in metrics.iter().enumerate() { + if metric.source_id == source.id { + ui.horizontal(|ui| { + metric_edit_ui( + ui, + metric, + Some(&panels), + remaining_width - 53.0, + ); + if ui.small_button("s").clicked() { + let path = FileDialog::new() + .add_filter("csv", &["csv"]) + .set_file_name(format!("{}-{}.csv", source.name, metric.name).as_str()) + .save_file(); + if let Some(_path) = path { + // serialize_values( + // &*metric + // .data + // .read() + // .expect("Values RwLock poisoned"), + // metric, + // path, + // ) + // .expect("Could not serialize data"); + } + } + if ui.small_button("×").clicked() { + // app.deleting_source = None; + // app.deleting_metric = Some(j); + } + }); + } + } + ui.horizontal(|ui| { + metric_edit_ui( + ui, + &mut app.buffer_metric, + None, + remaining_width - 53.0, + ); + ui.add_space(2.0); + if ui.small_button(" + ").clicked() { + source_to_put_metric_on = Some(source.id); + } + ui.add_space(1.0); // DAMN! + if ui.small_button("o").clicked() { + let path = FileDialog::new() + .add_filter("csv", &["csv"]) + .pick_file(); + if let Some(path) = path { + match deserialize_values(path) { + Ok((_name, _query_x, _query_y, _data)) => { + // let mut store = app + // .data + // .storage + // .lock() + // .expect("Storage Mutex poisoned"); + // match store.new_metric( + // name.as_str(), + // source.id, + // query_x.as_str(), + // query_y.as_str(), + // -1, + // Color32::TRANSPARENT, + // metrics.len() as i32, + // ) { + // Ok(verified_metric) => { + // store.put_values(verified_metric.id, &data).unwrap(); + // *verified_metric.data.write().expect("Values RwLock poisoned") = data; + // to_insert.push(verified_metric); + // } + // Err(e) => { + // error!(target: "ui", "could not save metric into archive : {:?}", e); + // } + // } + } + Err(e) => { + error!(target: "ui", "Could not deserialize metric from file : {:?}", e); } } } - if ui.small_button("×").clicked() { - app.buffer_metric = entities::metrics::Model::default(); - } - }) - }); - } else { - let metrics = app.metrics.borrow(); - source_display_ui(ui, source, remaining_width); - for metric in metrics.iter() { - if metric.source_id == source.id { - metric_display_ui(ui, metric, ui.available_width()); } - } - ui.separator(); - } + if ui.small_button("×").clicked() { + app.buffer_metric = entities::metrics::Model::default(); + } + }) + }); }); }); } @@ -200,7 +188,7 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) { // } // } if let Some(s) = source_to_put_metric_on { - for source in app.sources.borrow().iter() { + for source in app.view.sources.borrow().iter() { if source.id == s { // if let Err(e) = // app.data.add_metric(&app.input_metric, &source) @@ -212,29 +200,31 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) { } } -pub fn source_display_ui(_ui: &mut Ui, _source: &entities::sources::Model, _width: f32) { - // ui.horizontal(|ui| { +pub fn _source_display_ui(ui: &mut Ui, source: &entities::sources::Model, _width: f32) { + ui.horizontal(|ui| { + ui.heading(&source.name).on_hover_text(&source.url); // ui.add_enabled(false, Checkbox::new(&mut source.enabled, "")); - // ui.add_enabled( - // false, - // DragValue::new(&mut source.interval).clamp_range(1..=120), - // ); - // ui.heading(&source.name).on_hover_text(&source.url); - // }); + }); } -pub fn source_edit_ui(_ui: &mut Ui, _source: &entities::sources::Model, _width: f32) { - // ui.horizontal(|ui| { - // let text_width = width - 100.0; - // ui.checkbox(&mut source.enabled, ""); - // ui.add(DragValue::new(&mut source.interval).clamp_range(1..=3600)); - // TextEdit::singleline(&mut source.name) - // .desired_width(text_width / 4.0) - // .hint_text("name") - // .show(ui); - // TextEdit::singleline(&mut source.url) - // .desired_width(text_width * 3.0 / 4.0) - // .hint_text("url") - // .show(ui); - // }); +pub fn source_edit_ui(ui: &mut Ui, source: &entities::sources::Model, width: f32) { + let mut interval = source.interval.clone(); + let mut name = source.name.clone(); + let mut url = source.url.clone(); + let mut enabled = source.enabled.clone(); + ui.horizontal(|ui| { + let text_width = width - 100.0; + ui.add_enabled(false, Checkbox::new(&mut enabled, "")); + ui.add_enabled(false, DragValue::new(&mut interval).clamp_range(1..=3600)); + TextEdit::singleline(&mut name) + .interactive(false) + .desired_width(text_width / 4.0) + .hint_text("name") + .show(ui); + TextEdit::singleline(&mut url) + .interactive(false) + .desired_width(text_width * 3.0 / 4.0) + .hint_text("url") + .show(ui); + }); } diff --git a/src/main.rs b/src/main.rs index 18aee13..3eb0cc2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,14 +5,17 @@ mod worker; use tracing::metadata::LevelFilter; use tracing_subscriber::prelude::*; -use tracing::info; +use tracing::{info, error}; use tracing_subscriber::filter::filter_fn; +use eframe::egui::Context; use clap::Parser; use tokio::sync::watch; use sea_orm::Database; -use worker::{surveyor_loop, visualizer_loop}; +use worker::visualizer::AppState; +use worker::surveyor_loop; +use util::InternalLogger; use gui::{ // util::InternalLogger, App @@ -34,12 +37,16 @@ struct CliArgs { gui: bool, /// Check interval for background worker - #[arg(short, long, default_value_t = 5)] + #[arg(short, long, default_value_t = 10)] interval: u64, /// How often sources and metrics are refreshed #[arg(short, long, default_value_t = 300)] cache_time: u64, + + /// How many log lines to keep in memory + #[arg(short, long, default_value_t = 1000)] + log_size: u64, } // When compiling natively: @@ -47,25 +54,31 @@ struct CliArgs { fn main() { let args = CliArgs::parse(); + // TODO is there an alternative to this ugly botch? + let (ctx_tx, ctx_rx) = watch::channel::>(None); + + let (width_tx, width_rx) = watch::channel(0); + let (run_tx, run_rx) = watch::channel(true); + + let logger = InternalLogger::new(args.log_size as usize); + let logger_view = logger.view(); + tracing_subscriber::registry() .with(LevelFilter::INFO) .with(filter_fn(|x| x.target() != "sqlx::query")) .with(tracing_subscriber::fmt::Layer::new()) - // .with(InternalLogger::new(store.clone())) + .with(logger.layer()) .init(); - // // Set default file location - // let mut store_path = dirs::data_dir().unwrap_or(PathBuf::from(".")); // TODO get cwd more consistently? - // store_path.push("dashboard.db"); - // let store = - // Arc::new(ApplicationState::new(store_path).expect("Failed creating application state")); - + let state = AppState::new( + width_rx, + args.interval as i64, + args.cache_time as i64, + ).unwrap(); - let (panel_tx, panel_rx) = watch::channel(vec![]); - let (source_tx, source_rx) = watch::channel(vec![]); - let (metric_tx, metric_rx) = watch::channel(vec![]); - let (point_tx, point_rx) = watch::channel(vec![]); - let (view_tx, view_rx) = watch::channel(1440); + let view = state.view(); + let run_rx_clone = run_rx.clone(); + let db_uri = args.db.clone(); let worker = std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread() @@ -73,11 +86,28 @@ fn main() { .build() .unwrap() .block_on(async { - let db = Database::connect(args.db.clone()).await.unwrap(); - info!(target: "launcher", "Connected to '{}'", args.db); + let db = Database::connect(db_uri.clone()).await.unwrap(); + info!(target: "launcher", "Connected to '{}'", db_uri); let mut jobs = vec![]; + let run_rx_clone_clone = run_rx_clone.clone(); + + jobs.push( + tokio::spawn(async move { + while *run_rx_clone_clone.borrow() { + if let Some(ctx) = &*ctx_rx.borrow() { + ctx.request_repaint(); + } + tokio::time::sleep(std::time::Duration::from_secs(args.interval)).await; + } + }) + ); + + jobs.push( + tokio::spawn(logger.worker(run_rx_clone.clone())) + ); + if args.worker { jobs.push( tokio::spawn( @@ -85,6 +115,7 @@ fn main() { db.clone(), args.interval as i64, args.cache_time as i64, + run_rx_clone.clone(), ) ) ); @@ -93,16 +124,7 @@ fn main() { if args.gui { jobs.push( tokio::spawn( - visualizer_loop( - db.clone(), - args.interval, - args.cache_time as i64, - panel_tx, - source_tx, - metric_tx, - point_tx, - view_rx, - ) + state.worker(db, run_rx_clone.clone()) ) ); } @@ -116,23 +138,34 @@ fn main() { if args.gui { let native_options = eframe::NativeOptions::default(); + info!(target: "launcher", "Starting native GUI"); + eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker "dashboard", native_options, Box::new( - move |cc| Box::new( - App::new( - cc, - panel_rx, - source_rx, - metric_rx, - point_rx, - view_tx, + move |cc| { + ctx_tx.send(Some(cc.egui_ctx.clone())).unwrap_or_else(|_| { + error!(target: "launcher", "Could not share reference to egui context (won't be able to periodically refresh window)"); + }); + Box::new( + App::new( + cc, + args.db, + args.interval as i64, + view, + width_tx, + logger_view, + ) ) - ) + } ), ); + + info!(target: "launcher", "Stopping native GUI"); + + run_tx.send(false).unwrap(); } worker.join().unwrap(); diff --git a/src/util.rs b/src/util.rs index 273e759..67b36d3 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,9 +1,10 @@ use chrono::{DateTime, Local, NaiveDateTime, Utc}; use eframe::egui::{Color32, plot::PlotPoint}; -use std::{sync::Arc, error::Error, path::PathBuf}; +use tokio::sync::{watch, mpsc}; +use std::{error::Error, path::PathBuf, collections::VecDeque}; use tracing_subscriber::Layer; -use super::data::{ApplicationState, entities}; +use super::data::entities; // if you're handling more than terabytes of data, it's the future and you ought to update this code! const _PREFIXES: &'static [&'static str] = &["", "k", "M", "G", "T"]; @@ -51,7 +52,8 @@ pub fn deserialize_values(path: PathBuf) -> Result<(String, String, String, Vec< )) } -pub fn _human_size(size: u64) -> String { +#[allow(dead_code)] +pub fn human_size(size: u64) -> String { let mut buf: f64 = size as f64; let mut prefix: usize = 0; while buf > 1024.0 && prefix < _PREFIXES.len() - 1 { @@ -101,16 +103,62 @@ pub fn repack_color(c: Color32) -> u32 { } pub struct InternalLogger { - _state: Arc, + size: usize, + view_tx: watch::Sender>, + view_rx: watch::Receiver>, + msg_tx : mpsc::UnboundedSender, + msg_rx : mpsc::UnboundedReceiver, } impl InternalLogger { - pub fn _new(state: Arc) -> Self { - InternalLogger { _state: state } + pub fn new(size: usize) -> Self { + let (view_tx, view_rx) = watch::channel(vec![]); + let (msg_tx, msg_rx) = mpsc::unbounded_channel(); + InternalLogger { size, view_tx, view_rx, msg_tx, msg_rx } + } + + pub fn view(&self) -> watch::Receiver> { + self.view_rx.clone() + } + + pub fn layer(&self) -> InternalLoggerLayer { + InternalLoggerLayer::new(self.msg_tx.clone()) + } + + pub async fn worker(mut self, run: watch::Receiver) { + let mut messages = VecDeque::new(); + while *run.borrow() { + tokio::select!{ + msg = self.msg_rx.recv() => { + match msg { + Some(msg) => { + messages.push_back(msg); + while messages.len() > self.size { + messages.pop_front(); + } + self.view_tx.send(messages.clone().into()).unwrap(); + }, + None => break, + } + }, + _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, + // unblock so it checks again run and exits cleanly + } + } } } -impl Layer for InternalLogger +pub struct InternalLoggerLayer { + msg_tx: mpsc::UnboundedSender, +} + +impl InternalLoggerLayer { + pub fn new(msg_tx: mpsc::UnboundedSender) -> Self { + InternalLoggerLayer { msg_tx } + } +} + +impl Layer for InternalLoggerLayer where S: tracing::Subscriber, { @@ -123,18 +171,15 @@ where msg: "".to_string(), }; event.record(&mut msg_visitor); - let _out = format!( + let out = format!( "{} [{}] {}: {}", Local::now().format("%H:%M:%S"), event.metadata().level(), event.metadata().target(), msg_visitor.msg ); - // self.state - // .diagnostics - // .write() - // .expect("Diagnostics RwLock poisoned") - // .push(out); + + self.msg_tx.send(out).unwrap_or_default(); } } diff --git a/src/worker.rs b/src/worker.rs deleted file mode 100644 index 5045901..0000000 --- a/src/worker.rs +++ /dev/null @@ -1,159 +0,0 @@ -use chrono::Utc; -use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait, Condition, ColumnTrait, QueryFilter}; -use tokio::sync::watch; -use tracing::{error, info}; -use std::collections::VecDeque; - -use super::data::{entities, FetchError}; - -async fn fetch(url: &str) -> Result { - Ok(reqwest::get(url).await?.json().await?) -} - -pub async fn surveyor_loop( - db: DatabaseConnection, - interval:i64, - cache_time:i64, -) { - let mut last_check = Utc::now().timestamp(); - let mut last_fetch = 0; - let mut sources = vec![]; - let mut metrics = vec![]; - loop { - // sleep until next activation - let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check); - if delta_time > 0 { - std::thread::sleep(std::time::Duration::from_secs(delta_time as u64)); - } - last_check = Utc::now().timestamp(); - - if Utc::now().timestamp() - last_fetch > cache_time { - // TODO do both concurrently - let res = tokio::join!( - entities::sources::Entity::find().all(&db), - entities::metrics::Entity::find().all(&db) - ); - sources = res.0.unwrap(); - metrics = res.1.unwrap(); - last_fetch = Utc::now().timestamp(); - } - - for source in sources.iter_mut() { - if !source.enabled || !source.ready() { - continue; - } - - // source.last_fetch = Utc::now(); // TODO! do it in background threads again! - // tokio::spawn(async move { - match fetch(&source.url).await { - Ok(res) => { - let now = Utc::now().timestamp(); - entities::sources::Entity::update( - entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()} - ).exec(&db).await.unwrap(); - source.last_update = now; - for metric in metrics.iter().filter(|x| source.id == x.source_id) { - match metric.extract(&res) { - Ok(v) => { - entities::points::Entity::insert( - entities::points::ActiveModel { - id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y), - }).exec(&db).await.unwrap(); - }, - Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e), - } - } - }, - Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e), - } - // source.last_fetch = Utc::now(); // TODO! - // }); - - } - - // if let Ok(meta) = std::fs::metadata(state.file_path.clone()) { - // let mut fsize = state.file_size.write().expect("File Size RwLock poisoned"); - // *fsize = meta.len(); - // } // ignore errors - - // ctx.request_repaint(); - } -} - -pub async fn visualizer_loop( - db: DatabaseConnection, - interval: u64, - cache_time: i64, - panels_tx: watch::Sender>, - sources_tx: watch::Sender>, - metrics_tx: watch::Sender>, - points_tx: watch::Sender>, - view_rx: watch::Receiver, -) { - let mut points : VecDeque = VecDeque::new(); - let mut last_fetch = 0; - - let mut width = *view_rx.borrow() * 60; // TODO it's in minutes somewhere... - let mut lower = Utc::now().timestamp() - width; - - let mut changes; - - loop { - if Utc::now().timestamp() - last_fetch >= cache_time { - panels_tx.send(entities::panels::Entity::find().all(&db).await.unwrap()).unwrap(); - sources_tx.send(entities::sources::Entity::find().all(&db).await.unwrap()).unwrap(); - metrics_tx.send(entities::metrics::Entity::find().all(&db).await.unwrap()).unwrap(); - last_fetch = Utc::now().timestamp(); - info!(target: "worker", "Updated panels, sources and metrics"); - } - - changes = false; - let now = Utc::now().timestamp(); - let new_width = *view_rx.borrow() * 60; // TODO it's in minutes somewhere... - - if new_width != width { - let mut lower_points = entities::points::Entity::find() - .filter( - Condition::all() - .add(entities::points::Column::X.gte(now - new_width)) - .add(entities::points::Column::X.lte(now - width)) - ) - .all(&db) - .await.unwrap(); - lower_points.reverse(); // TODO wasteful! - for p in lower_points { - points.push_front(p); - changes = true; - } - } - - width = new_width; - - let new_points = entities::points::Entity::find() - .filter( - Condition::all() - .add(entities::points::Column::X.gte(lower as f64)) - ) - .all(&db) - .await.unwrap(); - - lower = Utc::now().timestamp(); - while let Some(p) = points.get(0) { - if (p.x as i64) >= lower - (*view_rx.borrow() * 60) { - break; - } - points.pop_front(); - changes = true; - } - for p in new_points { - points.push_back(p); - changes = true; - } - - if changes { - points_tx.send(points.clone().into()).unwrap(); - } - - tokio::time::sleep(std::time::Duration::from_secs(interval)).await; - } -} diff --git a/src/worker/mod.rs b/src/worker/mod.rs new file mode 100644 index 0000000..cabb281 --- /dev/null +++ b/src/worker/mod.rs @@ -0,0 +1,5 @@ +pub mod surveyor; +pub mod visualizer; + +pub use surveyor::surveyor_loop; +pub use visualizer::{AppState, AppStateView, BackgroundAction}; diff --git a/src/worker/surveyor.rs b/src/worker/surveyor.rs new file mode 100644 index 0000000..3955cff --- /dev/null +++ b/src/worker/surveyor.rs @@ -0,0 +1,81 @@ +use chrono::Utc; +use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait}; +use tokio::sync::watch; +use tracing::error; + +use crate::data::{entities, FetchError}; + +async fn fetch(url: &str) -> Result { + Ok(reqwest::get(url).await?.json().await?) +} + +pub async fn surveyor_loop( + db: DatabaseConnection, + interval:i64, + cache_time:i64, + run: watch::Receiver, +) { + let mut last_check = Utc::now().timestamp(); + let mut last_fetch = 0; + let mut sources = vec![]; + let mut metrics = vec![]; + while *run.borrow() { + // sleep until next activation + let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check); + if delta_time > 0 { + tokio::time::sleep(std::time::Duration::from_secs(delta_time as u64)).await; + } + last_check = Utc::now().timestamp(); + + if Utc::now().timestamp() - last_fetch > cache_time { + // TODO do both concurrently + let res = tokio::join!( + entities::sources::Entity::find().all(&db), + entities::metrics::Entity::find().all(&db) + ); + sources = res.0.unwrap(); + metrics = res.1.unwrap(); + last_fetch = Utc::now().timestamp(); + } + + for source in sources.iter_mut() { + if !source.enabled || !source.ready() { + continue; + } + + // source.last_fetch = Utc::now(); // TODO! do it in background threads again! + // tokio::spawn(async move { + match fetch(&source.url).await { + Ok(res) => { + let now = Utc::now().timestamp(); + entities::sources::Entity::update( + entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()} + ).exec(&db).await.unwrap(); + source.last_update = now; + for metric in metrics.iter().filter(|x| source.id == x.source_id) { + match metric.extract(&res) { + Ok(v) => { + entities::points::Entity::insert( + entities::points::ActiveModel { + id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y), + }).exec(&db).await.unwrap(); + }, + Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e), + } + } + }, + Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e), + } + // source.last_fetch = Utc::now(); // TODO! + // }); + + } + + // if let Ok(meta) = std::fs::metadata(state.file_path.clone()) { + // let mut fsize = state.file_size.write().expect("File Size RwLock poisoned"); + // *fsize = meta.len(); + // } // ignore errors + + // ctx.request_repaint(); + } +} diff --git a/src/worker/visualizer.rs b/src/worker/visualizer.rs new file mode 100644 index 0000000..9139be3 --- /dev/null +++ b/src/worker/visualizer.rs @@ -0,0 +1,251 @@ +use chrono::Utc; +use sea_orm::{DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set}; +use tokio::sync::{watch, mpsc}; +use tracing::info; +use std::collections::VecDeque; + +use crate::data::{entities, FetchError}; + +#[derive(Clone)] +pub struct AppStateView { + pub panels: watch::Receiver>, + pub sources: watch::Receiver>, + pub metrics: watch::Receiver>, + pub points: watch::Receiver>, + pub flush: mpsc::Sender<()>, + pub op: mpsc::Sender, +} + +impl AppStateView { + pub async fn _request_flush(&self) -> bool { + match self.flush.send(()).await { + Ok(_) => true, + Err(_) => false, + } + } +} + +struct AppStateTransmitters { + panels: watch::Sender>, + sources: watch::Sender>, + metrics: watch::Sender>, + points: watch::Sender>, +} + +pub struct AppState { + tx: AppStateTransmitters, + + panels: Vec, + sources: Vec, + metrics: Vec, + last_refresh: i64, + + points: VecDeque, + last_check: i64, + + flush: mpsc::Receiver<()>, + op: mpsc::Receiver, + + interval: i64, + cache_age: i64, + + width: watch::Receiver, + + view: AppStateView, +} + +async fn sleep(t:i64) { + if t > 0 { + tokio::time::sleep(std::time::Duration::from_secs(t as u64)).await + } +} + +impl AppState { + pub fn new( + width: watch::Receiver, + interval: i64, + cache_age: i64, + ) -> Result { + let (panel_tx, panel_rx) = watch::channel(vec![]); + let (source_tx, source_rx) = watch::channel(vec![]); + let (metric_tx, metric_rx) = watch::channel(vec![]); + let (point_tx, point_rx) = watch::channel(vec![]); + // let (view_tx, view_rx) = watch::channel(0); + let (flush_tx, flush_rx) = mpsc::channel(10); + let (op_tx, op_rx) = mpsc::channel(100); + + Ok(AppState { + panels: vec![], + sources: vec![], + metrics: vec![], + last_refresh: 0, + points: VecDeque::new(), + last_check: 0, + flush: flush_rx, + op: op_rx, + view: AppStateView { + panels: panel_rx, + sources: source_rx, + metrics: metric_rx, + points: point_rx, + flush: flush_tx, + op: op_tx, + }, + tx: AppStateTransmitters { + panels: panel_tx, + sources: source_tx, + metrics: metric_tx, + points: point_tx, + }, + width, + interval, + cache_age, + }) + } + + pub fn view(&self) -> AppStateView { + self.view.clone() + } + + pub async fn fetch(&mut self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> { + // TODO parallelize all this stuff + self.panels = entities::panels::Entity::find().all(db).await?; + self.tx.panels.send(self.panels.clone()).unwrap(); + self.sources = entities::sources::Entity::find().all(db).await?; + self.tx.sources.send(self.sources.clone()).unwrap(); + self.metrics = entities::metrics::Entity::find().all(db).await?; + self.tx.metrics.send(self.metrics.clone()).unwrap(); + info!(target: "worker", "Updated panels, sources and metrics"); + self.last_refresh = chrono::Utc::now().timestamp(); + Ok(()) + } + + pub fn _cache_age(&self) -> i64 { + chrono::Utc::now().timestamp() - self.last_refresh + } + + pub async fn worker(mut self, db: DatabaseConnection, run:watch::Receiver) { + let mut width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... + let mut last = Utc::now().timestamp() - width; + + while *run.borrow() { + let now = Utc::now().timestamp(); + tokio::select!{ + op = self.op.recv() => { + match op { + Some(op) => { + match op { + BackgroundAction::UpdateAllPanels { panels } => { + // TODO this is kinda rough, can it be done better? + entities::panels::Entity::delete_many().exec(&db).await.unwrap(); + entities::panels::Entity::insert_many( + panels.iter().map(|v| entities::panels::ActiveModel{ + id: Set(v.id), + name: Set(v.name.clone()), + view_scroll: Set(v.view_scroll), + view_size: Set(v.view_size), + timeserie: Set(v.timeserie), + height: Set(v.height), + limit_view: Set(v.limit_view), + position: Set(v.position), + reduce_view: Set(v.reduce_view), + view_chunks: Set(v.view_chunks), + shift_view: Set(v.shift_view), + view_offset: Set(v.view_offset), + average_view: Set(v.average_view), + }).collect::>() + ).exec(&db).await.unwrap(); + self.tx.panels.send(panels.clone()).unwrap(); + self.panels = panels; + }, + // _ => todo!(), + } + }, + None => {}, + } + }, + _ = self.flush.recv() => { + let now = Utc::now().timestamp(); + self.fetch(&db).await.unwrap(); + let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... + self.points = entities::points::Entity::find() + .filter( + Condition::all() + .add(entities::points::Column::X.gte((now - new_width) as f64)) + .add(entities::points::Column::X.lte(now as f64)) + ) + .all(&db) + .await.unwrap().into(); + self.tx.points.send(self.points.clone().into()).unwrap(); + last = Utc::now().timestamp(); + info!(target: "worker", "Reloaded points"); + }, + _ = sleep(self.cache_age - (now - self.last_refresh)) => self.fetch(&db).await.unwrap(), + _ = sleep(self.interval - (now - self.last_check)) => { + let mut changes = false; + let now = Utc::now().timestamp(); + let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... + + // fetch previous points + if new_width != width { + let mut previous_points = entities::points::Entity::find() + .filter( + Condition::all() + .add(entities::points::Column::X.gte(now - new_width)) + .add(entities::points::Column::X.lte(now - width)) + ) + .all(&db) + .await.unwrap(); + info!(target: "worker", "Fetched {} previous points", previous_points.len()); + previous_points.reverse(); // TODO wasteful! + for p in previous_points { + self.points.push_front(p); + changes = true; + } + } + + // fetch new points + let new_points = entities::points::Entity::find() + .filter( + Condition::all() + .add(entities::points::Column::X.gte(last as f64)) + .add(entities::points::Column::X.lte(now as f64)) + ) + .all(&db) + .await.unwrap(); + info!(target: "worker", "Fetched {} new points", new_points.len()); + + for p in new_points { + self.points.push_back(p); + changes = true; + } + + // remove old points + while let Some(p) = self.points.get(0) { + if (p.x as i64) >= now - (*self.width.borrow() * 60) { + break; + } + self.points.pop_front(); + changes = true; + } + + // update + last = now; + width = new_width; + self.last_check = now; + if changes { + self.tx.points.send(self.points.clone().into()).unwrap(); + } + }, + }; + } + } +} + +#[derive(Debug)] +pub enum BackgroundAction { + UpdateAllPanels { panels: Vec }, + // UpdatePanel { panel : entities::panels::ActiveModel }, + // UpdateSource { source: entities::sources::ActiveModel }, + // UpdateMetric { metric: entities::metrics::ActiveModel }, +}