diff --git a/src/worker/visualizer.rs b/src/worker/visualizer.rs index 669ffe0..d50d534 100644 --- a/src/worker/visualizer.rs +++ b/src/worker/visualizer.rs @@ -1,7 +1,7 @@ use chrono::Utc; -use sea_orm::{TransactionTrait, DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order, ActiveModelTrait, ActiveValue::{NotSet, self}}; +use sea_orm::{TransactionTrait, DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order, ActiveModelTrait, ActiveValue::{NotSet, self}, Database, DbErr}; use tokio::sync::{watch, mpsc}; -use tracing::{info, error}; +use tracing::{info, error, warn}; use std::collections::VecDeque; use crate::data::{entities, FetchError}; @@ -37,6 +37,8 @@ struct AppStateTransmitters { pub struct AppState { tx: AppStateTransmitters, + db_uri: mpsc::Receiver, + panels: Vec, sources: Vec, metrics: Vec, @@ -53,6 +55,7 @@ pub struct AppState { cache_age: i64, width: watch::Receiver, + last_width: i64, view: AppStateView, } @@ -66,6 +69,7 @@ async fn sleep(t:i64) { impl AppState { pub fn new( width: watch::Receiver, + db_uri: mpsc::Receiver, interval: i64, cache_age: i64, ) -> Result { @@ -86,6 +90,7 @@ impl AppState { last_refresh: 0, points: VecDeque::new(), last_check: 0, + last_width: 0, flush: flush_rx, op: op_rx, view: AppStateView { @@ -105,6 +110,7 @@ impl AppState { panel_metric: panel_metric_tx, }, width, + db_uri, interval, cache_age, }) @@ -143,7 +149,6 @@ impl AppState { error!(target: "worker", "Could not send panel-metric update: {:?}", e); } - info!(target: "worker", "Updated panels, sources and metrics"); self.last_refresh = chrono::Utc::now().timestamp(); Ok(()) } @@ -152,127 +157,210 @@ impl AppState { chrono::Utc::now().timestamp() - self.last_refresh } - pub async fn worker(mut self, db: DatabaseConnection, run:watch::Receiver) { - let mut width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... - let mut last = Utc::now().timestamp() - width; + pub async fn parse_op(&mut self, op:BackgroundAction, db: &DatabaseConnection) -> Result<(), DbErr> { + match op { + BackgroundAction::UpdateAllPanels { panels } => { + // TODO this is kinda rough, can it be done better? + let pnls = panels.clone(); + if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| { + Box::pin(async move { + entities::panels::Entity::delete_many().exec(txn).await?; + entities::panels::Entity::insert_many( + pnls.iter().map(|v| entities::panels::ActiveModel{ + id: Set(v.id), + name: Set(v.name.clone()), + view_scroll: Set(v.view_scroll), + view_size: Set(v.view_size), + timeserie: Set(v.timeserie), + height: Set(v.height), + position: Set(v.position), + reduce_view: Set(v.reduce_view), + view_chunks: Set(v.view_chunks), + view_offset: Set(v.view_offset), + average_view: Set(v.average_view), + }).collect::>() + ).exec(txn).await?; + Ok(()) + }) + }).await { + error!(target: "worker", "Could not update panels on database: {:?}", e); + } else { + if let Err(e) = self.tx.panels.send(panels.clone()) { + error!(target: "worker", "Could not send panels update: {:?}", e); + } + self.panels = panels; + } + }, + BackgroundAction::UpdatePanel { panel, metrics } => { + let panel_id = match panel.id { + ActiveValue::Unchanged(pid) => Some(pid), + _ => None, + }; + let op = if panel.id == NotSet { panel.insert(db) } else { panel.update(db) }; + op.await?; + // TODO chained if is trashy + if let Some(panel_id) = panel_id { + if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| { + Box::pin(async move { + entities::panel_metric::Entity::delete_many() + .filter( + Condition::all() + .add(entities::panel_metric::Column::PanelId.eq(panel_id)) + ) + .exec(txn).await?; + entities::panel_metric::Entity::insert_many(metrics).exec(txn).await?; + Ok(()) + }) + }).await { + error!(target: "worker", "Could not update panels on database: {:?}", e); + } + } else { + self.view.request_flush().await; + } + }, + BackgroundAction::UpdateSource { source } => { + let op = if source.id == NotSet { source.insert(db) } else { source.update(db) }; + op.await?; + self.view.request_flush().await; + }, + BackgroundAction::UpdateMetric { metric } => { + let op = if metric.id == NotSet { metric.insert(db) } else { metric.update(db) }; + if let Err(e) = op.await { + error!(target: "worker", "Could not update metric: {:?}", e); + } else { + self.view.request_flush().await; + } + }, + // _ => todo!(), + } + Ok(()) + } + + pub async fn flush_data(&mut self, db: &DatabaseConnection) -> Result<(), DbErr> { + let now = Utc::now().timestamp(); + self.fetch(db).await?; + let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... + self.points = entities::points::Entity::find() + .filter( + Condition::all() + .add(entities::points::Column::X.gte((now - new_width) as f64)) + .add(entities::points::Column::X.lte(now as f64)) + ) + .order_by(entities::points::Column::X, Order::Asc) + .all(db) + .await?.into(); + if let Err(e) = self.tx.points.send(self.points.clone().into()) { + warn!(target: "worker", "Could not send new points: {:?}", e); // TODO should be an err? + } + self.last_check = Utc::now().timestamp() - *self.width.borrow(); + info!(target: "worker", "Reloaded points"); + Ok(()) + } + + pub async fn update_points(&mut self, db: &DatabaseConnection) -> Result<(), DbErr> { + let mut changes = false; + let now = Utc::now().timestamp(); + let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... + // fetch previous points + if new_width != self.last_width { + let mut previous_points = entities::points::Entity::find() + .filter( + Condition::all() + .add(entities::points::Column::X.gte(now - new_width)) + .add(entities::points::Column::X.lte(now - self.last_width)) + ) + .order_by(entities::points::Column::X, Order::Asc) + .all(db) + .await?; + if previous_points.len() > 0 { + info!(target: "worker", "Fetched {} previous points", previous_points.len()); + } + previous_points.reverse(); // TODO wasteful! + for p in previous_points { + self.points.push_front(p); + changes = true; + } + } + + // fetch new points + let new_points = entities::points::Entity::find() + .filter( + Condition::all() + .add(entities::points::Column::X.gte(self.last_check as f64)) + .add(entities::points::Column::X.lte(now as f64)) + ) + .order_by(entities::points::Column::X, Order::Asc) + .all(db) + .await?; + + for p in new_points { + self.points.push_back(p); + changes = true; + } + + // remove old points + while let Some(p) = self.points.get(0) { + if (p.x as i64) >= now - (*self.width.borrow() * 60) { + break; + } + self.points.pop_front(); + changes = true; + } + + // update + self.last_width = new_width; + self.last_check = now; + if changes { + if let Err(e) = self.tx.points.send(self.points.clone().into()) { + warn!(target: "worker", "Could not send changes to main thread: {:?}", e); + } + } + Ok(()) + } + + pub async fn worker(mut self, run:watch::Receiver) { + let mut now; + let first_db_uri = self.db_uri.recv().await.unwrap(); + + let mut db = Database::connect(first_db_uri.clone()).await.unwrap(); + + info!(target: "worker", "Connected to '{}'", first_db_uri); + while *run.borrow() { - let now = Utc::now().timestamp(); + now = Utc::now().timestamp(); tokio::select!{ - op = self.op.recv() => { - match op { - Some(op) => { - match op { - BackgroundAction::UpdateAllPanels { panels } => { - // TODO this is kinda rough, can it be done better? - let pnls = panels.clone(); - if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| { - Box::pin(async move { - entities::panels::Entity::delete_many().exec(txn).await?; - entities::panels::Entity::insert_many( - pnls.iter().map(|v| entities::panels::ActiveModel{ - id: Set(v.id), - name: Set(v.name.clone()), - view_scroll: Set(v.view_scroll), - view_size: Set(v.view_size), - timeserie: Set(v.timeserie), - height: Set(v.height), - position: Set(v.position), - reduce_view: Set(v.reduce_view), - view_chunks: Set(v.view_chunks), - view_offset: Set(v.view_offset), - average_view: Set(v.average_view), - }).collect::>() - ).exec(txn).await?; - Ok(()) - }) - }).await { - error!(target: "worker", "Could not update panels on database: {:?}", e); - } else { - if let Err(e) = self.tx.panels.send(panels.clone()) { - error!(target: "worker", "Could not send panels update: {:?}", e); - } - self.panels = panels; - } + res = self.db_uri.recv() => { + match res { + Some(uri) => { + match Database::connect(uri.clone()).await { + Ok(new_db) => { + info!("Connected to '{}'", uri); + db = new_db; }, - BackgroundAction::UpdatePanel { panel, metrics } => { - let panel_id = match panel.id { - ActiveValue::Unchanged(pid) => Some(pid), - _ => None, - }; - let op = if panel.id == NotSet { panel.insert(&db) } else { panel.update(&db) }; - // TODO chained if is trashy - if let Err(e) = op.await { - error!(target: "worker", "Could not update panel: {:?}", e); - } else { - if let Some(panel_id) = panel_id { - if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| { - Box::pin(async move { - entities::panel_metric::Entity::delete_many() - .filter( - Condition::all() - .add(entities::panel_metric::Column::PanelId.eq(panel_id)) - ) - .exec(txn).await?; - entities::panel_metric::Entity::insert_many(metrics).exec(txn).await?; - Ok(()) - }) - }).await { - error!(target: "worker", "Could not update panels on database: {:?}", e); - } - } else { - self.view.request_flush().await; - } - } - }, - BackgroundAction::UpdateSource { source } => { - let op = if source.id == NotSet { source.insert(&db) } else { source.update(&db) }; - if let Err(e) = op.await { - error!(target: "worker", "Could not update source: {:?}", e); - } else { - self.view.request_flush().await; - } - }, - BackgroundAction::UpdateMetric { metric } => { - let op = if metric.id == NotSet { metric.insert(&db) } else { metric.update(&db) }; - if let Err(e) = op.await { - error!(target: "worker", "Could not update metric: {:?}", e); - } else { - self.view.request_flush().await; - } - }, - // _ => todo!(), - } + Err(e) => error!(target: "worker", "Could not connect to db: {:?}", e), + }; }, - None => {}, + None => { error!(target: "worker", "URI channel closed"); break; }, } }, - _ = self.flush.recv() => { - let now = Utc::now().timestamp(); - if let Err(e) = self.fetch(&db).await { - error!(target: "worker", "Could not fetch from db: {:?}", e); + res = self.op.recv() => { + match res { + Some(op) => match self.parse_op(op, &db).await { + Ok(()) => { }, + Err(e) => error!(target: "worker", "Failed executing operation: {:?}", e), + }, + None => { error!(target: "worker", "Operations channel closed"); break; }, } - let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... - self.points = match entities::points::Entity::find() - .filter( - Condition::all() - .add(entities::points::Column::X.gte((now - new_width) as f64)) - .add(entities::points::Column::X.lte(now as f64)) - ) - .order_by(entities::points::Column::X, Order::Asc) - .all(&db) - .await { - Ok(p) => p.into(), - Err(e) => { - error!(target: "worker", "Could not fetch new points: {:?}", e); - continue; - } - }; - if let Err(e) = self.tx.points.send(self.points.clone().into()) { - error!(target: "worker", "Could not send new points: {:?}", e); + } + res = self.flush.recv() => { + match res { + Some(()) => match self.flush_data(&db).await { + Ok(()) => { }, + Err(e) => error!(target: "worker", "Could not flush away current data: {:?}", e), + }, + None => { error!(target: "worker", "Flush channel closed"); break; }, } - last = Utc::now().timestamp(); - info!(target: "worker", "Reloaded points"); }, _ = sleep(self.cache_age - (now - self.last_refresh)) => { if let Err(e) = self.fetch(&db).await { @@ -280,82 +368,11 @@ impl AppState { } }, _ = sleep(self.interval - (now - self.last_check)) => { - let mut changes = false; - let now = Utc::now().timestamp(); - let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere... - - // fetch previous points - if new_width != width { - let mut previous_points = match entities::points::Entity::find() - .filter( - Condition::all() - .add(entities::points::Column::X.gte(now - new_width)) - .add(entities::points::Column::X.lte(now - width)) - ) - .order_by(entities::points::Column::X, Order::Asc) - .all(&db) - .await { - Ok(p) => p, - Err(e) => { - error!(target: "worker", "Could not fetch previous points: {:?}", e); - continue; - } - }; - if previous_points.len() > 0 { - info!(target: "worker", "Fetched {} previous points", previous_points.len()); - } - previous_points.reverse(); // TODO wasteful! - for p in previous_points { - self.points.push_front(p); - changes = true; - } + if let Err(e) = self.update_points(&db).await { + error!(target: "worker", "Could not update points: {:?}", e); } - - // fetch new points - let new_points = match entities::points::Entity::find() - .filter( - Condition::all() - .add(entities::points::Column::X.gte(last as f64)) - .add(entities::points::Column::X.lte(now as f64)) - ) - .order_by(entities::points::Column::X, Order::Asc) - .all(&db) - .await { - Ok(p) => p, - Err(e) => { - error!(target: "worker", "Could not fetch new points: {:?}", e); - continue; - } - }; - if new_points.len() > 0 { - info!(target: "worker", "Fetched {} new points", new_points.len()); - } - - for p in new_points { - self.points.push_back(p); - changes = true; - } - - // remove old points - while let Some(p) = self.points.get(0) { - if (p.x as i64) >= now - (*self.width.borrow() * 60) { - break; - } - self.points.pop_front(); - changes = true; - } - - // update - last = now; - width = new_width; - self.last_check = now; - if changes { - if let Err(e) = self.tx.points.send(self.points.clone().into()) { - error!(target: "worker", "Could not send changes to main thread: {:?}", e); - } - } - }, - }; + } + } } } }