From a4fb5470a6ffd5afd47890b3b9e9cfa93d4dce4a Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 18 Oct 2023 03:47:08 +0200 Subject: [PATCH] feat: initial work on proper restructure basically serve should only use db data, and crawlers can be periodically started when necessary to updated db contents --- src/{model.rs => bubble.rs} | 19 +++---------------- src/main.rs | 23 ++++++++++++++--------- 2 files changed, 17 insertions(+), 25 deletions(-) rename src/{model.rs => bubble.rs} (84%) diff --git a/src/model.rs b/src/bubble.rs similarity index 84% rename from src/model.rs rename to src/bubble.rs index 1a6b9f9..309eceb 100644 --- a/src/model.rs +++ b/src/bubble.rs @@ -3,23 +3,10 @@ use std::{collections::HashMap, sync::Arc}; use serde::Serialize; use tokio::sync::{mpsc, RwLock}; -pub struct MapCollector { - nodes_rx: mpsc::UnboundedReceiver, - vertices_rx: mpsc::UnboundedReceiver, -} +pub struct BubbleCharter {} -pub fn create_map_collector() -> (MapCollector, MapHandle) { - let (nodes_tx, nodes_rx) = mpsc::unbounded_channel(); - let (vertices_tx, vertices_rx) = mpsc::unbounded_channel(); - let scanned = Arc::new(RwLock::new(Vec::new())); - ( - MapCollector { nodes_rx, vertices_rx }, - MapHandle { nodes_tx, vertices_tx, scanned }, - ) -} - -impl MapCollector { - pub async fn collect(mut self) -> MapResult { +impl BubbleCharter { + pub async fn collect(self) -> MapResult { let mut nodes_domains = Vec::new(); let mut vertices_domains = Vec::new(); loop { diff --git a/src/main.rs b/src/main.rs index 13d0a94..96a5c3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::{net::SocketAddr, collections::HashSet}; +use sea_orm::{DatabaseConnection, EntityTrait, Database}; use serde::Deserialize; use clap::{Parser, Subcommand}; @@ -7,12 +8,13 @@ use clap::{Parser, Subcommand}; use axum::{routing::get, extract::Query, Json, Router}; use set::{SetHandle, InstanceRelation, create_set_collector}; -use crate::{model::{MapResult, MapHandle, create_map_collector}, cache::CACHE}; +use crate::{bubble::{MapResult, MapHandle}, cache::CACHE, entities::node_info::Entity, nodeinfo::model::NodeInfoOwned}; -mod model; +mod bubble; mod set; mod cache; mod nodeinfo; +mod collector; mod entities; @@ -93,8 +95,9 @@ struct Params { async fn route_crawl_domain(Query(params): Query) -> Json { tracing::info!("starting new crawl from {}", params.domain); + let db = Database::connect("sqlite://./test.db").await.expect("could not connect to database"); let (collector, handle) = create_map_collector(); - bubble_crawl_instance(¶ms.domain, handle).await; + bubble_crawl_instance(¶ms.domain, handle, db).await; axum::Json(collector.collect().await) } @@ -105,7 +108,7 @@ async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdep if set.already_scanned(domain).await { return }; match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await { - Err(e) => return tracing::error!("could not fetch peer list for {} : {}", domain, e), + Err(e) => tracing::error!("could not fetch peer list for {} : {}", domain, e), Ok(x) => { let peers : Vec = x.json().await.unwrap_or(vec![]); set.add_instance(InstanceRelation { @@ -125,13 +128,14 @@ async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdep } #[async_recursion::async_recursion] -async fn bubble_crawl_instance(domain: &str, map: MapHandle) { +async fn bubble_crawl_instance(domain: &str, map: MapHandle, db: DatabaseConnection) { if map.already_scanned(domain).await { return }; tracing::debug!("scanning instance {}", domain); - let info = match CACHE.instance_metadata(domain).await { - Ok(r) => r, - Err(e) => return tracing::warn!("could not fetch metadata for {}: {}", domain, e), + let info : NodeInfoOwned = match Entity::find_by_id(domain).one(&db).await { + Ok(Some(x)) => serde_json::from_str(&x.data).expect("could not deserialize nodeinfo in database"), + Ok(None) => return tracing::warn!("could not find data for instance {}", domain), + Err(e) => return tracing::error!("could not fetch data for instance {}: {}", domain, e), }; let node_name = info.metadata @@ -155,7 +159,8 @@ async fn bubble_crawl_instance(domain: &str, map: MapHandle) { for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) { let _map = map.clone(); + let _db = db.clone(); map.add_vertex(domain.to_string(), bubble_instance.clone()); - tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map).await; })); + tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map, _db).await; })); } }