diff --git a/src/gui/mod.rs b/src/gui/mod.rs index 1cb0217..9bb41b0 100644 --- a/src/gui/mod.rs +++ b/src/gui/mod.rs @@ -7,6 +7,7 @@ mod scaffold; use chrono::Utc; use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel, Window}; use tokio::sync::watch; +use tracing::error; use crate::{data::entities, worker::visualizer::AppStateView}; use panel::main_content; @@ -62,13 +63,17 @@ impl App { } pub fn save_all_panels(&self) { - self.view.op.blocking_send( + if let Err(e) = self.view.op.blocking_send( crate::worker::visualizer::BackgroundAction::UpdateAllPanels { panels: self.panels.clone() } - ).unwrap(); + ) { + error!(target: "app", "Could not save panels: {:?}", e); + } } pub fn refresh_data(&self) { - self.view.flush.blocking_send(()).unwrap(); + if let Err(e) = self.view.flush.blocking_send(()) { + error!(target: "app", "Could not request flush: {:?}", e); + } } } @@ -82,7 +87,7 @@ impl eframe::App for App { footer(ctx, ui, self.logger_view.clone(), self.db_path.clone(), self.view.points.borrow().len()); }); - let w = Window::new("a"); + let _w = Window::new("a"); // if let Some(index) = self.deleting_metric { // Window::new(format!("Delete Metric #{}?", index)) @@ -109,7 +114,9 @@ impl eframe::App for App { }); if let Some(viewsize) = self.panels.iter().map(|p| p.view_size).max() { - self.width_tx.send(viewsize as i64).unwrap(); + if let Err(e) = self.width_tx.send(viewsize as i64) { + error!(target: "app", "Could not update fetch size : {:?}", e); + } } if Utc::now().timestamp() > self.last_redraw + self.interval { diff --git a/src/gui/source.rs b/src/gui/source.rs index aaf0365..9b5cf39 100644 --- a/src/gui/source.rs +++ b/src/gui/source.rs @@ -123,7 +123,7 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) { // metrics.len() as i32, // ) { // Ok(verified_metric) => { - // store.put_values(verified_metric.id, &data).unwrap(); + // store.put_values(verified_metric.id, &data).unwrap (); // *verified_metric.data.write().expect("Values RwLock poisoned") = data; // to_insert.push(verified_metric); // } diff --git a/src/main.rs b/src/main.rs index ace264e..450d1d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,11 +83,17 @@ fn main() { setup_tracing(logger.layer()); - let state = AppState::new( + let state = match AppState::new( width_rx, args.interval as i64, args.cache_time as i64, - ).unwrap(); + ) { + Ok(s) => s, + Err(e) => { + error!(target: "launcher", "Could not create application state: {:?}", e); + return; + } + }; let view = state.view(); let run_rx_clone = run_rx.clone(); @@ -99,7 +105,13 @@ fn main() { .build() .unwrap() .block_on(async { - let db = Database::connect(db_uri.clone()).await.unwrap(); + let db = match Database::connect(db_uri.clone()).await { + Ok(v) => v, + Err(e) => { + error!(target: "launcher", "Could not connect to db: {:?}", e); + return; + } + }; info!(target: "launcher", "Connected to '{}'", db_uri); let mut jobs = vec![]; @@ -142,7 +154,11 @@ fn main() { ); } - for job in jobs { job.await.unwrap() } + for (i, job) in jobs.into_iter().enumerate() { + if let Err(e) = job.await { + error!(target: "launcher", "Could not join task #{}: {:?}", i, e); + } + } info!(target: "launcher", "Stopping background worker"); }) @@ -178,8 +194,12 @@ fn main() { info!(target: "launcher", "Stopping native GUI"); - run_tx.send(false).unwrap(); + if let Err(e) = run_tx.send(false) { + error!(target: "launcher", "Error signaling end to workers: {:?}", e); + } } - worker.join().unwrap(); + if let Err(e) = worker.join() { + error!(target: "launcher", "Error joining background thread : {:?}", e); + } } diff --git a/src/util.rs b/src/util.rs index d167849..82aaa72 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Local, NaiveDateTime, Utc}; use eframe::egui::{Color32, plot::PlotPoint}; use tokio::sync::{watch, mpsc}; +use tracing::error; use std::{error::Error, path::PathBuf, collections::VecDeque}; use tracing_subscriber::Layer; @@ -136,7 +137,9 @@ impl InternalLogger { while messages.len() > self.size { messages.pop_front(); } - self.view_tx.send(messages.clone().into()).unwrap(); + if let Err(e) = self.view_tx.send(messages.clone().into()) { + error!(target: "internal-logger", "Failed sending log line: {:?}", e); + } }, None => break, } diff --git a/src/worker/surveyor.rs b/src/worker/surveyor.rs index 0c5908e..b08b6b3 100644 --- a/src/worker/surveyor.rs +++ b/src/worker/surveyor.rs @@ -32,8 +32,20 @@ pub async fn surveyor_loop( if Utc::now().timestamp() - last_fetch > cache_time { // TODO do both concurrently - sources = entities::sources::Entity::find().all(&db).await.unwrap(); - metrics = Arc::new(entities::metrics::Entity::find().all(&db).await.unwrap()); + match entities::sources::Entity::find().all(&db).await { + Ok(srcs) => sources = srcs, + Err(e) => { + error!(target: "surveyor", "Could not fetch sources: {:?}", e); + continue; + } + } + match entities::metrics::Entity::find().all(&db).await { + Ok(mtrcs) => metrics = Arc::new(mtrcs), + Err(e) => { + error!(target: "surveyor", "Could not fetch metrics: {:?}", e); + continue; + } + } info!(target: "surveyor", "Reloaded sources and metrics"); last_fetch = Utc::now().timestamp(); } diff --git a/src/worker/visualizer.rs b/src/worker/visualizer.rs index c587784..78fb398 100644 --- a/src/worker/visualizer.rs +++ b/src/worker/visualizer.rs @@ -1,7 +1,7 @@ use chrono::Utc; -use sea_orm::{DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order}; +use sea_orm::{TransactionTrait, DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order}; use tokio::sync::{watch, mpsc}; -use tracing::info; +use tracing::{info, error}; use std::collections::VecDeque; use crate::data::{entities, FetchError}; @@ -110,11 +110,20 @@ impl AppState { pub async fn fetch(&mut self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> { // TODO parallelize all this stuff self.panels = entities::panels::Entity::find().all(db).await?; - self.tx.panels.send(self.panels.clone()).unwrap(); + if let Err(e) = self.tx.panels.send(self.panels.clone()) { + error!(target: "worker", "Could not send panels update: {:?}", e); + } + self.sources = entities::sources::Entity::find().all(db).await?; - self.tx.sources.send(self.sources.clone()).unwrap(); + if let Err(e) = self.tx.sources.send(self.sources.clone()) { + error!(target: "worker", "Could not send sources update: {:?}", e); + } + self.metrics = entities::metrics::Entity::find().all(db).await?; - self.tx.metrics.send(self.metrics.clone()).unwrap(); + if let Err(e) = self.tx.metrics.send(self.metrics.clone()) { + error!(target: "worker", "Could not send metrics update: {:?}", e); + } + info!(target: "worker", "Updated panels, sources and metrics"); self.last_refresh = chrono::Utc::now().timestamp(); Ok(()) @@ -137,26 +146,37 @@ impl AppState { match op { BackgroundAction::UpdateAllPanels { panels } => { // TODO this is kinda rough, can it be done better? - entities::panels::Entity::delete_many().exec(&db).await.unwrap(); - entities::panels::Entity::insert_many( - panels.iter().map(|v| entities::panels::ActiveModel{ - id: Set(v.id), - name: Set(v.name.clone()), - view_scroll: Set(v.view_scroll), - view_size: Set(v.view_size), - timeserie: Set(v.timeserie), - height: Set(v.height), - limit_view: Set(v.limit_view), - position: Set(v.position), - reduce_view: Set(v.reduce_view), - view_chunks: Set(v.view_chunks), - shift_view: Set(v.shift_view), - view_offset: Set(v.view_offset), - average_view: Set(v.average_view), - }).collect::>() - ).exec(&db).await.unwrap(); - self.tx.panels.send(panels.clone()).unwrap(); - self.panels = panels; + let pnls = panels.clone(); + if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| { + Box::pin(async move { + entities::panels::Entity::delete_many().exec(txn).await?; + entities::panels::Entity::insert_many( + pnls.iter().map(|v| entities::panels::ActiveModel{ + id: Set(v.id), + name: Set(v.name.clone()), + view_scroll: Set(v.view_scroll), + view_size: Set(v.view_size), + timeserie: Set(v.timeserie), + height: Set(v.height), + limit_view: Set(v.limit_view), + position: Set(v.position), + reduce_view: Set(v.reduce_view), + view_chunks: Set(v.view_chunks), + shift_view: Set(v.shift_view), + view_offset: Set(v.view_offset), + average_view: Set(v.average_view), + }).collect::>() + ).exec(txn).await?; + Ok(()) + }) + }).await { + error!(target: "worker", "Could not update panels on database: {:?}", e); + } else { + if let Err(e) = self.tx.panels.send(panels.clone()) { + error!(target: "worker", "Could not send panels update: {:?}", e); + } + self.panels = panels; + } }, // _ => todo!(), } @@ -166,9 +186,11 @@ impl AppState { }, _ = self.flush.recv() => { let now = Utc::now().timestamp(); - self.fetch(&db).await.unwrap(); + if let Err(e) = self.fetch(&db).await { + error!(target: "worker", "Could not fetch from db: {:?}", e); + } let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... - self.points = entities::points::Entity::find() + self.points = match entities::points::Entity::find() .filter( Condition::all() .add(entities::points::Column::X.gte((now - new_width) as f64)) @@ -176,12 +198,24 @@ impl AppState { ) .order_by(entities::points::Column::X, Order::Asc) .all(&db) - .await.unwrap().into(); - self.tx.points.send(self.points.clone().into()).unwrap(); + .await { + Ok(p) => p.into(), + Err(e) => { + error!(target: "worker", "Could not fetch new points: {:?}", e); + continue; + } + }; + if let Err(e) = self.tx.points.send(self.points.clone().into()) { + error!(target: "worker", "Could not send new points: {:?}", e); + } last = Utc::now().timestamp(); info!(target: "worker", "Reloaded points"); }, - _ = sleep(self.cache_age - (now - self.last_refresh)) => self.fetch(&db).await.unwrap(), + _ = sleep(self.cache_age - (now - self.last_refresh)) => { + if let Err(e) = self.fetch(&db).await { + error!(target: "worker", "Could not fetch from db: {:?}", e); + } + }, _ = sleep(self.interval - (now - self.last_check)) => { let mut changes = false; let now = Utc::now().timestamp(); @@ -189,7 +223,7 @@ impl AppState { // fetch previous points if new_width != width { - let mut previous_points = entities::points::Entity::find() + let mut previous_points = match entities::points::Entity::find() .filter( Condition::all() .add(entities::points::Column::X.gte(now - new_width)) @@ -197,8 +231,16 @@ impl AppState { ) .order_by(entities::points::Column::X, Order::Asc) .all(&db) - .await.unwrap(); - info!(target: "worker", "Fetched {} previous points", previous_points.len()); + .await { + Ok(p) => p, + Err(e) => { + error!(target: "worker", "Could not fetch previous points: {:?}", e); + continue; + } + }; + if previous_points.len() > 0 { + info!(target: "worker", "Fetched {} previous points", previous_points.len()); + } previous_points.reverse(); // TODO wasteful! for p in previous_points { self.points.push_front(p); @@ -207,7 +249,7 @@ impl AppState { } // fetch new points - let new_points = entities::points::Entity::find() + let new_points = match entities::points::Entity::find() .filter( Condition::all() .add(entities::points::Column::X.gte(last as f64)) @@ -215,8 +257,16 @@ impl AppState { ) .order_by(entities::points::Column::X, Order::Asc) .all(&db) - .await.unwrap(); - info!(target: "worker", "Fetched {} new points", new_points.len()); + .await { + Ok(p) => p, + Err(e) => { + error!(target: "worker", "Could not fetch new points: {:?}", e); + continue; + } + }; + if new_points.len() > 0 { + info!(target: "worker", "Fetched {} new points", new_points.len()); + } for p in new_points { self.points.push_back(p); @@ -237,7 +287,9 @@ impl AppState { width = new_width; self.last_check = now; if changes { - self.tx.points.send(self.points.clone().into()).unwrap(); + if let Err(e) = self.tx.points.send(self.points.clone().into()) { + error!(target: "worker", "Could not send changes to main thread: {:?}", e); + } } }, };