forked from alemi/upub
fix: dispatcher auto restarts
since akkoma is destroying my postgres, I/O is maxed out and sqlite queries time out. restart the dispatcher worker if it fails! this is kind of a nasty fix, but whatev
This commit is contained in:
parent
99d613a1e8
commit
b062608134
1 changed files with 15 additions and 12 deletions
|
@ -3,7 +3,7 @@ use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter,
|
|||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
|
||||
use apb::{ActivityMut, Node};
|
||||
use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}};
|
||||
use crate::{model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}};
|
||||
|
||||
pub struct Dispatcher {
|
||||
waker: broadcast::Sender<()>,
|
||||
|
@ -18,11 +18,14 @@ impl Default for Dispatcher {
|
|||
|
||||
impl Dispatcher {
|
||||
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
|
||||
let waker = self.waker.subscribe();
|
||||
let mut waker = self.waker.subscribe();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = worker(db, domain, poll_interval, waker).await {
|
||||
loop {
|
||||
if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await {
|
||||
tracing::error!("delivery worker exited with error: {e}");
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -34,12 +37,12 @@ impl Dispatcher {
|
|||
}
|
||||
}
|
||||
|
||||
async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut waker: broadcast::Receiver<()>) -> Result<(), UpubError> {
|
||||
async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> {
|
||||
loop {
|
||||
let Some(delivery) = model::delivery::Entity::find()
|
||||
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
|
||||
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
||||
.one(&db)
|
||||
.one(db)
|
||||
.await?
|
||||
else {
|
||||
tokio::select! {
|
||||
|
@ -55,7 +58,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
|||
..Default::default()
|
||||
};
|
||||
let del = model::delivery::Entity::delete(del_row)
|
||||
.exec(&db)
|
||||
.exec(db)
|
||||
.await?;
|
||||
|
||||
if del.rows_affected == 0 {
|
||||
|
@ -71,7 +74,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
|||
|
||||
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
|
||||
.find_also_related(model::object::Entity)
|
||||
.one(&db)
|
||||
.one(db)
|
||||
.await? // TODO probably should not fail here and at least re-insert the delivery
|
||||
{
|
||||
Some((activity, None)) => activity.ap().ld_context(),
|
||||
|
@ -98,7 +101,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
|||
|
||||
let key = if delivery.actor == format!("https://{domain}") {
|
||||
let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find()
|
||||
.one(&db).await?
|
||||
.one(db).await?
|
||||
else {
|
||||
tracing::error!("no private key configured for application");
|
||||
continue;
|
||||
|
@ -106,7 +109,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
|||
key
|
||||
} else {
|
||||
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
|
||||
.one(&db).await?
|
||||
.one(db).await?
|
||||
else {
|
||||
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
|
||||
continue;
|
||||
|
@ -118,7 +121,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
|||
if let Err(e) = Context::request(
|
||||
Method::POST, &delivery.target,
|
||||
Some(&serde_json::to_string(&payload).unwrap()),
|
||||
&delivery.actor, &key, &domain
|
||||
&delivery.actor, &key, domain
|
||||
).await {
|
||||
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
||||
let new_delivery = model::delivery::ActiveModel {
|
||||
|
@ -130,7 +133,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
|||
created: sea_orm::ActiveValue::Set(delivery.created),
|
||||
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
||||
};
|
||||
model::delivery::Entity::insert(new_delivery).exec(&db).await?;
|
||||
model::delivery::Entity::insert(new_delivery).exec(db).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue