diff --git a/src/collector.rs b/src/collector.rs new file mode 100644 index 0000000..eff43ef --- /dev/null +++ b/src/collector.rs @@ -0,0 +1,71 @@ +#![allow(unused)] +use std::{collections::{HashSet, HashMap}, sync::Arc, time::SystemTime}; + +use chrono::Utc; +use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect}; +use tokio::sync::{mpsc, RwLock}; + +use crate::nodeinfo::model::NodeInfoOwned; +use crate::entities; + +#[derive(Debug, Clone)] +pub struct CollectorHandle { + known: Arc>>, + tx: mpsc::UnboundedSender<(String, NodeInfoOwned)>, +} + +impl CollectorHandle { + pub async fn new(mut db: DatabaseConnection) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let existing_infos : HashSet = entities::node_info::Entity::find() + .select_only() + .column(entities::node_info::Column::Domain) + .all(&db).await.expect("could not stream known domains") + .iter() + .map(|x| x.domain.clone()) + .collect(); + + let worker = CollectorWorker::new(rx); + let known = Arc::new(RwLock::new(existing_infos)); + tokio::spawn(async move { worker.collect(&mut db).await }); + CollectorHandle { known, tx } + } + + pub async fn already_scanned(&self, domain: &str) -> bool { + if self.known.read().await.contains(domain) { return true; } + self.known.write().await.insert(domain.to_string()); + false + } + + pub fn add_instance(&self, domain: &str, instance: NodeInfoOwned) { + self.tx.send((domain.to_string(), instance)) + .expect("could not send instance to collector") + } +} + +#[derive(Debug)] +struct CollectorWorker { + known_network: HashSet, + rx: mpsc::UnboundedReceiver<(String, NodeInfoOwned)>, +} + +impl CollectorWorker { + fn new(rx: mpsc::UnboundedReceiver<(String, NodeInfoOwned)>) -> Self { + CollectorWorker { known_network: HashSet::new(), rx } + } + + async fn collect(mut self, db: &mut DatabaseConnection) { + while let Some((domain, info)) = self.rx.recv().await { + self.known_network.insert(domain.clone()); + match entities::node_info::Entity::insert(entities::node_info::ActiveModel { + domain: sea_orm::ActiveValue::Set(domain.clone()), + data: sea_orm::ActiveValue::Set(serde_json::to_string(&info).expect("could not serialize node info")), + updated: sea_orm::ActiveValue::Set(Utc::now()) + }).exec(db).await { + Ok(x) => tracing::debug!("inserted domain {} with id {}", domain, x.last_insert_id), + Err(e) => tracing::error!("could not insert info for domain {}: {}", domain, e), + } + } + } +} +