forgive yourself

This commit is contained in:
əlemi 2023-10-19 04:56:26 +02:00
parent a4fb5470a6
commit 84467f0141
Signed by: alemi
GPG key ID: A4895B84D311642C
19 changed files with 364 additions and 387 deletions

View file

@ -17,6 +17,7 @@ reqwest = { version = "0.11.20", features = ["json"] }
sea-orm = { version = "0.12.3", features = ["runtime-tokio-native-tls", "sqlx-sqlite", "sqlx-postgres"] } sea-orm = { version = "0.12.3", features = ["runtime-tokio-native-tls", "sqlx-sqlite", "sqlx-postgres"] }
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107" serde_json = "1.0.107"
thiserror = "1.0.49"
tokio = { version = "1.32.0", features = ["full"] } tokio = { version = "1.32.0", features = ["full"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"

View file

@ -23,13 +23,38 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(NodeInfo::Updated).date_time().not_null()) .col(ColumnDef::new(NodeInfo::Updated).date_time().not_null())
.to_owned(), .to_owned(),
) )
.await .await?;
manager
.create_table(
Table::create()
.table(Domain::Table)
.if_not_exists()
.col(
ColumnDef::new(Domain::Id)
.string()
.not_null()
.primary_key(),
)
.col(ColumnDef::new(Domain::InfoId).integer().not_null())
.col(ColumnDef::new(Domain::Updated).date_time().not_null())
.to_owned(),
)
.await?;
Ok(())
} }
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager manager
.drop_table(Table::drop().table(NodeInfo::Table).to_owned()) .drop_table(Table::drop().table(NodeInfo::Table).to_owned())
.await .await?;
manager
.drop_table(Table::drop().table(Domain::Table).to_owned())
.await?;
Ok(())
} }
} }
@ -41,3 +66,11 @@ enum NodeInfo {
Data, Data,
Updated, Updated,
} }
#[derive(DeriveIden)]
enum Domain {
Table,
Id,
InfoId,
Updated,
}

View file

@ -1,127 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use serde::Serialize;
use tokio::sync::{mpsc, RwLock};
pub struct BubbleCharter {}
impl BubbleCharter {
pub async fn collect(self) -> MapResult {
let mut nodes_domains = Vec::new();
let mut vertices_domains = Vec::new();
loop {
tokio::select! {
Some(node) = self.nodes_rx.recv() => nodes_domains.push(node),
Some(vertex) = self.vertices_rx.recv() => vertices_domains.push(vertex),
else => break,
}
}
tracing::info!("received all nodes and vertices, processing");
let mut nodes_map : HashMap<String, Node> = HashMap::new();
let mut nodes = Vec::new();
let mut vertices = Vec::new();
for (i, node) in nodes_domains.iter().enumerate() {
nodes_map.insert(
node.domain.clone(),
Node { id: i, label: node.domain.clone(), title: node.name.clone(), value: 1, mass: 1. }
);
}
for vertex in vertices_domains {
let from = {
if let Some(node) = nodes_map.get_mut(&vertex.from) {
node.value += 1;
node.mass += 0.1;
node.id
} else {
tracing::warn!("vertex from nonexisting node {}", vertex.from);
continue;
}
};
let to = {
if let Some(node) = nodes_map.get_mut(&vertex.to) {
node.value += 10;
node.mass += 1.;
node.id
} else {
tracing::warn!("vertex to nonexisting node {}", vertex.to);
continue;
}
};
vertices.push(Vertex { from, to });
}
for (_, node) in nodes_map {
nodes.push(node);
}
MapResult { nodes, vertices }
}
}
#[derive(Clone)]
pub struct MapHandle {
scanned: Arc<RwLock<Vec<String>>>,
nodes_tx: mpsc::UnboundedSender<NodeDomain>,
vertices_tx: mpsc::UnboundedSender<VertexDomain>,
}
impl MapHandle {
pub async fn already_scanned(&self, domain: &str) -> bool {
for scanned_domain in self.scanned.read().await.iter() {
if scanned_domain.ends_with(domain) || domain.ends_with(scanned_domain) {
return true;
}
}
self.scanned.write().await.push(domain.to_string());
false
}
pub fn add_node(&self, domain: String, name: String) {
self.nodes_tx.send(NodeDomain { domain, name })
.expect("could not send node to collector")
}
pub fn add_vertex(&self, from: String, to: String) {
self.vertices_tx.send(VertexDomain { from, to })
.expect("could not send vertex to collector")
}
}
#[derive(Clone, Debug)]
pub struct NodeDomain {
pub domain: String,
pub name: String,
}
#[derive(Clone, Debug)]
pub struct VertexDomain {
pub from: String,
pub to: String,
}
#[derive(Serialize, Clone, Debug)]
pub struct Node {
pub id: usize,
pub label: String,
pub value: usize,
pub mass: f32,
pub title: String,
}
#[derive(Serialize, Clone, Debug)]
pub struct Vertex {
pub from: usize,
pub to: usize,
}
#[derive(Serialize, Clone, Debug)]
pub struct MapResult {
pub nodes: Vec<Node>,
pub vertices: Vec<Vertex>,
}

View file

@ -1,56 +0,0 @@
use std::{sync::Arc, collections::HashMap};
use chrono::Utc;
use crate::nodeinfo::model::{NodeInfoOwned, Software, Services};
use tokio::sync::RwLock;
use crate::nodeinfo::fetcher::node_info;
lazy_static::lazy_static! {
pub static ref CACHE : Arc<InstanceCache> = Arc::new(InstanceCache::default());
}
const MAX_CACHE_AGE : i64 = 86400;
#[derive(Default)]
pub struct InstanceCache {
store: RwLock<HashMap<String, (i64, NodeInfoOwned)>>,
}
impl InstanceCache {
pub async fn instance_metadata(&self, domain: &str) -> reqwest::Result<NodeInfoOwned> {
let now = Utc::now().timestamp();
if let Some((age, value)) = self.store.read().await.get(domain) {
if now - age < MAX_CACHE_AGE {
return Ok(clone_node_info(value));
}
}
let info = node_info(domain).await?;
self.store.write().await.insert(domain.to_string(), (now, clone_node_info(&info)));
Ok(info)
}
}
fn clone_node_info(node: &NodeInfoOwned) -> NodeInfoOwned {
NodeInfoOwned {
version: node.version.clone(),
software: Software {
name: node.software.name.clone(),
version: node.software.version.clone(),
repository: node.software.repository.clone(),
homepage: node.software.homepage.clone(),
},
protocols: node.protocols.clone(),
services: Services {
inbound: node.services.inbound.clone(),
outbound: node.services.outbound.clone(),
},
open_registrations: node.open_registrations,
usage: node.usage.clone(),
metadata: node.metadata.clone(),
}
}

36
src/crawl/bubble.rs Normal file
View file

@ -0,0 +1,36 @@
use super::collector::CollectorHandle;
#[async_recursion::async_recursion]
pub async fn crawl_bubble(domain: &str, handle: CollectorHandle) -> Result<(), reqwest::Error> {
if handle.already_scanned(domain).await { return Ok(()) }
let info = match crate::nodeinfo::fetch(domain).await {
Ok(i) => i,
Err(e) => {
tracing::error!("failed fetching nodeinfo for {}: {}", domain, e);
return Err(e);
}
};
tracing::info!("disovered {} : {}, {} users", domain, info.software.name, info.usage.users.active_month.unwrap_or(-1));
let local_bubble = info.metadata
.get("localBubbleInstances")
.map(|x| x.as_array().cloned());
handle.add_instance(domain, info);
if let Some(Some(local_bubble)) = local_bubble {
let mut tasks = Vec::new();
for instance in local_bubble {
if let Some(d) = instance.as_str() {
let _h = handle.clone();
let _domain = d.to_string();
tasks.push(tokio::spawn(async move { crawl_bubble(&_domain, _h).await }));
}
}
for t in tasks {
let _ = t.await;
}
}
Ok(())
}

View file

@ -6,7 +6,7 @@ use sea_orm::{DatabaseConnection, EntityTrait, ActiveValue::NotSet, QuerySelect}
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
use crate::nodeinfo::model::NodeInfoOwned; use crate::nodeinfo::model::NodeInfoOwned;
use crate::entities; use crate::entities::{node_info, domain};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CollectorHandle { pub struct CollectorHandle {
@ -17,12 +17,10 @@ pub struct CollectorHandle {
impl CollectorHandle { impl CollectorHandle {
pub async fn new(mut db: DatabaseConnection) -> Self { pub async fn new(mut db: DatabaseConnection) -> Self {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let existing_infos : HashSet<String> = entities::node_info::Entity::find() let existing_infos : HashSet<String> = domain::Entity::find()
.select_only()
.column(entities::node_info::Column::Domain)
.all(&db).await.expect("could not stream known domains") .all(&db).await.expect("could not stream known domains")
.iter() .iter()
.map(|x| x.domain.clone()) .map(|x| x.id.clone())
.collect(); .collect();
let worker = CollectorWorker::new(rx); let worker = CollectorWorker::new(rx);
@ -57,13 +55,24 @@ impl CollectorWorker {
async fn collect(mut self, db: &mut DatabaseConnection) { async fn collect(mut self, db: &mut DatabaseConnection) {
while let Some((domain, info)) = self.rx.recv().await { while let Some((domain, info)) = self.rx.recv().await {
self.known_network.insert(domain.clone()); self.known_network.insert(domain.clone());
match entities::node_info::Entity::insert(entities::node_info::ActiveModel { match node_info::Entity::insert(node_info::ActiveModel {
id: NotSet,
domain: sea_orm::ActiveValue::Set(domain.clone()), domain: sea_orm::ActiveValue::Set(domain.clone()),
data: sea_orm::ActiveValue::Set(serde_json::to_string(&info).expect("could not serialize node info")), data: sea_orm::ActiveValue::Set(serde_json::to_string(&info).expect("could not serialize node info")),
updated: sea_orm::ActiveValue::Set(Utc::now()) updated: sea_orm::ActiveValue::Set(Utc::now())
}).exec(db).await { }).exec(db).await {
Ok(x) => tracing::debug!("inserted domain {} with id {}", domain, x.last_insert_id),
Err(e) => tracing::error!("could not insert info for domain {}: {}", domain, e), Err(e) => tracing::error!("could not insert info for domain {}: {}", domain, e),
Ok(x) => {
tracing::debug!("inserted nodeinfo {} with id {}", domain, x.last_insert_id);
match domain::Entity::insert(domain::ActiveModel {
id: sea_orm::ActiveValue::Set(domain.to_string()),
info_id: sea_orm::ActiveValue::Set(x.last_insert_id),
updated: sea_orm::ActiveValue::Set(Utc::now())
}).exec(db).await {
Ok(_) => tracing::debug!("inserted domain {}", domain),
Err(e) => tracing::error!("could not insert domain {}: {}", domain, e),
}
},
} }
} }
} }

4
src/crawl/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod collector;
pub mod bubble;
pub use bubble::crawl_bubble as bubble;

31
src/entities/domain.rs Normal file
View file

@ -0,0 +1,31 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "domain")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: String,
pub info_id: i32,
pub updated: ChronoDateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::node_info::Entity",
from = "Column::InfoId",
to = "super::node_info::Column::Id"
)]
NodeInfo
}
impl Related<super::node_info::Entity> for Entity {
fn to() -> RelationDef {
Relation::NodeInfo.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -3,3 +3,4 @@
pub mod prelude; pub mod prelude;
pub mod node_info; pub mod node_info;
pub mod domain;

View file

@ -13,6 +13,9 @@ pub struct Model {
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {} pub enum Relation {
#[sea_orm(has_one = "super::domain::Entity")]
Domain
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}

View file

@ -1,3 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3 //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3
pub use super::node_info::Entity as NodeInfo; pub use super::node_info::Entity as NodeInfo;
pub use super::domain::Entity as Domain;

View file

@ -1,29 +1,37 @@
use std::{net::SocketAddr, collections::HashSet}; use std::net::SocketAddr;
use sea_orm::{DatabaseConnection, EntityTrait, Database}; use crawl::collector::CollectorHandle;
use serde::Deserialize; use sea_orm::Database;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use tracing_subscriber::{prelude::*, filter::{LevelFilter, filter_fn}};
use axum::{routing::get, extract::Query, Json, Router}; mod nodeinfo; // TODO this should me PRd into upstream
use set::{SetHandle, InstanceRelation, create_set_collector};
use crate::{bubble::{MapResult, MapHandle}, cache::CACHE, entities::node_info::Entity, nodeinfo::model::NodeInfoOwned};
mod bubble;
mod set;
mod cache;
mod nodeinfo;
mod collector;
mod entities; mod entities;
mod crawl;
mod serve;
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
/// an API crawling akkoma bubble instances network and creating a map /// an API crawling akkoma bubble instances network and creating a map
struct CliArgs { struct CliArgs {
#[clap(subcommand)] #[clap(subcommand)]
/// action to perform /// action to perform
action: CliAction, action: CliAction,
/// database connection uri
#[arg(long)]
db: String,
/// also show debug level logging
#[arg(long, default_value_t = false)]
debug: bool,
/// also show sql queries in logs
#[arg(long, default_value_t = false)]
sql: bool,
} }
#[derive(Debug, Clone, Subcommand)] #[derive(Debug, Clone, Subcommand)]
@ -32,135 +40,52 @@ enum CliAction {
/// Serve an API providing routes for all actions /// Serve an API providing routes for all actions
Serve { Serve {
/// start the server listening on this host /// start the server listening on this host
host: Option<String>, addr: Option<String>,
}, },
Crawl {
#[clap(subcommand)]
mode: CliCrawlMode,
}
}
#[derive(Debug, Clone, Subcommand)]
enum CliCrawlMode {
/// Crawl local bubble domains and construct local bubble map /// Crawl local bubble domains and construct local bubble map
Bubble { }, Bubble {
/// starting domain to crawl from
/// Crawl known peers and build network set and unknowns map
Peers {
/// starting domain
domain: String, domain: String,
/// maximum recursion depth, leave 0 for unbounded crawling
#[arg(long, short, default_value_t = 10)]
maxdepth: usize,
}, },
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt::init();
let args = CliArgs::parse(); let args = CliArgs::parse();
match args.action { let show_sql = args.sql;
CliAction::Serve { host } => {
let app = Router::new()
.route("/crawl", get(route_crawl_domain));
let addr = match host { tracing_subscriber::registry()
.with(if args.debug { LevelFilter::DEBUG } else { LevelFilter::INFO })
.with(filter_fn(move |x| show_sql || x.target() != "sqlx::query"))
.with(tracing_subscriber::fmt::layer())
.init();
let db = Database::connect(args.db).await
.expect("could not connect to provided database");
match args.action {
CliAction::Serve { addr } => {
let addr = match addr {
Some(host) => host.parse().expect("could not parse provided host"), Some(host) => host.parse().expect("could not parse provided host"),
None => SocketAddr::from(([127, 0, 0, 1], 18811)), None => SocketAddr::from(([127, 0, 0, 1], 18811)),
}; };
crate::serve::api_routes(addr, db).await;
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.expect("could not serve axum app");
}, },
CliAction::Peers { domain, maxdepth } => { CliAction::Crawl { mode: CliCrawlMode::Bubble { domain } } => {
let (collector, handle) = create_set_collector(); let collector = CollectorHandle::new(db).await;
peers_crawl_instance(&domain, handle, 0, maxdepth).await; crate::crawl::bubble(&domain, collector).await.expect("network error wtf");
let results = collector.collect().await;
tracing::info!("discovered {} instances", results.whole.len());
// for instance in &results.instances {
// tracing::info!("instance {} doesn't know {} other instances", instance.domain, instance.relations.len());
// }
// println!("{}", serde_json::to_string(&results).expect("could not serialize set result"));
}
_ => {},
}
}
#[derive(Debug, Deserialize)]
struct Params {
domain: String
}
async fn route_crawl_domain(Query(params): Query<Params>) -> Json<MapResult> {
tracing::info!("starting new crawl from {}", params.domain);
let db = Database::connect("sqlite://./test.db").await.expect("could not connect to database");
let (collector, handle) = create_map_collector();
bubble_crawl_instance(&params.domain, handle, db).await;
axum::Json(collector.collect().await)
}
#[async_recursion::async_recursion]
async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdepth: usize) {
if depth >= maxdepth { return };
if set.already_scanned(domain).await { return };
match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await {
Err(e) => tracing::error!("could not fetch peer list for {} : {}", domain, e),
Ok(x) => {
let peers : Vec<String> = x.json().await.unwrap_or(vec![]);
set.add_instance(InstanceRelation {
domain: domain.to_string(),
// relations: HashSet::from_iter(peers.iter().cloned()),
relations: HashSet::new(),
});
tracing::info!("{} found: {} peers", domain, peers.len());
for peer in peers {
let _set = set.clone();
tokio::spawn(async move {
peers_crawl_instance(&peer, _set, depth + 1, maxdepth).await;
});
}
}
}
}
#[async_recursion::async_recursion]
async fn bubble_crawl_instance(domain: &str, map: MapHandle, db: DatabaseConnection) {
if map.already_scanned(domain).await { return };
tracing::debug!("scanning instance {}", domain);
let info : NodeInfoOwned = match Entity::find_by_id(domain).one(&db).await {
Ok(Some(x)) => serde_json::from_str(&x.data).expect("could not deserialize nodeinfo in database"),
Ok(None) => return tracing::warn!("could not find data for instance {}", domain),
Err(e) => return tracing::error!("could not fetch data for instance {}: {}", domain, e),
};
let node_name = info.metadata
.get("nodeName")
.map(|x| x.as_str().expect("nodeName is not a string"))
.unwrap_or(domain);
tracing::info!("adding instance {} ({})", node_name, domain);
map.add_node(domain.to_string(), node_name.to_string());
let local_bubble = match info.metadata.get("localBubbleInstances") {
None => return tracing::info!("instance {} doesn't provide local bubble data", domain),
Some(b) => match b.as_array() {
None => return tracing::warn!("instance {} local bubble is not an array", domain),
Some(v) => v,
}, },
};
let mut tasks = Vec::new();
for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) {
let _map = map.clone();
let _db = db.clone();
map.add_vertex(domain.to_string(), bubble_instance.clone());
tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map, _db).await; }));
} }
} }

View file

@ -15,7 +15,7 @@ struct WellKnownNodeInfoRef {
href: String, href: String,
} }
pub async fn node_info(domain: &str) -> Result<NodeInfoOwned, reqwest::Error> { pub async fn fetch_node_info(domain: &str) -> Result<NodeInfoOwned, reqwest::Error> {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(5))
.build()?; .build()?;

View file

@ -1,2 +1,4 @@
pub mod model; pub mod model;
pub mod fetcher; pub mod fetcher;
pub use fetcher::fetch_node_info as fetch;

View file

@ -4,9 +4,13 @@ use serde_json::{Map, Value};
/// Node metadata for version detection only used for deserialization. /// Node metadata for version detection only used for deserialization.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, derive_more::Display)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, derive_more::Display)]
pub enum NodeVersion { pub enum NodeVersion {
#[serde(rename = "1.0")]
V1_0 = 1000, V1_0 = 1000,
#[serde(rename = "1.1")]
V1_1 = 1001, V1_1 = 1001,
#[serde(rename = "2.0")]
V2_0 = 2000, V2_0 = 2000,
#[serde(rename = "2.1")]
V2_1 = 2001, V2_1 = 2001,
} }

136
src/serve/bubble.rs Normal file
View file

@ -0,0 +1,136 @@
use std::{collections::{HashMap, HashSet}, sync::Arc};
use sea_orm::{DatabaseConnection, EntityTrait};
use serde::Serialize;
use crate::{entities::{node_info, domain}, nodeinfo::model::NodeInfoOwned};
#[derive(Debug, thiserror::Error, derive_more::Display)]
pub enum ChartLocalBubbleError {
Unknown,
NoInfo,
DbErr(sea_orm::DbErr),
}
#[derive(Serialize, Clone, Debug)]
pub struct ChartVertex {
pub id: usize,
pub label: String,
pub value: usize,
pub mass: f32,
pub title: String,
}
#[derive(Serialize, Clone, Debug)]
pub struct ChartArc {
pub from: usize,
pub to: usize,
}
#[derive(Debug)]
pub struct BubbleChart {
pub vertices: HashMap<String, ChartVertex>,
pub arcs: Vec<ChartArc>,
charted: HashSet<String>,
infos: HashMap<String, Arc<NodeInfoOwned>>,
counter: usize,
db: Arc<DatabaseConnection>,
}
impl BubbleChart {
pub fn new(db: Arc<DatabaseConnection>) -> Self {
BubbleChart {
db,
vertices: HashMap::new(),
arcs: Vec::new(),
charted: HashSet::new(),
infos: HashMap::new(),
counter: 0,
}
}
async fn get_node_info(&mut self, domain: &str) -> Result<Arc<NodeInfoOwned>, ChartLocalBubbleError> {
if let Some(info) = self.infos.get(domain) { return Ok(info.clone()) };
match domain::Entity::find_by_id(domain)
.find_also_related(node_info::Entity)
.one(self.db.as_ref()).await
{
Err(e) => Err(ChartLocalBubbleError::DbErr(e)),
Ok(None) => Err(ChartLocalBubbleError::Unknown),
Ok(Some((_domain, None))) => Err(ChartLocalBubbleError::NoInfo),
Ok(Some((_domain, Some(info)))) => {
let info =serde_json::from_str::<NodeInfoOwned>(&info.data)
.expect("could not deserialize node info data in db");
let arc_info = Arc::new(info);
self.infos.insert(domain.to_string(), arc_info.clone());
Ok(arc_info)
},
}
}
async fn get_vertex(&mut self, domain: &str) -> Result<ChartVertex, ChartLocalBubbleError> {
if let Some(vertex) = self.vertices.get(domain) { return Ok(vertex.clone()) };
let info = self.get_node_info(domain).await?;
let node_name = info.metadata
.get("nodeName")
.map(|x| x.as_str().expect("nodeName is not a string"))
.unwrap_or(domain)
.to_string();
let v = ChartVertex {
id: self.counter,
label: node_name,
value: 1,
mass: 1.,
title: domain.to_string(),
};
self.vertices.insert(domain.to_string(), v);
self.counter += 1;
Ok(
self.vertices
.get_mut(domain)
.expect("vertices still not present after inserting it manually??")
.clone()
)
}
#[async_recursion::async_recursion]
pub async fn chart_local_bubble(&mut self, domain: &str) -> Result<(), ChartLocalBubbleError> {
if self.charted.contains(domain) { return Ok(()) }
let info = self.get_node_info(domain).await?;
let mut vertex = self.get_vertex(domain).await?; // it's a clone, must insert it back
let local_bubble = info.metadata
.get("localBubbleInstances")
.map(|x| x.as_array());
if let Some(Some(local_bubble)) = local_bubble {
vertex.mass += 0.1 * local_bubble.len() as f32;
vertex.value += 1 * local_bubble.len();
for instance in local_bubble {
if let Some(other_domain) = instance.as_str() {
if let Ok(mut other_vertex) = self.get_vertex(other_domain).await {
self.arcs.push(ChartArc { from: vertex.id, to: other_vertex.id });
other_vertex.mass += 1.;
other_vertex.value += 10;
self.vertices.insert(other_domain.to_string(), other_vertex);
}
}
}
}
self.charted.insert(domain.to_string());
if let Some(Some(local_bubble)) = local_bubble {
for instance in local_bubble {
if let Some(other_domain) = instance.as_str() {
let _ = self.chart_local_bubble(other_domain).await;
}
}
}
Ok(())
}
}

4
src/serve/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod bubble;
pub mod router;
pub use router::serve_api_routes as api_routes;

38
src/serve/router.rs Normal file
View file

@ -0,0 +1,38 @@
use std::{net::SocketAddr, sync::Arc};
use axum::{Router, routing::get, extract::{Query, State}, Json};
use sea_orm::DatabaseConnection;
use serde::Deserialize;
use crate::serve::bubble::BubbleChart;
use super::bubble::{ChartVertex, ChartArc};
pub async fn serve_api_routes(addr: SocketAddr, db: DatabaseConnection) {
let app = Router::new()
.route("/crawl", get(route_crawl_domain))
.with_state(Arc::new(db));
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.expect("could not serve axum app");
}
#[derive(Debug, Deserialize)]
struct Params {
domain: String
}
async fn route_crawl_domain(
State(db) : State<Arc<DatabaseConnection>>,
Query(params): Query<Params>
) -> Json<(Vec<ChartVertex>, Vec<ChartArc>)> {
tracing::info!("starting new crawl from {}", params.domain);
let mut chart = BubbleChart::new(db.clone());
chart.chart_local_bubble(&params.domain).await.expect("wtf could not chart this");
Json((chart.vertices.values().into_iter().cloned().collect(), chart.arcs))
}

View file

@ -1,68 +0,0 @@
use std::{collections::{HashSet, HashMap}, sync::Arc};
use serde::Serialize;
use tokio::sync::{mpsc, RwLock};
#[derive(Debug)]
pub struct SetCollector {
known_network: HashSet<String>,
instance_rx: mpsc::UnboundedReceiver<InstanceRelation>,
}
pub fn create_set_collector() -> (SetCollector, SetHandle) {
let (instance_tx, instance_rx) = mpsc::unbounded_channel();
let known_network = HashSet::new();
let scanned = Arc::new(RwLock::new(HashSet::new()));
(
SetCollector { known_network, instance_rx },
SetHandle { scanned, instance_tx },
)
}
impl SetCollector {
pub async fn collect(mut self) -> SetResult {
let mut in_instances : Vec<InstanceRelation> = Vec::new();
let mut out_instances = Vec::new();
while let Some(instance) = self.instance_rx.recv().await {
self.known_network.insert(instance.domain.clone());
// in_instances.push(instance);
}
for instance in in_instances {
out_instances.push(InstanceRelation {
domain: instance.domain,
relations: self.known_network.difference(&instance.relations).map(|x| x.clone()).collect(),
});
}
SetResult { whole: self.known_network, instances: out_instances }
}
}
#[derive(Debug, Clone)]
pub struct SetHandle {
scanned: Arc<RwLock<HashSet<String>>>,
instance_tx: mpsc::UnboundedSender<InstanceRelation>,
}
impl SetHandle {
pub async fn already_scanned(&self, domain: &str) -> bool {
if self.scanned.read().await.contains(domain) { return true; }
self.scanned.write().await.insert(domain.to_string());
false
}
pub fn add_instance(&self, instance: InstanceRelation) {
self.instance_tx.send(instance).expect("could not send instance to collector")
}
}
#[derive(Serialize, Clone, Debug)]
pub struct InstanceRelation {
pub domain: String,
pub relations: HashSet<String>,
}
#[derive(Serialize, Clone, Debug)]
pub struct SetResult {
pub whole: HashSet<String>,
pub instances: Vec<InstanceRelation>,
}