fix: remove repeat, proper parallelism

This commit is contained in:
əlemi 2024-10-20 05:12:15 +02:00
parent c2159989b2
commit a8ac211a41
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -39,10 +39,6 @@ pub enum PostWomanActions {
#[arg(long, default_value_t = false)]
parallel: bool,
/// repeat request N times
#[arg(long, default_value_t = 1)]
repeat: u32,
/// force debug extractor on all routes
#[arg(long, default_value_t = false)]
debug: bool,
@ -74,8 +70,17 @@ fn main() {
let task = async move {
for (name, collection) in collections {
run_postwoman(&args, name, collection).await;
let mut pool = tokio::task::JoinSet::new();
for (collection_name, collection) in collections {
run_postwoman(&args, collection_name, collection, &mut pool).await;
}
while let Some(j) = pool.join_next().await {
match j {
Err(e) => eprintln!("! error joining task: {e}"),
Ok(res) => res.print(),
}
}
};
@ -113,8 +118,9 @@ fn load_collections(store: &mut HashMap<String, PostWomanCollection>, mut path:
}
const DEFAULT_ACTION: PostWomanActions = PostWomanActions::List { verbose: false };
type RunResult = (Result<String, PostWomanError>, String, String, chrono::DateTime<chrono::Local>);
async fn run_postwoman(args: &PostWomanArgs, name: String, collection: PostWomanCollection) {
async fn run_postwoman(args: &PostWomanArgs, namespace: String, collection: PostWomanCollection, pool: &mut tokio::task::JoinSet<RunResult>) {
let action = args.action.as_ref().unwrap_or(&DEFAULT_ACTION);
match action {
@ -150,55 +156,35 @@ async fn run_postwoman(args: &PostWomanArgs, name: String, collection: PostWoman
}
}
},
PostWomanActions::Run { query, parallel, repeat, debug } => {
PostWomanActions::Run { query, parallel, debug } => {
// this is always safe to compile because we tested it beforehand
let pattern = regex::Regex::new(query).expect("tested it before and still failed here???");
let mut joinset = tokio::task::JoinSet::new();
let client = std::sync::Arc::new(collection.client.unwrap_or_default());
let env = std::sync::Arc::new(collection.env.unwrap_or_default());
for (name, mut endpoint) in collection.route {
if pattern.find(&name).is_some() {
if *debug { endpoint.extract = Some(ext::StringOr::T(model::ExtractorConfig::Debug)) };
for i in 0..*repeat {
let suffix = if *repeat > 1 {
format!("#{} ", i+1)
} else {
"".to_string()
};
let _client = client.clone();
let _env = env.clone();
let _endpoint = endpoint.clone();
let _name = name.clone();
let task = async move {
let before = chrono::Local::now();
eprintln!(" : [{}] sending {_name} {suffix}...", before.format(TIMESTAMP_FMT));
let res = _endpoint
.fill(&_env)
.execute(&_client)
.await;
(res, _name, before, suffix)
};
if *parallel {
joinset.spawn(task);
} else {
let (res, name, before, num) = task.await;
match res {
Ok(success) => print_results(true, success, name, before, num),
Err(e) => print_results(false, e.to_string(), name, before, num),
}
}
let _client = client.clone();
let _env = env.clone();
let _endpoint = endpoint.clone();
let _name = name.clone();
let _namespace = namespace.clone();
let task = async move {
let before = chrono::Local::now();
eprintln!(" : [{}] {_namespace}::{_name} \tsending request...", before.format(TIMESTAMP_FMT));
let res = _endpoint
.fill(&_env)
.execute(&_client)
.await;
(res, _namespace, _name, before)
};
if *parallel {
pool.spawn(task);
} else {
task.await.print();
}
}
}
while let Some(j) = joinset.join_next().await {
match j {
Err(e) => eprintln!("! error joining task: {e}"),
Ok((res, name, before, num)) => match res {
Err(e) => print_results(false, e.to_string(), name, before, num),
Ok(success) => print_results(true, success, name, before, num),
},
}
}
},
}
}