feat!: async refactor

this mostly has the objective of splitting fetching and visualizing, to
be able to daemonize the fetching. I swapped out ureq for reqwest,
brought in clap, removed rusqlite for full fledged sea_orm, added
tokio. Created migrations in sea_orm to reflect current db schema. Moved
data structures defititions out of app, and slimmed down gui code.
Entities collections are mostly managed by background workers and made
available with watch-channels to the main thread, which can always
nonblockingly access data. This is cool but still has some sharp
corners. Now plots don't refresh live but at specific interval. I plan
to add synchronization channels later on tho.
This commit is contained in:
əlemi 2022-10-31 02:54:42 +01:00
parent 23f77a5819
commit cbca9f99b3
Signed by: alemi
GPG key ID: A4895B84D311642C
32 changed files with 1502 additions and 1547 deletions

1
.gitignore vendored
View file

@ -1,6 +1,7 @@
# Generated by Cargo # Generated by Cargo
# will have compiled files and executables # will have compiled files and executables
/target/ /target/
/migration/target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html

View file

@ -8,8 +8,6 @@ name = "dashboard_bin"
path = "src/main.rs" path = "src/main.rs"
[features] [features]
default = ["bundled"]
bundled = ["rusqlite/bundled"]
web = ["chrono/wasmbind", "eframe/persistence"] web = ["chrono/wasmbind", "eframe/persistence"]
[dependencies] [dependencies]
@ -22,8 +20,14 @@ tracing-subscriber = "0.3"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
csv = "1.1" csv = "1.1"
rusqlite = "0.27"
jql = { version = "4", default-features = false } jql = { version = "4", default-features = false }
ureq = { version = "2", features = ["json"] }
rfd = "0.9" rfd = "0.9"
eframe = "0.18" eframe = "0.19"
tokio = { version = "1", features = ["full"] }
clap = { version = "4", features = ["derive"] }
futures = "0.3"
sea-orm = { version = "0.10", features = [ "runtime-tokio-rustls", "sqlx-sqlite", "macros" ] }
reqwest = { version = "0.11", features = ["json"] }
[profile.dev.package."*"]
opt-level = 3

23
migration/Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "migration"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "migration"
path = "src/lib.rs"
[dependencies]
async-std = { version = "^1", features = ["attributes", "tokio1"] }
chrono = "0.4.22"
[dependencies.sea-orm-migration]
version = "^0.10.0"
features = [
# Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI.
# View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime.
# e.g.
"runtime-tokio-rustls", # `ASYNC_RUNTIME` feature
"sqlx-sqlite", # `DATABASE_DRIVER` feature
]

41
migration/README.md Normal file
View file

@ -0,0 +1,41 @@
# Running Migrator CLI
- Generate a new migration file
```sh
cargo run -- migrate generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```

16
migration/src/lib.rs Normal file
View file

@ -0,0 +1,16 @@
pub use sea_orm_migration::prelude::*;
mod m20220101_000001_create_table;
mod m20221030_192706_add_last_update;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20220101_000001_create_table::Migration),
Box::new(m20221030_192706_add_last_update::Migration),
]
}
}

View file

@ -0,0 +1,165 @@
use sea_orm_migration::prelude::*;
// I wish I had used SeaOrm since the beginning:
// this first migration wouldn't be so beefy!
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Panels::Table)
.if_not_exists()
.col(
ColumnDef::new(Panels::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Panels::Name).string().not_null())
.col(ColumnDef::new(Panels::Position).integer().not_null())
.col(ColumnDef::new(Panels::Timeserie).boolean().not_null())
.col(ColumnDef::new(Panels::Height).integer().not_null())
.col(ColumnDef::new(Panels::ViewScroll).boolean().not_null())
.col(ColumnDef::new(Panels::LimitView).boolean().not_null())
.col(ColumnDef::new(Panels::ViewSize).integer().not_null())
.col(ColumnDef::new(Panels::ReduceView).boolean().not_null())
.col(ColumnDef::new(Panels::ViewChunks).integer().not_null())
.col(ColumnDef::new(Panels::ShiftView).boolean().not_null())
.col(ColumnDef::new(Panels::ViewOffset).integer().not_null())
.col(ColumnDef::new(Panels::AverageView).boolean().not_null())
.to_owned(),
).await?;
manager
.create_table(
Table::create()
.table(Sources::Table)
.if_not_exists()
.col(
ColumnDef::new(Sources::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Sources::Name).string().not_null())
.col(ColumnDef::new(Sources::Position).integer().not_null())
.col(ColumnDef::new(Sources::Enabled).boolean().not_null())
.col(ColumnDef::new(Sources::Url).string().not_null())
.col(ColumnDef::new(Sources::Interval).integer().not_null())
.to_owned(),
).await?;
manager
.create_table(
Table::create()
.table(Metrics::Table)
.if_not_exists()
.col(
ColumnDef::new(Metrics::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Metrics::Name).string().not_null())
.col(ColumnDef::new(Metrics::Position).integer().not_null())
.col(ColumnDef::new(Metrics::PanelId).integer().not_null())
.col(ColumnDef::new(Metrics::SourceId).integer().not_null())
.col(ColumnDef::new(Metrics::QueryX).string().not_null())
.col(ColumnDef::new(Metrics::QueryY).string().not_null())
.col(ColumnDef::new(Metrics::Color).integer().not_null())
.to_owned(),
).await?;
manager
.create_table(
Table::create()
.table(Points::Table)
.if_not_exists()
.col(
ColumnDef::new(Points::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Points::MetricId).integer().not_null())
.col(ColumnDef::new(Points::X).float().not_null())
.col(ColumnDef::new(Points::Y).float().not_null())
.to_owned(),
).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Panels::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(Sources::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(Metrics::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(Points::Table).to_owned())
.await?;
Ok(())
}
}
#[derive(Iden)]
enum Panels {
Table,
Id,
Name,
Position,
Timeserie,
Height,
ViewScroll,
LimitView,
ViewSize,
ReduceView,
ViewChunks,
ShiftView,
ViewOffset,
AverageView,
}
#[derive(Iden)]
enum Sources {
Table,
Id,
Name,
Position,
Enabled,
Url,
Interval,
}
#[derive(Iden)]
enum Metrics {
Table,
Id,
Name,
Position,
PanelId,
SourceId,
QueryX,
QueryY,
Color,
}
#[derive(Iden)]
enum Points {
Table,
Id,
MetricId,
X,
Y,
}

View file

@ -0,0 +1,40 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.
alter_table(
Table::alter()
.table(Sources::Table)
.add_column(
ColumnDef::new(Sources::LastUpdate)
.big_integer()
.not_null()
.default(0)
)
.to_owned()
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Sources::Table)
.drop_column(Sources::LastUpdate)
.to_owned()
)
.await
}
}
#[derive(Iden)]
enum Sources {
Table,
LastUpdate,
}

6
migration/src/main.rs Normal file
View file

@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}

View file

@ -1,144 +0,0 @@
pub mod source;
pub mod store;
use self::source::{Panel, Source, Metric};
use self::store::SQLiteDataStore;
use std::num::ParseFloatError;
use std::path::PathBuf;
use std::sync::{Mutex, RwLock};
#[derive(Debug)]
pub enum FetchError {
UreqError(ureq::Error),
IoError(std::io::Error),
// JqError(jq_rs::Error),
JQLError(String),
RusqliteError(rusqlite::Error),
ParseFloatError(ParseFloatError),
}
impl From<ureq::Error> for FetchError {
fn from(e: ureq::Error) -> Self {
FetchError::UreqError(e)
}
}
impl From<std::io::Error> for FetchError {
fn from(e: std::io::Error) -> Self {
FetchError::IoError(e)
}
}
impl From<String> for FetchError {
// TODO wtf? why does JQL error as a String?
fn from(e: String) -> Self {
FetchError::JQLError(e)
}
}
impl From<ParseFloatError> for FetchError {
fn from(e: ParseFloatError) -> Self {
FetchError::ParseFloatError(e)
}
}
impl From<rusqlite::Error> for FetchError {
fn from(e: rusqlite::Error) -> Self {
FetchError::RusqliteError(e)
}
}
pub struct ApplicationState {
pub run: bool,
pub file_path: PathBuf,
pub file_size: RwLock<u64>,
pub panels: RwLock<Vec<Panel>>,
pub sources: RwLock<Vec<Source>>,
pub metrics: RwLock<Vec<Metric>>,
pub storage: Mutex<SQLiteDataStore>,
pub diagnostics: RwLock<Vec<String>>,
}
impl ApplicationState {
pub fn new(path: PathBuf) -> Result<ApplicationState, FetchError> {
let storage = SQLiteDataStore::new(path.clone())?;
let panels = storage.load_panels()?;
let sources = storage.load_sources()?;
let metrics = storage.load_metrics()?;
return Ok(ApplicationState {
run: true,
file_size: RwLock::new(std::fs::metadata(path.clone())?.len()),
file_path: path,
panels: RwLock::new(panels),
sources: RwLock::new(sources),
metrics: RwLock::new(metrics),
storage: Mutex::new(storage),
diagnostics: RwLock::new(Vec::new()),
});
}
pub fn add_panel(&self, panel: &Panel) -> Result<(), FetchError> {
let verified_panel = self
.storage
.lock()
.expect("Storage Mutex poisoned")
.new_panel(
panel.name.as_str(),
false,
panel.view_size,
5,
0,
true,
panel.width,
panel.height,
false,
false,
false,
true,
self.panels.read().expect("Panels RwLock poisoned").len() as i32, // todo can this be made more compact and without acquisition?
)?; // TODO make values customizable and useful
self.panels
.write()
.expect("Panels RwLock poisoned")
.push(verified_panel);
Ok(())
}
pub fn add_source(&self, source: &Source) -> Result<(), FetchError> {
let verified_source = self
.storage
.lock()
.expect("Storage Mutex poisoned")
.new_source(
source.name.as_str(),
source.enabled,
source.url.as_str(),
source.interval,
self.sources.read().expect("Sources RwLock poisoned").len() as i32,
)?;
self.sources
.write()
.expect("Sources RwLock poisoned")
.push(verified_source);
return Ok(());
}
pub fn add_metric(&self, metric: &Metric, source: &Source) -> Result<(), FetchError> {
let verified_metric = self
.storage
.lock()
.expect("Storage Mutex poisoned")
.new_metric(
metric.name.as_str(),
source.id,
metric.query_x.as_str(),
metric.query_y.as_str(),
metric.panel_id,
metric.color,
self.metrics.read().expect("Sources RwLock poisoned").len() as i32, // TODO use source.metrics.len()
)?;
self.metrics
.write()
.expect("Sources RwLock poisoned")
.push(verified_metric);
return Ok(());
}
}

View file

@ -1,161 +0,0 @@
use super::FetchError;
use chrono::{DateTime, Utc};
use eframe::egui::plot::PlotPoint;
use eframe::epaint::Color32;
use std::sync::RwLock;
#[derive(Debug, Clone)]
pub struct Panel {
pub(crate) id: i32,
pub name: String,
pub view_scroll: bool,
pub view_size: u32,
pub view_chunks: u32,
pub view_offset: u32,
pub timeserie: bool,
pub(crate) width: i32,
pub(crate) height: i32,
pub limit: bool,
pub reduce: bool,
pub shift: bool,
pub average: bool,
}
impl Default for Panel {
fn default() -> Self {
Panel {
id: -1,
name: "".to_string(),
view_scroll: true,
view_size: 300,
view_chunks: 5,
view_offset: 0,
timeserie: true,
width: 100,
height: 200,
limit: false,
reduce: false,
shift: false,
average: false,
}
}
}
#[derive(Debug)]
pub struct Source {
pub(crate) id: i32,
pub name: String,
pub enabled: bool,
pub url: String,
pub interval: i32,
pub(crate) last_fetch: RwLock<DateTime<Utc>>,
}
impl Default for Source {
fn default() -> Self {
Source {
id: -1,
name: "".to_string(),
enabled: false,
url: "".to_string(),
interval: 60,
last_fetch: RwLock::new(Utc::now()),
}
}
}
fn avg_value(values: &[PlotPoint]) -> PlotPoint {
let mut x = 0.0;
let mut y = 0.0;
for v in values {
x += v.x;
y += v.y;
}
return PlotPoint {
x: x / values.len() as f64,
y: y / values.len() as f64,
};
}
impl Source {
pub fn valid(&self) -> bool {
let last_fetch = self.last_fetch.read().expect("LastFetch RwLock poisoned");
return (Utc::now() - *last_fetch).num_seconds() < self.interval as i64;
}
// pub fn fetch(&self) -> Result<serde_json::Value, FetchError> {
// fetch(self.url.as_str())
// }
}
pub fn fetch(url: &str) -> Result<serde_json::Value, FetchError> {
return Ok(ureq::get(url).call()?.into_json()?);
}
#[derive(Debug)]
pub struct Metric {
pub(crate) id: i32,
pub name: String,
pub source_id: i32,
pub color: Color32,
pub query_x: String,
pub query_y: String,
pub(crate) panel_id: i32,
pub(crate) data: RwLock<Vec<PlotPoint>>,
}
impl Default for Metric {
fn default() -> Self {
Metric {
id: -1,
name: "".to_string(),
source_id: -1,
color: Color32::TRANSPARENT,
query_x: "".to_string(),
query_y: "".to_string(),
panel_id: -1,
data: RwLock::new(Vec::new()),
}
}
}
impl Metric {
pub fn extract(&self, value: &serde_json::Value) -> Result<PlotPoint, FetchError> {
let x: f64;
if self.query_x.len() > 0 {
x = jql::walker(value, self.query_x.as_str())?
.as_f64()
.ok_or(FetchError::JQLError("X query is null".to_string()))?; // TODO what if it's given to us as a string?
} else {
x = Utc::now().timestamp() as f64;
}
let y = jql::walker(value, self.query_y.as_str())?
.as_f64()
.ok_or(FetchError::JQLError("Y query is null".to_string()))?;
Ok(PlotPoint { x, y })
}
pub fn values(
&self,
min_x: Option<f64>,
max_x: Option<f64>,
chunk_size: Option<u32>,
average: bool,
) -> Vec<PlotPoint> {
let mut values = self.data.read().expect("PlotPoints RwLock poisoned").clone();
if let Some(min_x) = min_x {
values.retain(|x| x.x > min_x);
}
if let Some(max_x) = max_x {
values.retain(|x| x.x < max_x);
}
if let Some(chunk_size) = chunk_size {
if chunk_size > 0 {
// TODO make this nested if prettier
let iter = values.chunks(chunk_size as usize);
values = iter.map(|x| if average { avg_value(x) } else { if x.len() > 0 { x[x.len()-1] } else { PlotPoint {x: 0.0, y:0.0 }} }).collect();
}
}
values
}
}

View file

@ -1,400 +0,0 @@
use crate::app::util::unpack_color;
use crate::app::{
data::source::{Panel, Source},
util::repack_color,
};
use chrono::{TimeZone, Utc};
use eframe::egui::{plot::Value, Color32};
use rusqlite::{params, Connection};
use std::sync::RwLock;
use super::source::Metric;
pub trait DataStorage {
fn add_panel(&self, name: &str);
}
pub struct SQLiteDataStore {
conn: Connection,
}
impl SQLiteDataStore {
pub fn new(path: std::path::PathBuf) -> Result<Self, rusqlite::Error> {
let conn = Connection::open(path)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS panels (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
view_scroll BOOL NOT NULL,
view_size INT NOT NULL,
timeserie BOOL NOT NULL,
width INT NOT NULL,
height INT NOT NULL,
limit_view BOOL NOT NULL,
position INT NOT NULL,
reduce_view BOOL NOT NULL,
view_chunks INT NOT NULL,
shift_view BOOL NOT NULL,
view_offset INT NOT NULL,
average_view BOOL NOT NULL
);",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS sources (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
enabled BOOL NOT NULL,
url TEXT NOT NULL,
interval INT NOT NULL,
position INT NOT NULL
);",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
source_id INT NOT NULL,
query_x TEXT NOT NULL,
query_y TEXT NOT NULL,
panel_id INT NOT NULL,
color INT NOT NULL,
position INT NOT NULL
);",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS points (
id INTEGER PRIMARY KEY,
metric_id INT NOT NULL,
x FLOAT NOT NULL,
y FLOAT NOT NULL
);",
[],
)?;
Ok(SQLiteDataStore { conn })
}
pub fn load_values(&self, metric_id: i32) -> rusqlite::Result<Vec<Value>> {
let mut values: Vec<Value> = Vec::new();
let mut statement = self
.conn
.prepare("SELECT x, y FROM points WHERE metric_id = ?")?;
let values_iter = statement.query_map(params![metric_id], |row| {
Ok(Value {
x: row.get(0)?,
y: row.get(1)?,
})
})?;
for value in values_iter {
if let Ok(v) = value {
values.push(v);
}
}
Ok(values)
}
pub fn put_value(&self, metric_id: i32, v: &Value) -> rusqlite::Result<usize> {
self.conn.execute(
"INSERT INTO points(metric_id, x, y) VALUES (?, ?, ?)",
params![metric_id, v.x, v.y],
)
}
pub fn put_values(&mut self, metric_id: i32, values: &Vec<Value>) -> rusqlite::Result<()> {
let tx = self.conn.transaction()?;
for v in values {
tx.execute(
"INSERT INTO points(metric_id, x, y) VALUES (?, ?, ?)",
params![metric_id, v.x, v.y],
)?;
}
tx.commit()?;
Ok(())
}
pub fn delete_values(&self, metric_id: i32) -> rusqlite::Result<usize> {
self.conn.execute(
"DELETE FROM points WHERE metric_id = ?",
params![metric_id]
)
}
pub fn load_sources(&self) -> rusqlite::Result<Vec<Source>> {
let mut sources: Vec<Source> = Vec::new();
let mut statement = self.conn.prepare("SELECT * FROM sources ORDER BY position")?;
let sources_iter = statement.query_map([], |row| {
Ok(Source {
id: row.get(0)?,
name: row.get(1)?,
enabled: row.get(2)?,
url: row.get(3)?,
interval: row.get(4)?,
last_fetch: RwLock::new(Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)),
})
})?;
for source in sources_iter {
if let Ok(s) = source {
sources.push(s);
}
}
Ok(sources)
}
// jank! TODO make it not jank!
pub fn new_source(
&self,
name: &str,
enabled: bool,
url: &str,
interval: i32,
position: i32,
) -> rusqlite::Result<Source> {
self.conn.execute(
"INSERT INTO sources(name, enabled, url, interval, position) VALUES (?, ?, ?, ?, ?)",
params![name, enabled, url, interval, position],
)?;
let mut statement = self
.conn
.prepare("SELECT * FROM sources WHERE name = ? AND url = ? ORDER BY id DESC")?;
for panel in statement.query_map(params![name, url], |row| {
Ok(Source {
id: row.get(0)?,
name: row.get(1)?,
enabled: row.get(2)?,
url: row.get(3)?,
interval: row.get(4)?,
// position: row.get(5)?,
last_fetch: RwLock::new(Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)),
})
})? {
if let Ok(p) = panel {
return Ok(p);
}
}
Err(rusqlite::Error::QueryReturnedNoRows)
}
pub fn update_source(
&self,
id: i32,
name: &str,
enabled: bool,
url: &str,
interval: i32,
position: i32,
) -> rusqlite::Result<usize> {
self.conn.execute(
"UPDATE sources SET name = ?, enabled = ?, url = ?, interval = ?, position = ? WHERE id = ?",
params![name, enabled, url, interval, position, id],
)
}
pub fn delete_source(&self, id:i32) -> rusqlite::Result<usize> {
self.conn.execute("DELETE FROM sources WHERE id = ?", params![id])
}
pub fn load_metrics(&self) -> rusqlite::Result<Vec<Metric>> {
let mut metrics: Vec<Metric> = Vec::new();
let mut statement = self.conn.prepare("SELECT * FROM metrics ORDER BY position")?;
let metrics_iter = statement.query_map([], |row| {
Ok(Metric {
id: row.get(0)?,
name: row.get(1)?,
source_id: row.get(2)?,
query_x: row.get(3)?,
query_y: row.get(4)?,
panel_id: row.get(5)?,
color: unpack_color(row.get(6).unwrap_or(0)),
// position: row.get(7)?,
data: RwLock::new(Vec::new()),
})
})?;
for metric in metrics_iter {
if let Ok(m) = metric {
*m.data.write().expect("Points RwLock poisoned") = self.load_values(m.id)?;
metrics.push(m);
}
}
Ok(metrics)
}
// jank! TODO make it not jank!
pub fn new_metric(
&self,
name: &str,
source_id: i32,
query_x: &str,
query_y: &str,
panel_id: i32,
color: Color32,
position: i32,
) -> rusqlite::Result<Metric> {
self.conn.execute(
"INSERT INTO metrics(name, source_id, query_x, query_y, panel_id, color, position) VALUES (?, ?, ?, ?, ?, ?, ?)",
params![name, source_id, query_x, query_y, panel_id, repack_color(color), position],
)?;
let mut statement = self
.conn
.prepare("SELECT * FROM metrics WHERE source_id = ? AND panel_id = ? AND name = ? ORDER BY id DESC")?;
for metric in statement.query_map(params![source_id, panel_id, name], |row| {
Ok(Metric {
id: row.get(0)?,
name: row.get(1)?,
source_id: row.get(2)?,
query_x: row.get(3)?,
query_y: row.get(4)?,
panel_id: row.get(5)?,
color: unpack_color(row.get(6).unwrap_or(0)),
// position: row.get(7)?,
data: RwLock::new(Vec::new()),
})
})? {
if let Ok(m) = metric {
return Ok(m);
}
}
Err(rusqlite::Error::QueryReturnedNoRows)
}
pub fn update_metric(
&self,
id: i32,
name: &str,
source_id: i32,
query_x: &str,
query_y: &str,
panel_id: i32,
color: Color32,
position: i32,
) -> rusqlite::Result<usize> {
self.conn.execute(
"UPDATE metrics SET name = ?, query_x = ?, query_y = ?, panel_id = ?, color = ?, position = ? WHERE id = ? AND source_id = ?",
params![name, query_x, query_y, panel_id, repack_color(color), position, id, source_id],
)
}
pub fn delete_metric(&self, id:i32) -> rusqlite::Result<usize> {
self.conn.execute("DELETE FROM metrics WHERE id = ?", params![id])
}
pub fn load_panels(&self) -> rusqlite::Result<Vec<Panel>> {
let mut panels: Vec<Panel> = Vec::new();
let mut statement = self
.conn
.prepare("SELECT * FROM panels ORDER BY position")?;
let panels_iter = statement.query_map([], |row| {
Ok(Panel {
id: row.get(0)?,
name: row.get(1)?,
view_scroll: row.get(2)?,
view_size: row.get(3)?,
timeserie: row.get(4)?,
width: row.get(5)?,
height: row.get(6)?,
limit: row.get(7)?,
// position: row.get(8)?,
reduce: row.get(9)?,
view_chunks: row.get(10)?,
shift: row.get(11)?,
view_offset: row.get(12)?,
average: row.get(13)?,
})
})?;
for panel in panels_iter {
if let Ok(p) = panel {
panels.push(p);
}
}
Ok(panels)
}
// jank! TODO make it not jank!
pub fn new_panel(
&self,
name: &str,
view_scroll: bool,
view_size: u32,
view_chunks: u32,
view_offset: u32,
timeserie: bool,
width: i32,
height: i32,
limit: bool,
reduce: bool,
shift: bool,
average: bool,
position: i32,
) -> rusqlite::Result<Panel> {
self.conn.execute(
"INSERT INTO panels (name, view_scroll, view_size, timeserie, width, height, limit_view, position, reduce_view, view_chunks, shift_view, view_offset, average_view) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![name, view_scroll, view_size, timeserie, width, height, limit, position, reduce, view_chunks, shift, view_offset, average]
)?;
let mut statement = self.conn.prepare("SELECT * FROM panels WHERE name = ?")?;
for panel in statement.query_map(params![name], |row| {
Ok(Panel {
id: row.get(0)?,
name: row.get(1)?,
view_scroll: row.get(2)?,
view_size: row.get(3)?,
timeserie: row.get(4)?,
width: row.get(5)?,
height: row.get(6)?,
limit: row.get(7)?,
// position: row.get(8)?,
reduce: row.get(9)?,
view_chunks: row.get(10)?,
shift: row.get(11)?,
view_offset: row.get(12)?,
average: row.get(13)?,
})
})? {
if let Ok(p) = panel {
return Ok(p);
}
}
Err(rusqlite::Error::QueryReturnedNoRows)
}
pub fn update_panel(
&self,
id: i32,
name: &str,
view_scroll: bool,
view_size: u32,
view_chunks: u32,
view_offset: u32,
timeserie: bool,
width: i32,
height: i32,
limit: bool,
reduce: bool,
shift: bool,
position: i32,
) -> rusqlite::Result<usize> {
self.conn.execute(
"UPDATE panels SET name = ?, view_scroll = ?, view_size = ?, timeserie = ?, width = ?, height = ?, limit_view = ?, position = ?, reduce_view = ?, view_chunks = ?, shift_view = ?, view_offset = ? WHERE id = ?",
params![name, view_scroll, view_size, timeserie, width, height, limit, position, reduce, view_chunks, shift, view_offset, id],
)
}
pub fn delete_panel(&self, id:i32) -> rusqlite::Result<usize> {
self.conn.execute("DELETE FROM panels WHERE id = ?", params![id])
}
}

View file

@ -1,69 +0,0 @@
use eframe::{egui::{Ui, TextEdit, ComboBox, Layout, Sense, color_picker::show_color_at}, emath::Align, epaint::Color32};
use crate::app::data::source::{Panel, Metric};
fn color_square(ui: &mut Ui, color:Color32) {
let size = ui.spacing().interact_size;
let (rect, response) = ui.allocate_exact_size(size, Sense::click());
if ui.is_rect_visible(rect) {
let visuals = ui.style().interact(&response);
let rect = rect.expand(visuals.expansion);
show_color_at(ui.painter(), color, rect);
let rounding = visuals.rounding.at_most(2.0);
ui.painter()
.rect_stroke(rect, rounding, (2.0, visuals.bg_fill)); // fill is intentional, because default style has no border
}
}
pub fn metric_display_ui(ui: &mut Ui, metric: &Metric, _width: f32) {
ui.horizontal(|ui| {
color_square(ui, metric.color);
ui.label(&metric.name);
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if metric.panel_id >= 0 {
ui.label(format!("panel: {}", metric.panel_id));
}
if metric.query_y.len() > 0 {
ui.label(format!("y: {}", metric.query_y));
}
if metric.query_x.len() > 0 {
ui.label(format!("x: {}", metric.query_x));
}
})
});
});
}
pub fn metric_edit_ui(ui: &mut Ui, metric: &mut Metric, panels: Option<&Vec<Panel>>, width: f32) {
let text_width = width - 195.0;
ui.horizontal(|ui| {
ui.color_edit_button_srgba(&mut metric.color);
TextEdit::singleline(&mut metric.name)
.desired_width(text_width / 2.0)
.hint_text("name")
.show(ui);
ui.separator();
TextEdit::singleline(&mut metric.query_x)
.desired_width(text_width / 4.0)
.hint_text("x")
.show(ui);
TextEdit::singleline(&mut metric.query_y)
.desired_width(text_width / 4.0)
.hint_text("y")
.show(ui);
if let Some(panels) = panels {
ComboBox::from_id_source(format!("panel-selector-{}", metric.id))
.width(60.0)
.selected_text(format!("panel: {:02}", metric.panel_id))
.show_ui(ui, |ui| {
ui.selectable_value(&mut metric.panel_id, -1, "None");
for p in panels {
ui.selectable_value(&mut metric.panel_id, p.id, p.name.as_str());
}
});
}
});
}

View file

@ -1,4 +0,0 @@
pub mod panel;
pub mod source;
pub mod metric;
pub mod scaffold;

View file

@ -1,239 +0,0 @@
use chrono::{Local, Utc};
use eframe::{egui::{
plot::{Corner, GridMark, Legend, Line, Plot, PlotPoints},
DragValue, Layout, Ui, Slider, TextEdit, ScrollArea, collapsing_header::CollapsingState, Context,
}, emath::Vec2};
use tracing::error;
use crate::app::{
data::source::{Panel, Metric},
util::timestamp_to_str, App,
};
pub fn main_content(app: &mut App, ctx: &Context, ui: &mut Ui) {
let mut to_swap: Option<usize> = None;
let mut to_delete: Option<usize> = None;
ScrollArea::vertical().show(ui, |ui| {
let mut panels = app.data.panels.write().expect("Panels RwLock poisoned"); // TODO only lock as write when editing
let panels_count = panels.len();
let metrics = app.data.metrics.read().expect("Metrics RwLock poisoned"); // TODO only lock as write when editing
for (index, panel) in panels.iter_mut().enumerate() {
if index > 0 {
ui.separator();
}
CollapsingState::load_with_default_open(
ctx,
ui.make_persistent_id(format!("panel-{}-compressable", panel.id)),
true,
)
.show_header(ui, |ui| {
if app.edit {
if ui.small_button(" + ").clicked() {
if index > 0 {
to_swap = Some(index); // TODO kinda jank but is there a better way?
}
}
if ui.small_button(" ").clicked() {
if index < panels_count - 1 {
to_swap = Some(index + 1); // TODO kinda jank but is there a better way?
}
}
if ui.small_button(" × ").clicked() {
to_delete = Some(index); // TODO kinda jank but is there a better way?
}
ui.separator();
}
panel_title_ui(ui, panel, app.edit);
})
.body(|ui| panel_body_ui(ui, panel, &metrics));
}
});
if let Some(i) = to_delete {
// TODO can this be done in background? idk
let mut panels = app.data.panels.write().expect("Panels RwLock poisoned");
if let Err(e) = app
.data
.storage
.lock()
.expect("Storage Mutex poisoned")
.delete_panel(panels[i].id)
{
error!(target: "ui", "Could not delete panel : {:?}", e);
} else {
for metric in app
.data
.metrics
.write()
.expect("Sources RwLock poisoned")
.iter_mut()
{
if metric.panel_id == panels[i].id {
metric.panel_id = -1;
}
}
panels.remove(i);
}
} else if let Some(i) = to_swap {
// TODO can this be done in background? idk
let mut panels = app.data.panels.write().expect("Panels RwLock poisoned");
panels.swap(i - 1, i);
}
}
pub fn panel_edit_inline_ui(ui: &mut Ui, panel: &mut Panel) {
TextEdit::singleline(&mut panel.name)
.hint_text("name")
.desired_width(100.0)
.show(ui);
}
pub fn panel_title_ui(ui: &mut Ui, panel: &mut Panel, edit: bool) { // TODO make edit UI in separate func
ui.horizontal(|ui| {
if edit {
TextEdit::singleline(&mut panel.name)
.hint_text("name")
.desired_width(150.0)
.show(ui);
ui.separator();
ui.add(Slider::new(&mut panel.height, 0..=500).text("height"));
ui.separator();
ui.checkbox(&mut panel.timeserie, "timeserie");
} else {
ui.heading(panel.name.as_str());
}
ui.with_layout(Layout::right_to_left(eframe::emath::Align::Min), |ui| {
ui.horizontal(|ui| {
ui.toggle_value(&mut panel.view_scroll, "🔒");
ui.separator();
if panel.limit {
ui.add(
DragValue::new(&mut panel.view_size)
.speed(10)
.suffix(" min")
.clamp_range(0..=2147483647i32),
);
}
ui.toggle_value(&mut panel.limit, "limit");
ui.separator();
if panel.shift {
ui.add(
DragValue::new(&mut panel.view_offset)
.speed(10)
.suffix(" min")
.clamp_range(0..=2147483647i32),
);
}
ui.toggle_value(&mut panel.shift, "offset");
ui.separator();
if panel.reduce {
ui.add(
DragValue::new(&mut panel.view_chunks)
.speed(1)
.prefix("x")
.clamp_range(1..=1000), // TODO allow to average larger spans maybe?
);
ui.toggle_value(&mut panel.average, "avg");
}
ui.toggle_value(&mut panel.reduce, "reduce");
});
});
});
}
pub fn panel_body_ui(ui: &mut Ui, panel: &mut Panel, metrics: &Vec<Metric>) {
let mut p = Plot::new(format!("plot-{}", panel.name))
.height(panel.height as f32)
.allow_scroll(false)
.legend(Legend::default().position(Corner::LeftTop));
if panel.limit {
p = p.set_margin_fraction(Vec2 { x: 0.0, y: 0.1 });
}
if panel.timeserie {
if panel.view_scroll {
let _now = (Utc::now().timestamp() as f64) - (60.0 * panel.view_offset as f64);
p = p.include_x(_now);
if panel.limit {
p = p
.include_x(_now + (panel.view_size as f64 * 3.0))
.include_x(_now - (panel.view_size as f64 * 60.0)); // ??? TODO
}
}
p = p
.x_axis_formatter(|x, _range| timestamp_to_str(x as i64, true, false))
.label_formatter(|name, value| {
if !name.is_empty() {
return format!(
"{}\nx = {}\ny = {:.1}",
name,
timestamp_to_str(value.x as i64, false, true),
value.y
);
} else {
return format!(
"x = {}\ny = {:.1}",
timestamp_to_str(value.x as i64, false, true),
value.y
);
}
})
.x_grid_spacer(|grid| {
let offset = Local::now().offset().local_minus_utc() as i64;
let (start, end) = grid.bounds;
let mut counter = (start as i64) - ((start as i64) % 3600);
let mut out: Vec<GridMark> = Vec::new();
loop {
counter += 3600;
if counter > end as i64 {
break;
}
if (counter + offset) % 86400 == 0 {
out.push(GridMark {
value: counter as f64,
step_size: 86400 as f64,
})
} else if counter % 3600 == 0 {
out.push(GridMark {
value: counter as f64,
step_size: 3600 as f64,
});
}
}
return out;
});
}
let mut lines : Vec<Line> = Vec::new();
let _now = Utc::now().timestamp() as f64;
let _off = (panel.view_offset as f64) * 60.0; // TODO multiplying x60 makes sense only for timeseries
let _size = (panel.view_size as f64) * 60.0; // TODO multiplying x60 makes sense only for timeseries
let min_x = if panel.limit { Some(_now - _size - _off) } else { None };
let max_x = if panel.shift { Some(_now - _off) } else { None };
let chunk_size = if panel.reduce { Some(panel.view_chunks) } else { None };
for metric in metrics {
if metric.panel_id == panel.id {
let values = metric.values(min_x, max_x, chunk_size, panel.average);
// if !panel.timeserie && panel.view_scroll && values.len() > 0 {
// let l = values.len() - 1;
// p = p.include_x(values[0].x)
// .include_x(values[l].x)
// .include_y(values[0].y)
// .include_y(values[l].y);
// }
let values_splice : Vec<[f64;2]> = values.iter().map(|x| [x.x, x.y]).collect();
lines.push(
Line::new(values_splice)
.name(metric.name.as_str())
.color(metric.color)
);
}
}
p.show(ui, |plot_ui| {
for line in lines {
plot_ui.line(line);
}
});
}

View file

@ -1,141 +0,0 @@
use std::sync::Arc;
use eframe::{Frame, egui::{collapsing_header::CollapsingState, Context, Ui, Layout, ScrollArea, global_dark_light_mode_switch}, emath::Align};
use tracing::error;
use crate::app::{data::ApplicationState, util::human_size, App, worker::native_save};
use super::panel::panel_edit_inline_ui;
// TODO make this not super specific!
pub fn confirmation_popup_delete_metric(app: &mut App, ui: &mut Ui, metric_index: usize) {
ui.heading("Are you sure you want to delete this metric?");
ui.label("This will remove all its metrics and delete all points from archive. This action CANNOT BE UNDONE!");
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if ui.button("\n yes \n").clicked() {
let store = app.data.storage.lock().expect("Storage Mutex poisoned");
let mut metrics = app.data.metrics.write().expect("Metrics RwLock poisoned");
store.delete_metric(metrics[metric_index].id).expect("Failed deleting metric");
store.delete_values(metrics[metric_index].id).expect("Failed deleting values");
metrics.remove(metric_index);
app.deleting_metric = None;
}
if ui.button("\n no \n").clicked() {
app.deleting_metric = None;
}
});
});
}
// TODO make this not super specific!
pub fn confirmation_popup_delete_source(app: &mut App, ui: &mut Ui, source_index: usize) {
ui.heading("Are you sure you want to delete this source?");
ui.label("This will remove all its metrics and delete all points from archive. This action CANNOT BE UNDONE!");
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if ui.button("\n yes \n").clicked() {
let store = app.data.storage.lock().expect("Storage Mutex poisoned");
let mut sources = app.data.sources.write().expect("sources RwLock poisoned");
let mut metrics = app.data.metrics.write().expect("Metrics RwLock poisoned");
let mut to_remove = Vec::new();
for j in 0..metrics.len() {
if metrics[j].source_id == app.input_source.id {
store.delete_values(metrics[j].id).expect("Failed deleting values");
store.delete_metric(metrics[j].id).expect("Failed deleting Metric");
to_remove.push(j);
}
}
for index in to_remove {
metrics.remove(index);
}
store.delete_source(sources[source_index].id).expect("Failed deleting source");
sources.remove(source_index);
app.deleting_source = None;
}
if ui.button("\n no \n").clicked() {
app.deleting_source = None;
}
});
});
}
pub fn header(app: &mut App, ui: &mut Ui, frame: &mut Frame) {
ui.horizontal(|ui| {
global_dark_light_mode_switch(ui);
ui.heading("dashboard");
ui.separator();
ui.checkbox(&mut app.sources, "sources");
ui.separator();
ui.checkbox(&mut app.edit, "edit");
if app.edit {
if ui.button("save").clicked() {
native_save(app.data.clone());
app.edit = false;
}
ui.separator();
ui.label("+ panel");
panel_edit_inline_ui(ui, &mut app.input_panel);
if ui.button("add").clicked() {
if let Err(e) = app.data.add_panel(&app.input_panel) {
error!(target: "ui", "Failed to add panel: {:?}", e);
};
}
ui.separator();
}
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if ui.small_button("×").clicked() {
frame.close();
}
});
});
});
}
pub fn footer(data: Arc<ApplicationState>, ctx: &Context, ui: &mut Ui) {
CollapsingState::load_with_default_open(
ctx,
ui.make_persistent_id("footer-logs"),
false,
)
.show_header(ui, |ui| {
ui.horizontal(|ui| {
ui.separator();
ui.label(data.file_path.to_str().unwrap()); // TODO maybe calculate it just once?
ui.separator();
ui.label(human_size(
*data
.file_size
.read()
.expect("Filesize RwLock poisoned"),
));
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
ui.label(format!(
"v{}-{}",
env!("CARGO_PKG_VERSION"),
git_version::git_version!()
));
ui.separator();
ui.hyperlink_to("<me@alemi.dev>", "mailto:me@alemi.dev");
ui.label("alemi");
});
});
});
})
.body(|ui| {
ui.set_height(200.0);
ScrollArea::vertical().show(ui, |ui| {
let msgs = data
.diagnostics
.read()
.expect("Diagnostics RwLock poisoned");
ui.separator();
for msg in msgs.iter() {
ui.label(msg);
}
ui.separator();
});
});
}

View file

@ -1,76 +0,0 @@
pub mod data;
pub mod gui;
pub mod util;
pub mod worker;
use eframe::egui::Window;
use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel};
use std::sync::Arc;
use self::data::source::{Metric, Panel, Source};
use self::data::ApplicationState;
use self::gui::panel::main_content;
use self::gui::scaffold::{
confirmation_popup_delete_metric, confirmation_popup_delete_source, footer, header,
};
use self::gui::source::source_panel;
pub struct App {
data: Arc<ApplicationState>,
input_metric: Metric,
input_source: Source,
input_panel: Panel,
deleting_metric: Option<usize>,
deleting_source: Option<usize>,
edit: bool,
sources: bool,
padding: bool,
}
impl App {
pub fn new(_cc: &eframe::CreationContext, data: Arc<ApplicationState>) -> Self {
Self {
data,
input_metric: Metric::default(),
input_panel: Panel::default(),
input_source: Source::default(),
deleting_metric: None,
deleting_source: None,
edit: false,
sources: true,
padding: false,
}
}
}
impl eframe::App for App {
fn update(&mut self, ctx: &Context, frame: &mut eframe::Frame) {
TopBottomPanel::top("header").show(ctx, |ui| {
header(self, ui, frame);
});
TopBottomPanel::bottom("footer").show(ctx, |ui| {
footer(self.data.clone(), ctx, ui);
});
if let Some(index) = self.deleting_metric {
Window::new(format!("Delete Metric #{}?", index))
.show(ctx, |ui| confirmation_popup_delete_metric(self, ui, index));
}
if let Some(index) = self.deleting_source {
Window::new(format!("Delete Source #{}?", index))
.show(ctx, |ui| confirmation_popup_delete_source(self, ui, index));
}
if self.sources {
SidePanel::left("sources-bar")
.width_range(if self.edit { 400.0..=1000.0 } else { 280.0..=680.0 })
.default_width(if self.edit { 450.0 } else { 330.0 })
.show(ctx, |ui| source_panel(self, ui));
}
CentralPanel::default().show(ctx, |ui| {
main_content(self, ctx, ui);
});
}
}

View file

@ -1,146 +0,0 @@
use crate::app::data::{source::fetch, ApplicationState};
use chrono::Utc;
use eframe::egui::Context;
use std::sync::Arc;
use tracing::warn;
pub fn native_save(state: Arc<ApplicationState>) {
std::thread::spawn(move || {
let storage = state.storage.lock().expect("Storage Mutex poisoned");
let panels = state.panels.read().expect("Panels RwLock poisoned");
for (index, panel) in panels.iter().enumerate() {
if let Err(e) = storage.update_panel(
panel.id,
panel.name.as_str(),
panel.view_scroll,
panel.view_size,
panel.view_chunks,
panel.view_offset,
panel.timeserie,
panel.width,
panel.height,
panel.limit,
panel.reduce,
panel.shift,
index as i32,
) {
warn!(target: "native-save", "Could not update panel #{} : {:?}", panel.id, e);
}
}
let sources = state.sources.read().expect("Sources RwLock poisoned");
for (index, source) in sources.iter().enumerate() {
if let Err(e) = storage.update_source(
source.id,
source.name.as_str(),
source.enabled,
source.url.as_str(),
source.interval,
index as i32,
) {
warn!(target: "native-save", "Could not update source #{} : {:?}", source.id, e);
}
}
let metrics = state.metrics.read().expect("Metrics RwLock poisoned");
for (index, metric) in metrics.iter().enumerate() {
if let Err(e) = storage.update_metric(
metric.id,
metric.name.as_str(),
metric.source_id,
metric.query_x.as_str(),
metric.query_y.as_str(),
metric.panel_id,
metric.color,
index as i32,
) {
warn!(target: "native-save", "Could not update metric #{} : {:?}", metric.id, e);
}
}
});
}
pub(crate) trait BackgroundWorker {
fn start(state: Arc<ApplicationState>, ctx: Context) -> Self; // TODO make it return an error? Can we even do anything without a background worker
fn stop(self); // TODO make it return an error? Can we even do anything without a background worker
}
pub(crate) struct NativeBackgroundWorker {
worker: std::thread::JoinHandle<()>,
}
impl BackgroundWorker for NativeBackgroundWorker {
fn start(state: Arc<ApplicationState>, ctx: Context) -> Self {
let worker = std::thread::spawn(move || {
let mut last_check = 0;
while state.run {
let delta_time = 1000 - (Utc::now().timestamp_millis() - last_check);
if delta_time > 0 {
std::thread::sleep(std::time::Duration::from_millis(delta_time as u64));
}
last_check = Utc::now().timestamp_millis();
let sources = state.sources.read().expect("Sources RwLock poisoned");
for j in 0..sources.len() {
let s_id = sources[j].id;
if sources[j].enabled && !sources[j].valid() {
let mut last_update = sources[j]
.last_fetch
.write()
.expect("Sources RwLock poisoned");
*last_update = Utc::now();
let state2 = state.clone();
let source_name = sources[j].name.clone();
let url = sources[j].url.clone();
std::thread::spawn(move || {
// TODO this can overspawn if a request takes longer than the refresh interval!
match fetch(url.as_str()) {
Ok(res) => {
let store =
state2.storage.lock().expect("Storage mutex poisoned");
for metric in state2.metrics.read().expect("Metrics RwLock poisoned").iter() {
if metric.source_id == s_id {
match metric.extract(&res) {
Ok(v) => {
metric.data.write().expect("Data RwLock poisoned").push(v);
if let Err(e) = store.put_value(metric.id, &v) {
warn!(target:"background-worker", "Could not put sample for source #{} in db: {:?}", s_id, e);
}
}
Err(e) => {
warn!(target:"background-worker", "[{}|{}] Could not extract value from result: {:?}", source_name, metric.name, e); // TODO: more info!
}
}
}
}
let sources = state2.sources.read().expect("Sources RwLock poisoned");
let mut last_update = sources[j]
.last_fetch
.write()
.expect("Source last update RwLock poisoned");
*last_update = Utc::now(); // overwrite it so fetches comply with API slowdowns and get desynched among them
}
Err(e) => {
warn!(target:"background-worker", "Could not fetch value from {} : {:?}", url, e);
}
}
});
}
}
if let Ok(meta) = std::fs::metadata(state.file_path.clone()) {
let mut fsize = state.file_size.write().expect("File Size RwLock poisoned");
*fsize = meta.len();
} // ignore errors
ctx.request_repaint();
}
});
return NativeBackgroundWorker { worker };
}
fn stop(self) {
self.worker
.join()
.expect("Failed joining main worker thread");
}
}

View file

@ -0,0 +1,66 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use chrono::Utc;
use eframe::egui::plot::PlotPoint;
use sea_orm::entity::prelude::*;
use crate::data::FetchError;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "metrics")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: i32,
pub name: String,
pub source_id: i32,
pub query_x: Option<String>,
pub query_y: String,
pub panel_id: i32,
pub color: u32,
pub position: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub fn extract(&self, value: &serde_json::Value) -> Result<PlotPoint, FetchError> {
let x: f64;
let fallback_query = "".into();
let q_x = self.query_x.as_ref().unwrap_or(&fallback_query);
// TODO because of bad design, empty queries are
// empty strings in my db. Rather than converting
// them right away, I'm putting this jank fix:
// checking len
if q_x.len() > 0 {
x = jql::walker(value, q_x.as_str())?
.as_f64()
.ok_or(FetchError::JQLError("X query is null".to_string()))?; // TODO what if it's given to us as a string?
} else {
x = Utc::now().timestamp() as f64;
}
let y = jql::walker(value, self.query_y.as_str())?
.as_f64()
.ok_or(FetchError::JQLError("Y query is null".to_string()))?;
Ok(PlotPoint { x, y })
}
}
impl Default for Model {
fn default() -> Self {
Model {
id: 0,
name: "".into(),
source_id: 0,
query_x: None,
query_y: "".into(),
panel_id: 0,
color: 0,
position: 0,
}
}
}

8
src/data/entities/mod.rs Normal file
View file

@ -0,0 +1,8 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
pub mod prelude;
pub mod metrics;
pub mod panels;
pub mod points;
pub mod sources;

View file

@ -0,0 +1,48 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "panels")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = true)]
pub id: i32,
pub name: String,
pub view_scroll: bool,
pub view_size: i32,
pub timeserie: bool,
pub height: i32,
pub limit_view: bool,
pub position: i32,
pub reduce_view: bool,
pub view_chunks: i32,
pub shift_view: bool,
pub view_offset: i32,
pub average_view: bool,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
impl Default for Model {
fn default() -> Self {
Model {
id: 0,
name: "".into(),
view_scroll: true,
view_size: 1000,
timeserie: true,
height: 100,
limit_view: true,
position: 0,
reduce_view: false,
view_chunks: 10,
shift_view: false,
view_offset: 0,
average_view: true,
}
}
}

View file

@ -0,0 +1,24 @@
use sea_orm::entity::prelude::*;
use eframe::egui::plot::PlotPoint;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "points")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: i32,
pub metric_id: i32,
pub x: f64,
pub y: f64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
impl Into<PlotPoint> for Model {
fn into(self) -> PlotPoint {
PlotPoint { x: self.x, y: self.y }
}
}

View file

@ -0,0 +1,6 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.10.1
pub use super::metrics::Entity as Metrics;
pub use super::panels::Entity as Panels;
pub use super::points::Entity as Points;
pub use super::sources::Entity as Sources;

View file

@ -0,0 +1,46 @@
use sea_orm::entity::prelude::*;
use chrono::Utc;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "sources")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: i32,
pub name: String,
pub enabled: bool,
pub url: String,
pub interval: i32,
pub last_update: i64,
pub position: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub fn cooldown(&self) -> i64 {
let elapsed = Utc::now().timestamp() - self.last_update;
(self.interval as i64) - elapsed
}
pub fn ready(&self) -> bool {
self.cooldown() <= 0
}
}
impl Default for Model {
fn default() -> Self {
Model {
id: 0,
name: "".into(),
enabled: false,
url: "".into(),
interval: 60,
last_update: 0,
position: 0,
}
}
}

77
src/data/mod.rs Normal file
View file

@ -0,0 +1,77 @@
pub mod entities;
use std::num::ParseFloatError;
use sea_orm::{DatabaseConnection, EntityTrait};
#[derive(Debug)]
pub enum FetchError {
ReqwestError(reqwest::Error),
IoError(std::io::Error),
JQLError(String),
ParseFloatError(ParseFloatError),
DbError(sea_orm::DbErr),
}
impl From<reqwest::Error> for FetchError {
fn from(e: reqwest::Error) -> Self {
FetchError::ReqwestError(e)
}
}
impl From<std::io::Error> for FetchError {
fn from(e: std::io::Error) -> Self {
FetchError::IoError(e)
}
}
impl From<String> for FetchError {
// TODO wtf? why does JQL error as a String?
fn from(e: String) -> Self {
FetchError::JQLError(e)
}
}
impl From<ParseFloatError> for FetchError {
fn from(e: ParseFloatError) -> Self {
FetchError::ParseFloatError(e)
}
}
impl From<sea_orm::DbErr> for FetchError {
fn from(e: sea_orm::DbErr) -> Self {
FetchError::DbError(e)
}
}
#[allow(dead_code)]
pub struct ApplicationState {
// pub run: bool,
db: DatabaseConnection,
pub panels: Vec<entities::panels::Model>,
pub sources: Vec<entities::sources::Model>,
pub metrics: Vec<entities::metrics::Model>,
last_fetch: i64,
// pub diagnostics: RwLock<Vec<String>>,
}
#[allow(dead_code)]
impl ApplicationState {
pub fn new(db: DatabaseConnection) -> Result<ApplicationState, FetchError> {
return Ok(ApplicationState {
db,
panels: vec![],
sources: vec![],
metrics: vec![],
last_fetch: 0,
});
}
pub async fn fetch(&mut self) -> Result<(), sea_orm::DbErr> {
self.panels = entities::panels::Entity::find().all(&self.db).await?;
self.sources = entities::sources::Entity::find().all(&self.db).await?;
self.metrics = entities::metrics::Entity::find().all(&self.db).await?;
self.last_fetch = chrono::Utc::now().timestamp();
Ok(())
}
pub fn age(&self) -> i64 {
chrono::Utc::now().timestamp() - self.last_fetch
}
}

65
src/gui/metric.rs Normal file
View file

@ -0,0 +1,65 @@
use eframe::{egui::{Ui, Layout, Sense, color_picker::show_color_at}, emath::Align, epaint::Color32};
use crate::{data::entities, util::unpack_color};
fn color_square(ui: &mut Ui, color:Color32) {
let size = ui.spacing().interact_size;
let (rect, response) = ui.allocate_exact_size(size, Sense::click());
if ui.is_rect_visible(rect) {
let visuals = ui.style().interact(&response);
let rect = rect.expand(visuals.expansion);
show_color_at(ui.painter(), color, rect);
let rounding = visuals.rounding.at_most(2.0);
ui.painter()
.rect_stroke(rect, rounding, (2.0, visuals.bg_fill)); // fill is intentional, because default style has no border
}
}
pub fn metric_display_ui(ui: &mut Ui, metric: &entities::metrics::Model, _width: f32) {
ui.horizontal(|ui| {
color_square(ui, unpack_color(metric.color));
ui.label(&metric.name);
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
ui.label(format!("panel: {}", metric.panel_id));
ui.label(format!("y: {}", metric.query_y));
// if let Some(query_x) = metric.query_x {
// ui.label(format!("x: {}", query_x));
// }
})
});
});
}
pub fn metric_edit_ui(ui: &mut Ui, metric: &entities::metrics::Model, panels: Option<&Vec<entities::panels::Model>>, width: f32) {
let _text_width = width - 195.0;
ui.horizontal(|ui| {
ui.color_edit_button_srgba(&mut unpack_color(metric.color));
// TextEdit::singleline(&mut metric.name)
// .desired_width(text_width / 2.0)
// .hint_text("name")
// .show(ui);
ui.separator();
// TextEdit::singleline(&mut metric.query_x.unwrap_or("".into()))
// .desired_width(text_width / 4.0)
// .hint_text("x")
// .show(ui);
// TextEdit::singleline(&mut metric.query_y)
// .desired_width(text_width / 4.0)
// .hint_text("y")
// .show(ui);
if let Some(_panels) = panels {
// ComboBox::from_id_source(format!("panel-selector-{}", metric.id))
// .width(60.0)
// .selected_text(format!("panel: {:02}", metric.panel_id))
// .show_ui(ui, |ui| {
// ui.selectable_value(&mut metric.panel_id, -1, "None");
// for p in panels {
// ui.selectable_value(&mut metric.panel_id, p.id, p.name.as_str());
// }
// });
}
});
}

93
src/gui/mod.rs Normal file
View file

@ -0,0 +1,93 @@
pub mod panel;
pub mod source;
pub mod metric;
mod scaffold;
use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel};
use tokio::sync::watch;
use crate::data::entities;
use panel::main_content;
use scaffold::{
// confirmation_popup_delete_metric, confirmation_popup_delete_source, footer,
header,
};
use source::source_panel;
pub struct App {
panels_rx: watch::Receiver<Vec<entities::panels::Model>>,
panels: Vec<entities::panels::Model>,
view_tx: watch::Sender<i64>,
sources: watch::Receiver<Vec<entities::sources::Model>>,
metrics: watch::Receiver<Vec<entities::metrics::Model>>,
points: watch::Receiver<Vec<entities::points::Model>>,
buffer_panel: entities::panels::Model,
buffer_source: entities::sources::Model,
buffer_metric: entities::metrics::Model,
edit: bool,
sidebar: bool,
padding: bool,
}
impl App {
pub fn new(
_cc: &eframe::CreationContext,
panels_rx: watch::Receiver<Vec<entities::panels::Model>>,
sources: watch::Receiver<Vec<entities::sources::Model>>,
metrics: watch::Receiver<Vec<entities::metrics::Model>>,
points: watch::Receiver<Vec<entities::points::Model>>,
view_tx: watch::Sender<i64>,
) -> Self {
let panels = panels_rx.borrow().clone();
Self {
panels_rx, panels, view_tx,
sources, metrics, points,
buffer_panel: entities::panels::Model::default(),
buffer_source: entities::sources::Model::default(),
buffer_metric: entities::metrics::Model::default(),
edit: false,
sidebar: true,
padding: false,
}
}
}
impl eframe::App for App {
fn update(&mut self, ctx: &Context, frame: &mut eframe::Frame) {
TopBottomPanel::top("header").show(ctx, |ui| {
header(self, ui, frame);
});
TopBottomPanel::bottom("footer").show(ctx, |_ui| {
// footer(self.data.clone(), ctx, ui);
});
// if let Some(index) = self.deleting_metric {
// Window::new(format!("Delete Metric #{}?", index))
// .show(ctx, |ui| confirmation_popup_delete_metric(self, ui, index));
// }
// if let Some(index) = self.deleting_source {
// Window::new(format!("Delete Source #{}?", index))
// .show(ctx, |ui| confirmation_popup_delete_source(self, ui, index));
// }
if self.sidebar {
SidePanel::left("sources-bar")
.width_range(if self.edit { 400.0..=1000.0 } else { 280.0..=680.0 })
.default_width(if self.edit { 450.0 } else { 330.0 })
.show(ctx, |ui| source_panel(self, ui));
}
CentralPanel::default().show(ctx, |ui| {
main_content(self, ctx, ui);
});
if let Some(viewsize) = self.panels.iter().map(|p| p.view_size).max() {
self.view_tx.send(viewsize as i64).unwrap();
}
}
}

218
src/gui/panel.rs Normal file
View file

@ -0,0 +1,218 @@
use chrono::{Local, Utc};
use eframe::{egui::{
plot::{Corner, GridMark, Legend, Line, Plot, PlotPoints, PlotPoint},
Ui, ScrollArea, collapsing_header::CollapsingState, Context, Layout, Slider, DragValue,
}, emath::Vec2};
use crate::util::{timestamp_to_str, unpack_color};
use crate::gui::App;
use crate::data::entities;
pub fn main_content(app: &mut App, ctx: &Context, ui: &mut Ui) {
let mut _to_swap: Option<usize> = None;
let mut _to_delete: Option<usize> = None;
ScrollArea::vertical().show(ui, |ui| {
let panels = &mut app.panels;
let _panels_count = panels.len();
let metrics = app.metrics.borrow();
for (index, panel) in panels.iter_mut().enumerate() {
if index > 0 {
ui.separator(); // only show this if there is at least one panel
}
CollapsingState::load_with_default_open(
ctx,
ui.make_persistent_id(format!("panel-{}-compressable", panel.id)),
true,
)
.show_header(ui, |ui| {
// if ui.small_button(" + ").clicked() {
// if index > 0 {
// to_swap = Some(index); // TODO kinda jank but is there a better way?
// }
// }
// if ui.small_button(" ").clicked() {
// if index < panels_count - 1 {
// to_swap = Some(index + 1); // TODO kinda jank but is there a better way?
// }
// }
// if ui.small_button(" × ").clicked() {
// to_delete = Some(index); // TODO kinda jank but is there a better way?
// }
// ui.separator();
panel_title_ui(ui, panel, app.edit);
})
.body(|ui| panel_body_ui(ui, panel, &metrics, &app.points.borrow()));
}
});
}
pub fn panel_edit_inline_ui(_ui: &mut Ui, _panel: &entities::panels::Model) {
// TextEdit::singleline(&mut panel.name)
// .hint_text("name")
// .desired_width(100.0)
// .show(ui);
}
pub fn panel_title_ui(ui: &mut Ui, panel: &mut entities::panels::Model, _edit: bool) { // TODO make edit UI in separate func
ui.horizontal(|ui| {
ui.heading(panel.name.as_str());
ui.separator();
ui.add(Slider::new(&mut panel.height, 0..=500).text("height"));
//ui.separator();
//ui.checkbox(&mut panel.timeserie, "timeserie");
ui.with_layout(Layout::right_to_left(eframe::emath::Align::Min), |ui| {
ui.horizontal(|ui| {
ui.toggle_value(&mut panel.view_scroll, "🔒");
ui.separator();
ui.add(
DragValue::new(&mut panel.view_size)
.speed(10)
.suffix(" min")
.clamp_range(0..=2147483647i32),
);
ui.separator();
ui.add(
DragValue::new(&mut panel.view_offset)
.speed(10)
.suffix(" min")
.clamp_range(0..=2147483647i32),
);
ui.separator();
if panel.reduce_view {
ui.add(
DragValue::new(&mut panel.view_chunks)
.speed(1)
.prefix("x")
.clamp_range(1..=1000), // TODO allow to average larger spans maybe?
);
ui.toggle_value(&mut panel.average_view, "avg");
}
ui.toggle_value(&mut panel.reduce_view, "reduce");
});
});
});
}
pub fn panel_body_ui(ui: &mut Ui, panel: &entities::panels::Model, metrics: &Vec<entities::metrics::Model>, points: &Vec<entities::points::Model>) {
let mut p = Plot::new(format!("plot-{}", panel.name))
.height(panel.height as f32)
.allow_scroll(false)
.legend(Legend::default().position(Corner::LeftTop));
if panel.limit_view {
p = p.set_margin_fraction(Vec2 { x: 0.0, y: 0.1 });
}
if panel.timeserie {
if panel.view_scroll {
let _now = (Utc::now().timestamp() as f64) - (60.0 * panel.view_offset as f64);
p = p.include_x(_now);
if panel.limit_view {
p = p
.include_x(_now + (panel.view_size as f64 * 3.0))
.include_x(_now - (panel.view_size as f64 * 60.0)); // ??? TODO
}
}
p = p
.x_axis_formatter(|x, _range| timestamp_to_str(x as i64, true, false))
.label_formatter(|name, value| {
if !name.is_empty() {
return format!(
"{}\nx = {}\ny = {:.1}",
name,
timestamp_to_str(value.x as i64, false, true),
value.y
);
} else {
return format!(
"x = {}\ny = {:.1}",
timestamp_to_str(value.x as i64, false, true),
value.y
);
}
})
.x_grid_spacer(|grid| {
let offset = Local::now().offset().local_minus_utc() as i64;
let (start, end) = grid.bounds;
let mut counter = (start as i64) - ((start as i64) % 3600);
let mut out: Vec<GridMark> = Vec::new();
loop {
counter += 3600;
if counter > end as i64 {
break;
}
if (counter + offset) % 86400 == 0 {
out.push(GridMark {
value: counter as f64,
step_size: 86400 as f64,
})
} else if counter % 3600 == 0 {
out.push(GridMark {
value: counter as f64,
step_size: 3600 as f64,
});
}
}
return out;
});
}
let mut lines : Vec<Line> = Vec::new();
let now = Utc::now().timestamp() as f64;
let off = (panel.view_offset as f64) * 60.0; // TODO multiplying x60 makes sense only for timeseries
let size = (panel.view_size as f64) * 60.0; // TODO multiplying x60 makes sense only for timeseries
let min_x = now - size - off;
let max_x = now - off;
let chunk_size = if panel.reduce_view { Some(panel.view_chunks) } else { None };
for metric in metrics {
if metric.panel_id == panel.id {
// let values = metric.values(min_x, max_x, chunk_size, panel.average_view);
let mut values : Vec<[f64;2]> = points
.iter()
.filter(|v| v.metric_id == metric.id)
.filter(|v| v.x > min_x as f64)
.filter(|v| v.x < max_x as f64)
.map(|v| [v.x, v.y])
.collect();
if let Some(chunk_size) = chunk_size { // TODO make this less of a mess
let iter = values.chunks(chunk_size as usize);
values = iter.map(|x|
if panel.average_view { avg_value(x) } else {
if x.len() > 0 { x[x.len()-1] } else { [0.0, 0.0 ]}
}).collect();
}
// if !panel.timeserie && panel.view_scroll && values.len() > 0 {
// let l = values.len() - 1;
// p = p.include_x(values[0].x)
// .include_x(values[l].x)
// .include_y(values[0].y)
// .include_y(values[l].y);
// }
lines.push(
Line::new(values)
.name(metric.name.as_str())
.color(unpack_color(metric.color))
);
}
}
p.show(ui, |plot_ui| {
for line in lines {
plot_ui.line(line);
}
});
}
fn avg_value(values: &[[f64;2]]) -> [f64;2] {
let mut x = 0.0;
let mut y = 0.0;
for v in values {
x += v[0];
y += v[1];
}
return [
x / values.len() as f64,
y / values.len() as f64,
];
}

145
src/gui/scaffold.rs Normal file
View file

@ -0,0 +1,145 @@
use std::sync::Arc;
use eframe::{Frame, egui::{collapsing_header::CollapsingState, Context, Ui, Layout, ScrollArea, global_dark_light_mode_switch}, emath::Align};
use crate::data::ApplicationState;
use crate::gui::App;
use super::panel::panel_edit_inline_ui;
// TODO make this not super specific!
pub fn _confirmation_popup_delete_metric(_app: &mut App, ui: &mut Ui, _metric_index: usize) {
ui.heading("Are you sure you want to delete this metric?");
ui.label("This will remove all its metrics and delete all points from archive. This action CANNOT BE UNDONE!");
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if ui.button("\n yes \n").clicked() {
// let store = app.data.storage.lock().expect("Storage Mutex poisoned");
// let mut metrics = app.data.metrics.write().expect("Metrics RwLock poisoned");
// store.delete_metric(metrics[metric_index].id).expect("Failed deleting metric");
// store.delete_values(metrics[metric_index].id).expect("Failed deleting values");
// metrics.remove(metric_index);
// app.deleting_metric = None;
}
if ui.button("\n no \n").clicked() {
// app.deleting_metric = None;
}
});
});
}
// TODO make this not super specific!
pub fn _confirmation_popup_delete_source(_app: &mut App, ui: &mut Ui, _source_index: usize) {
ui.heading("Are you sure you want to delete this source?");
ui.label("This will remove all its metrics and delete all points from archive. This action CANNOT BE UNDONE!");
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if ui.button("\n yes \n").clicked() {
// let store = app.data.storage.lock().expect("Storage Mutex poisoned");
// let mut sources = app.data.sources.write().expect("sources RwLock poisoned");
// let mut metrics = app.data.metrics.write().expect("Metrics RwLock poisoned");
// let mut to_remove = Vec::new();
// for j in 0..metrics.len() {
// if metrics[j].source_id == app.input_source.id {
// store.delete_values(metrics[j].id).expect("Failed deleting values");
// store.delete_metric(metrics[j].id).expect("Failed deleting Metric");
// to_remove.push(j);
// }
// }
// for index in to_remove {
// metrics.remove(index);
// }
// store.delete_source(sources[source_index].id).expect("Failed deleting source");
// sources.remove(source_index);
// app.deleting_source = None;
}
if ui.button("\n no \n").clicked() {
// app.deleting_source = None;
}
});
});
}
pub fn header(app: &mut App, ui: &mut Ui, frame: &mut Frame) {
ui.horizontal(|ui| {
global_dark_light_mode_switch(ui);
ui.heading("dashboard");
ui.separator();
ui.checkbox(&mut app.sidebar, "sources");
ui.separator();
if ui.button("reset").clicked() {
app.panels = app.panels_rx.borrow().clone();
}
ui.separator();
ui.checkbox(&mut app.edit, "edit");
if app.edit {
if ui.button("save").clicked() {
// native_save(app.data.clone());
app.edit = false;
}
ui.separator();
ui.label("+ panel");
panel_edit_inline_ui(ui, &mut app.buffer_panel);
if ui.button("add").clicked() {
// if let Err(e) = app.data.add_panel(&app.input_panel) {
// error!(target: "ui", "Failed to add panel: {:?}", e);
// };
}
ui.separator();
}
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
if ui.small_button("×").clicked() {
frame.close();
}
});
});
});
}
pub fn _footer(_data: Arc<ApplicationState>, ctx: &Context, ui: &mut Ui) {
CollapsingState::load_with_default_open(
ctx,
ui.make_persistent_id("footer-logs"),
false,
)
.show_header(ui, |ui| {
ui.horizontal(|ui| {
ui.separator();
// ui.label(data.file_path.to_str().unwrap()); // TODO maybe calculate it just once?
ui.separator();
// ui.label(human_size(
// *data
// .file_size
// .read()
// .expect("Filesize RwLock poisoned"),
// ));
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| {
ui.label(format!(
"v{}-{}",
env!("CARGO_PKG_VERSION"),
git_version::git_version!()
));
ui.separator();
ui.hyperlink_to("<me@alemi.dev>", "mailto:me@alemi.dev");
ui.label("alemi");
});
});
});
})
.body(|ui| {
ui.set_height(200.0);
ScrollArea::vertical().show(ui, |ui| {
// let msgs = data
// .diagnostics
// .read()
// .expect("Diagnostics RwLock poisoned");
ui.separator();
//for msg in msgs.iter() {
// ui.label(msg);
//}
ui.separator();
});
});
}

View file

@ -1,35 +1,33 @@
use eframe::{ use eframe::{
egui::{Checkbox, DragValue, Layout, ScrollArea, TextEdit, Ui}, egui::{Layout, ScrollArea, Ui},
emath::Align, epaint::Color32, emath::Align,
}; };
use rfd::FileDialog; use rfd::FileDialog;
use tracing::error; use tracing::error;
use crate::app::{ use crate::util::deserialize_values;
data::source::{Metric, Source}, use crate::gui::App;
util::{deserialize_values, serialize_values}, use crate::data::entities;
App,
};
use super::metric::{metric_display_ui, metric_edit_ui}; use super::metric::{metric_display_ui, metric_edit_ui};
pub fn source_panel(app: &mut App, ui: &mut Ui) { pub fn source_panel(app: &mut App, ui: &mut Ui) {
let mut source_to_put_metric_on : Option<i32> = None; let mut source_to_put_metric_on : Option<i32> = None;
let mut to_swap: Option<usize> = None; let mut to_swap: Option<usize> = None;
let mut to_insert: Vec<Metric> = Vec::new(); let _to_insert: Vec<entities::metrics::Model> = Vec::new();
// let mut to_delete: Option<usize> = None; // let mut to_delete: Option<usize> = None;
let panels = app.data.panels.read().expect("Panels RwLock poisoned"); let panels = &app.panels;
let panel_width = ui.available_width(); let panel_width = ui.available_width();
ScrollArea::vertical() ScrollArea::vertical()
.max_width(panel_width) .max_width(panel_width)
.show(ui, |ui| { .show(ui, |ui| {
// TODO only vertical! // TODO only vertical!
{ {
let mut sources = app.data.sources.write().expect("Sources RwLock poisoned"); let sources = app.sources.borrow();
let sources_count = sources.len(); let sources_count = sources.len();
ui.heading("Sources"); ui.heading("Sources");
ui.separator(); ui.separator();
for (i, source) in sources.iter_mut().enumerate() { for (i, source) in sources.iter().enumerate() {
ui.horizontal(|ui| { ui.horizontal(|ui| {
if app.edit { // show buttons to move sources up and down if app.edit { // show buttons to move sources up and down
ui.vertical(|ui| { ui.vertical(|ui| {
@ -53,16 +51,14 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
ui.horizontal(|ui| { ui.horizontal(|ui| {
source_edit_ui(ui, source, remaining_width - 34.0); source_edit_ui(ui, source, remaining_width - 34.0);
if ui.small_button("×").clicked() { if ui.small_button("×").clicked() {
app.deleting_metric = None; // app.deleting_metric = None;
app.deleting_source = Some(i); // app.deleting_source = Some(i);
} }
}); });
let mut metrics = app let metrics = app
.data
.metrics .metrics
.write() .borrow();
.expect("Metrics RwLock poisoned"); for (_j, metric) in metrics.iter().enumerate() {
for (j, metric) in metrics.iter_mut().enumerate() {
if metric.source_id == source.id { if metric.source_id == source.id {
ui.horizontal(|ui| { ui.horizontal(|ui| {
metric_edit_ui( metric_edit_ui(
@ -76,21 +72,21 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
.add_filter("csv", &["csv"]) .add_filter("csv", &["csv"])
.set_file_name(format!("{}-{}.csv", source.name, metric.name).as_str()) .set_file_name(format!("{}-{}.csv", source.name, metric.name).as_str())
.save_file(); .save_file();
if let Some(path) = path { if let Some(_path) = path {
serialize_values( // serialize_values(
&*metric // &*metric
.data // .data
.read() // .read()
.expect("Values RwLock poisoned"), // .expect("Values RwLock poisoned"),
metric, // metric,
path, // path,
) // )
.expect("Could not serialize data"); // .expect("Could not serialize data");
} }
} }
if ui.small_button("×").clicked() { if ui.small_button("×").clicked() {
app.deleting_source = None; // app.deleting_source = None;
app.deleting_metric = Some(j); // app.deleting_metric = Some(j);
} }
}); });
} }
@ -98,7 +94,7 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
ui.horizontal(|ui| { ui.horizontal(|ui| {
metric_edit_ui( metric_edit_ui(
ui, ui,
&mut app.input_metric, &mut app.buffer_metric,
None, None,
remaining_width - 53.0, remaining_width - 53.0,
); );
@ -113,30 +109,30 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
.pick_file(); .pick_file();
if let Some(path) = path { if let Some(path) = path {
match deserialize_values(path) { match deserialize_values(path) {
Ok((name, query_x, query_y, data)) => { Ok((_name, _query_x, _query_y, _data)) => {
let mut store = app // let mut store = app
.data // .data
.storage // .storage
.lock() // .lock()
.expect("Storage Mutex poisoned"); // .expect("Storage Mutex poisoned");
match store.new_metric( // match store.new_metric(
name.as_str(), // name.as_str(),
source.id, // source.id,
query_x.as_str(), // query_x.as_str(),
query_y.as_str(), // query_y.as_str(),
-1, // -1,
Color32::TRANSPARENT, // Color32::TRANSPARENT,
metrics.len() as i32, // metrics.len() as i32,
) { // ) {
Ok(verified_metric) => { // Ok(verified_metric) => {
store.put_values(verified_metric.id, &data).unwrap(); // store.put_values(verified_metric.id, &data).unwrap();
*verified_metric.data.write().expect("Values RwLock poisoned") = data; // *verified_metric.data.write().expect("Values RwLock poisoned") = data;
to_insert.push(verified_metric); // to_insert.push(verified_metric);
} // }
Err(e) => { // Err(e) => {
error!(target: "ui", "could not save metric into archive : {:?}", e); // error!(target: "ui", "could not save metric into archive : {:?}", e);
} // }
} // }
} }
Err(e) => { Err(e) => {
error!(target: "ui", "Could not deserialize metric from file : {:?}", e); error!(target: "ui", "Could not deserialize metric from file : {:?}", e);
@ -145,13 +141,12 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
} }
} }
if ui.small_button("×").clicked() { if ui.small_button("×").clicked() {
app.input_metric = Metric::default(); app.buffer_metric = entities::metrics::Model::default();
} }
}) })
}); });
} else { } else {
let metrics = let metrics = app.metrics.borrow();
app.data.metrics.read().expect("Metrics RwLock poisoned");
source_display_ui(ui, source, remaining_width); source_display_ui(ui, source, remaining_width);
for metric in metrics.iter() { for metric in metrics.iter() {
if metric.source_id == source.id { if metric.source_id == source.id {
@ -171,17 +166,17 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
ui.with_layout(Layout::top_down(Align::RIGHT), |ui| { ui.with_layout(Layout::top_down(Align::RIGHT), |ui| {
ui.horizontal(|ui| { ui.horizontal(|ui| {
if ui.button("add").clicked() { if ui.button("add").clicked() {
if let Err(e) = app.data.add_source(&app.input_source) { // if let Err(e) = app.data.add_source(&app.input_source) {
error!(target: "ui", "Error adding source : {:?}", e); // error!(target: "ui", "Error adding source : {:?}", e);
} else { // } else {
app.input_source.id += 1; // app.input_source.id += 1;
} // }
} }
ui.toggle_value(&mut app.padding, "#"); ui.toggle_value(&mut app.padding, "#");
}); });
}); });
}); });
source_edit_ui(ui, &mut app.input_source, panel_width - 10.0); source_edit_ui(ui, &mut app.buffer_source, panel_width - 10.0);
ui.add_space(5.0); ui.add_space(5.0);
if app.padding { if app.padding {
ui.add_space(300.0); ui.add_space(300.0);
@ -193,53 +188,53 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
// let mut panels = app.data.panels.write().expect("Panels RwLock poisoned"); // let mut panels = app.data.panels.write().expect("Panels RwLock poisoned");
// panels.remove(i); // panels.remove(i);
// } else // } else
if let Some(i) = to_swap { // if let Some(i) = to_swap {
// TODO can this be done in background? idk // // TODO can this be done in background? idk
let mut sources = app.data.sources.write().expect("Sources RwLock poisoned"); // let mut sources = app.sources.borrow();
sources.swap(i - 1, i); // sources.swap(i - 1, i);
} // }
if to_insert.len() > 0 { // if to_insert.len() > 0 {
let mut metrics = app.data.metrics.write().expect("Metrics RwLock poisoned"); // let mut metrics = app.metrics.borrow();
for m in to_insert { // for m in to_insert {
metrics.push(m); // metrics.push(m);
} // }
} // }
if let Some(s) = source_to_put_metric_on { if let Some(s) = source_to_put_metric_on {
for source in app.data.sources.read().expect("Sources RwLock poisoned").iter() { for source in app.sources.borrow().iter() {
if source.id == s { if source.id == s {
if let Err(e) = // if let Err(e) =
app.data.add_metric(&app.input_metric, &source) // app.data.add_metric(&app.input_metric, &source)
{ // {
error!(target: "ui", "Error adding metric : {:?}", e); // error!(target: "ui", "Error adding metric : {:?}", e);
} // }
} }
} }
} }
} }
pub fn source_display_ui(ui: &mut Ui, source: &mut Source, _width: f32) { pub fn source_display_ui(_ui: &mut Ui, _source: &entities::sources::Model, _width: f32) {
ui.horizontal(|ui| { // ui.horizontal(|ui| {
ui.add_enabled(false, Checkbox::new(&mut source.enabled, "")); // ui.add_enabled(false, Checkbox::new(&mut source.enabled, ""));
ui.add_enabled( // ui.add_enabled(
false, // false,
DragValue::new(&mut source.interval).clamp_range(1..=120), // DragValue::new(&mut source.interval).clamp_range(1..=120),
); // );
ui.heading(&source.name).on_hover_text(&source.url); // ui.heading(&source.name).on_hover_text(&source.url);
}); // });
} }
pub fn source_edit_ui(ui: &mut Ui, source: &mut Source, width: f32) { pub fn source_edit_ui(_ui: &mut Ui, _source: &entities::sources::Model, _width: f32) {
ui.horizontal(|ui| { // ui.horizontal(|ui| {
let text_width = width - 100.0; // let text_width = width - 100.0;
ui.checkbox(&mut source.enabled, ""); // ui.checkbox(&mut source.enabled, "");
ui.add(DragValue::new(&mut source.interval).clamp_range(1..=3600)); // ui.add(DragValue::new(&mut source.interval).clamp_range(1..=3600));
TextEdit::singleline(&mut source.name) // TextEdit::singleline(&mut source.name)
.desired_width(text_width / 4.0) // .desired_width(text_width / 4.0)
.hint_text("name") // .hint_text("name")
.show(ui); // .show(ui);
TextEdit::singleline(&mut source.url) // TextEdit::singleline(&mut source.url)
.desired_width(text_width * 3.0 / 4.0) // .desired_width(text_width * 3.0 / 4.0)
.hint_text("url") // .hint_text("url")
.show(ui); // .show(ui);
}); // });
} }

View file

@ -1,56 +1,139 @@
mod app; mod gui;
mod data;
mod util;
mod worker;
use crate::app::{
data::ApplicationState,
util::InternalLogger,
worker::{BackgroundWorker, NativeBackgroundWorker},
App,
};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::metadata::LevelFilter; use tracing::metadata::LevelFilter;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
use tracing::info;
use tracing_subscriber::filter::filter_fn;
use clap::Parser;
use tokio::sync::watch;
use sea_orm::Database;
use worker::{surveyor_loop, visualizer_loop};
use gui::{
// util::InternalLogger,
App
};
/// Data gatherer and visualization tool
#[derive(Parser, Debug)]
#[command(author, version, about)]
struct CliArgs {
/// Connection string for database to use
db: String,
/// Run background worker
#[arg(long, default_value_t = false)]
worker: bool,
/// Run user interface
#[arg(long, default_value_t = false)]
gui: bool,
/// Check interval for background worker
#[arg(short, long, default_value_t = 5)]
interval: u64,
/// How often sources and metrics are refreshed
#[arg(short, long, default_value_t = 300)]
cache_time: u64,
}
// When compiling natively: // When compiling natively:
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
fn main() -> ! { fn main() {
let native_options = eframe::NativeOptions::default(); let args = CliArgs::parse();
let args: Vec<String> = std::env::args().collect();
// Set default file location
let mut store_path = dirs::data_dir().unwrap_or(PathBuf::from(".")); // TODO get cwd more consistently?
store_path.push("dashboard.db");
// Can be overruled from argv
for (index, arg) in args.iter().enumerate() {
if index <= 0 || arg.eq("--") {
continue;
}
store_path = PathBuf::from(arg.as_str());
break;
}
println!("path: {}", store_path.to_str().unwrap());
let store =
Arc::new(ApplicationState::new(store_path).expect("Failed creating application state"));
tracing_subscriber::registry() tracing_subscriber::registry()
.with(LevelFilter::INFO) .with(LevelFilter::INFO)
.with(filter_fn(|x| x.target() != "sqlx::query"))
.with(tracing_subscriber::fmt::Layer::new()) .with(tracing_subscriber::fmt::Layer::new())
.with(InternalLogger::new(store.clone())) // .with(InternalLogger::new(store.clone()))
.init(); .init();
// // Set default file location
// let mut store_path = dirs::data_dir().unwrap_or(PathBuf::from(".")); // TODO get cwd more consistently?
// store_path.push("dashboard.db");
// let store =
// Arc::new(ApplicationState::new(store_path).expect("Failed creating application state"));
let (panel_tx, panel_rx) = watch::channel(vec![]);
let (source_tx, source_rx) = watch::channel(vec![]);
let (metric_tx, metric_rx) = watch::channel(vec![]);
let (point_tx, point_rx) = watch::channel(vec![]);
let (view_tx, view_rx) = watch::channel(1440);
let worker = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let db = Database::connect(args.db.clone()).await.unwrap();
info!(target: "launcher", "Connected to '{}'", args.db);
let mut jobs = vec![];
if args.worker {
jobs.push(
tokio::spawn(
surveyor_loop(
db.clone(),
args.interval as i64,
args.cache_time as i64,
)
)
);
}
if args.gui {
jobs.push(
tokio::spawn(
visualizer_loop(
db.clone(),
args.interval,
args.cache_time as i64,
panel_tx,
source_tx,
metric_tx,
point_tx,
view_rx,
)
)
);
}
for job in jobs { job.await.unwrap() }
info!(target: "launcher", "Stopping background worker");
})
});
if args.gui {
let native_options = eframe::NativeOptions::default();
eframe::run_native( eframe::run_native(
// TODO replace this with a loop that ends so we can cleanly exit the background worker // TODO replace this with a loop that ends so we can cleanly exit the background worker
"dashboard", "dashboard",
native_options, native_options,
Box::new(move |cc| { Box::new(
let _worker = NativeBackgroundWorker::start(store.clone(), cc.egui_ctx.clone()); move |cc| Box::new(
Box::new(App::new(cc, store)) App::new(
}), cc,
panel_rx,
source_rx,
metric_rx,
point_rx,
view_tx,
)
)
),
); );
}
// worker.stop(); worker.join().unwrap();
} }

View file

@ -3,14 +3,20 @@ use eframe::egui::{Color32, plot::PlotPoint};
use std::{sync::Arc, error::Error, path::PathBuf}; use std::{sync::Arc, error::Error, path::PathBuf};
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
use super::data::{ApplicationState, source::Metric}; use super::data::{ApplicationState, entities};
// if you're handling more than terabytes of data, it's the future and you ought to update this code! // if you're handling more than terabytes of data, it's the future and you ought to update this code!
const PREFIXES: &'static [&'static str] = &["", "k", "M", "G", "T"]; const _PREFIXES: &'static [&'static str] = &["", "k", "M", "G", "T"];
pub fn serialize_values(values: &Vec<PlotPoint>, metric: &Metric, path: PathBuf) -> Result<(), Box<dyn Error>> { pub fn _serialize_values(values: &Vec<PlotPoint>, metric: &entities::metrics::Model, path: PathBuf) -> Result<(), Box<dyn Error>> {
let mut wtr = csv::Writer::from_writer(std::fs::File::create(path)?); let mut wtr = csv::Writer::from_writer(std::fs::File::create(path)?);
wtr.write_record(&[metric.name.as_str(), metric.query_x.as_str(), metric.query_y.as_str()])?; // DAMN! VVVVV
let def_q_x = "".into();
let name = metric.name.as_str();
let q_x = metric.query_x.as_ref().unwrap_or(&def_q_x).as_str();
let q_y = metric.query_y.as_str();
wtr.write_record(&[name, q_x, q_y])?;
// DAMN! AAAAA
for v in values { for v in values {
wtr.serialize(("", v.x, v.y))?; wtr.serialize(("", v.x, v.y))?;
} }
@ -45,15 +51,15 @@ pub fn deserialize_values(path: PathBuf) -> Result<(String, String, String, Vec<
)) ))
} }
pub fn human_size(size: u64) -> String { pub fn _human_size(size: u64) -> String {
let mut buf: f64 = size as f64; let mut buf: f64 = size as f64;
let mut prefix: usize = 0; let mut prefix: usize = 0;
while buf > 1024.0 && prefix < PREFIXES.len() - 1 { while buf > 1024.0 && prefix < _PREFIXES.len() - 1 {
buf /= 1024.0; buf /= 1024.0;
prefix += 1; prefix += 1;
} }
return format!("{:.3} {}B", buf, PREFIXES[prefix]); return format!("{:.3} {}B", buf, _PREFIXES[prefix]);
} }
pub fn timestamp_to_str(t: i64, date: bool, time: bool) -> String { pub fn timestamp_to_str(t: i64, date: bool, time: bool) -> String {
@ -95,12 +101,12 @@ pub fn repack_color(c: Color32) -> u32 {
} }
pub struct InternalLogger { pub struct InternalLogger {
state: Arc<ApplicationState>, _state: Arc<ApplicationState>,
} }
impl InternalLogger { impl InternalLogger {
pub fn new(state: Arc<ApplicationState>) -> Self { pub fn _new(state: Arc<ApplicationState>) -> Self {
InternalLogger { state } InternalLogger { _state: state }
} }
} }
@ -117,18 +123,18 @@ where
msg: "".to_string(), msg: "".to_string(),
}; };
event.record(&mut msg_visitor); event.record(&mut msg_visitor);
let out = format!( let _out = format!(
"{} [{}] {}: {}", "{} [{}] {}: {}",
Local::now().format("%H:%M:%S"), Local::now().format("%H:%M:%S"),
event.metadata().level(), event.metadata().level(),
event.metadata().target(), event.metadata().target(),
msg_visitor.msg msg_visitor.msg
); );
self.state // self.state
.diagnostics // .diagnostics
.write() // .write()
.expect("Diagnostics RwLock poisoned") // .expect("Diagnostics RwLock poisoned")
.push(out); // .push(out);
} }
} }

159
src/worker.rs Normal file
View file

@ -0,0 +1,159 @@
use chrono::Utc;
use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait, Condition, ColumnTrait, QueryFilter};
use tokio::sync::watch;
use tracing::{error, info};
use std::collections::VecDeque;
use super::data::{entities, FetchError};
async fn fetch(url: &str) -> Result<serde_json::Value, FetchError> {
Ok(reqwest::get(url).await?.json().await?)
}
pub async fn surveyor_loop(
db: DatabaseConnection,
interval:i64,
cache_time:i64,
) {
let mut last_check = Utc::now().timestamp();
let mut last_fetch = 0;
let mut sources = vec![];
let mut metrics = vec![];
loop {
// sleep until next activation
let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check);
if delta_time > 0 {
std::thread::sleep(std::time::Duration::from_secs(delta_time as u64));
}
last_check = Utc::now().timestamp();
if Utc::now().timestamp() - last_fetch > cache_time {
// TODO do both concurrently
let res = tokio::join!(
entities::sources::Entity::find().all(&db),
entities::metrics::Entity::find().all(&db)
);
sources = res.0.unwrap();
metrics = res.1.unwrap();
last_fetch = Utc::now().timestamp();
}
for source in sources.iter_mut() {
if !source.enabled || !source.ready() {
continue;
}
// source.last_fetch = Utc::now(); // TODO! do it in background threads again!
// tokio::spawn(async move {
match fetch(&source.url).await {
Ok(res) => {
let now = Utc::now().timestamp();
entities::sources::Entity::update(
entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()}
).exec(&db).await.unwrap();
source.last_update = now;
for metric in metrics.iter().filter(|x| source.id == x.source_id) {
match metric.extract(&res) {
Ok(v) => {
entities::points::Entity::insert(
entities::points::ActiveModel {
id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y),
}).exec(&db).await.unwrap();
},
Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e),
}
}
},
Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e),
}
// source.last_fetch = Utc::now(); // TODO!
// });
}
// if let Ok(meta) = std::fs::metadata(state.file_path.clone()) {
// let mut fsize = state.file_size.write().expect("File Size RwLock poisoned");
// *fsize = meta.len();
// } // ignore errors
// ctx.request_repaint();
}
}
pub async fn visualizer_loop(
db: DatabaseConnection,
interval: u64,
cache_time: i64,
panels_tx: watch::Sender<Vec<entities::panels::Model>>,
sources_tx: watch::Sender<Vec<entities::sources::Model>>,
metrics_tx: watch::Sender<Vec<entities::metrics::Model>>,
points_tx: watch::Sender<Vec<entities::points::Model>>,
view_rx: watch::Receiver<i64>,
) {
let mut points : VecDeque<entities::points::Model> = VecDeque::new();
let mut last_fetch = 0;
let mut width = *view_rx.borrow() * 60; // TODO it's in minutes somewhere...
let mut lower = Utc::now().timestamp() - width;
let mut changes;
loop {
if Utc::now().timestamp() - last_fetch >= cache_time {
panels_tx.send(entities::panels::Entity::find().all(&db).await.unwrap()).unwrap();
sources_tx.send(entities::sources::Entity::find().all(&db).await.unwrap()).unwrap();
metrics_tx.send(entities::metrics::Entity::find().all(&db).await.unwrap()).unwrap();
last_fetch = Utc::now().timestamp();
info!(target: "worker", "Updated panels, sources and metrics");
}
changes = false;
let now = Utc::now().timestamp();
let new_width = *view_rx.borrow() * 60; // TODO it's in minutes somewhere...
if new_width != width {
let mut lower_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(now - new_width))
.add(entities::points::Column::X.lte(now - width))
)
.all(&db)
.await.unwrap();
lower_points.reverse(); // TODO wasteful!
for p in lower_points {
points.push_front(p);
changes = true;
}
}
width = new_width;
let new_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(lower as f64))
)
.all(&db)
.await.unwrap();
lower = Utc::now().timestamp();
while let Some(p) = points.get(0) {
if (p.x as i64) >= lower - (*view_rx.borrow() * 60) {
break;
}
points.pop_front();
changes = true;
}
for p in new_points {
points.push_back(p);
changes = true;
}
if changes {
points_tx.send(points.clone().into()).unwrap();
}
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
}
}