feat: track workers by index

This commit is contained in:
əlemi 2022-11-05 19:05:31 +01:00
parent 372e17caad
commit a2e353d18f
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 11 additions and 9 deletions

View file

@ -100,11 +100,11 @@ fn main() {
.block_on(async { .block_on(async {
let mut jobs = vec![]; let mut jobs = vec![];
for db_uri in db_uris { for (i, db_uri) in db_uris.iter().enumerate() {
let db = match Database::connect(db_uri.clone()).await { let db = match Database::connect(db_uri.clone()).await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
error!(target: "worker", "Could not connect to db: {:?}", e); error!(target: "worker", "Could not connect to db #{}: {:?}", i, e);
return; return;
} }
}; };
@ -118,6 +118,7 @@ fn main() {
args.interval as i64, args.interval as i64,
args.cache_time as i64, args.cache_time as i64,
run_rx.clone(), run_rx.clone(),
i,
) )
) )
); );

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use chrono::Utc; use chrono::Utc;
use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait}; use sea_orm::{DatabaseConnection, ActiveValue::NotSet, Set, EntityTrait};
use tokio::sync::watch; use tokio::sync::watch;
use tracing::{error, info}; use tracing::error;
use crate::data::{entities, FetchError}; use crate::data::{entities, FetchError};
@ -16,6 +16,7 @@ pub async fn surveyor_loop(
interval:i64, interval:i64,
cache_time:i64, cache_time:i64,
run: watch::Receiver<bool>, run: watch::Receiver<bool>,
index: usize,
) { ) {
let mut last_activation = Utc::now().timestamp(); let mut last_activation = Utc::now().timestamp();
let mut last_fetch = 0; let mut last_fetch = 0;
@ -35,14 +36,14 @@ pub async fn surveyor_loop(
match entities::sources::Entity::find().all(&db).await { match entities::sources::Entity::find().all(&db).await {
Ok(srcs) => sources = srcs, Ok(srcs) => sources = srcs,
Err(e) => { Err(e) => {
error!(target: "surveyor", "Could not fetch sources: {:?}", e); error!(target: "surveyor", "[{}] Could not fetch sources: {:?}", index, e);
continue; continue;
} }
} }
match entities::metrics::Entity::find().all(&db).await { match entities::metrics::Entity::find().all(&db).await {
Ok(mtrcs) => metrics = Arc::new(mtrcs), Ok(mtrcs) => metrics = Arc::new(mtrcs),
Err(e) => { Err(e) => {
error!(target: "surveyor", "Could not fetch metrics: {:?}", e); error!(target: "surveyor", "[{}] Could not fetch metrics: {:?}", index, e);
continue; continue;
} }
} }
@ -70,7 +71,7 @@ pub async fn surveyor_loop(
if let Err(e) = entities::sources::Entity::update( if let Err(e) = entities::sources::Entity::update(
entities::sources::ActiveModel{id: Set(source_clone.id), last_update: Set(now), ..Default::default()} entities::sources::ActiveModel{id: Set(source_clone.id), last_update: Set(now), ..Default::default()}
).exec(&db_clone).await { ).exec(&db_clone).await {
error!(target: "surveyor", "Failed setting last_update ({:?}) for source {:?} but successfully fetched '{}', aborting", e, source_clone, res); error!(target: "surveyor", "[{}] Failed setting last_update ({:?}) for source {:?} but successfully fetched '{}', aborting", index, e, source_clone, res);
return; return;
} }
for metric in metrics_snapshot.iter().filter(|x| source_clone.id == x.source_id) { for metric in metrics_snapshot.iter().filter(|x| source_clone.id == x.source_id) {
@ -80,14 +81,14 @@ pub async fn surveyor_loop(
entities::points::ActiveModel { entities::points::ActiveModel {
id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y), id: NotSet, metric_id: Set(metric.id), x: Set(v.x), y: Set(v.y),
}).exec(&db_clone).await { }).exec(&db_clone).await {
error!(target: "surveyor", "Could not insert record {:?} : {:?}", v, e); error!(target: "surveyor", "[{}] Could not insert record {:?} : {:?}", index, v, e);
} }
}, },
Err(e) => error!(target: "surveyor", "Failed extracting '{}' from {}: {:?}", metric.name, source_clone.name, e), Err(e) => error!(target: "surveyor", "[{}] Failed extracting '{}' from {}: {:?}", index, metric.name, source_clone.name, e),
} }
} }
}, },
Err(e) => error!(target: "surveyor", "Failed fetching {}: {:?}", source_clone.name, e), Err(e) => error!(target: "surveyor", "[{}] Failed fetching {}: {:?}", index, source_clone.name, e),
} }
}); });
} }