mirror of
https://git.alemi.dev/fedicharter.git
synced 2024-11-23 00:44:48 +01:00
feat: initial work on proper restructure
basically serve should only use db data, and crawlers can be periodically started when necessary to updated db contents
This commit is contained in:
parent
6d22dfb9fa
commit
a4fb5470a6
2 changed files with 17 additions and 25 deletions
|
@ -3,23 +3,10 @@ use std::{collections::HashMap, sync::Arc};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::sync::{mpsc, RwLock};
|
use tokio::sync::{mpsc, RwLock};
|
||||||
|
|
||||||
pub struct MapCollector {
|
pub struct BubbleCharter {}
|
||||||
nodes_rx: mpsc::UnboundedReceiver<NodeDomain>,
|
|
||||||
vertices_rx: mpsc::UnboundedReceiver<VertexDomain>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn create_map_collector() -> (MapCollector, MapHandle) {
|
impl BubbleCharter {
|
||||||
let (nodes_tx, nodes_rx) = mpsc::unbounded_channel();
|
pub async fn collect(self) -> MapResult {
|
||||||
let (vertices_tx, vertices_rx) = mpsc::unbounded_channel();
|
|
||||||
let scanned = Arc::new(RwLock::new(Vec::new()));
|
|
||||||
(
|
|
||||||
MapCollector { nodes_rx, vertices_rx },
|
|
||||||
MapHandle { nodes_tx, vertices_tx, scanned },
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MapCollector {
|
|
||||||
pub async fn collect(mut self) -> MapResult {
|
|
||||||
let mut nodes_domains = Vec::new();
|
let mut nodes_domains = Vec::new();
|
||||||
let mut vertices_domains = Vec::new();
|
let mut vertices_domains = Vec::new();
|
||||||
loop {
|
loop {
|
23
src/main.rs
23
src/main.rs
|
@ -1,5 +1,6 @@
|
||||||
use std::{net::SocketAddr, collections::HashSet};
|
use std::{net::SocketAddr, collections::HashSet};
|
||||||
|
|
||||||
|
use sea_orm::{DatabaseConnection, EntityTrait, Database};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
@ -7,12 +8,13 @@ use clap::{Parser, Subcommand};
|
||||||
use axum::{routing::get, extract::Query, Json, Router};
|
use axum::{routing::get, extract::Query, Json, Router};
|
||||||
use set::{SetHandle, InstanceRelation, create_set_collector};
|
use set::{SetHandle, InstanceRelation, create_set_collector};
|
||||||
|
|
||||||
use crate::{model::{MapResult, MapHandle, create_map_collector}, cache::CACHE};
|
use crate::{bubble::{MapResult, MapHandle}, cache::CACHE, entities::node_info::Entity, nodeinfo::model::NodeInfoOwned};
|
||||||
|
|
||||||
mod model;
|
mod bubble;
|
||||||
mod set;
|
mod set;
|
||||||
mod cache;
|
mod cache;
|
||||||
mod nodeinfo;
|
mod nodeinfo;
|
||||||
|
mod collector;
|
||||||
|
|
||||||
mod entities;
|
mod entities;
|
||||||
|
|
||||||
|
@ -93,8 +95,9 @@ struct Params {
|
||||||
|
|
||||||
async fn route_crawl_domain(Query(params): Query<Params>) -> Json<MapResult> {
|
async fn route_crawl_domain(Query(params): Query<Params>) -> Json<MapResult> {
|
||||||
tracing::info!("starting new crawl from {}", params.domain);
|
tracing::info!("starting new crawl from {}", params.domain);
|
||||||
|
let db = Database::connect("sqlite://./test.db").await.expect("could not connect to database");
|
||||||
let (collector, handle) = create_map_collector();
|
let (collector, handle) = create_map_collector();
|
||||||
bubble_crawl_instance(¶ms.domain, handle).await;
|
bubble_crawl_instance(¶ms.domain, handle, db).await;
|
||||||
axum::Json(collector.collect().await)
|
axum::Json(collector.collect().await)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +108,7 @@ async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdep
|
||||||
if set.already_scanned(domain).await { return };
|
if set.already_scanned(domain).await { return };
|
||||||
|
|
||||||
match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await {
|
match reqwest::get(format!("https://{}/api/v1/instance/peers", domain)).await {
|
||||||
Err(e) => return tracing::error!("could not fetch peer list for {} : {}", domain, e),
|
Err(e) => tracing::error!("could not fetch peer list for {} : {}", domain, e),
|
||||||
Ok(x) => {
|
Ok(x) => {
|
||||||
let peers : Vec<String> = x.json().await.unwrap_or(vec![]);
|
let peers : Vec<String> = x.json().await.unwrap_or(vec![]);
|
||||||
set.add_instance(InstanceRelation {
|
set.add_instance(InstanceRelation {
|
||||||
|
@ -125,13 +128,14 @@ async fn peers_crawl_instance(domain: &str, set: SetHandle, depth: usize, maxdep
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_recursion::async_recursion]
|
#[async_recursion::async_recursion]
|
||||||
async fn bubble_crawl_instance(domain: &str, map: MapHandle) {
|
async fn bubble_crawl_instance(domain: &str, map: MapHandle, db: DatabaseConnection) {
|
||||||
if map.already_scanned(domain).await { return };
|
if map.already_scanned(domain).await { return };
|
||||||
|
|
||||||
tracing::debug!("scanning instance {}", domain);
|
tracing::debug!("scanning instance {}", domain);
|
||||||
let info = match CACHE.instance_metadata(domain).await {
|
let info : NodeInfoOwned = match Entity::find_by_id(domain).one(&db).await {
|
||||||
Ok(r) => r,
|
Ok(Some(x)) => serde_json::from_str(&x.data).expect("could not deserialize nodeinfo in database"),
|
||||||
Err(e) => return tracing::warn!("could not fetch metadata for {}: {}", domain, e),
|
Ok(None) => return tracing::warn!("could not find data for instance {}", domain),
|
||||||
|
Err(e) => return tracing::error!("could not fetch data for instance {}: {}", domain, e),
|
||||||
};
|
};
|
||||||
|
|
||||||
let node_name = info.metadata
|
let node_name = info.metadata
|
||||||
|
@ -155,7 +159,8 @@ async fn bubble_crawl_instance(domain: &str, map: MapHandle) {
|
||||||
|
|
||||||
for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) {
|
for bubble_instance in local_bubble.iter().filter_map(|x| x.as_str().map(|x| x.to_string())) {
|
||||||
let _map = map.clone();
|
let _map = map.clone();
|
||||||
|
let _db = db.clone();
|
||||||
map.add_vertex(domain.to_string(), bubble_instance.clone());
|
map.add_vertex(domain.to_string(), bubble_instance.clone());
|
||||||
tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map).await; }));
|
tasks.push(tokio::spawn(async move { bubble_crawl_instance(&bubble_instance, _map, _db).await; }));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue