diff --git a/src/server/dispatcher.rs b/src/server/dispatcher.rs index 93b6cd9..4a54550 100644 --- a/src/server/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -3,7 +3,7 @@ use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, use tokio::{sync::broadcast, task::JoinHandle}; use apb::{ActivityMut, Node}; -use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}}; +use crate::{model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}}; pub struct Dispatcher { waker: broadcast::Sender<()>, @@ -18,10 +18,13 @@ impl Default for Dispatcher { impl Dispatcher { pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> { - let waker = self.waker.subscribe(); + let mut waker = self.waker.subscribe(); tokio::spawn(async move { - if let Err(e) = worker(db, domain, poll_interval, waker).await { - tracing::error!("delivery worker exited with error: {e}"); + loop { + if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await { + tracing::error!("delivery worker exited with error: {e}"); + } + tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await; } }) } @@ -34,12 +37,12 @@ impl Dispatcher { } } -async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut waker: broadcast::Receiver<()>) -> Result<(), UpubError> { +async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> { loop { let Some(delivery) = model::delivery::Entity::find() .filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now())) .order_by(model::delivery::Column::NotBefore, Order::Asc) - .one(&db) + .one(db) .await? else { tokio::select! { @@ -55,7 +58,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut ..Default::default() }; let del = model::delivery::Entity::delete(del_row) - .exec(&db) + .exec(db) .await?; if del.rows_affected == 0 { @@ -71,7 +74,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut let payload = match model::activity::Entity::find_by_id(&delivery.activity) .find_also_related(model::object::Entity) - .one(&db) + .one(db) .await? // TODO probably should not fail here and at least re-insert the delivery { Some((activity, None)) => activity.ap().ld_context(), @@ -98,7 +101,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut let key = if delivery.actor == format!("https://{domain}") { let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find() - .one(&db).await? + .one(db).await? else { tracing::error!("no private key configured for application"); continue; @@ -106,7 +109,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut key } else { let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor) - .one(&db).await? + .one(db).await? else { tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor); continue; @@ -118,7 +121,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut if let Err(e) = Context::request( Method::POST, &delivery.target, Some(&serde_json::to_string(&payload).unwrap()), - &delivery.actor, &key, &domain + &delivery.actor, &key, domain ).await { tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); let new_delivery = model::delivery::ActiveModel { @@ -130,7 +133,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut created: sea_orm::ActiveValue::Set(delivery.created), attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1), }; - model::delivery::Entity::insert(new_delivery).exec(&db).await?; + model::delivery::Entity::insert(new_delivery).exec(db).await?; } } }