feat: basic peers collector functionality

This commit is contained in:
əlemi 2023-10-18 00:38:26 +02:00
parent 341a0a77aa
commit 8c9c6a90d8
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 168 additions and 41 deletions

View file

@ -1,21 +1,50 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, collections::HashSet};
use serde::Deserialize;
use clap::Parser;
use clap::{Parser, Subcommand};
use axum::{routing::get, extract::Query, Json, Router};
use set::{SetHandle, InstanceRelation, create_set_collector};
use crate::{model::{MapResult, MapHandle, create_map_collector}, cache::CACHE};
mod model;
mod set;
mod cache;
mod nodeinfo;
mod entities;
#[derive(Debug, Parser)]
/// an API crawling akkoma bubble instances network and creating a map
struct CliArgs {
/// start the server listening on this host
host: Option<String>,
#[clap(subcommand)]
/// action to perform
action: CliAction,
}
#[derive(Debug, Clone, Subcommand)]
/// available actions which the process can do
enum CliAction {
/// Serve an API providing routes for all actions
Serve {
/// start the server listening on this host
host: Option<String>,
},
/// Crawl local bubble domains and construct local bubble map
Bubble { },
/// Crawl known peers and build network set and unknowns map
Peers {
/// starting domain
domain: String,
/// maximum recursion depth, leave 0 for unbounded crawling
#[arg(long, short, default_value_t = 10)]
maxdepth: usize,
},
}
#[tokio::main]
@ -24,19 +53,37 @@ async fn main() {
let args = CliArgs::parse();
let app = Router::new()
.route("/crawl", get(route_crawl_domain));
match args.action {
CliAction::Serve { host } => {
let app = Router::new()
.route("/crawl", get(route_crawl_domain));
let addr = match args.host {
Some(host) => host.parse().expect("could not parse provided host"),
None => SocketAddr::from(([127, 0, 0, 1], 18811)),
};
let addr = match host {
Some(host) => host.parse().expect("could not parse provided host"),
None => SocketAddr::from(([127, 0, 0, 1], 18811)),
};
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.expect("could not serve axum app");
},
CliAction::Peers { domain, maxdepth } => {
let (collector, handle) = create_set_collector();
peers_crawl_instance(&domain, handle, 0, maxdepth).await;
let results = collector.collect().await;
tracing::info!("discovered {} instances", results.whole.len());
// for instance in &results.instances {
// tracing::info!("instance {} doesn't know {} other instances", instance.domain, instance.relations.len());
// }
// println!("{}", serde_json::to_string(&results).expect("could not serialize set result"));
}
_ => {},
}
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.expect("could not serve axum app");
}
#[derive(Debug, Deserialize)]
@ -47,44 +94,56 @@ struct Params {
async fn route_crawl_domain(Query(params): Query<Params>) -> Json<MapResult> {
tracing::info!("starting new crawl from {}", params.domain);
let (collector, handle) = create_map_collector();
scan_instance(&params.domain, handle).await;
bubble_crawl_instance(&params.domain, handle).await;
axum::Json(collector.collect().await)
}
#[async_recursion::async_recursion]
async fn scan_instance(domain: &str, map: MapHandle) {
async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdepth: usize) {
if depth >= maxdepth { return };
if set.already_scanned(domain).await { return };
match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await {
Err(e) => return tracing::error!("could not fetch peer list for {} : {}", domain, e),
Ok(x) => {
let peers : Vec<String> = x.json().await.unwrap_or(vec![]);
set.add_instance(InstanceRelation {
domain: domain.to_string(),
// relations: HashSet::from_iter(peers.iter().cloned()),
relations: HashSet::new(),
});
tracing::info!("{} found: {} peers", domain, peers.len());
for peer in peers {
let _set = set.clone();
tokio::spawn(async move {
peers_crawl_instance(&peer, _set, depth + 1, maxdepth).await;
});
}
}
}
}
#[async_recursion::async_recursion]
async fn bubble_crawl_instance(domain: &str, map: MapHandle) {
if map.already_scanned(domain).await { return };
tracing::debug!("scanning instance {}", domain);
let response = match CACHE.instance_metadata(domain).await {
Ok(Some(r)) => r,
Ok(None) => {
tracing::info!("instance {} doesn't provide nodeinfo api", domain);
return map.add_node(domain.to_string(), domain.to_string());
},
Err(e) => {
return tracing::warn!("could not fetch metadata for {}: {}", domain, e);
}
let info = match CACHE.instance_metadata(domain).await {
Ok(r) => r,
Err(e) => return tracing::warn!("could not fetch metadata for {}: {}", domain, e),
};
let metadata = match response.get("metadata") {
Some(m) => m,
None => {
tracing::info!("instance {} doesn't provide metadata", domain);
return map.add_node(domain.to_string(), domain.to_string());
}
};
let node_name = match metadata.get("nodeName") {
Some(v) => v.as_str().unwrap_or("").to_string(),
None => domain.to_string(),
};
let node_name = info.metadata
.get("nodeName")
.map(|x| x.as_str().expect("nodeName is not a string"))
.unwrap_or(domain);
tracing::info!("adding instance {} ({})", node_name, domain);
map.add_node(domain.to_string(), node_name);
map.add_node(domain.to_string(), node_name.to_string());
let local_bubble = match metadata.get("localBubbleInstances") {
let local_bubble = match info.metadata.get("localBubbleInstances") {
None => return tracing::info!("instance {} doesn't provide local bubble data", domain),
Some(b) => match b.as_array() {
None => return tracing::warn!("instance {} local bubble is not an array", domain),
@ -97,6 +156,6 @@ async fn scan_instance(domain: &str, map: MapHandle) {
for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) {
let _map = map.clone();
map.add_vertex(domain.to_string(), bubble_instance.clone());
tasks.push(tokio::spawn(async move { scan_instance(&bubble_instance, _map).await; }));
tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map).await; }));
}
}

68
src/set.rs Normal file
View file

@ -0,0 +1,68 @@
use std::{collections::{HashSet, HashMap}, sync::Arc};
use serde::Serialize;
use tokio::sync::{mpsc, RwLock};
#[derive(Debug)]
pub struct SetCollector {
known_network: HashSet<String>,
instance_rx: mpsc::UnboundedReceiver<InstanceRelation>,
}
pub fn create_set_collector() -> (SetCollector, SetHandle) {
let (instance_tx, instance_rx) = mpsc::unbounded_channel();
let known_network = HashSet::new();
let scanned = Arc::new(RwLock::new(HashSet::new()));
(
SetCollector { known_network, instance_rx },
SetHandle { scanned, instance_tx },
)
}
impl SetCollector {
pub async fn collect(mut self) -> SetResult {
let mut in_instances : Vec<InstanceRelation> = Vec::new();
let mut out_instances = Vec::new();
while let Some(instance) = self.instance_rx.recv().await {
self.known_network.insert(instance.domain.clone());
// in_instances.push(instance);
}
for instance in in_instances {
out_instances.push(InstanceRelation {
domain: instance.domain,
relations: self.known_network.difference(&instance.relations).map(|x| x.clone()).collect(),
});
}
SetResult { whole: self.known_network, instances: out_instances }
}
}
#[derive(Debug, Clone)]
pub struct SetHandle {
scanned: Arc<RwLock<HashSet<String>>>,
instance_tx: mpsc::UnboundedSender<InstanceRelation>,
}
impl SetHandle {
pub async fn already_scanned(&self, domain: &str) -> bool {
if self.scanned.read().await.contains(domain) { return true; }
self.scanned.write().await.insert(domain.to_string());
false
}
pub fn add_instance(&self, instance: InstanceRelation) {
self.instance_tx.send(instance).expect("could not send instance to collector")
}
}
#[derive(Serialize, Clone, Debug)]
pub struct InstanceRelation {
pub domain: String,
pub relations: HashSet<String>,
}
#[derive(Serialize, Clone, Debug)]
pub struct SetResult {
pub whole: HashSet<String>,
pub instances: Vec<InstanceRelation>,
}