1
0
Fork 0
forked from alemi/upub

feat: improved dispatcher sleep logic

This commit is contained in:
əlemi 2024-03-26 23:53:44 +01:00
parent 6097493932
commit 603724ebcc
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -22,17 +22,16 @@ impl Dispatcher {
} }
async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64) -> Result<(), UpubError> { async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64) -> Result<(), UpubError> {
let mut nosleep = true;
loop { loop {
if nosleep { nosleep = false } else {
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
}
let Some(delivery) = model::delivery::Entity::find() let Some(delivery) = model::delivery::Entity::find()
.filter(Condition::all().add(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))) .filter(Condition::all().add(model::delivery::Column::NotBefore.lte(chrono::Utc::now())))
.order_by(model::delivery::Column::NotBefore, Order::Asc) .order_by(model::delivery::Column::NotBefore, Order::Asc)
.one(&db) .one(&db)
.await? .await?
else { continue }; else {
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
continue
};
let del_row = model::delivery::ActiveModel { let del_row = model::delivery::ActiveModel {
id: sea_orm::ActiveValue::Set(delivery.id), id: sea_orm::ActiveValue::Set(delivery.id),
@ -44,12 +43,10 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64) -> R
if del.rows_affected == 0 { if del.rows_affected == 0 {
// another worker claimed this delivery // another worker claimed this delivery
nosleep = true;
continue; // go back to the top continue; // go back to the top
} }
if delivery.expired() { if delivery.expired() {
// try polling for another one // try polling for another one
nosleep = true;
continue; // go back to top continue; // go back to top
} }