diff --git a/src/crawl/bubble.rs b/src/crawl/bubble.rs index 70883d0..6b9851f 100644 --- a/src/crawl/bubble.rs +++ b/src/crawl/bubble.rs @@ -3,20 +3,26 @@ use super::collector::CollectorHandle; #[async_recursion::async_recursion] pub async fn crawl_bubble(domain: &str, handle: CollectorHandle) -> Result<(), reqwest::Error> { if handle.already_scanned(domain).await { return Ok(()) } - let info = match crate::nodeinfo::fetch(domain).await { + let (info, new) = match handle.get_node_info(domain).await { Ok(i) => i, Err(e) => { tracing::error!("failed fetching nodeinfo for {}: {}", domain, e); return Err(e); } }; - tracing::info!("disovered {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1)); + if new { + tracing::info!("disovered {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1)); + } else { + tracing::debug!("passing known instance {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1)); + } let local_bubble = info.metadata .get("localBubbleInstances") .map(|x| x.as_array().cloned()); - handle.add_instance(domain, info); + if new { + handle.add_instance(domain, info); + } if let Some(Some(local_bubble)) = local_bubble { let mut tasks = Vec::new(); diff --git a/src/crawl/collector.rs b/src/crawl/collector.rs index 6670075..e3457bf 100644 --- a/src/crawl/collector.rs +++ b/src/crawl/collector.rs @@ -5,17 +5,19 @@ use chrono::Utc; use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect}; use tokio::sync::{mpsc, RwLock}; -use crate::nodeinfo::model::NodeInfoOwned; +use crate::nodeinfo::{model::NodeInfoOwned, self}; use crate::entities::{node_info, domain}; #[derive(Debug, Clone)] pub struct CollectorHandle { - known: Arc>>, + known: Arc>, + already_scanned: Arc>>, tx: mpsc::UnboundedSender<(String, NodeInfoOwned)>, + db: DatabaseConnection, } impl CollectorHandle { - pub async fn new(mut db: DatabaseConnection) -> Self { + pub async fn new(db: DatabaseConnection) -> Self { let (tx, rx) = mpsc::unbounded_channel(); let existing_infos : HashSet = domain::Entity::find() .all(&db).await.expect("could not stream known domains") @@ -24,14 +26,33 @@ impl CollectorHandle { .collect(); let worker = CollectorWorker::new(rx); - let known = Arc::new(RwLock::new(existing_infos)); - tokio::spawn(async move { worker.collect(&mut db).await }); - CollectorHandle { known, tx } + let known = Arc::new(existing_infos); + let already_scanned = Arc::new(RwLock::new(HashSet::new())); + let mut _db = db.clone(); + tokio::spawn(async move { worker.collect(&mut _db).await }); + CollectorHandle { known, already_scanned, tx, db } + } + + pub async fn get_node_info(&self, domain: &str) -> Result<(NodeInfoOwned, bool), reqwest::Error> { + if self.known.contains(domain) { + let data_raw = domain::Entity::find_by_id(domain) + .find_also_related(node_info::Entity) + .one(&self.db) + .await + .expect("error getting data from db") + .expect("it was there when i created the handle but now not anymore??") + .1 + .expect("no node info associated to this domain, yet it should") + .data; + Ok((serde_json::from_str(&data_raw).expect("could not deserialize nodeinfo from db"), false)) + } else { + Ok((nodeinfo::fetch(domain).await?, true)) + } } pub async fn already_scanned(&self, domain: &str) -> bool { - if self.known.read().await.contains(domain) { return true; } - self.known.write().await.insert(domain.to_string()); + if self.already_scanned.read().await.contains(domain) { return true; } + self.already_scanned.write().await.insert(domain.to_string()); false }