From 08ed95d74c981020aabfe19721731b277e1eb381 Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 5 Nov 2022 16:18:24 +0100 Subject: [PATCH] feat: allow processing multiple dbs from one surveyor --- Cargo.toml | 1 + src/main.rs | 116 +++++++++++++++++++++------------------ src/worker/visualizer.rs | 42 +++++++------- 3 files changed, 86 insertions(+), 73 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 637d67e..399e9e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ reqwest = { version = "0.11", features = ["json"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tracing-subscriber = "0.3" +ctrlc = "3.2.3" [target.'cfg(target_arch = "wasm32")'.dependencies] console_error_panic_hook = "0.1.6" diff --git a/src/main.rs b/src/main.rs index 93505cd..af38a23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ use gui::{ #[derive(Parser, Debug)] #[command(author, version, about)] struct CliArgs { + /// Which mode to run in #[clap(subcommand)] mode: Mode, @@ -46,7 +47,8 @@ enum Mode { /// Run as background service fetching sources from db Worker { /// Connection string for database to use - db_uri: String, + #[arg(required = true)] + db_uris: Vec, }, /// Run as foreground user interface displaying collected data GUI { @@ -56,7 +58,7 @@ enum Mode { // When compiling for web: #[cfg(target_arch = "wasm32")] -fn setup_tracing(_layer: InternalLoggerLayer) { +fn setup_tracing(_layer: Option) { // Make sure panics are logged using `console.error`. console_error_panic_hook::set_once(); // Redirect tracing to console.log and friends: @@ -65,13 +67,17 @@ fn setup_tracing(_layer: InternalLoggerLayer) { // When compiling natively: #[cfg(not(target_arch = "wasm32"))] -fn setup_tracing(layer: InternalLoggerLayer) { - tracing_subscriber::registry() +fn setup_tracing(layer: Option) { + let sub = tracing_subscriber::registry() .with(LevelFilter::INFO) .with(filter_fn(|x| x.target() != "sqlx::query")) - .with(tracing_subscriber::fmt::Layer::new()) - .with(layer) - .init(); + .with(tracing_subscriber::fmt::Layer::new()); + + if let Some(layer) = layer { + sub.with(layer).init(); + } else { + sub.init(); + } } fn main() { @@ -80,63 +86,74 @@ fn main() { // TODO is there an alternative to this ugly botch? let (ctx_tx, ctx_rx) = watch::channel::>(None); - let (width_tx, width_rx) = watch::channel(0); let (run_tx, run_rx) = watch::channel(true); - let logger = InternalLogger::new(args.log_size as usize); - let logger_view = logger.view(); - - setup_tracing(logger.layer()); - match args.mode { - Mode::Worker { db_uri } => { - let run_rx_clone = run_rx.clone(); + Mode::Worker { db_uris } => { + setup_tracing(None); + let worker = std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(async { - let db = match Database::connect(db_uri.clone()).await { - Ok(v) => v, - Err(e) => { - error!(target: "launcher", "Could not connect to db: {:?}", e); - return; - } - }; - info!(target: "launcher", "Connected to '{}'", db_uri); - let mut jobs = vec![]; - jobs.push( - tokio::spawn(logger.worker(run_rx_clone.clone())) - ); + for db_uri in db_uris { + let db = match Database::connect(db_uri.clone()).await { + Ok(v) => v, + Err(e) => { + error!(target: "worker", "Could not connect to db: {:?}", e); + return; + } + }; - jobs.push( - tokio::spawn( - surveyor_loop( - db.clone(), - args.interval as i64, - args.cache_time as i64, - run_rx_clone.clone(), + info!(target: "worker", "Connected to '{}'", db_uri); + + jobs.push( + tokio::spawn( + surveyor_loop( + db.clone(), + args.interval as i64, + args.cache_time as i64, + run_rx.clone(), + ) ) - ) - ); + ); + } for (i, job) in jobs.into_iter().enumerate() { if let Err(e) = job.await { - error!(target: "launcher", "Could not join task #{}: {:?}", i, e); + error!(target: "worker", "Could not join task #{}: {:?}", i, e); } } - info!(target: "launcher", "Stopping background worker"); + info!(target: "worker", "Stopping background worker"); }) }); - worker.join().unwrap(); + let (sigint_tx, sigint_rx) = std::sync::mpsc::channel(); // TODO can I avoid using a std channel? + ctrlc::set_handler(move || + sigint_tx.send(()).expect("Could not send signal on channel") + ).expect("Could not set SIGINT handler"); + + sigint_rx.recv().expect("Could not receive signal from channel"); + info!(target: "launcher", "Received SIGINT, stopping..."); + + run_tx.send(false).unwrap_or(()); // ignore errors + worker.join().expect("Failed joining worker thread"); }, + Mode::GUI { } => { let (uri_tx, uri_rx) = mpsc::channel(10); + let (width_tx, width_rx) = watch::channel(0); + + let logger = InternalLogger::new(args.log_size as usize); + let logger_view = logger.view(); + + setup_tracing(Some(logger.layer())); + let state = match AppState::new( width_rx, uri_rx, @@ -150,7 +167,6 @@ fn main() { } }; let view = state.view(); - let run_rx_clone = run_rx.clone(); let worker = std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread() @@ -160,7 +176,7 @@ fn main() { .block_on(async { let mut jobs = vec![]; - let run_rx_clone_clone = run_rx_clone.clone(); + let run_rx_clone_clone = run_rx.clone(); jobs.push( tokio::spawn(async move { @@ -174,22 +190,22 @@ fn main() { ); jobs.push( - tokio::spawn(logger.worker(run_rx_clone.clone())) + tokio::spawn(logger.worker(run_rx.clone())) ); jobs.push( tokio::spawn( - state.worker(run_rx_clone.clone()) + state.worker(run_rx.clone()) ) ); for (i, job) in jobs.into_iter().enumerate() { if let Err(e) = job.await { - error!(target: "launcher", "Could not join task #{}: {:?}", i, e); + error!(target: "worker", "Could not join task #{}: {:?}", i, e); } } - info!(target: "launcher", "Stopping background worker"); + info!(target: "worker", "Stopping background worker"); }) }); @@ -220,15 +236,11 @@ fn main() { ), ); - info!(target: "launcher", "Stopping native GUI"); + info!(target: "launcher", "GUI quit, stopping background worker..."); - if let Err(e) = run_tx.send(false) { - error!(target: "launcher", "Error signaling end to workers: {:?}", e); - } + run_tx.send(false).unwrap_or(()); // ignore errors - if let Err(e) = worker.join() { - error!(target: "launcher", "Error joining background thread : {:?}", e); - } + worker.join().expect("Failed joining worker thread"); } } diff --git a/src/worker/visualizer.rs b/src/worker/visualizer.rs index d50d534..806afbc 100644 --- a/src/worker/visualizer.rs +++ b/src/worker/visualizer.rs @@ -126,27 +126,27 @@ impl AppState { .order_by(entities::panels::Column::Position, Order::Asc) .all(db).await?; if let Err(e) = self.tx.panels.send(self.panels.clone()) { - error!(target: "worker", "Could not send panels update: {:?}", e); + error!(target: "state-manager", "Could not send panels update: {:?}", e); } self.sources = entities::sources::Entity::find() .order_by(entities::sources::Column::Position, Order::Asc) .all(db).await?; if let Err(e) = self.tx.sources.send(self.sources.clone()) { - error!(target: "worker", "Could not send sources update: {:?}", e); + error!(target: "state-manager", "Could not send sources update: {:?}", e); } self.metrics = entities::metrics::Entity::find() .order_by(entities::metrics::Column::Position, Order::Asc) .all(db).await?; if let Err(e) = self.tx.metrics.send(self.metrics.clone()) { - error!(target: "worker", "Could not send metrics update: {:?}", e); + error!(target: "state-manager", "Could not send metrics update: {:?}", e); } self.panel_metric = entities::panel_metric::Entity::find() .all(db).await?; if let Err(e) = self.tx.panel_metric.send(self.panel_metric.clone()) { - error!(target: "worker", "Could not send panel-metric update: {:?}", e); + error!(target: "state-manager", "Could not send panel-metric update: {:?}", e); } self.last_refresh = chrono::Utc::now().timestamp(); @@ -183,10 +183,10 @@ impl AppState { Ok(()) }) }).await { - error!(target: "worker", "Could not update panels on database: {:?}", e); + error!(target: "state-manager", "Could not update panels on database: {:?}", e); } else { if let Err(e) = self.tx.panels.send(panels.clone()) { - error!(target: "worker", "Could not send panels update: {:?}", e); + error!(target: "state-manager", "Could not send panels update: {:?}", e); } self.panels = panels; } @@ -212,7 +212,7 @@ impl AppState { Ok(()) }) }).await { - error!(target: "worker", "Could not update panels on database: {:?}", e); + error!(target: "state-manager", "Could not update panels on database: {:?}", e); } } else { self.view.request_flush().await; @@ -226,7 +226,7 @@ impl AppState { BackgroundAction::UpdateMetric { metric } => { let op = if metric.id == NotSet { metric.insert(db) } else { metric.update(db) }; if let Err(e) = op.await { - error!(target: "worker", "Could not update metric: {:?}", e); + error!(target: "state-manager", "Could not update metric: {:?}", e); } else { self.view.request_flush().await; } @@ -250,10 +250,10 @@ impl AppState { .all(db) .await?.into(); if let Err(e) = self.tx.points.send(self.points.clone().into()) { - warn!(target: "worker", "Could not send new points: {:?}", e); // TODO should be an err? + warn!(target: "state-manager", "Could not send new points: {:?}", e); // TODO should be an err? } self.last_check = Utc::now().timestamp() - *self.width.borrow(); - info!(target: "worker", "Reloaded points"); + info!(target: "state-manager", "Reloaded points"); Ok(()) } @@ -274,7 +274,7 @@ impl AppState { .all(db) .await?; if previous_points.len() > 0 { - info!(target: "worker", "Fetched {} previous points", previous_points.len()); + info!(target: "state-manager", "Fetched {} previous points", previous_points.len()); } previous_points.reverse(); // TODO wasteful! for p in previous_points { @@ -313,7 +313,7 @@ impl AppState { self.last_check = now; if changes { if let Err(e) = self.tx.points.send(self.points.clone().into()) { - warn!(target: "worker", "Could not send changes to main thread: {:?}", e); + warn!(target: "state-manager", "Could not send changes to main thread: {:?}", e); } } Ok(()) @@ -325,7 +325,7 @@ impl AppState { let mut db = Database::connect(first_db_uri.clone()).await.unwrap(); - info!(target: "worker", "Connected to '{}'", first_db_uri); + info!(target: "state-manager", "Connected to '{}'", first_db_uri); while *run.borrow() { now = Utc::now().timestamp(); @@ -338,38 +338,38 @@ impl AppState { info!("Connected to '{}'", uri); db = new_db; }, - Err(e) => error!(target: "worker", "Could not connect to db: {:?}", e), + Err(e) => error!(target: "state-manager", "Could not connect to db: {:?}", e), }; }, - None => { error!(target: "worker", "URI channel closed"); break; }, + None => { error!(target: "state-manager", "URI channel closed"); break; }, } }, res = self.op.recv() => { match res { Some(op) => match self.parse_op(op, &db).await { Ok(()) => { }, - Err(e) => error!(target: "worker", "Failed executing operation: {:?}", e), + Err(e) => error!(target: "state-manager", "Failed executing operation: {:?}", e), }, - None => { error!(target: "worker", "Operations channel closed"); break; }, + None => { error!(target: "state-manager", "Operations channel closed"); break; }, } } res = self.flush.recv() => { match res { Some(()) => match self.flush_data(&db).await { Ok(()) => { }, - Err(e) => error!(target: "worker", "Could not flush away current data: {:?}", e), + Err(e) => error!(target: "state-manager", "Could not flush away current data: {:?}", e), }, - None => { error!(target: "worker", "Flush channel closed"); break; }, + None => { error!(target: "state-manager", "Flush channel closed"); break; }, } }, _ = sleep(self.cache_age - (now - self.last_refresh)) => { if let Err(e) = self.fetch(&db).await { - error!(target: "worker", "Could not fetch from db: {:?}", e); + error!(target: "state-manager", "Could not fetch from db: {:?}", e); } }, _ = sleep(self.interval - (now - self.last_check)) => { if let Err(e) = self.update_points(&db).await { - error!(target: "worker", "Could not update points: {:?}", e); + error!(target: "state-manager", "Could not update points: {:?}", e); } } }