From 84467f0141f86e6f739bcda992412fd182f1083d Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 19 Oct 2023 04:56:26 +0200 Subject: [PATCH] forgive yourself --- Cargo.toml | 1 + .../src/m20220101_000001_create_table.rs | 37 +++- src/bubble.rs | 127 ------------- src/cache.rs | 56 ------ src/crawl/bubble.rs | 36 ++++ src/{ => crawl}/collector.rs | 23 ++- src/crawl/mod.rs | 4 + src/entities/domain.rs | 31 ++++ src/entities/mod.rs | 1 + src/entities/node_info.rs | 5 +- src/entities/prelude.rs | 1 + src/main.rs | 175 +++++------------- src/nodeinfo/fetcher.rs | 2 +- src/nodeinfo/mod.rs | 2 + src/nodeinfo/model.rs | 4 + src/serve/bubble.rs | 136 ++++++++++++++ src/serve/mod.rs | 4 + src/serve/router.rs | 38 ++++ src/set.rs | 68 ------- 19 files changed, 364 insertions(+), 387 deletions(-) delete mode 100644 src/bubble.rs delete mode 100644 src/cache.rs create mode 100644 src/crawl/bubble.rs rename src/{ => crawl}/collector.rs (73%) create mode 100644 src/crawl/mod.rs create mode 100644 src/entities/domain.rs create mode 100644 src/serve/bubble.rs create mode 100644 src/serve/mod.rs create mode 100644 src/serve/router.rs delete mode 100644 src/set.rs diff --git a/Cargo.toml b/Cargo.toml index f35186c..d16a732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ reqwest = { version = "0.11.20", features = ["json"] } sea-orm = { version = "0.12.3", features = ["runtime-tokio-native-tls", "sqlx-sqlite", "sqlx-postgres"] } serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" +thiserror = "1.0.49" tokio = { version = "1.32.0", features = ["full"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs index 246720d..77f27dc 100644 --- a/migration/src/m20220101_000001_create_table.rs +++ b/migration/src/m20220101_000001_create_table.rs @@ -23,13 +23,38 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(NodeInfo::Updated).date_time().not_null()) .to_owned(), ) - .await + .await?; + + manager + .create_table( + Table::create() + .table(Domain::Table) + .if_not_exists() + .col( + ColumnDef::new(Domain::Id) + .string() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(Domain::InfoId).integer().not_null()) + .col(ColumnDef::new(Domain::Updated).date_time().not_null()) + .to_owned(), + ) + .await?; + + Ok(()) } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .drop_table(Table::drop().table(NodeInfo::Table).to_owned()) - .await + .await?; + + manager + .drop_table(Table::drop().table(Domain::Table).to_owned()) + .await?; + + Ok(()) } } @@ -41,3 +66,11 @@ enum NodeInfo { Data, Updated, } + +#[derive(DeriveIden)] +enum Domain { + Table, + Id, + InfoId, + Updated, +} diff --git a/src/bubble.rs b/src/bubble.rs deleted file mode 100644 index 309eceb..0000000 --- a/src/bubble.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use serde::Serialize; -use tokio::sync::{mpsc, RwLock}; - -pub struct BubbleCharter {} - -impl BubbleCharter { - pub async fn collect(self) -> MapResult { - let mut nodes_domains = Vec::new(); - let mut vertices_domains = Vec::new(); - loop { - tokio::select! { - Some(node) = self.nodes_rx.recv() => nodes_domains.push(node), - Some(vertex) = self.vertices_rx.recv() => vertices_domains.push(vertex), - else => break, - } - } - - tracing::info!("received all nodes and vertices, processing"); - let mut nodes_map : HashMap = HashMap::new(); - let mut nodes = Vec::new(); - let mut vertices = Vec::new(); - - for (i, node) in nodes_domains.iter().enumerate() { - nodes_map.insert( - node.domain.clone(), - Node { id: i, label: node.domain.clone(), title: node.name.clone(), value: 1, mass: 1. } - ); - } - - for vertex in vertices_domains { - let from = { - if let Some(node) = nodes_map.get_mut(&vertex.from) { - node.value += 1; - node.mass += 0.1; - node.id - } else { - tracing::warn!("vertex from nonexisting node {}", vertex.from); - continue; - } - }; - - let to = { - if let Some(node) = nodes_map.get_mut(&vertex.to) { - node.value += 10; - node.mass += 1.; - node.id - } else { - tracing::warn!("vertex to nonexisting node {}", vertex.to); - continue; - } - }; - - vertices.push(Vertex { from, to }); - } - - for (_, node) in nodes_map { - nodes.push(node); - } - - MapResult { nodes, vertices } - } -} - -#[derive(Clone)] -pub struct MapHandle { - scanned: Arc>>, - nodes_tx: mpsc::UnboundedSender, - vertices_tx: mpsc::UnboundedSender, -} - -impl MapHandle { - pub async fn already_scanned(&self, domain: &str) -> bool { - for scanned_domain in self.scanned.read().await.iter() { - if scanned_domain.ends_with(domain) || domain.ends_with(scanned_domain) { - return true; - } - } - self.scanned.write().await.push(domain.to_string()); - false - } - - pub fn add_node(&self, domain: String, name: String) { - self.nodes_tx.send(NodeDomain { domain, name }) - .expect("could not send node to collector") - } - - pub fn add_vertex(&self, from: String, to: String) { - self.vertices_tx.send(VertexDomain { from, to }) - .expect("could not send vertex to collector") - } -} - -#[derive(Clone, Debug)] -pub struct NodeDomain { - pub domain: String, - pub name: String, -} - -#[derive(Clone, Debug)] -pub struct VertexDomain { - pub from: String, - pub to: String, -} - -#[derive(Serialize, Clone, Debug)] -pub struct Node { - pub id: usize, - pub label: String, - pub value: usize, - pub mass: f32, - pub title: String, -} - -#[derive(Serialize, Clone, Debug)] -pub struct Vertex { - pub from: usize, - pub to: usize, -} - - -#[derive(Serialize, Clone, Debug)] -pub struct MapResult { - pub nodes: Vec, - pub vertices: Vec, -} diff --git a/src/cache.rs b/src/cache.rs deleted file mode 100644 index 3b41de4..0000000 --- a/src/cache.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::{sync::Arc, collections::HashMap}; - -use chrono::Utc; -use crate::nodeinfo::model::{NodeInfoOwned, Software, Services}; -use tokio::sync::RwLock; - -use crate::nodeinfo::fetcher::node_info; - -lazy_static::lazy_static! { - pub static ref CACHE : Arc = Arc::new(InstanceCache::default()); -} - -const MAX_CACHE_AGE : i64 = 86400; - -#[derive(Default)] -pub struct InstanceCache { - store: RwLock>, -} - -impl InstanceCache { - pub async fn instance_metadata(&self, domain: &str) -> reqwest::Result { - let now = Utc::now().timestamp(); - - if let Some((age, value)) = self.store.read().await.get(domain) { - if now - age < MAX_CACHE_AGE { - return Ok(clone_node_info(value)); - } - } - - let info = node_info(domain).await?; - - self.store.write().await.insert(domain.to_string(), (now, clone_node_info(&info))); - - Ok(info) - } -} - -fn clone_node_info(node: &NodeInfoOwned) -> NodeInfoOwned { - NodeInfoOwned { - version: node.version.clone(), - software: Software { - name: node.software.name.clone(), - version: node.software.version.clone(), - repository: node.software.repository.clone(), - homepage: node.software.homepage.clone(), - }, - protocols: node.protocols.clone(), - services: Services { - inbound: node.services.inbound.clone(), - outbound: node.services.outbound.clone(), - }, - open_registrations: node.open_registrations, - usage: node.usage.clone(), - metadata: node.metadata.clone(), - } -} diff --git a/src/crawl/bubble.rs b/src/crawl/bubble.rs new file mode 100644 index 0000000..70883d0 --- /dev/null +++ b/src/crawl/bubble.rs @@ -0,0 +1,36 @@ +use super::collector::CollectorHandle; + +#[async_recursion::async_recursion] +pub async fn crawl_bubble(domain: &str, handle: CollectorHandle) -> Result<(), reqwest::Error> { + if handle.already_scanned(domain).await { return Ok(()) } + let info = match crate::nodeinfo::fetch(domain).await { + Ok(i) => i, + Err(e) => { + tracing::error!("failed fetching nodeinfo for {}: {}", domain, e); + return Err(e); + } + }; + tracing::info!("disovered {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1)); + + let local_bubble = info.metadata + .get("localBubbleInstances") + .map(|x| x.as_array().cloned()); + + handle.add_instance(domain, info); + + if let Some(Some(local_bubble)) = local_bubble { + let mut tasks = Vec::new(); + for instance in local_bubble { + if let Some(d) = instance.as_str() { + let _h = handle.clone(); + let _domain = d.to_string(); + tasks.push(tokio::spawn(async move { crawl_bubble(&_domain, _h).await })); + } + } + for t in tasks { + let _ = t.await; + } + } + + Ok(()) +} diff --git a/src/collector.rs b/src/crawl/collector.rs similarity index 73% rename from src/collector.rs rename to src/crawl/collector.rs index eff43ef..6670075 100644 --- a/src/collector.rs +++ b/src/crawl/collector.rs @@ -6,7 +6,7 @@ use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect} use tokio::sync::{mpsc, RwLock}; use crate::nodeinfo::model::NodeInfoOwned; -use crate::entities; +use crate::entities::{node_info, domain}; #[derive(Debug, Clone)] pub struct CollectorHandle { @@ -17,12 +17,10 @@ pub struct CollectorHandle { impl CollectorHandle { pub async fn new(mut db: DatabaseConnection) -> Self { let (tx, rx) = mpsc::unbounded_channel(); - let existing_infos : HashSet = entities::node_info::Entity::find() - .select_only() - .column(entities::node_info::Column::Domain) + let existing_infos : HashSet = domain::Entity::find() .all(&db).await.expect("could not stream known domains") .iter() - .map(|x| x.domain.clone()) + .map(|x| x.id.clone()) .collect(); let worker = CollectorWorker::new(rx); @@ -57,13 +55,24 @@ impl CollectorWorker { async fn collect(mut self, db: &mut DatabaseConnection) { while let Some((domain, info)) = self.rx.recv().await { self.known_network.insert(domain.clone()); - match entities::node_info::Entity::insert(entities::node_info::ActiveModel { + match node_info::Entity::insert(node_info::ActiveModel { + id: NotSet, domain: sea_orm::ActiveValue::Set(domain.clone()), data: sea_orm::ActiveValue::Set(serde_json::to_string(&info).expect("could not serialize node info")), updated: sea_orm::ActiveValue::Set(Utc::now()) }).exec(db).await { - Ok(x) => tracing::debug!("inserted domain {} with id {}", domain, x.last_insert_id), Err(e) => tracing::error!("could not insert info for domain {}: {}", domain, e), + Ok(x) => { + tracing::debug!("inserted nodeinfo {} with id {}", domain, x.last_insert_id); + match domain::Entity::insert(domain::ActiveModel { + id: sea_orm::ActiveValue::Set(domain.to_string()), + info_id: sea_orm::ActiveValue::Set(x.last_insert_id), + updated: sea_orm::ActiveValue::Set(Utc::now()) + }).exec(db).await { + Ok(_) => tracing::debug!("inserted domain {}", domain), + Err(e) => tracing::error!("could not insert domain {}: {}", domain, e), + } + }, } } } diff --git a/src/crawl/mod.rs b/src/crawl/mod.rs new file mode 100644 index 0000000..070c73e --- /dev/null +++ b/src/crawl/mod.rs @@ -0,0 +1,4 @@ +pub mod collector; +pub mod bubble; + +pub use bubble::crawl_bubble as bubble; diff --git a/src/entities/domain.rs b/src/entities/domain.rs new file mode 100644 index 0000000..4a83b67 --- /dev/null +++ b/src/entities/domain.rs @@ -0,0 +1,31 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "domain")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: String, + pub info_id: i32, + pub updated: ChronoDateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::node_info::Entity", + from = "Column::InfoId", + to = "super::node_info::Column::Id" + )] + NodeInfo +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::NodeInfo.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} + diff --git a/src/entities/mod.rs b/src/entities/mod.rs index 5fba073..1d1cbad 100644 --- a/src/entities/mod.rs +++ b/src/entities/mod.rs @@ -3,3 +3,4 @@ pub mod prelude; pub mod node_info; +pub mod domain; diff --git a/src/entities/node_info.rs b/src/entities/node_info.rs index f66992c..3a899b5 100644 --- a/src/entities/node_info.rs +++ b/src/entities/node_info.rs @@ -13,6 +13,9 @@ pub struct Model { } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} +pub enum Relation { + #[sea_orm(has_one = "super::domain::Entity")] + Domain +} impl ActiveModelBehavior for ActiveModel {} diff --git a/src/entities/prelude.rs b/src/entities/prelude.rs index 1935a68..152ab2f 100644 --- a/src/entities/prelude.rs +++ b/src/entities/prelude.rs @@ -1,3 +1,4 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3 pub use super::node_info::Entity as NodeInfo; +pub use super::domain::Entity as Domain; diff --git a/src/main.rs b/src/main.rs index 96a5c3a..9e7fb8f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,29 +1,37 @@ -use std::{net::SocketAddr, collections::HashSet}; +use std::net::SocketAddr; -use sea_orm::{DatabaseConnection, EntityTrait, Database}; -use serde::Deserialize; +use crawl::collector::CollectorHandle; +use sea_orm::Database; use clap::{Parser, Subcommand}; +use tracing_subscriber::{prelude::*, filter::{LevelFilter, filter_fn}}; -use axum::{routing::get, extract::Query, Json, Router}; -use set::{SetHandle, InstanceRelation, create_set_collector}; - -use crate::{bubble::{MapResult, MapHandle}, cache::CACHE, entities::node_info::Entity, nodeinfo::model::NodeInfoOwned}; - -mod bubble; -mod set; -mod cache; -mod nodeinfo; -mod collector; +mod nodeinfo; // TODO this should me PRd into upstream mod entities; +mod crawl; +mod serve; + + #[derive(Debug, Parser)] /// an API crawling akkoma bubble instances network and creating a map struct CliArgs { #[clap(subcommand)] /// action to perform action: CliAction, + + /// database connection uri + #[arg(long)] + db: String, + + /// also show debug level logging + #[arg(long, default_value_t = false)] + debug: bool, + + /// also show sql queries in logs + #[arg(long, default_value_t = false)] + sql: bool, } #[derive(Debug, Clone, Subcommand)] @@ -32,135 +40,52 @@ enum CliAction { /// Serve an API providing routes for all actions Serve { /// start the server listening on this host - host: Option, + addr: Option, }, + Crawl { + #[clap(subcommand)] + mode: CliCrawlMode, + } +} + +#[derive(Debug, Clone, Subcommand)] +enum CliCrawlMode { /// Crawl local bubble domains and construct local bubble map - Bubble { }, - - /// Crawl known peers and build network set and unknowns map - Peers { - /// starting domain + Bubble { + /// starting domain to crawl from domain: String, - - /// maximum recursion depth, leave 0 for unbounded crawling - #[arg(long, short, default_value_t = 10)] - maxdepth: usize, }, } #[tokio::main] async fn main() { - tracing_subscriber::fmt::init(); - let args = CliArgs::parse(); - match args.action { - CliAction::Serve { host } => { - let app = Router::new() - .route("/crawl", get(route_crawl_domain)); + let show_sql = args.sql; - let addr = match host { + tracing_subscriber::registry() + .with(if args.debug { LevelFilter::DEBUG } else { LevelFilter::INFO }) + .with(filter_fn(move |x| show_sql || x.target() != "sqlx::query")) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let db = Database::connect(args.db).await + .expect("could not connect to provided database"); + + match args.action { + CliAction::Serve { addr } => { + let addr = match addr { Some(host) => host.parse().expect("could not parse provided host"), None => SocketAddr::from(([127, 0, 0, 1], 18811)), }; - - tracing::debug!("listening on {}", addr); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .expect("could not serve axum app"); + crate::serve::api_routes(addr, db).await; }, - CliAction::Peers { domain, maxdepth } => { - let (collector, handle) = create_set_collector(); - peers_crawl_instance(&domain, handle, 0, maxdepth).await; - let results = collector.collect().await; - tracing::info!("discovered {} instances", results.whole.len()); - // for instance in &results.instances { - // tracing::info!("instance {} doesn't know {} other instances", instance.domain, instance.relations.len()); - // } - // println!("{}", serde_json::to_string(&results).expect("could not serialize set result")); - } - - _ => {}, - } - -} - -#[derive(Debug, Deserialize)] -struct Params { - domain: String -} - -async fn route_crawl_domain(Query(params): Query) -> Json { - tracing::info!("starting new crawl from {}", params.domain); - let db = Database::connect("sqlite://./test.db").await.expect("could not connect to database"); - let (collector, handle) = create_map_collector(); - bubble_crawl_instance(¶ms.domain, handle, db).await; - axum::Json(collector.collect().await) -} - - -#[async_recursion::async_recursion] -async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdepth: usize) { - if depth >= maxdepth { return }; - if set.already_scanned(domain).await { return }; - - match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await { - Err(e) => tracing::error!("could not fetch peer list for {} : {}", domain, e), - Ok(x) => { - let peers : Vec = x.json().await.unwrap_or(vec![]); - set.add_instance(InstanceRelation { - domain: domain.to_string(), - // relations: HashSet::from_iter(peers.iter().cloned()), - relations: HashSet::new(), - }); - tracing::info!("{} found: {} peers", domain, peers.len()); - for peer in peers { - let _set = set.clone(); - tokio::spawn(async move { - peers_crawl_instance(&peer, _set, depth + 1, maxdepth).await; - }); - } - } - } -} - -#[async_recursion::async_recursion] -async fn bubble_crawl_instance(domain: &str, map: MapHandle, db: DatabaseConnection) { - if map.already_scanned(domain).await { return }; - - tracing::debug!("scanning instance {}", domain); - let info : NodeInfoOwned = match Entity::find_by_id(domain).one(&db).await { - Ok(Some(x)) => serde_json::from_str(&x.data).expect("could not deserialize nodeinfo in database"), - Ok(None) => return tracing::warn!("could not find data for instance {}", domain), - Err(e) => return tracing::error!("could not fetch data for instance {}: {}", domain, e), - }; - - let node_name = info.metadata - .get("nodeName") - .map(|x| x.as_str().expect("nodeName is not a string")) - .unwrap_or(domain); - - tracing::info!("adding instance {} ({})", node_name, domain); - - map.add_node(domain.to_string(), node_name.to_string()); - - let local_bubble = match info.metadata.get("localBubbleInstances") { - None => return tracing::info!("instance {} doesn't provide local bubble data", domain), - Some(b) => match b.as_array() { - None => return tracing::warn!("instance {} local bubble is not an array", domain), - Some(v) => v, + CliAction::Crawl { mode: CliCrawlMode::Bubble { domain } } => { + let collector = CollectorHandle::new(db).await; + crate::crawl::bubble(&domain, collector).await.expect("network error wtf"); }, - }; - - let mut tasks = Vec::new(); - - for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) { - let _map = map.clone(); - let _db = db.clone(); - map.add_vertex(domain.to_string(), bubble_instance.clone()); - tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map, _db).await; })); } } + diff --git a/src/nodeinfo/fetcher.rs b/src/nodeinfo/fetcher.rs index 0adfe57..79f5c7f 100644 --- a/src/nodeinfo/fetcher.rs +++ b/src/nodeinfo/fetcher.rs @@ -15,7 +15,7 @@ struct WellKnownNodeInfoRef { href: String, } -pub async fn node_info(domain: &str) -> Result { +pub async fn fetch_node_info(domain: &str) -> Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(5)) .build()?; diff --git a/src/nodeinfo/mod.rs b/src/nodeinfo/mod.rs index e9fa803..a5f661f 100644 --- a/src/nodeinfo/mod.rs +++ b/src/nodeinfo/mod.rs @@ -1,2 +1,4 @@ pub mod model; pub mod fetcher; + +pub use fetcher::fetch_node_info as fetch; diff --git a/src/nodeinfo/model.rs b/src/nodeinfo/model.rs index 4f084c2..688eae6 100644 --- a/src/nodeinfo/model.rs +++ b/src/nodeinfo/model.rs @@ -4,9 +4,13 @@ use serde_json::{Map, Value}; /// Node metadata for version detection only used for deserialization. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, derive_more::Display)] pub enum NodeVersion { + #[serde(rename = "1.0")] V1_0 = 1000, + #[serde(rename = "1.1")] V1_1 = 1001, + #[serde(rename = "2.0")] V2_0 = 2000, + #[serde(rename = "2.1")] V2_1 = 2001, } diff --git a/src/serve/bubble.rs b/src/serve/bubble.rs new file mode 100644 index 0000000..e919a74 --- /dev/null +++ b/src/serve/bubble.rs @@ -0,0 +1,136 @@ +use std::{collections::{HashMap, HashSet}, sync::Arc}; + +use sea_orm::{DatabaseConnection, EntityTrait}; +use serde::Serialize; + +use crate::{entities::{node_info, domain}, nodeinfo::model::NodeInfoOwned}; + +#[derive(Debug, thiserror::Error, derive_more::Display)] +pub enum ChartLocalBubbleError { + Unknown, + NoInfo, + DbErr(sea_orm::DbErr), +} + +#[derive(Serialize, Clone, Debug)] +pub struct ChartVertex { + pub id: usize, + pub label: String, + pub value: usize, + pub mass: f32, + pub title: String, +} + +#[derive(Serialize, Clone, Debug)] +pub struct ChartArc { + pub from: usize, + pub to: usize, +} + +#[derive(Debug)] +pub struct BubbleChart { + pub vertices: HashMap, + pub arcs: Vec, + charted: HashSet, + infos: HashMap>, + counter: usize, + db: Arc, +} + +impl BubbleChart { + pub fn new(db: Arc) -> Self { + BubbleChart { + db, + vertices: HashMap::new(), + arcs: Vec::new(), + charted: HashSet::new(), + infos: HashMap::new(), + counter: 0, + } + } + + async fn get_node_info(&mut self, domain: &str) -> Result, ChartLocalBubbleError> { + if let Some(info) = self.infos.get(domain) { return Ok(info.clone()) }; + match domain::Entity::find_by_id(domain) + .find_also_related(node_info::Entity) + .one(self.db.as_ref()).await + { + Err(e) => Err(ChartLocalBubbleError::DbErr(e)), + Ok(None) => Err(ChartLocalBubbleError::Unknown), + Ok(Some((_domain, None))) => Err(ChartLocalBubbleError::NoInfo), + Ok(Some((_domain, Some(info)))) => { + let info =serde_json::from_str::(&info.data) + .expect("could not deserialize node info data in db"); + let arc_info = Arc::new(info); + self.infos.insert(domain.to_string(), arc_info.clone()); + Ok(arc_info) + }, + } + } + + async fn get_vertex(&mut self, domain: &str) -> Result { + if let Some(vertex) = self.vertices.get(domain) { return Ok(vertex.clone()) }; + let info = self.get_node_info(domain).await?; + let node_name = info.metadata + .get("nodeName") + .map(|x| x.as_str().expect("nodeName is not a string")) + .unwrap_or(domain) + .to_string(); + + let v = ChartVertex { + id: self.counter, + label: node_name, + value: 1, + mass: 1., + title: domain.to_string(), + }; + + self.vertices.insert(domain.to_string(), v); + self.counter += 1; + Ok( + self.vertices + .get_mut(domain) + .expect("vertices still not present after inserting it manually??") + .clone() + ) + } + + #[async_recursion::async_recursion] + pub async fn chart_local_bubble(&mut self, domain: &str) -> Result<(), ChartLocalBubbleError> { + if self.charted.contains(domain) { return Ok(()) } + + let info = self.get_node_info(domain).await?; + let mut vertex = self.get_vertex(domain).await?; // it's a clone, must insert it back + + let local_bubble = info.metadata + .get("localBubbleInstances") + .map(|x| x.as_array()); + + if let Some(Some(local_bubble)) = local_bubble { + vertex.mass += 0.1 * local_bubble.len() as f32; + vertex.value += 1 * local_bubble.len(); + for instance in local_bubble { + if let Some(other_domain) = instance.as_str() { + if let Ok(mut other_vertex) = self.get_vertex(other_domain).await { + self.arcs.push(ChartArc { from: vertex.id, to: other_vertex.id }); + other_vertex.mass += 1.; + other_vertex.value += 10; + self.vertices.insert(other_domain.to_string(), other_vertex); + } + } + } + } + + self.charted.insert(domain.to_string()); + + if let Some(Some(local_bubble)) = local_bubble { + for instance in local_bubble { + if let Some(other_domain) = instance.as_str() { + let _ = self.chart_local_bubble(other_domain).await; + } + } + } + + Ok(()) + } +} diff --git a/src/serve/mod.rs b/src/serve/mod.rs new file mode 100644 index 0000000..5023131 --- /dev/null +++ b/src/serve/mod.rs @@ -0,0 +1,4 @@ +pub mod bubble; +pub mod router; + +pub use router::serve_api_routes as api_routes; diff --git a/src/serve/router.rs b/src/serve/router.rs new file mode 100644 index 0000000..48fc966 --- /dev/null +++ b/src/serve/router.rs @@ -0,0 +1,38 @@ +use std::{net::SocketAddr, sync::Arc}; + +use axum::{Router, routing::get, extract::{Query, State}, Json}; +use sea_orm::DatabaseConnection; +use serde::Deserialize; + +use crate::serve::bubble::BubbleChart; + +use super::bubble::{ChartVertex, ChartArc}; + +pub async fn serve_api_routes(addr: SocketAddr, db: DatabaseConnection) { + + let app = Router::new() + .route("/crawl", get(route_crawl_domain)) + .with_state(Arc::new(db)); + + tracing::info!("listening on {}", addr); + + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .expect("could not serve axum app"); +} + +#[derive(Debug, Deserialize)] +struct Params { + domain: String +} + +async fn route_crawl_domain( + State(db) : State>, + Query(params): Query +) -> Json<(Vec, Vec)> { + tracing::info!("starting new crawl from {}", params.domain); + let mut chart = BubbleChart::new(db.clone()); + chart.chart_local_bubble(¶ms.domain).await.expect("wtf could not chart this"); + Json((chart.vertices.values().into_iter().cloned().collect(), chart.arcs)) +} diff --git a/src/set.rs b/src/set.rs deleted file mode 100644 index e70f64f..0000000 --- a/src/set.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::{collections::{HashSet, HashMap}, sync::Arc}; - -use serde::Serialize; -use tokio::sync::{mpsc, RwLock}; - -#[derive(Debug)] -pub struct SetCollector { - known_network: HashSet, - instance_rx: mpsc::UnboundedReceiver, -} - -pub fn create_set_collector() -> (SetCollector, SetHandle) { - let (instance_tx, instance_rx) = mpsc::unbounded_channel(); - let known_network = HashSet::new(); - let scanned = Arc::new(RwLock::new(HashSet::new())); - ( - SetCollector { known_network, instance_rx }, - SetHandle { scanned, instance_tx }, - ) -} - -impl SetCollector { - pub async fn collect(mut self) -> SetResult { - let mut in_instances : Vec = Vec::new(); - let mut out_instances = Vec::new(); - while let Some(instance) = self.instance_rx.recv().await { - self.known_network.insert(instance.domain.clone()); - // in_instances.push(instance); - } - for instance in in_instances { - out_instances.push(InstanceRelation { - domain: instance.domain, - relations: self.known_network.difference(&instance.relations).map(|x| x.clone()).collect(), - }); - } - SetResult { whole: self.known_network, instances: out_instances } - } -} - -#[derive(Debug, Clone)] -pub struct SetHandle { - scanned: Arc>>, - instance_tx: mpsc::UnboundedSender, -} - -impl SetHandle { - pub async fn already_scanned(&self, domain: &str) -> bool { - if self.scanned.read().await.contains(domain) { return true; } - self.scanned.write().await.insert(domain.to_string()); - false - } - - pub fn add_instance(&self, instance: InstanceRelation) { - self.instance_tx.send(instance).expect("could not send instance to collector") - } -} - -#[derive(Serialize, Clone, Debug)] -pub struct InstanceRelation { - pub domain: String, - pub relations: HashSet, -} - -#[derive(Serialize, Clone, Debug)] -pub struct SetResult { - pub whole: HashSet, - pub instances: Vec, -}