feat: added generic db collector

This commit is contained in:
əlemi 2023-10-18 03:46:43 +02:00
parent 231d27e00f
commit 6d22dfb9fa
Signed by: alemi
GPG key ID: A4895B84D311642C

71
src/collector.rs Normal file
View file

@ -0,0 +1,71 @@
#![allow(unused)]
use std::{collections::{HashSet, HashMap}, sync::Arc, time::SystemTime};
use chrono::Utc;
use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect};
use tokio::sync::{mpsc, RwLock};
use crate::nodeinfo::model::NodeInfoOwned;
use crate::entities;
#[derive(Debug, Clone)]
pub struct CollectorHandle {
known: Arc<RwLock<HashSet<String>>>,
tx: mpsc::UnboundedSender<(String, NodeInfoOwned)>,
}
impl CollectorHandle {
pub async fn new(mut db: DatabaseConnection) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let existing_infos : HashSet<String> = entities::node_info::Entity::find()
.select_only()
.column(entities::node_info::Column::Domain)
.all(&db).await.expect("could not stream known domains")
.iter()
.map(|x| x.domain.clone())
.collect();
let worker = CollectorWorker::new(rx);
let known = Arc::new(RwLock::new(existing_infos));
tokio::spawn(async move { worker.collect(&mut db).await });
CollectorHandle { known, tx }
}
pub async fn already_scanned(&self, domain: &str) -> bool {
if self.known.read().await.contains(domain) { return true; }
self.known.write().await.insert(domain.to_string());
false
}
pub fn add_instance(&self, domain: &str, instance: NodeInfoOwned) {
self.tx.send((domain.to_string(), instance))
.expect("could not send instance to collector")
}
}
#[derive(Debug)]
struct CollectorWorker {
known_network: HashSet<String>,
rx: mpsc::UnboundedReceiver<(String, NodeInfoOwned)>,
}
impl CollectorWorker {
fn new(rx: mpsc::UnboundedReceiver<(String, NodeInfoOwned)>) -> Self {
CollectorWorker { known_network: HashSet::new(), rx }
}
async fn collect(mut self, db: &mut DatabaseConnection) {
while let Some((domain, info)) = self.rx.recv().await {
self.known_network.insert(domain.clone());
match entities::node_info::Entity::insert(entities::node_info::ActiveModel {
domain: sea_orm::ActiveValue::Set(domain.clone()),
data: sea_orm::ActiveValue::Set(serde_json::to_string(&info).expect("could not serialize node info")),
updated: sea_orm::ActiveValue::Set(Utc::now())
}).exec(db).await {
Ok(x) => tracing::debug!("inserted domain {} with id {}", domain, x.last_insert_id),
Err(e) => tracing::error!("could not insert info for domain {}: {}", domain, e),
}
}
}
}