diff --git a/src/worker/surveyor.rs b/src/worker/surveyor.rs index 3955cff..0c5908e 100644 --- a/src/worker/surveyor.rs +++ b/src/worker/surveyor.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; + use chrono::Utc; use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait}; use tokio::sync::watch; -use tracing::error; +use tracing::{error, info}; use crate::data::{entities, FetchError}; @@ -15,26 +17,24 @@ pub async fn surveyor_loop( cache_time:i64, run: watch::Receiver, ) { - let mut last_check = Utc::now().timestamp(); + let mut last_activation = Utc::now().timestamp(); let mut last_fetch = 0; let mut sources = vec![]; - let mut metrics = vec![]; + let mut metrics = Arc::new(vec![]); + while *run.borrow() { // sleep until next activation - let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check); + let delta_time = (interval as i64) - (Utc::now().timestamp() - last_activation); if delta_time > 0 { tokio::time::sleep(std::time::Duration::from_secs(delta_time as u64)).await; } - last_check = Utc::now().timestamp(); + last_activation = Utc::now().timestamp(); if Utc::now().timestamp() - last_fetch > cache_time { // TODO do both concurrently - let res = tokio::join!( - entities::sources::Entity::find().all(&db), - entities::metrics::Entity::find().all(&db) - ); - sources = res.0.unwrap(); - metrics = res.1.unwrap(); + sources = entities::sources::Entity::find().all(&db).await.unwrap(); + metrics = Arc::new(entities::metrics::Entity::find().all(&db).await.unwrap()); + info!(target: "surveyor", "Reloaded sources and metrics"); last_fetch = Utc::now().timestamp(); } @@ -43,39 +43,42 @@ pub async fn surveyor_loop( continue; } - // source.last_fetch = Utc::now(); // TODO! do it in background threads again! - // tokio::spawn(async move { - match fetch(&source.url).await { + let metrics_snapshot = metrics.clone(); + let db_clone = db.clone(); + let source_clone = source.clone(); + let now = Utc::now().timestamp(); + source.last_update = now; // TODO kinda meh + // we set this before knowing about fetch result, to avoid re-running a fetch + // next time this loop runs. But the task only sets last_update on db if fetch succeeds, + // so if an error happens the client and server last_update fields will differ until fetched + // again. This could be avoided by keeping track of which threads are trying which sources, + // but also only trying to fetch at certain intervals to stay aligned might be desirable. + tokio::spawn(async move { + match fetch(&source_clone.url).await { Ok(res) => { - let now = Utc::now().timestamp(); - entities::sources::Entity::update( - entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()} - ).exec(&db).await.unwrap(); - source.last_update = now; - for metric in metrics.iter().filter(|x| source.id == x.source_id) { + if let Err(e) = entities::sources::Entity::update( + entities::sources::ActiveModel{id: Set(source_clone.id), last_update: Set(now), ..Default::default()} + ).exec(&db_clone).await { + error!(target: "surveyor", "Failed setting last_update ({:?}) for source {:?} but successfully fetched '{}', aborting", e, source_clone, res); + return; + } + for metric in metrics_snapshot.iter().filter(|x| source_clone.id == x.source_id) { match metric.extract(&res) { Ok(v) => { - entities::points::Entity::insert( + if let Err(e) = entities::points::Entity::insert( entities::points::ActiveModel { id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y), - }).exec(&db).await.unwrap(); + }).exec(&db_clone).await { + error!(target: "surveyor", "Could not insert record {:?} : {:?}", v, e); + } }, - Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e), + Err(e) => error!(target: "surveyor", "Failed extracting '{}' from {}: {:?}", metric.name, source_clone.name, e), } } }, - Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e), + Err(e) => error!(target: "surveyor", "Failed fetching {}: {:?}", source_clone.name, e), } - // source.last_fetch = Utc::now(); // TODO! - // }); - + }); } - - // if let Ok(meta) = std::fs::metadata(state.file_path.clone()) { - // let mut fsize = state.file_size.write().expect("File Size RwLock poisoned"); - // *fsize = meta.len(); - // } // ignore errors - - // ctx.request_repaint(); } }