feat: surveyor fetches concurrently

also catch more errors and log more
This commit is contained in:
əlemi 2022-11-01 20:47:40 +01:00
parent adf2812dfc
commit 9b794fa6b1
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -1,7 +1,9 @@
use std::sync::Arc;
use chrono::Utc; use chrono::Utc;
use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait}; use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait};
use tokio::sync::watch; use tokio::sync::watch;
use tracing::error; use tracing::{error, info};
use crate::data::{entities, FetchError}; use crate::data::{entities, FetchError};
@ -15,26 +17,24 @@ pub async fn surveyor_loop(
cache_time:i64, cache_time:i64,
run: watch::Receiver<bool>, run: watch::Receiver<bool>,
) { ) {
let mut last_check = Utc::now().timestamp(); let mut last_activation = Utc::now().timestamp();
let mut last_fetch = 0; let mut last_fetch = 0;
let mut sources = vec![]; let mut sources = vec![];
let mut metrics = vec![]; let mut metrics = Arc::new(vec![]);
while *run.borrow() { while *run.borrow() {
// sleep until next activation // sleep until next activation
let delta_time = (interval as i64) - (Utc::now().timestamp() - last_check); let delta_time = (interval as i64) - (Utc::now().timestamp() - last_activation);
if delta_time > 0 { if delta_time > 0 {
tokio::time::sleep(std::time::Duration::from_secs(delta_time as u64)).await; tokio::time::sleep(std::time::Duration::from_secs(delta_time as u64)).await;
} }
last_check = Utc::now().timestamp(); last_activation = Utc::now().timestamp();
if Utc::now().timestamp() - last_fetch > cache_time { if Utc::now().timestamp() - last_fetch > cache_time {
// TODO do both concurrently // TODO do both concurrently
let res = tokio::join!( sources = entities::sources::Entity::find().all(&db).await.unwrap();
entities::sources::Entity::find().all(&db), metrics = Arc::new(entities::metrics::Entity::find().all(&db).await.unwrap());
entities::metrics::Entity::find().all(&db) info!(target: "surveyor", "Reloaded sources and metrics");
);
sources = res.0.unwrap();
metrics = res.1.unwrap();
last_fetch = Utc::now().timestamp(); last_fetch = Utc::now().timestamp();
} }
@ -43,39 +43,42 @@ pub async fn surveyor_loop(
continue; continue;
} }
// source.last_fetch = Utc::now(); // TODO! do it in background threads again! let metrics_snapshot = metrics.clone();
// tokio::spawn(async move { let db_clone = db.clone();
match fetch(&source.url).await { let source_clone = source.clone();
let now = Utc::now().timestamp();
source.last_update = now; // TODO kinda meh
// we set this before knowing about fetch result, to avoid re-running a fetch
// next time this loop runs. But the task only sets last_update on db if fetch succeeds,
// so if an error happens the client and server last_update fields will differ until fetched
// again. This could be avoided by keeping track of which threads are trying which sources,
// but also only trying to fetch at certain intervals to stay aligned might be desirable.
tokio::spawn(async move {
match fetch(&source_clone.url).await {
Ok(res) => { Ok(res) => {
let now = Utc::now().timestamp(); if let Err(e) = entities::sources::Entity::update(
entities::sources::Entity::update( entities::sources::ActiveModel{id: Set(source_clone.id), last_update: Set(now), ..Default::default()}
entities::sources::ActiveModel{id: Set(source.id), last_update: Set(now), ..Default::default()} ).exec(&db_clone).await {
).exec(&db).await.unwrap(); error!(target: "surveyor", "Failed setting last_update ({:?}) for source {:?} but successfully fetched '{}', aborting", e, source_clone, res);
source.last_update = now; return;
for metric in metrics.iter().filter(|x| source.id == x.source_id) { }
for metric in metrics_snapshot.iter().filter(|x| source_clone.id == x.source_id) {
match metric.extract(&res) { match metric.extract(&res) {
Ok(v) => { Ok(v) => {
entities::points::Entity::insert( if let Err(e) = entities::points::Entity::insert(
entities::points::ActiveModel { entities::points::ActiveModel {
id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y), id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y),
}).exec(&db).await.unwrap(); }).exec(&db_clone).await {
error!(target: "surveyor", "Could not insert record {:?} : {:?}", v, e);
}
}, },
Err(e) => error!(target: "worker", "Failed extracting '{}' from {}: {:?}", metric.name, source.name, e), Err(e) => error!(target: "surveyor", "Failed extracting '{}' from {}: {:?}", metric.name, source_clone.name, e),
} }
} }
}, },
Err(e) => error!(target: "worker", "Failed fetching {}: {:?}", source.name, e), Err(e) => error!(target: "surveyor", "Failed fetching {}: {:?}", source_clone.name, e),
} }
// source.last_fetch = Utc::now(); // TODO! });
// });
} }
// if let Ok(meta) = std::fs::metadata(state.file_path.clone()) {
// let mut fsize = state.file_size.write().expect("File Size RwLock poisoned");
// *fsize = meta.len();
// } // ignore errors
// ctx.request_repaint();
} }
} }