chore: removed a ton of .unwrap()

maybe too much? now code is mostly error catching in some spots
This commit is contained in:
əlemi 2022-11-02 00:45:40 +01:00
parent 9b794fa6b1
commit 132975f59a
Signed by: alemi
GPG key ID: A4895B84D311642C
6 changed files with 146 additions and 52 deletions

View file

@ -7,6 +7,7 @@ mod scaffold;
use chrono::Utc; use chrono::Utc;
use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel, Window}; use eframe::egui::{CentralPanel, Context, SidePanel, TopBottomPanel, Window};
use tokio::sync::watch; use tokio::sync::watch;
use tracing::error;
use crate::{data::entities, worker::visualizer::AppStateView}; use crate::{data::entities, worker::visualizer::AppStateView};
use panel::main_content; use panel::main_content;
@ -62,13 +63,17 @@ impl App {
} }
pub fn save_all_panels(&self) { pub fn save_all_panels(&self) {
self.view.op.blocking_send( if let Err(e) = self.view.op.blocking_send(
crate::worker::visualizer::BackgroundAction::UpdateAllPanels { panels: self.panels.clone() } crate::worker::visualizer::BackgroundAction::UpdateAllPanels { panels: self.panels.clone() }
).unwrap(); ) {
error!(target: "app", "Could not save panels: {:?}", e);
}
} }
pub fn refresh_data(&self) { pub fn refresh_data(&self) {
self.view.flush.blocking_send(()).unwrap(); if let Err(e) = self.view.flush.blocking_send(()) {
error!(target: "app", "Could not request flush: {:?}", e);
}
} }
} }
@ -82,7 +87,7 @@ impl eframe::App for App {
footer(ctx, ui, self.logger_view.clone(), self.db_path.clone(), self.view.points.borrow().len()); footer(ctx, ui, self.logger_view.clone(), self.db_path.clone(), self.view.points.borrow().len());
}); });
let w = Window::new("a"); let _w = Window::new("a");
// if let Some(index) = self.deleting_metric { // if let Some(index) = self.deleting_metric {
// Window::new(format!("Delete Metric #{}?", index)) // Window::new(format!("Delete Metric #{}?", index))
@ -109,7 +114,9 @@ impl eframe::App for App {
}); });
if let Some(viewsize) = self.panels.iter().map(|p| p.view_size).max() { if let Some(viewsize) = self.panels.iter().map(|p| p.view_size).max() {
self.width_tx.send(viewsize as i64).unwrap(); if let Err(e) = self.width_tx.send(viewsize as i64) {
error!(target: "app", "Could not update fetch size : {:?}", e);
}
} }
if Utc::now().timestamp() > self.last_redraw + self.interval { if Utc::now().timestamp() > self.last_redraw + self.interval {

View file

@ -123,7 +123,7 @@ pub fn source_panel(app: &mut App, ui: &mut Ui) {
// metrics.len() as i32, // metrics.len() as i32,
// ) { // ) {
// Ok(verified_metric) => { // Ok(verified_metric) => {
// store.put_values(verified_metric.id, &data).unwrap(); // store.put_values(verified_metric.id, &data).unwrap ();
// *verified_metric.data.write().expect("Values RwLock poisoned") = data; // *verified_metric.data.write().expect("Values RwLock poisoned") = data;
// to_insert.push(verified_metric); // to_insert.push(verified_metric);
// } // }

View file

@ -83,11 +83,17 @@ fn main() {
setup_tracing(logger.layer()); setup_tracing(logger.layer());
let state = AppState::new( let state = match AppState::new(
width_rx, width_rx,
args.interval as i64, args.interval as i64,
args.cache_time as i64, args.cache_time as i64,
).unwrap(); ) {
Ok(s) => s,
Err(e) => {
error!(target: "launcher", "Could not create application state: {:?}", e);
return;
}
};
let view = state.view(); let view = state.view();
let run_rx_clone = run_rx.clone(); let run_rx_clone = run_rx.clone();
@ -99,7 +105,13 @@ fn main() {
.build() .build()
.unwrap() .unwrap()
.block_on(async { .block_on(async {
let db = Database::connect(db_uri.clone()).await.unwrap(); let db = match Database::connect(db_uri.clone()).await {
Ok(v) => v,
Err(e) => {
error!(target: "launcher", "Could not connect to db: {:?}", e);
return;
}
};
info!(target: "launcher", "Connected to '{}'", db_uri); info!(target: "launcher", "Connected to '{}'", db_uri);
let mut jobs = vec![]; let mut jobs = vec![];
@ -142,7 +154,11 @@ fn main() {
); );
} }
for job in jobs { job.await.unwrap() } for (i, job) in jobs.into_iter().enumerate() {
if let Err(e) = job.await {
error!(target: "launcher", "Could not join task #{}: {:?}", i, e);
}
}
info!(target: "launcher", "Stopping background worker"); info!(target: "launcher", "Stopping background worker");
}) })
@ -178,8 +194,12 @@ fn main() {
info!(target: "launcher", "Stopping native GUI"); info!(target: "launcher", "Stopping native GUI");
run_tx.send(false).unwrap(); if let Err(e) = run_tx.send(false) {
error!(target: "launcher", "Error signaling end to workers: {:?}", e);
}
} }
worker.join().unwrap(); if let Err(e) = worker.join() {
error!(target: "launcher", "Error joining background thread : {:?}", e);
}
} }

View file

@ -1,6 +1,7 @@
use chrono::{DateTime, Local, NaiveDateTime, Utc}; use chrono::{DateTime, Local, NaiveDateTime, Utc};
use eframe::egui::{Color32, plot::PlotPoint}; use eframe::egui::{Color32, plot::PlotPoint};
use tokio::sync::{watch, mpsc}; use tokio::sync::{watch, mpsc};
use tracing::error;
use std::{error::Error, path::PathBuf, collections::VecDeque}; use std::{error::Error, path::PathBuf, collections::VecDeque};
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
@ -136,7 +137,9 @@ impl InternalLogger {
while messages.len() > self.size { while messages.len() > self.size {
messages.pop_front(); messages.pop_front();
} }
self.view_tx.send(messages.clone().into()).unwrap(); if let Err(e) = self.view_tx.send(messages.clone().into()) {
error!(target: "internal-logger", "Failed sending log line: {:?}", e);
}
}, },
None => break, None => break,
} }

View file

@ -32,8 +32,20 @@ pub async fn surveyor_loop(
if Utc::now().timestamp() - last_fetch > cache_time { if Utc::now().timestamp() - last_fetch > cache_time {
// TODO do both concurrently // TODO do both concurrently
sources = entities::sources::Entity::find().all(&db).await.unwrap(); match entities::sources::Entity::find().all(&db).await {
metrics = Arc::new(entities::metrics::Entity::find().all(&db).await.unwrap()); Ok(srcs) => sources = srcs,
Err(e) => {
error!(target: "surveyor", "Could not fetch sources: {:?}", e);
continue;
}
}
match entities::metrics::Entity::find().all(&db).await {
Ok(mtrcs) => metrics = Arc::new(mtrcs),
Err(e) => {
error!(target: "surveyor", "Could not fetch metrics: {:?}", e);
continue;
}
}
info!(target: "surveyor", "Reloaded sources and metrics"); info!(target: "surveyor", "Reloaded sources and metrics");
last_fetch = Utc::now().timestamp(); last_fetch = Utc::now().timestamp();
} }

View file

@ -1,7 +1,7 @@
use chrono::Utc; use chrono::Utc;
use sea_orm::{DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order}; use sea_orm::{TransactionTrait, DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order};
use tokio::sync::{watch, mpsc}; use tokio::sync::{watch, mpsc};
use tracing::info; use tracing::{info, error};
use std::collections::VecDeque; use std::collections::VecDeque;
use crate::data::{entities, FetchError}; use crate::data::{entities, FetchError};
@ -110,11 +110,20 @@ impl AppState {
pub async fn fetch(&mut self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> { pub async fn fetch(&mut self, db: &DatabaseConnection) -> Result<(), sea_orm::DbErr> {
// TODO parallelize all this stuff // TODO parallelize all this stuff
self.panels = entities::panels::Entity::find().all(db).await?; self.panels = entities::panels::Entity::find().all(db).await?;
self.tx.panels.send(self.panels.clone()).unwrap(); if let Err(e) = self.tx.panels.send(self.panels.clone()) {
error!(target: "worker", "Could not send panels update: {:?}", e);
}
self.sources = entities::sources::Entity::find().all(db).await?; self.sources = entities::sources::Entity::find().all(db).await?;
self.tx.sources.send(self.sources.clone()).unwrap(); if let Err(e) = self.tx.sources.send(self.sources.clone()) {
error!(target: "worker", "Could not send sources update: {:?}", e);
}
self.metrics = entities::metrics::Entity::find().all(db).await?; self.metrics = entities::metrics::Entity::find().all(db).await?;
self.tx.metrics.send(self.metrics.clone()).unwrap(); if let Err(e) = self.tx.metrics.send(self.metrics.clone()) {
error!(target: "worker", "Could not send metrics update: {:?}", e);
}
info!(target: "worker", "Updated panels, sources and metrics"); info!(target: "worker", "Updated panels, sources and metrics");
self.last_refresh = chrono::Utc::now().timestamp(); self.last_refresh = chrono::Utc::now().timestamp();
Ok(()) Ok(())
@ -137,26 +146,37 @@ impl AppState {
match op { match op {
BackgroundAction::UpdateAllPanels { panels } => { BackgroundAction::UpdateAllPanels { panels } => {
// TODO this is kinda rough, can it be done better? // TODO this is kinda rough, can it be done better?
entities::panels::Entity::delete_many().exec(&db).await.unwrap(); let pnls = panels.clone();
entities::panels::Entity::insert_many( if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| {
panels.iter().map(|v| entities::panels::ActiveModel{ Box::pin(async move {
id: Set(v.id), entities::panels::Entity::delete_many().exec(txn).await?;
name: Set(v.name.clone()), entities::panels::Entity::insert_many(
view_scroll: Set(v.view_scroll), pnls.iter().map(|v| entities::panels::ActiveModel{
view_size: Set(v.view_size), id: Set(v.id),
timeserie: Set(v.timeserie), name: Set(v.name.clone()),
height: Set(v.height), view_scroll: Set(v.view_scroll),
limit_view: Set(v.limit_view), view_size: Set(v.view_size),
position: Set(v.position), timeserie: Set(v.timeserie),
reduce_view: Set(v.reduce_view), height: Set(v.height),
view_chunks: Set(v.view_chunks), limit_view: Set(v.limit_view),
shift_view: Set(v.shift_view), position: Set(v.position),
view_offset: Set(v.view_offset), reduce_view: Set(v.reduce_view),
average_view: Set(v.average_view), view_chunks: Set(v.view_chunks),
}).collect::<Vec<entities::panels::ActiveModel>>() shift_view: Set(v.shift_view),
).exec(&db).await.unwrap(); view_offset: Set(v.view_offset),
self.tx.panels.send(panels.clone()).unwrap(); average_view: Set(v.average_view),
self.panels = panels; }).collect::<Vec<entities::panels::ActiveModel>>()
).exec(txn).await?;
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
} else {
if let Err(e) = self.tx.panels.send(panels.clone()) {
error!(target: "worker", "Could not send panels update: {:?}", e);
}
self.panels = panels;
}
}, },
// _ => todo!(), // _ => todo!(),
} }
@ -166,9 +186,11 @@ impl AppState {
}, },
_ = self.flush.recv() => { _ = self.flush.recv() => {
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
self.fetch(&db).await.unwrap(); if let Err(e) = self.fetch(&db).await {
error!(target: "worker", "Could not fetch from db: {:?}", e);
}
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
self.points = entities::points::Entity::find() self.points = match entities::points::Entity::find()
.filter( .filter(
Condition::all() Condition::all()
.add(entities::points::Column::X.gte((now - new_width) as f64)) .add(entities::points::Column::X.gte((now - new_width) as f64))
@ -176,12 +198,24 @@ impl AppState {
) )
.order_by(entities::points::Column::X, Order::Asc) .order_by(entities::points::Column::X, Order::Asc)
.all(&db) .all(&db)
.await.unwrap().into(); .await {
self.tx.points.send(self.points.clone().into()).unwrap(); Ok(p) => p.into(),
Err(e) => {
error!(target: "worker", "Could not fetch new points: {:?}", e);
continue;
}
};
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
error!(target: "worker", "Could not send new points: {:?}", e);
}
last = Utc::now().timestamp(); last = Utc::now().timestamp();
info!(target: "worker", "Reloaded points"); info!(target: "worker", "Reloaded points");
}, },
_ = sleep(self.cache_age - (now - self.last_refresh)) => self.fetch(&db).await.unwrap(), _ = sleep(self.cache_age - (now - self.last_refresh)) => {
if let Err(e) = self.fetch(&db).await {
error!(target: "worker", "Could not fetch from db: {:?}", e);
}
},
_ = sleep(self.interval - (now - self.last_check)) => { _ = sleep(self.interval - (now - self.last_check)) => {
let mut changes = false; let mut changes = false;
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
@ -189,7 +223,7 @@ impl AppState {
// fetch previous points // fetch previous points
if new_width != width { if new_width != width {
let mut previous_points = entities::points::Entity::find() let mut previous_points = match entities::points::Entity::find()
.filter( .filter(
Condition::all() Condition::all()
.add(entities::points::Column::X.gte(now - new_width)) .add(entities::points::Column::X.gte(now - new_width))
@ -197,8 +231,16 @@ impl AppState {
) )
.order_by(entities::points::Column::X, Order::Asc) .order_by(entities::points::Column::X, Order::Asc)
.all(&db) .all(&db)
.await.unwrap(); .await {
info!(target: "worker", "Fetched {} previous points", previous_points.len()); Ok(p) => p,
Err(e) => {
error!(target: "worker", "Could not fetch previous points: {:?}", e);
continue;
}
};
if previous_points.len() > 0 {
info!(target: "worker", "Fetched {} previous points", previous_points.len());
}
previous_points.reverse(); // TODO wasteful! previous_points.reverse(); // TODO wasteful!
for p in previous_points { for p in previous_points {
self.points.push_front(p); self.points.push_front(p);
@ -207,7 +249,7 @@ impl AppState {
} }
// fetch new points // fetch new points
let new_points = entities::points::Entity::find() let new_points = match entities::points::Entity::find()
.filter( .filter(
Condition::all() Condition::all()
.add(entities::points::Column::X.gte(last as f64)) .add(entities::points::Column::X.gte(last as f64))
@ -215,8 +257,16 @@ impl AppState {
) )
.order_by(entities::points::Column::X, Order::Asc) .order_by(entities::points::Column::X, Order::Asc)
.all(&db) .all(&db)
.await.unwrap(); .await {
info!(target: "worker", "Fetched {} new points", new_points.len()); Ok(p) => p,
Err(e) => {
error!(target: "worker", "Could not fetch new points: {:?}", e);
continue;
}
};
if new_points.len() > 0 {
info!(target: "worker", "Fetched {} new points", new_points.len());
}
for p in new_points { for p in new_points {
self.points.push_back(p); self.points.push_back(p);
@ -237,7 +287,9 @@ impl AppState {
width = new_width; width = new_width;
self.last_check = now; self.last_check = now;
if changes { if changes {
self.tx.points.send(self.points.clone().into()).unwrap(); if let Err(e) = self.tx.points.send(self.points.clone().into()) {
error!(target: "worker", "Could not send changes to main thread: {:?}", e);
}
} }
}, },
}; };