From a4e26d153306c115987f24f30b9c00b5e8279f21 Mon Sep 17 00:00:00 2001 From: alemidev Date: Tue, 7 Jun 2022 00:05:45 +0200 Subject: [PATCH] feat: arbitrary sources, background worker implemented a reliable sqlite storage, a way to save and load values, a native-only background worker to fetch data and update values --- Cargo.toml | 10 +- src/app/data/mod.rs | 85 ++++++++++++--- src/app/data/store.rs | 241 +++++++++++++++++++++++++----------------- src/app/mod.rs | 173 +++++++++++++++++++++--------- src/app/worker.rs | 94 ++++++++++++++++ src/main.rs | 25 +---- src/util/mod.rs | 1 - src/util/worker.rs | 47 -------- 8 files changed, 439 insertions(+), 237 deletions(-) create mode 100644 src/app/worker.rs delete mode 100644 src/util/mod.rs delete mode 100644 src/util/worker.rs diff --git a/Cargo.toml b/Cargo.toml index 77b64f0..a4a4e72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,20 +13,18 @@ path = "src/main.rs" crate-type = ["cdylib", "rlib"] [dependencies] -dirs = "4" rand = "0.8" chrono = { version = "0.4", features = ["wasmbind"] } eframe = { version = "0.18", features = ["persistence"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -jq-rs = "0.4" -rusqlite = { version = "0.27" } -ehttp = "0.2.0" -reqwest = { version = "0.11", features = ["json"] } # native only dependancies: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tokio = { version = "1", features = ["full"] } # TODO do we need full features? +dirs = "4" +rusqlite = { version = "0.27" } +jq-rs = "0.4" +ureq = { version = "2", features = ["json"] } # web only dependancies: [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/src/app/data/mod.rs b/src/app/data/mod.rs index 54b8923..77eb50d 100644 --- a/src/app/data/mod.rs +++ b/src/app/data/mod.rs @@ -1,21 +1,29 @@ -pub mod source; +// pub mod source; pub mod store; -use std::sync::{Arc, Mutex}; +use std::path::PathBuf; +use std::sync::{RwLock, Mutex, Arc}; use std::num::ParseFloatError; use chrono::{DateTime, Utc}; use eframe::egui::plot::{Values, Value}; +use self::store::SQLiteDataStore; + #[derive(Debug)] pub enum FetchError { - ReqwestError(reqwest::Error), + UreqError(ureq::Error), + IoError(std::io::Error), JqError(jq_rs::Error), RusqliteError(rusqlite::Error), ParseFloatError(ParseFloatError), + NoPanelWithThatIdError, } -impl From:: for FetchError { - fn from(e: reqwest::Error) -> Self { FetchError::ReqwestError(e) } +impl From:: for FetchError { + fn from(e: ureq::Error) -> Self { FetchError::UreqError(e) } +} +impl From:: for FetchError { + fn from(e: std::io::Error) -> Self { FetchError::IoError(e) } } impl From:: for FetchError { fn from(e: jq_rs::Error) -> Self { FetchError::JqError(e) } @@ -27,16 +35,54 @@ impl From:: for FetchError { fn from(e: rusqlite::Error) -> Self { FetchError::RusqliteError(e) } } +pub struct ApplicationState { + pub run: bool, + pub panels: RwLock>, + pub storage: Mutex, +} +impl ApplicationState { + pub fn new(path:PathBuf) -> Self { + let storage = SQLiteDataStore::new(path).unwrap(); + + let panels = storage.load_panels().unwrap(); + + return ApplicationState{ + run: true, + panels: RwLock::new(panels), + storage: Mutex::new(storage), + }; + } + + pub fn add_panel(&self, name:&str) -> Result<(), FetchError> { + let panel = self.storage.lock().unwrap().new_panel(name, 100, 200, 280)?; // TODO make values customizable and useful + self.panels.write().unwrap().push(panel); + Ok(()) + } + + pub fn add_source(&self, panel_id:i32, name:&str, url:&str, query_x:&str, query_y:&str) -> Result<(), FetchError> { + let source = self.storage.lock().unwrap().new_source(panel_id, name, url, query_x, query_y)?; + let panels = self.panels.read().unwrap(); + for panel in &*panels { + if panel.id == panel_id { + panel.sources.write().unwrap().push(source); + return Ok(()); + } + } + Err(FetchError::NoPanelWithThatIdError) + } +} pub struct Panel { pub(crate) id: i32, pub name: String, pub view_scroll: bool, pub view_size: i32, + pub timeserie: bool, pub(crate) width: i32, pub(crate) height: i32, - pub(crate) sources: Mutex>, + pub(crate) sources: RwLock>, + } impl Panel { @@ -47,29 +93,36 @@ pub struct Source { pub name: String, pub url: String, pub interval: i32, - pub(crate) last_fetch: DateTime, + pub(crate) last_fetch: RwLock>, pub query_x: String, // pub(crate) compiled_query_x: Arc>, pub query_y: String, // pub(crate) compiled_query_y: Arc>, pub(crate) panel_id: i32, - pub(crate) data: Mutex>, + pub(crate) data: RwLock>, } impl Source { pub fn valid(&self) -> bool { - return (Utc::now() - self.last_fetch).num_seconds() < self.interval as i64; + let last_fetch = self.last_fetch.read().unwrap(); + return (Utc::now() - *last_fetch).num_seconds() < self.interval as i64; } pub fn values(&self) -> Values { - Values::from_values(self.data.lock().unwrap().clone()) + Values::from_values(self.data.read().unwrap().clone()) } - pub async fn fetch(&self) -> Result { - let res = reqwest::get(&self.url).await?; - let body = res.text().await?; - let x = jq_rs::compile(&self.query_x)?.run(&body)?.parse::()?; - let y = jq_rs::compile(&self.query_y)?.run(&body)?.parse::()?; - return Ok( Value { x, y } ); +} + +pub fn fetch(url:&str, query_x:&str, query_y:&str) -> Result { + let res = ureq::get(url).call()?; + let body = res.into_string()?; + let x : f64; + if query_x.len() > 0 { + x = jq_rs::compile(query_x)?.run(&body)?.trim().parse::()?; // TODO precompile and guard with a mutex + } else { + x = Utc::now().timestamp() as f64; } + let y = jq_rs::compile(query_y)?.run(&body)?.trim().parse::()?; + return Ok( Value { x, y } ); } \ No newline at end of file diff --git a/src/app/data/store.rs b/src/app/data/store.rs index 78a6fb7..567c374 100644 --- a/src/app/data/store.rs +++ b/src/app/data/store.rs @@ -1,30 +1,30 @@ -use std::sync::{Arc, Mutex}; -use chrono::{DateTime, TimeZone, NaiveDateTime, Utc}; -use rusqlite::{Connection, params}; -use eframe::egui::plot::Value; use crate::app::data::{Panel, Source}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; +use eframe::egui::plot::Value; +use rusqlite::{params, Connection}; +use std::sync::{Arc, RwLock}; use super::FetchError; pub trait DataStorage { - fn add_panel(&self, name:&str); + fn add_panel(&self, name: &str); } pub struct SQLiteDataStore { conn: Connection, - pub(crate) panels: Mutex>, } impl SQLiteDataStore { pub fn new(path: std::path::PathBuf) -> Result { let conn = Connection::open(path)?; - + conn.execute( "CREATE TABLE IF NOT EXISTS panels ( id INTEGER PRIMARY KEY, name TEXT UNIQUE, view_scroll BOOL, view_size INT, + timeserie BOOL, width INT, height INT );", @@ -34,7 +34,7 @@ impl SQLiteDataStore { conn.execute( "CREATE TABLE IF NOT EXISTS sources ( id INTEGER PRIMARY KEY, - name TEXT UNIQUE, + name TEXT, url TEXT, interval INT, query_x TEXT, @@ -55,21 +55,21 @@ impl SQLiteDataStore { [], )?; - let mut store = SQLiteDataStore { - conn, - panels: Mutex::new(Vec::new()), - }; - - store.load_panels()?; - - return Ok(store); + Ok(SQLiteDataStore { conn }) } - fn load_values(&self, panel_id:i32, source_id:i32) -> rusqlite::Result> { - let mut values : Vec = Vec::new(); - let mut statement = self.conn.prepare("SELECT x, y FROM points WHERE panel_id = ? AND source_id = ?")?; + + + pub fn load_values(&self, panel_id: i32, source_id: i32) -> rusqlite::Result> { + let mut values: Vec = Vec::new(); + let mut statement = self + .conn + .prepare("SELECT x, y FROM points WHERE panel_id = ? AND source_id = ?")?; let values_iter = statement.query_map(params![panel_id, source_id], |row| { - Ok(Value{ x: row.get(0)?, y: row.get(1)? }) + Ok(Value { + x: row.get(0)?, + y: row.get(1)?, + }) })?; for value in values_iter { @@ -81,35 +81,39 @@ impl SQLiteDataStore { Ok(values) } - fn put_value(&self, panel_id:i32, source_id:i32, v:Value) -> rusqlite::Result { + pub fn put_value(&self, panel_id: i32, source_id: i32, v: Value) -> rusqlite::Result { self.conn.execute( "INSERT INTO points(panel_id, source_id, x, y) VALUES (?, ?, ?, ?)", params![panel_id, source_id, v.x, v.y], ) } - fn load_sources(&self, panel_id:i32) -> rusqlite::Result> { - let mut sources : Vec = Vec::new(); - let mut statement = self.conn.prepare("SELECT * FROM sources WHERE panel_id = ?")?; + + + pub fn load_sources(&self, panel_id: i32) -> rusqlite::Result> { + let mut sources: Vec = Vec::new(); + let mut statement = self + .conn + .prepare("SELECT * FROM sources WHERE panel_id = ?")?; let sources_iter = statement.query_map(params![panel_id], |row| { - Ok(Source{ + Ok(Source { id: row.get(0)?, name: row.get(1)?, url: row.get(2)?, interval: row.get(3)?, - last_fetch: Utc.ymd(1970, 1, 1).and_hms(0, 0, 0), + last_fetch: RwLock::new(Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)), query_x: row.get(4)?, // compiled_query_x: Arc::new(Mutex::new(jq_rs::compile(row.get::(4)?.as_str()).unwrap())), query_y: row.get(5)?, // compiled_query_y: Arc::new(Mutex::new(jq_rs::compile(row.get::(5)?.as_str()).unwrap())), panel_id: row.get(6)?, - data: Mutex::new(Vec::new()), + data: RwLock::new(Vec::new()), }) })?; for source in sources_iter { if let Ok(mut s) = source { - s.data = Mutex::new(self.load_values(panel_id, s.id)?); + s.data = RwLock::new(self.load_values(panel_id, s.id)?); sources.push(s); } } @@ -117,58 +121,33 @@ impl SQLiteDataStore { Ok(sources) } - fn put_source(&self, panel_id:i32, s:Source) -> rusqlite::Result { - self.conn.execute( - "INSERT INTO sources(id, name, url, interval, query_x, query_y, panel_id) VALUES (?, ?, ?, ?, ?, ?, ?)", - params![s.id, s.name, s.url, s.interval, s.query_x, s.query_y, panel_id], - ) - } - - fn load_panels(&self) -> rusqlite::Result> { - let mut panels : Vec = Vec::new(); - let mut statement = self.conn.prepare("SELECT * FROM panels")?; - let panels_iter = statement.query_map([], |row| { - Ok(Panel{ - id: row.get(0)?, - name: row.get(1)?, - view_scroll: row.get(2)?, - view_size: row.get(3)?, - width: row.get(4)?, - height: row.get(5)?, - sources: Mutex::new(Vec::new()), - }) - })?; - - for panel in panels_iter { - if let Ok(mut p) = panel { - p.sources = Mutex::new(self.load_sources(p.id)?); - panels.push(p); - } - } - - Ok(panels) - } - - fn put_panel(&self, name:&str, view_scroll:bool, view_size:i32, width:i32, height:i32) -> rusqlite::Result { - self.conn.execute( - "INSERT INTO panels (name, view_scroll, view_size, width, height) VALUES (?, ?, ?, ?, ?)", - params![name, view_scroll, view_size, width, height] - ) - } - // jank! TODO make it not jank! - fn new_panel(&self, name:&str) -> rusqlite::Result { - self.put_panel(name, true, 100, 400, 280)?; - let mut statement = self.conn.prepare("SELECT * FROM panels WHERE name = ?")?; - for panel in statement.query_map(params![name], |row| { - Ok(Panel{ + pub fn new_source( + &self, + panel_id: i32, + name: &str, + url: &str, + query_x: &str, + query_y: &str, + ) -> rusqlite::Result { + self.conn.execute( + "INSERT INTO sources(name, url, interval, query_x, query_y, panel_id) VALUES (?, ?, ?, ?, ?, ?)", + params![name, url, 60, query_x, query_y, panel_id], + )?; + let mut statement = self + .conn + .prepare("SELECT * FROM sources WHERE name = ? AND panel_id = ?")?; + for panel in statement.query_map(params![name, panel_id], |row| { + Ok(Source { id: row.get(0)?, name: row.get(1)?, - view_scroll: row.get(2)?, - view_size: row.get(3)?, - width: row.get(4)?, - height: row.get(5)?, - sources: Mutex::new(Vec::new()), + url: row.get(2)?, + interval: row.get(3)?, + query_x: row.get(4)?, + query_y: row.get(5)?, + panel_id: row.get(6)?, + last_fetch: RwLock::new(Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)), + data: RwLock::new(Vec::new()), }) })? { if let Ok(p) = panel { @@ -181,27 +160,99 @@ impl SQLiteDataStore { Err(rusqlite::Error::QueryReturnedNoRows) } - pub async fn fetch_all(&self) -> Result<(), FetchError> { - let panels = &*self.panels.lock().unwrap(); - for i in 0..panels.len() { - let sources = &*panels[i].sources.lock().unwrap(); - for j in 0..sources.len() { - if !sources[j].valid() { - let v = sources[j].fetch().await?; - self.put_value(panels[i].id, sources[j].id, v)?; - sources[j].data.lock().unwrap().push(v); - } + pub fn update_source( + &self, + source_id: i32, + name: &str, + url: &str, + interval: i32, + query_x: &str, + query_y: &str, + ) -> rusqlite::Result { + self.conn.execute( + "UPDATE sources SET name = ?, url = ?, interval = ?, query_x = ?, query_y = ? WHERE id = ?", + params![name, url, interval, query_x, query_y, source_id], + ) + } + + pub fn delete_source(&self, id:i32) -> rusqlite::Result { + self.conn.execute("DELETE FROM sources WHERE id = ?", params![id]) + } + + + + pub fn load_panels(&self) -> rusqlite::Result> { + let mut panels: Vec = Vec::new(); + let mut statement = self.conn.prepare("SELECT * FROM panels")?; + let panels_iter = statement.query_map([], |row| { + Ok(Panel { + id: row.get(0)?, + name: row.get(1)?, + view_scroll: row.get(2)?, + view_size: row.get(3)?, + timeserie: row.get(4)?, + width: row.get(5)?, + height: row.get(6)?, + sources: RwLock::new(Vec::new()), + }) + })?; + + for panel in panels_iter { + if let Ok(mut p) = panel { + p.sources = RwLock::new(self.load_sources(p.id)?); + panels.push(p); } } - Ok(()) + Ok(panels) } + // jank! TODO make it not jank! + pub fn new_panel(&self, name: &str, view_size:i32, width: i32, height: i32) -> rusqlite::Result { + self.conn.execute( + "INSERT INTO panels (name, view_scroll, view_size, timeserie, width, height) VALUES (?, ?, ?, ?, ?, ?)", + params![name, true, view_size, true, width, height] + )?; + let mut statement = self.conn.prepare("SELECT * FROM panels WHERE name = ?")?; + for panel in statement.query_map(params![name], |row| { + Ok(Panel { + id: row.get(0)?, + name: row.get(1)?, + view_scroll: row.get(2)?, + view_size: row.get(3)?, + timeserie: row.get(4)?, + width: row.get(5)?, + height: row.get(6)?, + sources: RwLock::new(Vec::new()), + }) + })? { + if let Ok(p) = panel { + return Ok(p); + } + } + Err(rusqlite::Error::QueryReturnedNoRows) + } + + pub fn update_panel( + &self, + id: i32, + name: &str, + view_scroll: bool, + view_size: i32, + timeserie: bool, + width: i32, + height: i32, + ) -> rusqlite::Result { + self.conn.execute( + "UPDATE panels SET name = ?, view_scroll = ?, view_size = ?, timeserie = ?, width = ?, height = ? WHERE id = ?", + params![name, view_scroll, view_size, timeserie, width, height, id], + ) + } + + pub fn delete_panel(&self, id:i32) -> rusqlite::Result { + self.conn.execute("DELETE FROM panels WHERE id = ?", params![id]) + } + + + } - -impl DataStorage for SQLiteDataStore { - fn add_panel(&self, name:&str) { - let panel = self.new_panel(name).unwrap(); - self.panels.lock().unwrap().push(panel); - } -} \ No newline at end of file diff --git a/src/app/mod.rs b/src/app/mod.rs index 469e9ea..9f20be9 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -1,32 +1,47 @@ pub mod data; +pub mod worker; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, Utc}; -use data::source::{ChatData, PlayerCountData, TpsData, Data, native_save}; use eframe::egui; -use eframe::egui::plot::{Line, Plot, Values}; -use crate::app::data::store::DataStorage; +use eframe::egui::{plot::{Line, Plot}}; -use self::data::store::SQLiteDataStore; +use self::data::ApplicationState; +use self::worker::native_save; -pub struct App { - // data : SQLiteDataStore, - data : Arc, +struct InputBuffer { + panel_name: String, + name: String, + url: String, + interval: i32, + query_x: String, + query_y: String, + panel_id: i32, } -struct ServerOptions { - title: String, - url: String, - player_count: PlayerCountData, - tps: TpsData, - chat: ChatData, - sync_time:bool, +impl Default for InputBuffer { + fn default() -> Self { + InputBuffer { + panel_name: "".to_string(), + name: "".to_string(), + url: "".to_string(), + interval: 60, + query_x: "".to_string(), + query_y: "".to_string(), + panel_id: 0, + } + } +} + +pub struct App { + data: Arc, + input: InputBuffer, + edit: bool, } impl App { - // pub fn new(_cc: &eframe::CreationContext, data: SQLiteDataStore) -> Self { - pub fn new(_cc: &eframe::CreationContext, data: Arc) -> Self { - Self { data } + pub fn new(_cc: &eframe::CreationContext, data: Arc) -> Self { + Self { data, input: InputBuffer::default(), edit: false } } } @@ -36,48 +51,104 @@ impl eframe::App for App { ui.horizontal(|ui| { egui::widgets::global_dark_light_mode_switch(ui); ui.heading("dashboard"); - if ui.button("test add").clicked() { - self.data.add_panel("test panel"); + ui.checkbox(&mut self.edit, "edit"); + if self.edit { + if ui.button("save").clicked() { + native_save(self.data.clone()); + } } ui.with_layout(egui::Layout::top_down(egui::Align::RIGHT), |ui| { - if ui.button("x").clicked() { + if ui.button("×").clicked() { frame.quit(); } }); }); - }); - egui::CentralPanel::default().show(ctx, |ui| { - let panels = &*self.data.panels.lock().unwrap(); - for i in 0..panels.len() { - // for panel in self.data.view() { - ui.group(|ui| { - ui.vertical(|ui| { - ui.horizontal(|ui| { - ui.heading(panels[i].name.as_str()); - // ui.checkbox(&mut panel.view_scroll, "autoscroll"); - }); - let mut p = Plot::new(format!("plot-{}", panels[i].name)).x_axis_formatter(|x, _range| { - format!( - "{}", - DateTime::::from_utc(NaiveDateTime::from_timestamp(x as i64, 0), Utc) - .format("%Y/%m/%d %H:%M:%S") - ) - }).center_x_axis(false).height(panels[i].height as f32); // TODO make it fucking reactive! It fills the whole screen with 1 plot no matter what I do... - - if panels[i].view_scroll { - p = p.include_x(Utc::now().timestamp() as f64); - } - - p.show(ui, |plot_ui| { - let sources = &*panels[i].sources.lock().unwrap(); - for j in 0..sources.len() { - plot_ui.line(Line::new(sources[j].values()).name(sources[j].name.as_str())); + if self.edit { + ui.horizontal(|ui| { + eframe::egui::TextEdit::singleline(&mut self.input.panel_name).hint_text("panel").desired_width(50.0).show(ui); + if ui.button("add panel").clicked() { + self.data.add_panel(self.input.panel_name.as_str()).unwrap(); + } + eframe::egui::TextEdit::singleline(&mut self.input.name).hint_text("name").desired_width(30.0).show(ui); + eframe::egui::TextEdit::singleline(&mut self.input.url).hint_text("url").desired_width(80.0).show(ui); + eframe::egui::TextEdit::singleline(&mut self.input.query_x).hint_text("x query").desired_width(25.0).show(ui); + eframe::egui::TextEdit::singleline(&mut self.input.query_y).hint_text("y query").desired_width(25.0).show(ui); + egui::ComboBox::from_label("panel") + .selected_text(format!("[{}]", self.input.panel_id)) + .show_ui(ui, |ui| { + let pnls = self.data.panels.write().unwrap(); + for p in &*pnls { + ui.selectable_value(&mut self.input.panel_id, p.id, p.name.as_str()); } - }); - }); + } + ); + if ui.button("add source").clicked() { + self.data.add_source( + self.input.panel_id, + self.input.name.as_str(), + self.input.url.as_str(), + self.input.query_x.as_str(), + self.input.query_y.as_str(), + ).unwrap(); + } + ui.add(egui::Slider::new(&mut self.input.interval, 1..=600).text("interval")); }); } }); - ctx.request_repaint(); // TODO super jank way to sorta keep drawing + egui::CentralPanel::default().show(ctx, |ui| { + egui::ScrollArea::vertical().show(ui, |ui| { + let mut panels = self.data.panels.write().unwrap(); + for panel in &mut *panels { + // for panel in self.data.view() { + ui.group(|ui| { + ui.vertical(|ui| { + ui.horizontal(|ui| { + ui.heading(panel.name.as_str()); + ui.checkbox(&mut panel.view_scroll, "autoscroll"); + ui.checkbox(&mut panel.timeserie, "timeserie"); + ui.add(egui::Slider::new(&mut panel.height, 0..=500).text("height")); + }); + + let mut sources = panel.sources.write().unwrap(); + + if self.edit { + for source in &mut *sources { + ui.horizontal(|ui| { + ui.heading(source.name.as_str()); + eframe::egui::TextEdit::singleline(&mut source.url).hint_text("url").show(ui); + eframe::egui::TextEdit::singleline(&mut source.query_x).hint_text("x query").show(ui); + eframe::egui::TextEdit::singleline(&mut source.query_y).hint_text("y query").show(ui); + ui.add(egui::Slider::new(&mut source.interval, 1..=600).text("interval")); + }); + } + } + + let mut p = Plot::new(format!("plot-{}", panel.name)) + .height(panel.height as f32); // TODO make it fucking reactive! It fills the whole screen with 1 plot no matter what I do... + + if panel.view_scroll { + p = p.include_x(Utc::now().timestamp() as f64); + } + + if panel.timeserie { + p = p.x_axis_formatter(|x, _range| { + format!( + "{}", + DateTime::::from_utc(NaiveDateTime::from_timestamp(x as i64, 0), Utc) + .format("%Y/%m/%d %H:%M:%S") + ) + }); + } + + p.show(ui, |plot_ui| { + for source in &mut *sources { + plot_ui.line(Line::new(source.values()).name(source.name.as_str())); + } + }); + }); + }); + } + }); + }); } } diff --git a/src/app/worker.rs b/src/app/worker.rs new file mode 100644 index 0000000..3faa34e --- /dev/null +++ b/src/app/worker.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; +use chrono::Utc; +use eframe::egui::Context; +use crate::app::data::{fetch, ApplicationState}; + +pub fn native_save(state:Arc) { + std::thread::spawn(move || { + let storage = state.storage.lock().unwrap(); + let panels = state.panels.read().unwrap(); + for panel in &*panels { + storage.update_panel( + panel.id, + panel.name.as_str(), + panel.view_scroll, + panel.view_size, + panel.timeserie, + panel.width, + panel.height + ).unwrap(); + let sources = panel.sources.read().unwrap(); + for source in &*sources { + storage.update_source( + source.id, + source.name.as_str(), + source.url.as_str(), + source.interval, + source.query_x.as_str(), + source.query_y.as_str(), + ).unwrap(); + } + } + }); +} + +pub(crate) trait BackgroundWorker { + fn start(state:Arc, ctx:Context) -> Self; // TODO make it return an error? Can we even do anything without a background worker + fn stop(self); // TODO make it return an error? Can we even do anything without a background worker +} + +pub(crate) struct NativeBackgroundWorker { + worker : std::thread::JoinHandle<()>, +} + +impl BackgroundWorker for NativeBackgroundWorker { + fn start(state:Arc, ctx:Context) -> Self { + let worker = std::thread::spawn(move || { + let mut last_check = 0; + while state.run { + let delta_time = 1000 - (Utc::now().timestamp_millis() - last_check); + if delta_time > 0 { + std::thread::sleep(std::time::Duration::from_millis(delta_time as u64)); + } + last_check = Utc::now().timestamp_millis(); + + let panels = state.panels.read().unwrap(); + for i in 0..panels.len() { + let sources = panels[i].sources.read().unwrap(); + let p_id = panels[i].id; + for j in 0..sources.len() { + let s_id = sources[j].id; + if !sources[j].valid() { + let mut last_update = sources[j].last_fetch.write().unwrap(); + *last_update = Utc::now(); + let state2 = state.clone(); + let url = sources[j].url.clone(); + let query_x = sources[j].query_x.clone(); + let query_y = sources[j].query_y.clone(); + std::thread::spawn(move || { + let v = fetch(url.as_str(), query_x.as_str(), query_y.as_str()).unwrap(); + let store = state2.storage.lock().unwrap(); + store.put_value(p_id, s_id, v).unwrap(); + let panels = state2.panels.read().unwrap(); + let sources = panels[i].sources.read().unwrap(); + sources[j].data.write().unwrap().push(v); + let mut last_update = sources[j].last_fetch.write().unwrap(); + *last_update = Utc::now(); // overwrite it so fetches comply with API slowdowns and get desynched among them + }); + } + } + } + + ctx.request_repaint(); + } + }); + + return NativeBackgroundWorker { + worker + }; + } + + fn stop(self) { + self.worker.join().expect("Failed joining main worker thread"); + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c38b3db..16cdd2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,7 @@ mod app; -mod util; -use std::sync::{Arc, Mutex}; -use tokio::time::{sleep, Duration}; -use crate::util::worker::{BackgroundWorker, NativeBackgroundWorker}; -use crate::app::{App, data::store::{SQLiteDataStore, DataStorage}}; +use std::sync::Arc; +use crate::app::{App, data::ApplicationState, worker::{BackgroundWorker, NativeBackgroundWorker}}; // When compiling natively: #[cfg(not(target_arch = "wasm32"))] @@ -14,27 +11,13 @@ fn main() -> ! { let mut store_path = dirs::data_dir().unwrap_or(std::path::PathBuf::from(".")); // TODO get cwd more consistently? store_path.push("dashboard.db"); - println!("{}", store_path.as_path().to_str().unwrap()); - - let store = Arc::new( - SQLiteDataStore::new(store_path) - .unwrap() - ); + let store = Arc::new(ApplicationState::new(store_path)); eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker "dashboard", native_options, Box::new(move |cc| { - let worker = NativeBackgroundWorker::start(); - let ctx = cc.egui_ctx.clone(); - worker.task(async move { - loop { - sleep(Duration::from_secs(1)).await; - ctx.request_repaint(); - // tokio::spawn(async move {store2.fetch_all().await}); - } - }); - + let _worker = NativeBackgroundWorker::start(store.clone(), cc.egui_ctx.clone()); Box::new(App::new(cc, store)) }), ); diff --git a/src/util/mod.rs b/src/util/mod.rs deleted file mode 100644 index 4ff7f34..0000000 --- a/src/util/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) mod worker; \ No newline at end of file diff --git a/src/util/worker.rs b/src/util/worker.rs deleted file mode 100644 index e7f34b7..0000000 --- a/src/util/worker.rs +++ /dev/null @@ -1,47 +0,0 @@ -use tokio::{runtime::{Handle, Runtime, Builder}, sync::oneshot::Sender, time::{sleep, Duration}}; - -pub(crate) trait BackgroundWorker { - fn start() -> Self; // TODO make it return an error? Can we even do anything without a background worker - fn task(&self, task:T) where T : std::future::Future + core::marker::Send + 'static; - fn stop(self); // TODO make it return an error? Can we even do anything without a background worker -} - -pub(crate) struct NativeBackgroundWorker { - runtime : Handle, - // end_tx : Sender, - worker : std::thread::JoinHandle, -} - -impl BackgroundWorker for NativeBackgroundWorker { - fn start() -> Self { - let (rt_tx, rt_rx) = tokio::sync::oneshot::channel::(); - let worker = std::thread::spawn(|| { - let runtime = Builder::new_multi_thread() - .worker_threads(1) - .enable_all() - .build() - .unwrap(); - rt_tx.send(runtime.handle().clone()).unwrap(); - runtime.block_on(async { - loop { - println!("keepalive loop"); - sleep(Duration::from_secs(1)).await; - } - }) - }); - NativeBackgroundWorker { - runtime : rt_rx.blocking_recv().unwrap(), - // end_tx : end_tx, - worker : worker, - } - } - - fn task(&self, task:T) where T : std::future::Future + core::marker::Send + 'static { - self.runtime.spawn(task); - } - - fn stop(self) { - // self.end_tx.send(true).expect("Failed signaling termination"); - // self.worker.join().expect("Failed joining main worker thread"); - } -} \ No newline at end of file