From 8c9c6a90d87bfd608d2a175b9694df8a77dc010e Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 18 Oct 2023 00:38:26 +0200 Subject: [PATCH] feat: basic peers collector functionality --- src/main.rs | 141 +++++++++++++++++++++++++++++++++++++--------------- src/set.rs | 68 +++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 41 deletions(-) create mode 100644 src/set.rs diff --git a/src/main.rs b/src/main.rs index ded175e..13d0a94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,50 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, collections::HashSet}; use serde::Deserialize; -use clap::Parser; +use clap::{Parser, Subcommand}; use axum::{routing::get, extract::Query, Json, Router}; +use set::{SetHandle, InstanceRelation, create_set_collector}; use crate::{model::{MapResult, MapHandle, create_map_collector}, cache::CACHE}; mod model; +mod set; mod cache; +mod nodeinfo; + +mod entities; #[derive(Debug, Parser)] /// an API crawling akkoma bubble instances network and creating a map struct CliArgs { - /// start the server listening on this host - host: Option, + #[clap(subcommand)] + /// action to perform + action: CliAction, +} + +#[derive(Debug, Clone, Subcommand)] +/// available actions which the process can do +enum CliAction { + /// Serve an API providing routes for all actions + Serve { + /// start the server listening on this host + host: Option, + }, + + /// Crawl local bubble domains and construct local bubble map + Bubble { }, + + /// Crawl known peers and build network set and unknowns map + Peers { + /// starting domain + domain: String, + + /// maximum recursion depth, leave 0 for unbounded crawling + #[arg(long, short, default_value_t = 10)] + maxdepth: usize, + }, } #[tokio::main] @@ -24,19 +53,37 @@ async fn main() { let args = CliArgs::parse(); - let app = Router::new() - .route("/crawl", get(route_crawl_domain)); + match args.action { + CliAction::Serve { host } => { + let app = Router::new() + .route("/crawl", get(route_crawl_domain)); - let addr = match args.host { - Some(host) => host.parse().expect("could not parse provided host"), - None => SocketAddr::from(([127, 0, 0, 1], 18811)), - }; + let addr = match host { + Some(host) => host.parse().expect("could not parse provided host"), + None => SocketAddr::from(([127, 0, 0, 1], 18811)), + }; + + tracing::debug!("listening on {}", addr); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .expect("could not serve axum app"); + }, + + CliAction::Peers { domain, maxdepth } => { + let (collector, handle) = create_set_collector(); + peers_crawl_instance(&domain, handle, 0, maxdepth).await; + let results = collector.collect().await; + tracing::info!("discovered {} instances", results.whole.len()); + // for instance in &results.instances { + // tracing::info!("instance {} doesn't know {} other instances", instance.domain, instance.relations.len()); + // } + // println!("{}", serde_json::to_string(&results).expect("could not serialize set result")); + } + + _ => {}, + } - tracing::debug!("listening on {}", addr); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .expect("could not serve axum app"); } #[derive(Debug, Deserialize)] @@ -47,44 +94,56 @@ struct Params { async fn route_crawl_domain(Query(params): Query) -> Json { tracing::info!("starting new crawl from {}", params.domain); let (collector, handle) = create_map_collector(); - scan_instance(¶ms.domain, handle).await; + bubble_crawl_instance(¶ms.domain, handle).await; axum::Json(collector.collect().await) } + #[async_recursion::async_recursion] -async fn scan_instance(domain: &str, map: MapHandle) { +async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdepth: usize) { + if depth >= maxdepth { return }; + if set.already_scanned(domain).await { return }; + + match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await { + Err(e) => return tracing::error!("could not fetch peer list for {} : {}", domain, e), + Ok(x) => { + let peers : Vec = x.json().await.unwrap_or(vec![]); + set.add_instance(InstanceRelation { + domain: domain.to_string(), + // relations: HashSet::from_iter(peers.iter().cloned()), + relations: HashSet::new(), + }); + tracing::info!("{} found: {} peers", domain, peers.len()); + for peer in peers { + let _set = set.clone(); + tokio::spawn(async move { + peers_crawl_instance(&peer, _set, depth + 1, maxdepth).await; + }); + } + } + } +} + +#[async_recursion::async_recursion] +async fn bubble_crawl_instance(domain: &str, map: MapHandle) { if map.already_scanned(domain).await { return }; tracing::debug!("scanning instance {}", domain); - let response = match CACHE.instance_metadata(domain).await { - Ok(Some(r)) => r, - Ok(None) => { - tracing::info!("instance {} doesn't provide nodeinfo api", domain); - return map.add_node(domain.to_string(), domain.to_string()); - }, - Err(e) => { - return tracing::warn!("could not fetch metadata for {}: {}", domain, e); - } + let info = match CACHE.instance_metadata(domain).await { + Ok(r) => r, + Err(e) => return tracing::warn!("could not fetch metadata for {}: {}", domain, e), }; - let metadata = match response.get("metadata") { - Some(m) => m, - None => { - tracing::info!("instance {} doesn't provide metadata", domain); - return map.add_node(domain.to_string(), domain.to_string()); - } - }; - - let node_name = match metadata.get("nodeName") { - Some(v) => v.as_str().unwrap_or("").to_string(), - None => domain.to_string(), - }; + let node_name = info.metadata + .get("nodeName") + .map(|x| x.as_str().expect("nodeName is not a string")) + .unwrap_or(domain); tracing::info!("adding instance {} ({})", node_name, domain); - map.add_node(domain.to_string(), node_name); + map.add_node(domain.to_string(), node_name.to_string()); - let local_bubble = match metadata.get("localBubbleInstances") { + let local_bubble = match info.metadata.get("localBubbleInstances") { None => return tracing::info!("instance {} doesn't provide local bubble data", domain), Some(b) => match b.as_array() { None => return tracing::warn!("instance {} local bubble is not an array", domain), @@ -97,6 +156,6 @@ async fn scan_instance(domain: &str, map: MapHandle) { for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) { let _map = map.clone(); map.add_vertex(domain.to_string(), bubble_instance.clone()); - tasks.push(tokio::spawn(async move { scan_instance(&bubble_instance, _map).await; })); + tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map).await; })); } } diff --git a/src/set.rs b/src/set.rs new file mode 100644 index 0000000..e70f64f --- /dev/null +++ b/src/set.rs @@ -0,0 +1,68 @@ +use std::{collections::{HashSet, HashMap}, sync::Arc}; + +use serde::Serialize; +use tokio::sync::{mpsc, RwLock}; + +#[derive(Debug)] +pub struct SetCollector { + known_network: HashSet, + instance_rx: mpsc::UnboundedReceiver, +} + +pub fn create_set_collector() -> (SetCollector, SetHandle) { + let (instance_tx, instance_rx) = mpsc::unbounded_channel(); + let known_network = HashSet::new(); + let scanned = Arc::new(RwLock::new(HashSet::new())); + ( + SetCollector { known_network, instance_rx }, + SetHandle { scanned, instance_tx }, + ) +} + +impl SetCollector { + pub async fn collect(mut self) -> SetResult { + let mut in_instances : Vec = Vec::new(); + let mut out_instances = Vec::new(); + while let Some(instance) = self.instance_rx.recv().await { + self.known_network.insert(instance.domain.clone()); + // in_instances.push(instance); + } + for instance in in_instances { + out_instances.push(InstanceRelation { + domain: instance.domain, + relations: self.known_network.difference(&instance.relations).map(|x| x.clone()).collect(), + }); + } + SetResult { whole: self.known_network, instances: out_instances } + } +} + +#[derive(Debug, Clone)] +pub struct SetHandle { + scanned: Arc>>, + instance_tx: mpsc::UnboundedSender, +} + +impl SetHandle { + pub async fn already_scanned(&self, domain: &str) -> bool { + if self.scanned.read().await.contains(domain) { return true; } + self.scanned.write().await.insert(domain.to_string()); + false + } + + pub fn add_instance(&self, instance: InstanceRelation) { + self.instance_tx.send(instance).expect("could not send instance to collector") + } +} + +#[derive(Serialize, Clone, Debug)] +pub struct InstanceRelation { + pub domain: String, + pub relations: HashSet, +} + +#[derive(Serialize, Clone, Debug)] +pub struct SetResult { + pub whole: HashSet, + pub instances: Vec, +}