feat: crawl past known instances using data from db

This commit is contained in:
əlemi 2023-10-19 06:12:45 +02:00
parent 35d7829162
commit 4e401ed173
2 changed files with 38 additions and 11 deletions

View file

@ -3,20 +3,26 @@ use super::collector::CollectorHandle;
#[async_recursion::async_recursion] #[async_recursion::async_recursion]
pub async fn crawl_bubble(domain: &str, handle: CollectorHandle) -> Result<(), reqwest::Error> { pub async fn crawl_bubble(domain: &str, handle: CollectorHandle) -> Result<(), reqwest::Error> {
if handle.already_scanned(domain).await { return Ok(()) } if handle.already_scanned(domain).await { return Ok(()) }
let info = match crate::nodeinfo::fetch(domain).await { let (info, new) = match handle.get_node_info(domain).await {
Ok(i) => i, Ok(i) => i,
Err(e) => { Err(e) => {
tracing::error!("failed fetching nodeinfo for {}: {}", domain, e); tracing::error!("failed fetching nodeinfo for {}: {}", domain, e);
return Err(e); return Err(e);
} }
}; };
if new {
tracing::info!("disovered {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1)); tracing::info!("disovered {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1));
} else {
tracing::debug!("passing known instance {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1));
}
let local_bubble = info.metadata let local_bubble = info.metadata
.get("localBubbleInstances") .get("localBubbleInstances")
.map(|x| x.as_array().cloned()); .map(|x| x.as_array().cloned());
if new {
handle.add_instance(domain, info); handle.add_instance(domain, info);
}
if let Some(Some(local_bubble)) = local_bubble { if let Some(Some(local_bubble)) = local_bubble {
let mut tasks = Vec::new(); let mut tasks = Vec::new();

View file

@ -5,17 +5,19 @@ use chrono::Utc;
use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect}; use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect};
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
use crate::nodeinfo::model::NodeInfoOwned; use crate::nodeinfo::{model::NodeInfoOwned, self};
use crate::entities::{node_info, domain}; use crate::entities::{node_info, domain};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CollectorHandle { pub struct CollectorHandle {
known: Arc<RwLock<HashSet<String>>>, known: Arc<HashSet<String>>,
already_scanned: Arc<RwLock<HashSet<String>>>,
tx: mpsc::UnboundedSender<(String, NodeInfoOwned)>, tx: mpsc::UnboundedSender<(String, NodeInfoOwned)>,
db: DatabaseConnection,
} }
impl CollectorHandle { impl CollectorHandle {
pub async fn new(mut db: DatabaseConnection) -> Self { pub async fn new(db: DatabaseConnection) -> Self {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let existing_infos : HashSet<String> = domain::Entity::find() let existing_infos : HashSet<String> = domain::Entity::find()
.all(&db).await.expect("could not stream known domains") .all(&db).await.expect("could not stream known domains")
@ -24,14 +26,33 @@ impl CollectorHandle {
.collect(); .collect();
let worker = CollectorWorker::new(rx); let worker = CollectorWorker::new(rx);
let known = Arc::new(RwLock::new(existing_infos)); let known = Arc::new(existing_infos);
tokio::spawn(async move { worker.collect(&mut db).await }); let already_scanned = Arc::new(RwLock::new(HashSet::new()));
CollectorHandle { known, tx } let mut _db = db.clone();
tokio::spawn(async move { worker.collect(&mut _db).await });
CollectorHandle { known, already_scanned, tx, db }
}
pub async fn get_node_info(&self, domain: &str) -> Result<(NodeInfoOwned, bool), reqwest::Error> {
if self.known.contains(domain) {
let data_raw = domain::Entity::find_by_id(domain)
.find_also_related(node_info::Entity)
.one(&self.db)
.await
.expect("error getting data from db")
.expect("it was there when i created the handle but now not anymore??")
.1
.expect("no node info associated to this domain, yet it should")
.data;
Ok((serde_json::from_str(&data_raw).expect("could not deserialize nodeinfo from db"), false))
} else {
Ok((nodeinfo::fetch(domain).await?, true))
}
} }
pub async fn already_scanned(&self, domain: &str) -> bool { pub async fn already_scanned(&self, domain: &str) -> bool {
if self.known.read().await.contains(domain) { return true; } if self.already_scanned.read().await.contains(domain) { return true; }
self.known.write().await.insert(domain.to_string()); self.already_scanned.write().await.insert(domain.to_string());
false false
} }