feat: allow processing multiple dbs from one surveyor

This commit is contained in:
əlemi 2022-11-05 16:18:24 +01:00
parent c59c4b0009
commit 08ed95d74c
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 86 additions and 73 deletions

View file

@ -26,6 +26,7 @@ reqwest = { version = "0.11", features = ["json"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tracing-subscriber = "0.3"
ctrlc = "3.2.3"
[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.6"

View file

@ -25,6 +25,7 @@ use gui::{
#[derive(Parser, Debug)]
#[command(author, version, about)]
struct CliArgs {
/// Which mode to run in
#[clap(subcommand)]
mode: Mode,
@ -46,7 +47,8 @@ enum Mode {
/// Run as background service fetching sources from db
Worker {
/// Connection string for database to use
db_uri: String,
#[arg(required = true)]
db_uris: Vec<String>,
},
/// Run as foreground user interface displaying collected data
GUI {
@ -56,7 +58,7 @@ enum Mode {
// When compiling for web:
#[cfg(target_arch = "wasm32")]
fn setup_tracing(_layer: InternalLoggerLayer) {
fn setup_tracing(_layer: Option<InternalLoggerLayer>) {
// Make sure panics are logged using `console.error`.
console_error_panic_hook::set_once();
// Redirect tracing to console.log and friends:
@ -65,13 +67,17 @@ fn setup_tracing(_layer: InternalLoggerLayer) {
// When compiling natively:
#[cfg(not(target_arch = "wasm32"))]
fn setup_tracing(layer: InternalLoggerLayer) {
tracing_subscriber::registry()
fn setup_tracing(layer: Option<InternalLoggerLayer>) {
let sub = tracing_subscriber::registry()
.with(LevelFilter::INFO)
.with(filter_fn(|x| x.target() != "sqlx::query"))
.with(tracing_subscriber::fmt::Layer::new())
.with(layer)
.init();
.with(tracing_subscriber::fmt::Layer::new());
if let Some(layer) = layer {
sub.with(layer).init();
} else {
sub.init();
}
}
fn main() {
@ -80,63 +86,74 @@ fn main() {
// TODO is there an alternative to this ugly botch?
let (ctx_tx, ctx_rx) = watch::channel::<Option<Context>>(None);
let (width_tx, width_rx) = watch::channel(0);
let (run_tx, run_rx) = watch::channel(true);
let logger = InternalLogger::new(args.log_size as usize);
let logger_view = logger.view();
setup_tracing(logger.layer());
match args.mode {
Mode::Worker { db_uri } => {
let run_rx_clone = run_rx.clone();
Mode::Worker { db_uris } => {
setup_tracing(None);
let worker = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let db = match Database::connect(db_uri.clone()).await {
Ok(v) => v,
Err(e) => {
error!(target: "launcher", "Could not connect to db: {:?}", e);
return;
}
};
info!(target: "launcher", "Connected to '{}'", db_uri);
let mut jobs = vec![];
jobs.push(
tokio::spawn(logger.worker(run_rx_clone.clone()))
);
for db_uri in db_uris {
let db = match Database::connect(db_uri.clone()).await {
Ok(v) => v,
Err(e) => {
error!(target: "worker", "Could not connect to db: {:?}", e);
return;
}
};
jobs.push(
tokio::spawn(
surveyor_loop(
db.clone(),
args.interval as i64,
args.cache_time as i64,
run_rx_clone.clone(),
info!(target: "worker", "Connected to '{}'", db_uri);
jobs.push(
tokio::spawn(
surveyor_loop(
db.clone(),
args.interval as i64,
args.cache_time as i64,
run_rx.clone(),
)
)
)
);
);
}
for (i, job) in jobs.into_iter().enumerate() {
if let Err(e) = job.await {
error!(target: "launcher", "Could not join task #{}: {:?}", i, e);
error!(target: "worker", "Could not join task #{}: {:?}", i, e);
}
}
info!(target: "launcher", "Stopping background worker");
info!(target: "worker", "Stopping background worker");
})
});
worker.join().unwrap();
let (sigint_tx, sigint_rx) = std::sync::mpsc::channel(); // TODO can I avoid using a std channel?
ctrlc::set_handler(move ||
sigint_tx.send(()).expect("Could not send signal on channel")
).expect("Could not set SIGINT handler");
sigint_rx.recv().expect("Could not receive signal from channel");
info!(target: "launcher", "Received SIGINT, stopping...");
run_tx.send(false).unwrap_or(()); // ignore errors
worker.join().expect("Failed joining worker thread");
},
Mode::GUI { } => {
let (uri_tx, uri_rx) = mpsc::channel(10);
let (width_tx, width_rx) = watch::channel(0);
let logger = InternalLogger::new(args.log_size as usize);
let logger_view = logger.view();
setup_tracing(Some(logger.layer()));
let state = match AppState::new(
width_rx,
uri_rx,
@ -150,7 +167,6 @@ fn main() {
}
};
let view = state.view();
let run_rx_clone = run_rx.clone();
let worker = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
@ -160,7 +176,7 @@ fn main() {
.block_on(async {
let mut jobs = vec![];
let run_rx_clone_clone = run_rx_clone.clone();
let run_rx_clone_clone = run_rx.clone();
jobs.push(
tokio::spawn(async move {
@ -174,22 +190,22 @@ fn main() {
);
jobs.push(
tokio::spawn(logger.worker(run_rx_clone.clone()))
tokio::spawn(logger.worker(run_rx.clone()))
);
jobs.push(
tokio::spawn(
state.worker(run_rx_clone.clone())
state.worker(run_rx.clone())
)
);
for (i, job) in jobs.into_iter().enumerate() {
if let Err(e) = job.await {
error!(target: "launcher", "Could not join task #{}: {:?}", i, e);
error!(target: "worker", "Could not join task #{}: {:?}", i, e);
}
}
info!(target: "launcher", "Stopping background worker");
info!(target: "worker", "Stopping background worker");
})
});
@ -220,15 +236,11 @@ fn main() {
),
);
info!(target: "launcher", "Stopping native GUI");
info!(target: "launcher", "GUI quit, stopping background worker...");
if let Err(e) = run_tx.send(false) {
error!(target: "launcher", "Error signaling end to workers: {:?}", e);
}
run_tx.send(false).unwrap_or(()); // ignore errors
if let Err(e) = worker.join() {
error!(target: "launcher", "Error joining background thread : {:?}", e);
}
worker.join().expect("Failed joining worker thread");
}
}

View file

@ -126,27 +126,27 @@ impl AppState {
.order_by(entities::panels::Column::Position, Order::Asc)
.all(db).await?;
if let Err(e) = self.tx.panels.send(self.panels.clone()) {
error!(target: "worker", "Could not send panels update: {:?}", e);
error!(target: "state-manager", "Could not send panels update: {:?}", e);
}
self.sources = entities::sources::Entity::find()
.order_by(entities::sources::Column::Position, Order::Asc)
.all(db).await?;
if let Err(e) = self.tx.sources.send(self.sources.clone()) {
error!(target: "worker", "Could not send sources update: {:?}", e);
error!(target: "state-manager", "Could not send sources update: {:?}", e);
}
self.metrics = entities::metrics::Entity::find()
.order_by(entities::metrics::Column::Position, Order::Asc)
.all(db).await?;
if let Err(e) = self.tx.metrics.send(self.metrics.clone()) {
error!(target: "worker", "Could not send metrics update: {:?}", e);
error!(target: "state-manager", "Could not send metrics update: {:?}", e);
}
self.panel_metric = entities::panel_metric::Entity::find()
.all(db).await?;
if let Err(e) = self.tx.panel_metric.send(self.panel_metric.clone()) {
error!(target: "worker", "Could not send panel-metric update: {:?}", e);
error!(target: "state-manager", "Could not send panel-metric update: {:?}", e);
}
self.last_refresh = chrono::Utc::now().timestamp();
@ -183,10 +183,10 @@ impl AppState {
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
error!(target: "state-manager", "Could not update panels on database: {:?}", e);
} else {
if let Err(e) = self.tx.panels.send(panels.clone()) {
error!(target: "worker", "Could not send panels update: {:?}", e);
error!(target: "state-manager", "Could not send panels update: {:?}", e);
}
self.panels = panels;
}
@ -212,7 +212,7 @@ impl AppState {
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
error!(target: "state-manager", "Could not update panels on database: {:?}", e);
}
} else {
self.view.request_flush().await;
@ -226,7 +226,7 @@ impl AppState {
BackgroundAction::UpdateMetric { metric } => {
let op = if metric.id == NotSet { metric.insert(db) } else { metric.update(db) };
if let Err(e) = op.await {
error!(target: "worker", "Could not update metric: {:?}", e);
error!(target: "state-manager", "Could not update metric: {:?}", e);
} else {
self.view.request_flush().await;
}
@ -250,10 +250,10 @@ impl AppState {
.all(db)
.await?.into();
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
warn!(target: "worker", "Could not send new points: {:?}", e); // TODO should be an err?
warn!(target: "state-manager", "Could not send new points: {:?}", e); // TODO should be an err?
}
self.last_check = Utc::now().timestamp() - *self.width.borrow();
info!(target: "worker", "Reloaded points");
info!(target: "state-manager", "Reloaded points");
Ok(())
}
@ -274,7 +274,7 @@ impl AppState {
.all(db)
.await?;
if previous_points.len() > 0 {
info!(target: "worker", "Fetched {} previous points", previous_points.len());
info!(target: "state-manager", "Fetched {} previous points", previous_points.len());
}
previous_points.reverse(); // TODO wasteful!
for p in previous_points {
@ -313,7 +313,7 @@ impl AppState {
self.last_check = now;
if changes {
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
warn!(target: "worker", "Could not send changes to main thread: {:?}", e);
warn!(target: "state-manager", "Could not send changes to main thread: {:?}", e);
}
}
Ok(())
@ -325,7 +325,7 @@ impl AppState {
let mut db = Database::connect(first_db_uri.clone()).await.unwrap();
info!(target: "worker", "Connected to '{}'", first_db_uri);
info!(target: "state-manager", "Connected to '{}'", first_db_uri);
while *run.borrow() {
now = Utc::now().timestamp();
@ -338,38 +338,38 @@ impl AppState {
info!("Connected to '{}'", uri);
db = new_db;
},
Err(e) => error!(target: "worker", "Could not connect to db: {:?}", e),
Err(e) => error!(target: "state-manager", "Could not connect to db: {:?}", e),
};
},
None => { error!(target: "worker", "URI channel closed"); break; },
None => { error!(target: "state-manager", "URI channel closed"); break; },
}
},
res = self.op.recv() => {
match res {
Some(op) => match self.parse_op(op, &db).await {
Ok(()) => { },
Err(e) => error!(target: "worker", "Failed executing operation: {:?}", e),
Err(e) => error!(target: "state-manager", "Failed executing operation: {:?}", e),
},
None => { error!(target: "worker", "Operations channel closed"); break; },
None => { error!(target: "state-manager", "Operations channel closed"); break; },
}
}
res = self.flush.recv() => {
match res {
Some(()) => match self.flush_data(&db).await {
Ok(()) => { },
Err(e) => error!(target: "worker", "Could not flush away current data: {:?}", e),
Err(e) => error!(target: "state-manager", "Could not flush away current data: {:?}", e),
},
None => { error!(target: "worker", "Flush channel closed"); break; },
None => { error!(target: "state-manager", "Flush channel closed"); break; },
}
},
_ = sleep(self.cache_age - (now - self.last_refresh)) => {
if let Err(e) = self.fetch(&db).await {
error!(target: "worker", "Could not fetch from db: {:?}", e);
error!(target: "state-manager", "Could not fetch from db: {:?}", e);
}
},
_ = sleep(self.interval - (now - self.last_check)) => {
if let Err(e) = self.update_points(&db).await {
error!(target: "worker", "Could not update points: {:?}", e);
error!(target: "state-manager", "Could not update points: {:?}", e);
}
}
}