2024-04-13 00:44:53 +02:00
|
|
|
use reqwest::Method;
|
2024-04-19 03:28:39 +02:00
|
|
|
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder};
|
2024-03-27 05:09:20 +01:00
|
|
|
use tokio::{sync::broadcast, task::JoinHandle};
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-04-06 16:56:13 +02:00
|
|
|
use apb::{ActivityMut, Node};
|
2024-04-19 03:28:39 +02:00
|
|
|
use crate::{errors::UpubError, model, server::{fetcher::Fetcher, Context}};
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-03-27 05:09:20 +01:00
|
|
|
pub struct Dispatcher {
|
|
|
|
waker: broadcast::Sender<()>,
|
|
|
|
}
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-04-09 01:14:48 +02:00
|
|
|
impl Default for Dispatcher {
|
|
|
|
fn default() -> Self {
|
2024-03-27 05:09:20 +01:00
|
|
|
let (waker, _) = broadcast::channel(1);
|
|
|
|
Dispatcher { waker }
|
|
|
|
}
|
2024-04-09 01:14:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Dispatcher {
|
|
|
|
pub fn new() -> Self { Dispatcher::default() }
|
2024-03-27 05:09:20 +01:00
|
|
|
|
|
|
|
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
|
|
|
|
let waker = self.waker.subscribe();
|
2024-03-25 05:07:58 +01:00
|
|
|
tokio::spawn(async move {
|
2024-03-27 05:09:20 +01:00
|
|
|
if let Err(e) = worker(db, domain, poll_interval, waker).await {
|
2024-03-26 03:11:59 +01:00
|
|
|
tracing::error!("delivery worker exited with error: {e}");
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
2024-03-27 05:09:20 +01:00
|
|
|
|
|
|
|
pub fn wakeup(&self) {
|
|
|
|
match self.waker.send(()) {
|
|
|
|
Err(_) => tracing::error!("no worker to wakeup"),
|
|
|
|
Ok(n) => tracing::debug!("woken {n} workers"),
|
|
|
|
}
|
|
|
|
}
|
2024-03-26 03:11:59 +01:00
|
|
|
}
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-03-27 05:09:20 +01:00
|
|
|
async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut waker: broadcast::Receiver<()>) -> Result<(), UpubError> {
|
2024-03-26 03:11:59 +01:00
|
|
|
loop {
|
|
|
|
let Some(delivery) = model::delivery::Entity::find()
|
2024-04-19 03:28:39 +02:00
|
|
|
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
|
2024-03-26 03:11:59 +01:00
|
|
|
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
|
|
|
.one(&db)
|
|
|
|
.await?
|
2024-03-26 23:53:44 +01:00
|
|
|
else {
|
2024-03-27 05:09:20 +01:00
|
|
|
tokio::select! {
|
|
|
|
biased;
|
|
|
|
_ = waker.recv() => {},
|
|
|
|
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
|
|
|
|
}
|
2024-03-26 23:53:44 +01:00
|
|
|
continue
|
|
|
|
};
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-03-26 03:11:59 +01:00
|
|
|
let del_row = model::delivery::ActiveModel {
|
|
|
|
id: sea_orm::ActiveValue::Set(delivery.id),
|
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
let del = model::delivery::Entity::delete(del_row)
|
|
|
|
.exec(&db)
|
|
|
|
.await?;
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-03-26 03:11:59 +01:00
|
|
|
if del.rows_affected == 0 {
|
|
|
|
// another worker claimed this delivery
|
|
|
|
continue; // go back to the top
|
|
|
|
}
|
|
|
|
if delivery.expired() {
|
|
|
|
// try polling for another one
|
|
|
|
continue; // go back to top
|
|
|
|
}
|
2024-03-26 01:14:43 +01:00
|
|
|
|
2024-03-26 03:11:59 +01:00
|
|
|
tracing::info!("delivering {} to {}", delivery.activity, delivery.target);
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-03-26 19:27:35 +01:00
|
|
|
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
|
2024-03-26 03:11:59 +01:00
|
|
|
.find_also_related(model::object::Entity)
|
|
|
|
.one(&db)
|
|
|
|
.await? // TODO probably should not fail here and at least re-insert the delivery
|
|
|
|
{
|
2024-04-19 03:28:39 +02:00
|
|
|
Some((activity, None)) => activity.ap(),
|
2024-04-22 22:42:00 +02:00
|
|
|
Some((activity, Some(object))) => {
|
|
|
|
// embed local object when dispatching
|
|
|
|
// TODO this .contains() is jank, could trick us into embedding remote activities
|
|
|
|
if object.id.contains(&domain) {
|
|
|
|
activity.ap().set_object(Node::object(object.ap()))
|
|
|
|
} else {
|
|
|
|
activity.ap()
|
|
|
|
}
|
|
|
|
},
|
2024-03-26 03:11:59 +01:00
|
|
|
None => {
|
|
|
|
tracing::warn!("skipping dispatch for deleted object {}", delivery.activity);
|
|
|
|
continue;
|
|
|
|
},
|
|
|
|
};
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-04-23 05:30:10 +02:00
|
|
|
let key = if delivery.actor == format!("https://{domain}") {
|
2024-04-22 04:23:11 +02:00
|
|
|
let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find()
|
|
|
|
.one(&db).await?
|
|
|
|
else {
|
|
|
|
tracing::error!("no private key configured for application");
|
|
|
|
continue;
|
|
|
|
};
|
|
|
|
key
|
|
|
|
} else {
|
|
|
|
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
|
|
|
|
.one(&db).await?
|
|
|
|
else {
|
|
|
|
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
|
|
|
|
continue;
|
|
|
|
};
|
|
|
|
key
|
2024-03-26 03:11:59 +01:00
|
|
|
};
|
2024-03-25 05:07:58 +01:00
|
|
|
|
2024-04-22 04:23:11 +02:00
|
|
|
|
2024-04-18 05:25:56 +02:00
|
|
|
if let Err(e) = Context::request(
|
2024-04-13 00:44:53 +02:00
|
|
|
Method::POST, &delivery.target,
|
|
|
|
Some(&serde_json::to_string(&payload).unwrap()),
|
|
|
|
&delivery.actor, &key, &domain
|
|
|
|
).await {
|
2024-03-26 03:11:59 +01:00
|
|
|
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
|
|
|
let new_delivery = model::delivery::ActiveModel {
|
|
|
|
id: sea_orm::ActiveValue::NotSet,
|
|
|
|
not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()),
|
|
|
|
actor: sea_orm::ActiveValue::Set(delivery.actor),
|
|
|
|
target: sea_orm::ActiveValue::Set(delivery.target),
|
|
|
|
activity: sea_orm::ActiveValue::Set(delivery.activity),
|
|
|
|
created: sea_orm::ActiveValue::Set(delivery.created),
|
|
|
|
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
|
|
|
};
|
|
|
|
model::delivery::Entity::insert(new_delivery).exec(&db).await?;
|
|
|
|
}
|
2024-03-25 05:07:58 +01:00
|
|
|
}
|
|
|
|
}
|