chore: boilerplate over async stuff

This commit is contained in:
əlemi 2022-11-01 04:32:46 +01:00
parent cbca9f99b3
commit bc94398e49
Signed by: alemi
GPG key ID: A4895B84D311642C
12 changed files with 695 additions and 456 deletions

View file

@ -2,8 +2,6 @@ pub mod entities;
use std::num::ParseFloatError;
use sea_orm::{DatabaseConnection, EntityTrait};
#[derive(Debug)]
pub enum FetchError {
ReqwestError(reqwest::Error),
@ -39,39 +37,3 @@ impl From<sea_orm::DbErr> for FetchError {
FetchError::DbError(e)
}
}
#[allow(dead_code)]
pub struct ApplicationState {
// pub run: bool,
db: DatabaseConnection,
pub panels: Vec<entities::panels::Model>,
pub sources: Vec<entities::sources::Model>,
pub metrics: Vec<entities::metrics::Model>,
last_fetch: i64,
// pub diagnostics: RwLock<Vec<String>>,
}
#[allow(dead_code)]
impl ApplicationState {
pub fn new(db: DatabaseConnection) -> Result<ApplicationState, FetchError> {
return Ok(ApplicationState {
db,
panels: vec![],
sources: vec![],
metrics: vec![],
last_fetch: 0,
});
}
pub async fn fetch(&mut self) -> Result<(), sea_orm::DbErr> {
self.panels = entities::panels::Entity::find().all(&self.db).await?;
self.sources = entities::sources::Entity::find().all(&self.db).await?;
self.metrics = entities::metrics::Entity::find().all(&self.db).await?;
self.last_fetch = chrono::Utc::now().timestamp();
Ok(())
}
pub fn age(&self) -> i64 {
chrono::Utc::now().timestamp() - self.last_fetch
}
}

View file

@ -1,8 +1,8 @@
use eframe::{egui::{Ui, Layout, Sense, color_picker::show_color_at}, emath::Align, epaint::Color32};
use eframe::{egui::{Ui, Layout, Sense, color_picker::show_color_at, ComboBox, TextEdit}, emath::Align, epaint::Color32};
use crate::{data::entities, util::unpack_color};
fn color_square(ui: &mut Ui, color:Color32) {
fn _color_square(ui: &mut Ui, color:Color32) {
let size = ui.spacing().interact_size;
let (rect, response) = ui.allocate_exact_size(size, Sense::click());
if ui.is_rect_visible(rect) {
@ -17,9 +17,9 @@ fn color_square(ui: &mut Ui, color:Color32) {
}
}
pub fn metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width: f32) {
pub fn _metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width: f32) {
ui.horizontal(|ui| {
color_square(ui, unpack_color(metric.color));
_color_square(ui, unpack_color(metric.color));
ui.label(&metric.name);
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
@ -34,32 +34,42 @@ pub fn metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width:
}
pub fn metric_edit_ui(ui: &mut Ui, metric: &entities::metrics::Model, panels: Option<&Vec<entities::panels::Model>>, width: f32) {
let _text_width = width - 195.0;
let text_width = width - 195.0;
let mut name = metric.name.clone();
let def_str = "".into();
let mut query_x = metric.query_x.as_ref().unwrap_or(&def_str).clone();
let mut query_y = metric.query_y.clone();
let mut panel_id = 0;
ui.horizontal(|ui| {
ui.color_edit_button_srgba(&mut unpack_color(metric.color));
// TextEdit::singleline(&mut metric.name)
// .desired_width(text_width / 2.0)
// .hint_text("name")
// .show(ui);
TextEdit::singleline(&mut name)
.interactive(false)
.desired_width(text_width / 2.0)
.hint_text("name")
.show(ui);
ui.separator();
// TextEdit::singleline(&mut metric.query_x.unwrap_or("".into()))
// .desired_width(text_width / 4.0)
// .hint_text("x")
// .show(ui);
// TextEdit::singleline(&mut metric.query_y)
// .desired_width(text_width / 4.0)
// .hint_text("y")
// .show(ui);
if let Some(_panels) = panels {
// ComboBox::from_id_source(format!("panel-selector-{}", metric.id))
// .width(60.0)
// .selected_text(format!("panel: {:02}", metric.panel_id))
// .show_ui(ui, |ui| {
// ui.selectable_value(&mut metric.panel_id, -1, "None");
// for p in panels {
// ui.selectable_value(&mut metric.panel_id, p.id, p.name.as_str());
// }
// });
if query_x.len() > 0 {
TextEdit::singleline(&mut query_x)
.interactive(false)
.desired_width(text_width / 4.0)
.hint_text("x")
.show(ui);
}
TextEdit::singleline(&mut query_y)
.interactive(false)
.desired_width(if query_x.len() > 0 { 0.0 } else { 15.0 } + (text_width / if query_x.len() > 0 { 4.0 } else { 2.0 }))
.hint_text("y")
.show(ui);
if let Some(panels) = panels {
ComboBox::from_id_source(format!("panel-selector-{}", metric.id))
.width(60.0)
.selected_text(format!("panel: {:02}", metric.panel_id))
.show_ui(ui, |ui| {
ui.selectable_value(&mut panel_id, -1, "None");
for p in panels {
ui.selectable_value(&mut panel_id, p.id, p.name.as_str());
}
});
}
});
}

View file

@ -4,10 +4,11 @@ pub mod metric;
mod scaffold;
use chrono::Utc;
use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel};
use tokio::sync::watch;
use crate::data::entities;
use crate::{data::entities, worker::visualizer::AppStateView};
use panel::main_content;
use scaffold::{
// confirmation_popup_delete_metric, confirmation_popup_delete_source, footer,
@ -15,14 +16,17 @@ use scaffold::{
};
use source::source_panel;
pub struct App {
panels_rx: watch::Receiver<Vec<entities::panels::Model>>,
panels: Vec<entities::panels::Model>,
view_tx: watch::Sender<i64>,
use self::scaffold::footer;
sources: watch::Receiver<Vec<entities::sources::Model>>,
metrics: watch::Receiver<Vec<entities::metrics::Model>>,
points: watch::Receiver<Vec<entities::points::Model>>,
pub struct App {
view: AppStateView,
db_path: String,
interval: i64,
last_redraw: i64,
panels: Vec<entities::panels::Model>,
width_tx: watch::Sender<i64>,
logger_view: watch::Receiver<Vec<String>>,
buffer_panel: entities::panels::Model,
buffer_source: entities::sources::Model,
@ -36,24 +40,34 @@ pub struct App {
impl App {
pub fn new(
_cc: &eframe::CreationContext,
panels_rx: watch::Receiver<Vec<entities::panels::Model>>,
sources: watch::Receiver<Vec<entities::sources::Model>>,
metrics: watch::Receiver<Vec<entities::metrics::Model>>,
points: watch::Receiver<Vec<entities::points::Model>>,
view_tx: watch::Sender<i64>,
db_path: String,
interval: i64,
view: AppStateView,
width_tx: watch::Sender<i64>,
logger_view: watch::Receiver<Vec<String>>,
) -> Self {
let panels = panels_rx.borrow().clone();
let panels = view.panels.borrow().clone();
Self {
panels_rx, panels, view_tx,
sources, metrics, points,
db_path, interval, panels, width_tx, view, logger_view,
buffer_panel: entities::panels::Model::default(),
buffer_source: entities::sources::Model::default(),
buffer_metric: entities::metrics::Model::default(),
last_redraw: 0,
edit: false,
sidebar: true,
padding: false,
}
}
pub fn save_all_panels(&self) {
self.view.op.blocking_send(
crate::worker::visualizer::BackgroundAction::UpdateAllPanels { panels: self.panels.clone() }
).unwrap();
}
pub fn refresh_data(&self) {
self.view.flush.blocking_send(()).unwrap();
}
}
impl eframe::App for App {
@ -62,8 +76,8 @@ impl eframe::App for App {
header(self, ui, frame);
});
TopBottomPanel::bottom("footer").show(ctx, |_ui| {
// footer(self.data.clone(), ctx, ui);
TopBottomPanel::bottom("footer").show(ctx, |ui| {
footer(ctx, ui, self.logger_view.clone(), self.db_path.clone(), self.view.points.borrow().len());
});
// if let Some(index) = self.deleting_metric {
@ -75,6 +89,10 @@ impl eframe::App for App {
// .show(ctx, |ui| confirmation_popup_delete_source(self, ui, index));
// }
// for window in self.windows {
// }
if self.sidebar {
SidePanel::left("sources-bar")
.width_range(if self.edit { 400.0..=1000.0 } else { 280.0..=680.0 })
@ -87,7 +105,12 @@ impl eframe::App for App {
});
if let Some(viewsize) = self.panels.iter().map(|p| p.view_size).max() {
self.view_tx.send(viewsize as i64).unwrap();
self.width_tx.send(viewsize as i64).unwrap();
}
if Utc::now().timestamp() > self.last_redraw + self.interval {
ctx.request_repaint();
self.last_redraw = Utc::now().timestamp();
}
}
}

View file

@ -1,6 +1,6 @@
use chrono::{Local, Utc};
use eframe::{egui::{
plot::{Corner, GridMark, Legend, Line, Plot, PlotPoints, PlotPoint},
plot::{Corner, GridMark, Legend, Line, Plot},
Ui, ScrollArea, collapsing_header::CollapsingState, Context, Layout, Slider, DragValue,
}, emath::Vec2};
@ -14,7 +14,7 @@ pub fn main_content(app: &mut App, ctx: &Context, ui: &mut Ui) {
ScrollArea::vertical().show(ui, |ui| {
let panels = &mut app.panels;
let _panels_count = panels.len();
let metrics = app.metrics.borrow();
let metrics = app.view.metrics.borrow();
for (index, panel) in panels.iter_mut().enumerate() {
if index > 0 {
ui.separator(); // only show this if there is at least one panel
@ -41,7 +41,7 @@ pub fn main_content(app: &mut App, ctx: &Context, ui: &mut Ui) {
// ui.separator();
panel_title_ui(ui, panel, app.edit);
})
.body(|ui| panel_body_ui(ui, panel, &metrics, &app.points.borrow()));
.body(|ui| panel_body_ui(ui, panel, &metrics, &app.view.points.borrow()));
}
});
}

View file

@ -1,8 +1,6 @@
use std::sync::Arc;
use eframe::{Frame, egui::{collapsing_header::CollapsingState, Context, Ui, Layout, ScrollArea, global_dark_light_mode_switch}, emath::Align};
use tokio::sync::watch;
use crate::data::ApplicationState;
use crate::gui::App;
use super::panel::panel_edit_inline_ui;
@ -68,16 +66,19 @@ pub fn header(app: &mut App, ui: &mut Ui, frame: &mut Frame) {
ui.checkbox(&mut app.sidebar, "sources");
ui.separator();
if ui.button("reset").clicked() {
app.panels = app.panels_rx.borrow().clone();
app.panels = app.view.panels.borrow().clone();
}
ui.separator();
if ui.button("save").clicked() {
app.save_all_panels();
}
ui.separator();
if ui.button("refresh").clicked() {
app.refresh_data();
}
ui.separator();
ui.checkbox(&mut app.edit, "edit");
if app.edit {
if ui.button("save").clicked() {
// native_save(app.data.clone());
app.edit = false;
}
ui.separator();
ui.label("+ panel");
panel_edit_inline_ui(ui, &mut app.buffer_panel);
if ui.button("add").clicked() {
@ -97,7 +98,7 @@ pub fn header(app: &mut App, ui: &mut Ui, frame: &mut Frame) {
});
}
pub fn _footer(_data: Arc<ApplicationState>, ctx: &Context, ui: &mut Ui) {
pub fn footer(ctx: &Context, ui: &mut Ui, diagnostics: watch::Receiver<Vec<String>>, db_path: String, records: usize) {
CollapsingState::load_with_default_open(
ctx,
ui.make_persistent_id("footer-logs"),
@ -106,8 +107,9 @@ pub fn _footer(_data: Arc<ApplicationState>, ctx: &Context, ui: &mut Ui) {
.show_header(ui, |ui| {
ui.horizontal(|ui| {
ui.separator();
// ui.label(data.file_path.to_str().unwrap()); // TODO maybe calculate it just once?
ui.label(db_path); // TODO maybe calculate it just once?
ui.separator();
ui.label(format!("{} records loaded", records)); // TODO put thousands separator
// ui.label(human_size(
// *data
// .file_size
@ -131,14 +133,10 @@ pub fn _footer(_data: Arc<ApplicationState>, ctx: &Context, ui: &mut Ui) {
.body(|ui| {
ui.set_height(200.0);
ScrollArea::vertical().show(ui, |ui| {
// let msgs = data
// .diagnostics
// .read()
// .expect("Diagnostics RwLock poisoned");
ui.separator();
//for msg in msgs.iter() {
// ui.label(msg);
//}
for msg in diagnostics.borrow().iter() {
ui.label(msg);
}
ui.separator();
});
});

View file

@ -1,5 +1,5 @@
use eframe::{
egui::{Layout, ScrollArea, Ui},
egui::{Layout, ScrollArea, Ui, DragValue, TextEdit, Checkbox},
emath::Align,
};
use rfd::FileDialog;
@ -9,7 +9,7 @@ use crate::util::deserialize_values;
use crate::gui::App;
use crate::data::entities;
use super::metric::{metric_display_ui, metric_edit_ui};
use super::metric::metric_edit_ui;
pub fn source_panel(app: &mut App, ui: &mut Ui) {
let mut source_to_put_metric_on : Option<i32> = None;
@ -23,13 +23,12 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
.show(ui, |ui| {
// TODO only vertical!
{
let sources = app.sources.borrow();
let sources = app.view.sources.borrow();
let sources_count = sources.len();
ui.heading("Sources");
ui.separator();
for (i, source) in sources.iter().enumerate() {
ui.horizontal(|ui| {
if app.edit { // show buttons to move sources up and down
ui.vertical(|ui| {
ui.add_space(10.0);
if ui.small_button("+").clicked() {
@ -43,10 +42,8 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
}
}
});
}
ui.vertical(|ui| { // actual sources list container
let remaining_width = ui.available_width();
if app.edit {
ui.group(|ui| {
ui.horizontal(|ui| {
source_edit_ui(ui, source, remaining_width - 34.0);
@ -56,6 +53,7 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
}
});
let metrics = app
.view
.metrics
.borrow();
for (_j, metric) in metrics.iter().enumerate() {
@ -145,16 +143,6 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
}
})
});
} else {
let metrics = app.metrics.borrow();
source_display_ui(ui, source, remaining_width);
for metric in metrics.iter() {
if metric.source_id == source.id {
metric_display_ui(ui, metric, ui.available_width());
}
}
ui.separator();
}
});
});
}
@ -200,7 +188,7 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
// }
// }
if let Some(s) = source_to_put_metric_on {
for source in app.sources.borrow().iter() {
for source in app.view.sources.borrow().iter() {
if source.id == s {
// if let Err(e) =
// app.data.add_metric(&app.input_metric, &source)
@ -212,29 +200,31 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
}
}
pub fn source_display_ui(_ui: &mut Ui, _source: &entities::sources::Model, _width: f32) {
// ui.horizontal(|ui| {
pub fn _source_display_ui(ui: &mut Ui, source: &entities::sources::Model, _width: f32) {
ui.horizontal(|ui| {
ui.heading(&source.name).on_hover_text(&source.url);
// ui.add_enabled(false, Checkbox::new(&mut source.enabled, ""));
// ui.add_enabled(
// false,
// DragValue::new(&mut source.interval).clamp_range(1..=120),
// );
// ui.heading(&source.name).on_hover_text(&source.url);
// });
});
}
pub fn source_edit_ui(_ui: &mut Ui, _source: &entities::sources::Model, _width: f32) {
// ui.horizontal(|ui| {
// let text_width = width - 100.0;
// ui.checkbox(&mut source.enabled, "");
// ui.add(DragValue::new(&mut source.interval).clamp_range(1..=3600));
// TextEdit::singleline(&mut source.name)
// .desired_width(text_width / 4.0)
// .hint_text("name")
// .show(ui);
// TextEdit::singleline(&mut source.url)
// .desired_width(text_width * 3.0 / 4.0)
// .hint_text("url")
// .show(ui);
// });
pub fn source_edit_ui(ui: &mut Ui, source: &entities::sources::Model, width: f32) {
let mut interval = source.interval.clone();
let mut name = source.name.clone();
let mut url = source.url.clone();
let mut enabled = source.enabled.clone();
ui.horizontal(|ui| {
let text_width = width - 100.0;
ui.add_enabled(false, Checkbox::new(&mut enabled, ""));
ui.add_enabled(false, DragValue::new(&mut interval).clamp_range(1..=3600));
TextEdit::singleline(&mut name)
.interactive(false)
.desired_width(text_width / 4.0)
.hint_text("name")
.show(ui);
TextEdit::singleline(&mut url)
.interactive(false)
.desired_width(text_width * 3.0 / 4.0)
.hint_text("url")
.show(ui);
});
}

View file

@ -5,14 +5,17 @@ mod worker;
use tracing::metadata::LevelFilter;
use tracing_subscriber::prelude::*;
use tracing::info;
use tracing::{info, error};
use tracing_subscriber::filter::filter_fn;
use eframe::egui::Context;
use clap::Parser;
use tokio::sync::watch;
use sea_orm::Database;
use worker::{surveyor_loop, visualizer_loop};
use worker::visualizer::AppState;
use worker::surveyor_loop;
use util::InternalLogger;
use gui::{
// util::InternalLogger,
App
@ -34,12 +37,16 @@ struct CliArgs {
gui: bool,
/// Check interval for background worker
#[arg(short, long, default_value_t = 5)]
#[arg(short, long, default_value_t = 10)]
interval: u64,
/// How often sources and metrics are refreshed
#[arg(short, long, default_value_t = 300)]
cache_time: u64,
/// How many log lines to keep in memory
#[arg(short, long, default_value_t = 1000)]
log_size: u64,
}
// When compiling natively:
@ -47,25 +54,31 @@ struct CliArgs {
fn main() {
let args = CliArgs::parse();
// TODO is there an alternative to this ugly botch?
let (ctx_tx, ctx_rx) = watch::channel::<Option<Context>>(None);
let (width_tx, width_rx) = watch::channel(0);
let (run_tx, run_rx) = watch::channel(true);
let logger = InternalLogger::new(args.log_size as usize);
let logger_view = logger.view();
tracing_subscriber::registry()
.with(LevelFilter::INFO)
.with(filter_fn(|x| x.target() != "sqlx::query"))
.with(tracing_subscriber::fmt::Layer::new())
// .with(InternalLogger::new(store.clone()))
.with(logger.layer())
.init();
// // Set default file location
// let mut store_path = dirs::data_dir().unwrap_or(PathBuf::from(".")); // TODO get cwd more consistently?
// store_path.push("dashboard.db");
// let store =
// Arc::new(ApplicationState::new(store_path).expect("Failed creating application state"));
let state = AppState::new(
width_rx,
args.interval as i64,
args.cache_time as i64,
).unwrap();
let (panel_tx, panel_rx) = watch::channel(vec![]);
let (source_tx, source_rx) = watch::channel(vec![]);
let (metric_tx, metric_rx) = watch::channel(vec![]);
let (point_tx, point_rx) = watch::channel(vec![]);
let (view_tx, view_rx) = watch::channel(1440);
let view = state.view();
let run_rx_clone = run_rx.clone();
let db_uri = args.db.clone();
let worker = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
@ -73,11 +86,28 @@ fn main() {
.build()
.unwrap()
.block_on(async {
let db = Database::connect(args.db.clone()).await.unwrap();
info!(target: "launcher", "Connected to '{}'", args.db);
let db = Database::connect(db_uri.clone()).await.unwrap();
info!(target: "launcher", "Connected to '{}'", db_uri);
let mut jobs = vec![];
let run_rx_clone_clone = run_rx_clone.clone();
jobs.push(
tokio::spawn(async move {
while *run_rx_clone_clone.borrow() {
if let Some(ctx) = &*ctx_rx.borrow() {
ctx.request_repaint();
}
tokio::time::sleep(std::time::Duration::from_secs(args.interval)).await;
}
})
);
jobs.push(
tokio::spawn(logger.worker(run_rx_clone.clone()))
);
if args.worker {
jobs.push(
tokio::spawn(
@ -85,6 +115,7 @@ fn main() {
db.clone(),
args.interval as i64,
args.cache_time as i64,
run_rx_clone.clone(),
)
)
);
@ -93,16 +124,7 @@ fn main() {
if args.gui {
jobs.push(
tokio::spawn(
visualizer_loop(
db.clone(),
args.interval,
args.cache_time as i64,
panel_tx,
source_tx,
metric_tx,
point_tx,
view_rx,
)
state.worker(db, run_rx_clone.clone())
)
);
}
@ -116,23 +138,34 @@ fn main() {
if args.gui {
let native_options = eframe::NativeOptions::default();
info!(target: "launcher", "Starting native GUI");
eframe::run_native(
// TODO replace this with a loop that ends so we can cleanly exit the background worker
"dashboard",
native_options,
Box::new(
move |cc| Box::new(
move |cc| {
ctx_tx.send(Some(cc.egui_ctx.clone())).unwrap_or_else(|_| {
error!(target: "launcher", "Could not share reference to egui context (won't be able to periodically refresh window)");
});
Box::new(
App::new(
cc,
panel_rx,
source_rx,
metric_rx,
point_rx,
view_tx,
args.db,
args.interval as i64,
view,
width_tx,
logger_view,
)
)
}
),
);
info!(target: "launcher", "Stopping native GUI");
run_tx.send(false).unwrap();
}
worker.join().unwrap();

View file

@ -1,9 +1,10 @@
use chrono::{DateTime, Local, NaiveDateTime, Utc};
use eframe::egui::{Color32, plot::PlotPoint};
use std::{sync::Arc, error::Error, path::PathBuf};
use tokio::sync::{watch, mpsc};
use std::{error::Error, path::PathBuf, collections::VecDeque};
use tracing_subscriber::Layer;
use super::data::{ApplicationState, entities};
use super::data::entities;
// if you're handling more than terabytes of data, it's the future and you ought to update this code!
const _PREFIXES: &'static [&'static str] = &["", "k", "M", "G", "T"];
@ -51,7 +52,8 @@ pub fn deserialize_values(path: PathBuf) -> Result<(String, String, String, Vec<
))
}
pub fn _human_size(size: u64) -> String {
#[allow(dead_code)]
pub fn human_size(size: u64) -> String {
let mut buf: f64 = size as f64;
let mut prefix: usize = 0;
while buf > 1024.0 && prefix < _PREFIXES.len() - 1 {
@ -101,16 +103,62 @@ pub fn repack_color(c: Color32) -> u32 {
}
pub struct InternalLogger {
_state: Arc<ApplicationState>,
size: usize,
view_tx: watch::Sender<Vec<String>>,
view_rx: watch::Receiver<Vec<String>>,
msg_tx : mpsc::UnboundedSender<String>,
msg_rx : mpsc::UnboundedReceiver<String>,
}
impl InternalLogger {
pub fn _new(state: Arc<ApplicationState>) -> Self {
InternalLogger { _state: state }
pub fn new(size: usize) -> Self {
let (view_tx, view_rx) = watch::channel(vec![]);
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
InternalLogger { size, view_tx, view_rx, msg_tx, msg_rx }
}
pub fn view(&self) -> watch::Receiver<Vec<String>> {
self.view_rx.clone()
}
pub fn layer(&self) -> InternalLoggerLayer {
InternalLoggerLayer::new(self.msg_tx.clone())
}
pub async fn worker(mut self, run: watch::Receiver<bool>) {
let mut messages = VecDeque::new();
while *run.borrow() {
tokio::select!{
msg = self.msg_rx.recv() => {
match msg {
Some(msg) => {
messages.push_back(msg);
while messages.len() > self.size {
messages.pop_front();
}
self.view_tx.send(messages.clone().into()).unwrap();
},
None => break,
}
},
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {},
// unblock so it checks again run and exits cleanly
}
}
}
}
impl<S> Layer<S> for InternalLogger
pub struct InternalLoggerLayer {
msg_tx: mpsc::UnboundedSender<String>,
}
impl InternalLoggerLayer {
pub fn new(msg_tx: mpsc::UnboundedSender<String>) -> Self {
InternalLoggerLayer { msg_tx }
}
}
impl<S> Layer<S> for InternalLoggerLayer
where
S: tracing::Subscriber,
{
@ -123,18 +171,15 @@ where
msg: "".to_string(),
};
event.record(&mut msg_visitor);
let _out = format!(
let out = format!(
"{} [{}] {}: {}",
Local::now().format("%H:%M:%S"),
event.metadata().level(),
event.metadata().target(),
msg_visitor.msg
);
// self.state
// .diagnostics
// .write()
// .expect("Diagnostics RwLock poisoned")
// .push(out);
self.msg_tx.send(out).unwrap_or_default();
}
}

View file

@ -1,159 +0,0 @@
use chrono::Utc;
use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait, Condition, ColumnTrait, QueryFilter};
use tokio::sync::watch;
use tracing::{error, info};
use std::collections::VecDeque;
use super::data::{entities, FetchError};
async fn fetch(url: &str) -> Result<serde_json::Value, FetchError> {
Ok(reqwest::get(url).await?.json().await?)
}
pub async fn surveyor_loop(
db: DatabaseConnection,
interval:i64,
cache_time:i64,
) {
let mut last_check = Utc::now().timestamp();
let mut last_fetch = 0;
let mut sources = vec![];
let mut metrics = vec![];
loop {
// sleep until next activation
let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check);
if delta_time > 0 {
std::thread::sleep(std::time::Duration::from_secs(delta_time as u64));
}
last_check = Utc::now().timestamp();
if Utc::now().timestamp() - last_fetch > cache_time {
// TODO do both concurrently
let res = tokio::join!(
entities::sources::Entity::find().all(&db),
entities::metrics::Entity::find().all(&db)
);
sources = res.0.unwrap();
metrics = res.1.unwrap();
last_fetch = Utc::now().timestamp();
}
for source in sources.iter_mut() {
if !source.enabled || !source.ready() {
continue;
}
// source.last_fetch = Utc::now(); // TODO! do it in background threads again!
// tokio::spawn(async move {
match fetch(&source.url).await {
Ok(res) => {
let now = Utc::now().timestamp();
entities::sources::Entity::update(
entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()}
).exec(&db).await.unwrap();
source.last_update = now;
for metric in metrics.iter().filter(|x| source.id == x.source_id) {
match metric.extract(&res) {
Ok(v) => {
entities::points::Entity::insert(
entities::points::ActiveModel {
id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y),
}).exec(&db).await.unwrap();
},
Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e),
}
}
},
Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e),
}
// source.last_fetch = Utc::now(); // TODO!
// });
}
// if let Ok(meta) = std::fs::metadata(state.file_path.clone()) {
// let mut fsize = state.file_size.write().expect("File Size RwLock poisoned");
// *fsize = meta.len();
// } // ignore errors
// ctx.request_repaint();
}
}
pub async fn visualizer_loop(
db: DatabaseConnection,
interval: u64,
cache_time: i64,
panels_tx: watch::Sender<Vec<entities::panels::Model>>,
sources_tx: watch::Sender<Vec<entities::sources::Model>>,
metrics_tx: watch::Sender<Vec<entities::metrics::Model>>,
points_tx: watch::Sender<Vec<entities::points::Model>>,
view_rx: watch::Receiver<i64>,
) {
let mut points : VecDeque<entities::points::Model> = VecDeque::new();
let mut last_fetch = 0;
let mut width = *view_rx.borrow() * 60; // TODO it's in minutes somewhere...
let mut lower = Utc::now().timestamp() - width;
let mut changes;
loop {
if Utc::now().timestamp() - last_fetch >= cache_time {
panels_tx.send(entities::panels::Entity::find().all(&db).await.unwrap()).unwrap();
sources_tx.send(entities::sources::Entity::find().all(&db).await.unwrap()).unwrap();
metrics_tx.send(entities::metrics::Entity::find().all(&db).await.unwrap()).unwrap();
last_fetch = Utc::now().timestamp();
info!(target: "worker", "Updated panels, sources and metrics");
}
changes = false;
let now = Utc::now().timestamp();
let new_width = *view_rx.borrow() * 60; // TODO it's in minutes somewhere...
if new_width != width {
let mut lower_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(now - new_width))
.add(entities::points::Column::X.lte(now - width))
)
.all(&db)
.await.unwrap();
lower_points.reverse(); // TODO wasteful!
for p in lower_points {
points.push_front(p);
changes = true;
}
}
width = new_width;
let new_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(lower as f64))
)
.all(&db)
.await.unwrap();
lower = Utc::now().timestamp();
while let Some(p) = points.get(0) {
if (p.x as i64) >= lower - (*view_rx.borrow() * 60) {
break;
}
points.pop_front();
changes = true;
}
for p in new_points {
points.push_back(p);
changes = true;
}
if changes {
points_tx.send(points.clone().into()).unwrap();
}
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
}
}

5
src/worker/mod.rs Normal file
View file

@ -0,0 +1,5 @@
pub mod surveyor;
pub mod visualizer;
pub use surveyor::surveyor_loop;
pub use visualizer::{AppState, AppStateView, BackgroundAction};

81
src/worker/surveyor.rs Normal file
View file

@ -0,0 +1,81 @@
use chrono::Utc;
use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait};
use tokio::sync::watch;
use tracing::error;
use crate::data::{entities, FetchError};
async fn fetch(url: &str) -> Result<serde_json::Value, FetchError> {
Ok(reqwest::get(url).await?.json().await?)
}
pub async fn surveyor_loop(
db: DatabaseConnection,
interval:i64,
cache_time:i64,
run: watch::Receiver<bool>,
) {
let mut last_check = Utc::now().timestamp();
let mut last_fetch = 0;
let mut sources = vec![];
let mut metrics = vec![];
while *run.borrow() {
// sleep until next activation
let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check);
if delta_time > 0 {
tokio::time::sleep(std::time::Duration::from_secs(delta_time as u64)).await;
}
last_check = Utc::now().timestamp();
if Utc::now().timestamp() - last_fetch > cache_time {
// TODO do both concurrently
let res = tokio::join!(
entities::sources::Entity::find().all(&db),
entities::metrics::Entity::find().all(&db)
);
sources = res.0.unwrap();
metrics = res.1.unwrap();
last_fetch = Utc::now().timestamp();
}
for source in sources.iter_mut() {
if !source.enabled || !source.ready() {
continue;
}
// source.last_fetch = Utc::now(); // TODO! do it in background threads again!
// tokio::spawn(async move {
match fetch(&source.url).await {
Ok(res) => {
let now = Utc::now().timestamp();
entities::sources::Entity::update(
entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()}
).exec(&db).await.unwrap();
source.last_update = now;
for metric in metrics.iter().filter(|x| source.id == x.source_id) {
match metric.extract(&res) {
Ok(v) => {
entities::points::Entity::insert(
entities::points::ActiveModel {
id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y),
}).exec(&db).await.unwrap();
},
Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e),
}
}
},
Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e),
}
// source.last_fetch = Utc::now(); // TODO!
// });
}
// if let Ok(meta) = std::fs::metadata(state.file_path.clone()) {
// let mut fsize = state.file_size.write().expect("File Size RwLock poisoned");
// *fsize = meta.len();
// } // ignore errors
// ctx.request_repaint();
}
}

251
src/worker/visualizer.rs Normal file
View file

@ -0,0 +1,251 @@
use chrono::Utc;
use sea_orm::{DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set};
use tokio::sync::{watch, mpsc};
use tracing::info;
use std::collections::VecDeque;
use crate::data::{entities, FetchError};
#[derive(Clone)]
pub struct AppStateView {
pub panels: watch::Receiver<Vec<entities::panels::Model>>,
pub sources: watch::Receiver<Vec<entities::sources::Model>>,
pub metrics: watch::Receiver<Vec<entities::metrics::Model>>,
pub points: watch::Receiver<Vec<entities::points::Model>>,
pub flush: mpsc::Sender<()>,
pub op: mpsc::Sender<BackgroundAction>,
}
impl AppStateView {
pub async fn _request_flush(&self) -> bool {
match self.flush.send(()).await {
Ok(_) => true,
Err(_) => false,
}
}
}
struct AppStateTransmitters {
panels: watch::Sender<Vec<entities::panels::Model>>,
sources: watch::Sender<Vec<entities::sources::Model>>,
metrics: watch::Sender<Vec<entities::metrics::Model>>,
points: watch::Sender<Vec<entities::points::Model>>,
}
pub struct AppState {
tx: AppStateTransmitters,
panels: Vec<entities::panels::Model>,
sources: Vec<entities::sources::Model>,
metrics: Vec<entities::metrics::Model>,
last_refresh: i64,
points: VecDeque<entities::points::Model>,
last_check: i64,
flush: mpsc::Receiver<()>,
op: mpsc::Receiver<BackgroundAction>,
interval: i64,
cache_age: i64,
width: watch::Receiver<i64>,
view: AppStateView,
}
async fn sleep(t:i64) {
if t > 0 {
tokio::time::sleep(std::time::Duration::from_secs(t as u64)).await
}
}
impl AppState {
pub fn new(
width: watch::Receiver<i64>,
interval: i64,
cache_age: i64,
) -> Result<AppState, FetchError> {
let (panel_tx, panel_rx) = watch::channel(vec![]);
let (source_tx, source_rx) = watch::channel(vec![]);
let (metric_tx, metric_rx) = watch::channel(vec![]);
let (point_tx, point_rx) = watch::channel(vec![]);
// let (view_tx, view_rx) = watch::channel(0);
let (flush_tx, flush_rx) = mpsc::channel(10);
let (op_tx, op_rx) = mpsc::channel(100);
Ok(AppState {
panels: vec![],
sources: vec![],
metrics: vec![],
last_refresh: 0,
points: VecDeque::new(),
last_check: 0,
flush: flush_rx,
op: op_rx,
view: AppStateView {
panels: panel_rx,
sources: source_rx,
metrics: metric_rx,
points: point_rx,
flush: flush_tx,
op: op_tx,
},
tx: AppStateTransmitters {
panels: panel_tx,
sources: source_tx,
metrics: metric_tx,
points: point_tx,
},
width,
interval,
cache_age,
})
}
pub fn view(&self) -> AppStateView {
self.view.clone()
}
pub async fn fetch(&mut self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
// TODO parallelize all this stuff
self.panels = entities::panels::Entity::find().all(db).await?;
self.tx.panels.send(self.panels.clone()).unwrap();
self.sources = entities::sources::Entity::find().all(db).await?;
self.tx.sources.send(self.sources.clone()).unwrap();
self.metrics = entities::metrics::Entity::find().all(db).await?;
self.tx.metrics.send(self.metrics.clone()).unwrap();
info!(target: "worker", "Updated panels, sources and metrics");
self.last_refresh = chrono::Utc::now().timestamp();
Ok(())
}
pub fn _cache_age(&self) -> i64 {
chrono::Utc::now().timestamp() - self.last_refresh
}
pub async fn worker(mut self, db: DatabaseConnection, run:watch::Receiver<bool>) {
let mut width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
let mut last = Utc::now().timestamp() - width;
while *run.borrow() {
let now = Utc::now().timestamp();
tokio::select!{
op = self.op.recv() => {
match op {
Some(op) => {
match op {
BackgroundAction::UpdateAllPanels { panels } => {
// TODO this is kinda rough, can it be done better?
entities::panels::Entity::delete_many().exec(&db).await.unwrap();
entities::panels::Entity::insert_many(
panels.iter().map(|v| entities::panels::ActiveModel{
id: Set(v.id),
name: Set(v.name.clone()),
view_scroll: Set(v.view_scroll),
view_size: Set(v.view_size),
timeserie: Set(v.timeserie),
height: Set(v.height),
limit_view: Set(v.limit_view),
position: Set(v.position),
reduce_view: Set(v.reduce_view),
view_chunks: Set(v.view_chunks),
shift_view: Set(v.shift_view),
view_offset: Set(v.view_offset),
average_view: Set(v.average_view),
}).collect::<Vec<entities::panels::ActiveModel>>()
).exec(&db).await.unwrap();
self.tx.panels.send(panels.clone()).unwrap();
self.panels = panels;
},
// _ => todo!(),
}
},
None => {},
}
},
_ = self.flush.recv() => {
let now = Utc::now().timestamp();
self.fetch(&db).await.unwrap();
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
self.points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte((now - new_width) as f64))
.add(entities::points::Column::X.lte(now as f64))
)
.all(&db)
.await.unwrap().into();
self.tx.points.send(self.points.clone().into()).unwrap();
last = Utc::now().timestamp();
info!(target: "worker", "Reloaded points");
},
_ = sleep(self.cache_age - (now - self.last_refresh)) => self.fetch(&db).await.unwrap(),
_ = sleep(self.interval - (now - self.last_check)) => {
let mut changes = false;
let now = Utc::now().timestamp();
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
// fetch previous points
if new_width != width {
let mut previous_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(now - new_width))
.add(entities::points::Column::X.lte(now - width))
)
.all(&db)
.await.unwrap();
info!(target: "worker", "Fetched {} previous points", previous_points.len());
previous_points.reverse(); // TODO wasteful!
for p in previous_points {
self.points.push_front(p);
changes = true;
}
}
// fetch new points
let new_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(last as f64))
.add(entities::points::Column::X.lte(now as f64))
)
.all(&db)
.await.unwrap();
info!(target: "worker", "Fetched {} new points", new_points.len());
for p in new_points {
self.points.push_back(p);
changes = true;
}
// remove old points
while let Some(p) = self.points.get(0) {
if (p.x as i64) >= now - (*self.width.borrow() * 60) {
break;
}
self.points.pop_front();
changes = true;
}
// update
last = now;
width = new_width;
self.last_check = now;
if changes {
self.tx.points.send(self.points.clone().into()).unwrap();
}
},
};
}
}
}
#[derive(Debug)]
pub enum BackgroundAction {
UpdateAllPanels { panels: Vec<entities::panels::Model> },
// UpdatePanel { panel : entities::panels::ActiveModel },
// UpdateSource { source: entities::sources::ActiveModel },
// UpdateMetric { metric: entities::metrics::ActiveModel },
}