diff --git a/src/main.rs b/src/main.rs index 2c82e12..c0af1e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -100,11 +100,11 @@ fn main() { .block_on(async { let mut jobs = vec![]; - for db_uri in db_uris { + for (i, db_uri) in db_uris.iter().enumerate() { let db = match Database::connect(db_uri.clone()).await { Ok(v) => v, Err(e) => { - error!(target: "worker", "Could not connect to db: {:?}", e); + error!(target: "worker", "Could not connect to db #{}: {:?}", i, e); return; } }; @@ -118,6 +118,7 @@ fn main() { args.interval as i64, args.cache_time as i64, run_rx.clone(), + i, ) ) ); diff --git a/src/worker/surveyor.rs b/src/worker/surveyor.rs index 786cab8..5f9ddac 100644 --- a/src/worker/surveyor.rs +++ b/src/worker/surveyor.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use chrono::Utc; use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait}; use tokio::sync::watch; -use tracing::{error, info}; +use tracing::error; use crate::data::{entities, FetchError}; @@ -16,6 +16,7 @@ pub async fn surveyor_loop( interval:i64, cache_time:i64, run: watch::Receiver, + index: usize, ) { let mut last_activation = Utc::now().timestamp(); let mut last_fetch = 0; @@ -35,14 +36,14 @@ pub async fn surveyor_loop( match entities::sources::Entity::find().all(&db).await { Ok(srcs) => sources = srcs, Err(e) => { - error!(target: "surveyor", "Could not fetch sources: {:?}", e); + error!(target: "surveyor", "[{}] Could not fetch sources: {:?}", index, e); continue; } } match entities::metrics::Entity::find().all(&db).await { Ok(mtrcs) => metrics = Arc::new(mtrcs), Err(e) => { - error!(target: "surveyor", "Could not fetch metrics: {:?}", e); + error!(target: "surveyor", "[{}] Could not fetch metrics: {:?}", index, e); continue; } } @@ -70,7 +71,7 @@ pub async fn surveyor_loop( if let Err(e) = entities::sources::Entity::update( entities::sources::ActiveModel{id: Set(source_clone.id), last_update: Set(now), ..Default::default()} ).exec(&db_clone).await { - error!(target: "surveyor", "Failed setting last_update ({:?}) for source {:?} but successfully fetched '{}', aborting", e, source_clone, res); + error!(target: "surveyor", "[{}] Failed setting last_update ({:?}) for source {:?} but successfully fetched '{}', aborting", index, e, source_clone, res); return; } for metric in metrics_snapshot.iter().filter(|x| source_clone.id == x.source_id) { @@ -80,14 +81,14 @@ pub async fn surveyor_loop( entities::points::ActiveModel { id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y), }).exec(&db_clone).await { - error!(target: "surveyor", "Could not insert record {:?} : {:?}", v, e); + error!(target: "surveyor", "[{}] Could not insert record {:?} : {:?}", index, v, e); } }, - Err(e) => error!(target: "surveyor", "Failed extracting '{}' from {}: {:?}", metric.name, source_clone.name, e), + Err(e) => error!(target: "surveyor", "[{}] Failed extracting '{}' from {}: {:?}", index, metric.name, source_clone.name, e), } } }, - Err(e) => error!(target: "surveyor", "Failed fetching {}: {:?}", source_clone.name, e), + Err(e) => error!(target: "surveyor", "[{}] Failed fetching {}: {:?}", index, source_clone.name, e), } }); }