feat: reworker worker task, allows to change db

now way more modularized and better error checked. allows receiving db
uris from a mpsc channel, and reconnects to them
This commit is contained in:
əlemi 2022-11-05 03:27:12 +01:00
parent f48d1e3682
commit 4345a9e9b9
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -1,7 +1,7 @@
use chrono::Utc;
use sea_orm::{TransactionTrait, DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order, ActiveModelTrait, ActiveValue::{NotSet, self}};
use sea_orm::{TransactionTrait, DatabaseConnection, EntityTrait, Condition, ColumnTrait, QueryFilter, Set, QueryOrder, Order, ActiveModelTrait, ActiveValue::{NotSet, self}, Database, DbErr};
use tokio::sync::{watch, mpsc};
use tracing::{info, error};
use tracing::{info, error, warn};
use std::collections::VecDeque;
use crate::data::{entities, FetchError};
@ -37,6 +37,8 @@ struct AppStateTransmitters {
pub struct AppState {
tx: AppStateTransmitters,
db_uri: mpsc::Receiver<String>,
panels: Vec<entities::panels::Model>,
sources: Vec<entities::sources::Model>,
metrics: Vec<entities::metrics::Model>,
@ -53,6 +55,7 @@ pub struct AppState {
cache_age: i64,
width: watch::Receiver<i64>,
last_width: i64,
view: AppStateView,
}
@ -66,6 +69,7 @@ async fn sleep(t:i64) {
impl AppState {
pub fn new(
width: watch::Receiver<i64>,
db_uri: mpsc::Receiver<String>,
interval: i64,
cache_age: i64,
) -> Result<AppState, FetchError> {
@ -86,6 +90,7 @@ impl AppState {
last_refresh: 0,
points: VecDeque::new(),
last_check: 0,
last_width: 0,
flush: flush_rx,
op: op_rx,
view: AppStateView {
@ -105,6 +110,7 @@ impl AppState {
panel_metric: panel_metric_tx,
},
width,
db_uri,
interval,
cache_age,
})
@ -143,7 +149,6 @@ impl AppState {
error!(target: "worker", "Could not send panel-metric update: {:?}", e);
}
info!(target: "worker", "Updated panels, sources and metrics");
self.last_refresh = chrono::Utc::now().timestamp();
Ok(())
}
@ -152,127 +157,210 @@ impl AppState {
chrono::Utc::now().timestamp() - self.last_refresh
}
pub async fn worker(mut self, db: DatabaseConnection, run:watch::Receiver<bool>) {
let mut width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
let mut last = Utc::now().timestamp() - width;
pub async fn parse_op(&mut self, op:BackgroundAction, db: &DatabaseConnection) -> Result<(), DbErr> {
match op {
BackgroundAction::UpdateAllPanels { panels } => {
// TODO this is kinda rough, can it be done better?
let pnls = panels.clone();
if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| {
Box::pin(async move {
entities::panels::Entity::delete_many().exec(txn).await?;
entities::panels::Entity::insert_many(
pnls.iter().map(|v| entities::panels::ActiveModel{
id: Set(v.id),
name: Set(v.name.clone()),
view_scroll: Set(v.view_scroll),
view_size: Set(v.view_size),
timeserie: Set(v.timeserie),
height: Set(v.height),
position: Set(v.position),
reduce_view: Set(v.reduce_view),
view_chunks: Set(v.view_chunks),
view_offset: Set(v.view_offset),
average_view: Set(v.average_view),
}).collect::<Vec<entities::panels::ActiveModel>>()
).exec(txn).await?;
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
} else {
if let Err(e) = self.tx.panels.send(panels.clone()) {
error!(target: "worker", "Could not send panels update: {:?}", e);
}
self.panels = panels;
}
},
BackgroundAction::UpdatePanel { panel, metrics } => {
let panel_id = match panel.id {
ActiveValue::Unchanged(pid) => Some(pid),
_ => None,
};
let op = if panel.id == NotSet { panel.insert(db) } else { panel.update(db) };
op.await?;
// TODO chained if is trashy
if let Some(panel_id) = panel_id {
if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| {
Box::pin(async move {
entities::panel_metric::Entity::delete_many()
.filter(
Condition::all()
.add(entities::panel_metric::Column::PanelId.eq(panel_id))
)
.exec(txn).await?;
entities::panel_metric::Entity::insert_many(metrics).exec(txn).await?;
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
}
} else {
self.view.request_flush().await;
}
},
BackgroundAction::UpdateSource { source } => {
let op = if source.id == NotSet { source.insert(db) } else { source.update(db) };
op.await?;
self.view.request_flush().await;
},
BackgroundAction::UpdateMetric { metric } => {
let op = if metric.id == NotSet { metric.insert(db) } else { metric.update(db) };
if let Err(e) = op.await {
error!(target: "worker", "Could not update metric: {:?}", e);
} else {
self.view.request_flush().await;
}
},
// _ => todo!(),
}
Ok(())
}
pub async fn flush_data(&mut self, db: &DatabaseConnection) -> Result<(), DbErr> {
let now = Utc::now().timestamp();
self.fetch(db).await?;
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
self.points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte((now - new_width) as f64))
.add(entities::points::Column::X.lte(now as f64))
)
.order_by(entities::points::Column::X, Order::Asc)
.all(db)
.await?.into();
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
warn!(target: "worker", "Could not send new points: {:?}", e); // TODO should be an err?
}
self.last_check = Utc::now().timestamp() - *self.width.borrow();
info!(target: "worker", "Reloaded points");
Ok(())
}
pub async fn update_points(&mut self, db: &DatabaseConnection) -> Result<(), DbErr> {
let mut changes = false;
let now = Utc::now().timestamp();
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
// fetch previous points
if new_width != self.last_width {
let mut previous_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(now - new_width))
.add(entities::points::Column::X.lte(now - self.last_width))
)
.order_by(entities::points::Column::X, Order::Asc)
.all(db)
.await?;
if previous_points.len() > 0 {
info!(target: "worker", "Fetched {} previous points", previous_points.len());
}
previous_points.reverse(); // TODO wasteful!
for p in previous_points {
self.points.push_front(p);
changes = true;
}
}
// fetch new points
let new_points = entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(self.last_check as f64))
.add(entities::points::Column::X.lte(now as f64))
)
.order_by(entities::points::Column::X, Order::Asc)
.all(db)
.await?;
for p in new_points {
self.points.push_back(p);
changes = true;
}
// remove old points
while let Some(p) = self.points.get(0) {
if (p.x as i64) >= now - (*self.width.borrow() * 60) {
break;
}
self.points.pop_front();
changes = true;
}
// update
self.last_width = new_width;
self.last_check = now;
if changes {
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
warn!(target: "worker", "Could not send changes to main thread: {:?}", e);
}
}
Ok(())
}
pub async fn worker(mut self, run:watch::Receiver<bool>) {
let mut now;
let first_db_uri = self.db_uri.recv().await.unwrap();
let mut db = Database::connect(first_db_uri.clone()).await.unwrap();
info!(target: "worker", "Connected to '{}'", first_db_uri);
while *run.borrow() {
let now = Utc::now().timestamp();
now = Utc::now().timestamp();
tokio::select!{
op = self.op.recv() => {
match op {
Some(op) => {
match op {
BackgroundAction::UpdateAllPanels { panels } => {
// TODO this is kinda rough, can it be done better?
let pnls = panels.clone();
if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| {
Box::pin(async move {
entities::panels::Entity::delete_many().exec(txn).await?;
entities::panels::Entity::insert_many(
pnls.iter().map(|v| entities::panels::ActiveModel{
id: Set(v.id),
name: Set(v.name.clone()),
view_scroll: Set(v.view_scroll),
view_size: Set(v.view_size),
timeserie: Set(v.timeserie),
height: Set(v.height),
position: Set(v.position),
reduce_view: Set(v.reduce_view),
view_chunks: Set(v.view_chunks),
view_offset: Set(v.view_offset),
average_view: Set(v.average_view),
}).collect::<Vec<entities::panels::ActiveModel>>()
).exec(txn).await?;
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
} else {
if let Err(e) = self.tx.panels.send(panels.clone()) {
error!(target: "worker", "Could not send panels update: {:?}", e);
}
self.panels = panels;
}
res = self.db_uri.recv() => {
match res {
Some(uri) => {
match Database::connect(uri.clone()).await {
Ok(new_db) => {
info!("Connected to '{}'", uri);
db = new_db;
},
BackgroundAction::UpdatePanel { panel, metrics } => {
let panel_id = match panel.id {
ActiveValue::Unchanged(pid) => Some(pid),
_ => None,
};
let op = if panel.id == NotSet { panel.insert(&db) } else { panel.update(&db) };
// TODO chained if is trashy
if let Err(e) = op.await {
error!(target: "worker", "Could not update panel: {:?}", e);
} else {
if let Some(panel_id) = panel_id {
if let Err(e) = db.transaction::<_, (), sea_orm::DbErr>(|txn| {
Box::pin(async move {
entities::panel_metric::Entity::delete_many()
.filter(
Condition::all()
.add(entities::panel_metric::Column::PanelId.eq(panel_id))
)
.exec(txn).await?;
entities::panel_metric::Entity::insert_many(metrics).exec(txn).await?;
Ok(())
})
}).await {
error!(target: "worker", "Could not update panels on database: {:?}", e);
}
} else {
self.view.request_flush().await;
}
}
},
BackgroundAction::UpdateSource { source } => {
let op = if source.id == NotSet { source.insert(&db) } else { source.update(&db) };
if let Err(e) = op.await {
error!(target: "worker", "Could not update source: {:?}", e);
} else {
self.view.request_flush().await;
}
},
BackgroundAction::UpdateMetric { metric } => {
let op = if metric.id == NotSet { metric.insert(&db) } else { metric.update(&db) };
if let Err(e) = op.await {
error!(target: "worker", "Could not update metric: {:?}", e);
} else {
self.view.request_flush().await;
}
},
// _ => todo!(),
}
Err(e) => error!(target: "worker", "Could not connect to db: {:?}", e),
};
},
None => {},
None => { error!(target: "worker", "URI channel closed"); break; },
}
},
_ = self.flush.recv() => {
let now = Utc::now().timestamp();
if let Err(e) = self.fetch(&db).await {
error!(target: "worker", "Could not fetch from db: {:?}", e);
res = self.op.recv() => {
match res {
Some(op) => match self.parse_op(op, &db).await {
Ok(()) => { },
Err(e) => error!(target: "worker", "Failed executing operation: {:?}", e),
},
None => { error!(target: "worker", "Operations channel closed"); break; },
}
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
self.points = match entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte((now - new_width) as f64))
.add(entities::points::Column::X.lte(now as f64))
)
.order_by(entities::points::Column::X, Order::Asc)
.all(&db)
.await {
Ok(p) => p.into(),
Err(e) => {
error!(target: "worker", "Could not fetch new points: {:?}", e);
continue;
}
};
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
error!(target: "worker", "Could not send new points: {:?}", e);
}
res = self.flush.recv() => {
match res {
Some(()) => match self.flush_data(&db).await {
Ok(()) => { },
Err(e) => error!(target: "worker", "Could not flush away current data: {:?}", e),
},
None => { error!(target: "worker", "Flush channel closed"); break; },
}
last = Utc::now().timestamp();
info!(target: "worker", "Reloaded points");
},
_ = sleep(self.cache_age - (now - self.last_refresh)) => {
if let Err(e) = self.fetch(&db).await {
@ -280,82 +368,11 @@ impl AppState {
}
},
_ = sleep(self.interval - (now - self.last_check)) => {
let mut changes = false;
let now = Utc::now().timestamp();
let new_width = *self.width.borrow() * 60; // TODO it's in minutes somewhere...
// fetch previous points
if new_width != width {
let mut previous_points = match entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(now - new_width))
.add(entities::points::Column::X.lte(now - width))
)
.order_by(entities::points::Column::X, Order::Asc)
.all(&db)
.await {
Ok(p) => p,
Err(e) => {
error!(target: "worker", "Could not fetch previous points: {:?}", e);
continue;
}
};
if previous_points.len() > 0 {
info!(target: "worker", "Fetched {} previous points", previous_points.len());
}
previous_points.reverse(); // TODO wasteful!
for p in previous_points {
self.points.push_front(p);
changes = true;
}
if let Err(e) = self.update_points(&db).await {
error!(target: "worker", "Could not update points: {:?}", e);
}
// fetch new points
let new_points = match entities::points::Entity::find()
.filter(
Condition::all()
.add(entities::points::Column::X.gte(last as f64))
.add(entities::points::Column::X.lte(now as f64))
)
.order_by(entities::points::Column::X, Order::Asc)
.all(&db)
.await {
Ok(p) => p,
Err(e) => {
error!(target: "worker", "Could not fetch new points: {:?}", e);
continue;
}
};
if new_points.len() > 0 {
info!(target: "worker", "Fetched {} new points", new_points.len());
}
for p in new_points {
self.points.push_back(p);
changes = true;
}
// remove old points
while let Some(p) = self.points.get(0) {
if (p.x as i64) >= now - (*self.width.borrow() * 60) {
break;
}
self.points.pop_front();
changes = true;
}
// update
last = now;
width = new_width;
self.last_check = now;
if changes {
if let Err(e) = self.tx.points.send(self.points.clone().into()) {
error!(target: "worker", "Could not send changes to main thread: {:?}", e);
}
}
},
};
}
}
}
}
}