Compare commits

..

2 commits

Author SHA1 Message Date
b062608134
fix: dispatcher auto restarts
since akkoma is destroying my postgres, I/O is maxed out and sqlite
queries time out. restart the dispatcher worker if it fails!
this is kind of a nasty fix, but whatev
2024-05-11 21:59:02 +02:00
99d613a1e8
fix(web): more consistent object rendering 2024-05-11 21:49:55 +02:00
3 changed files with 65 additions and 84 deletions

View file

@ -3,7 +3,7 @@ use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter,
use tokio::{sync::broadcast, task::JoinHandle}; use tokio::{sync::broadcast, task::JoinHandle};
use apb::{ActivityMut, Node}; use apb::{ActivityMut, Node};
use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}}; use crate::{model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}};
pub struct Dispatcher { pub struct Dispatcher {
waker: broadcast::Sender<()>, waker: broadcast::Sender<()>,
@ -18,11 +18,14 @@ impl Default for Dispatcher {
impl Dispatcher { impl Dispatcher {
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> { pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
let waker = self.waker.subscribe(); let mut waker = self.waker.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = worker(db, domain, poll_interval, waker).await { loop {
if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await {
tracing::error!("delivery worker exited with error: {e}"); tracing::error!("delivery worker exited with error: {e}");
} }
tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await;
}
}) })
} }
@ -34,12 +37,12 @@ impl Dispatcher {
} }
} }
async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut waker: broadcast::Receiver<()>) -> Result<(), UpubError> { async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> {
loop { loop {
let Some(delivery) = model::delivery::Entity::find() let Some(delivery) = model::delivery::Entity::find()
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now())) .filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
.order_by(model::delivery::Column::NotBefore, Order::Asc) .order_by(model::delivery::Column::NotBefore, Order::Asc)
.one(&db) .one(db)
.await? .await?
else { else {
tokio::select! { tokio::select! {
@ -55,7 +58,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
..Default::default() ..Default::default()
}; };
let del = model::delivery::Entity::delete(del_row) let del = model::delivery::Entity::delete(del_row)
.exec(&db) .exec(db)
.await?; .await?;
if del.rows_affected == 0 { if del.rows_affected == 0 {
@ -71,7 +74,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
let payload = match model::activity::Entity::find_by_id(&delivery.activity) let payload = match model::activity::Entity::find_by_id(&delivery.activity)
.find_also_related(model::object::Entity) .find_also_related(model::object::Entity)
.one(&db) .one(db)
.await? // TODO probably should not fail here and at least re-insert the delivery .await? // TODO probably should not fail here and at least re-insert the delivery
{ {
Some((activity, None)) => activity.ap().ld_context(), Some((activity, None)) => activity.ap().ld_context(),
@ -98,7 +101,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
let key = if delivery.actor == format!("https://{domain}") { let key = if delivery.actor == format!("https://{domain}") {
let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find() let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find()
.one(&db).await? .one(db).await?
else { else {
tracing::error!("no private key configured for application"); tracing::error!("no private key configured for application");
continue; continue;
@ -106,7 +109,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
key key
} else { } else {
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor) let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
.one(&db).await? .one(db).await?
else { else {
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor); tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
continue; continue;
@ -118,7 +121,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
if let Err(e) = Context::request( if let Err(e) = Context::request(
Method::POST, &delivery.target, Method::POST, &delivery.target,
Some(&serde_json::to_string(&payload).unwrap()), Some(&serde_json::to_string(&payload).unwrap()),
&delivery.actor, &key, &domain &delivery.actor, &key, domain
).await { ).await {
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target); tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
let new_delivery = model::delivery::ActiveModel { let new_delivery = model::delivery::ActiveModel {
@ -130,7 +133,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
created: sea_orm::ActiveValue::Set(delivery.created), created: sea_orm::ActiveValue::Set(delivery.created),
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1), attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
}; };
model::delivery::Entity::insert(new_delivery).exec(&db).await?; model::delivery::Entity::insert(new_delivery).exec(db).await?;
} }
} }
} }

View file

@ -1,7 +1,7 @@
use leptos::*; use leptos::*;
use crate::prelude::*; use crate::prelude::*;
use apb::{target::Addressed, Activity, Object}; use apb::{target::Addressed, Base, Activity, Object};
#[component] #[component]
@ -31,3 +31,39 @@ pub fn ActivityLine(activity: crate::Object) -> impl IntoView {
</div> </div>
} }
} }
#[component]
pub fn Item(item: crate::Object) -> impl IntoView {
let id = item.id().unwrap_or_default().to_string();
match item.object_type() {
// special case for placeholder activities
Some(apb::ObjectType::Note) | Some(apb::ObjectType::Document(_)) =>
view! { <Object object=item /> }.into_view(),
// everything else
Some(apb::ObjectType::Activity(t)) => {
let object_id = item.object().id().unwrap_or_default();
let object = match t {
apb::ActivityType::Create | apb::ActivityType::Announce =>
CACHE.get(&object_id).map(|obj| {
view! { <Object object=obj /> }
}.into_view()),
apb::ActivityType::Follow =>
CACHE.get(&object_id).map(|obj| {
view! {
<div class="ml-1">
<ActorBanner object=obj />
<FollowRequestButtons activity_id=id actor_id=object_id />
</div>
}
}.into_view()),
_ => None,
};
view! {
<ActivityLine activity=item />
{object}
}.into_view()
},
// should never happen
_ => view! { <p><code>type not implemented</code></p> }.into_view(),
}
}

View file

@ -92,45 +92,16 @@ pub fn TimelineRepliesRecursive(tl: Timeline, root: String) -> impl IntoView {
<For <For
each=root_values each=root_values
key=|k| k.id().unwrap_or_default().to_string() key=|k| k.id().unwrap_or_default().to_string()
children=move |object: crate::Object| { children=move |obj: crate::Object| {
match object.object_type() { let oid = obj.id().unwrap_or_default().to_string();
Some(apb::ObjectType::Activity(apb::ActivityType::Create)) => {
let oid = object.object().id().unwrap_or_default().to_string();
if let Some(note) = CACHE.get(&oid) {
view! { view! {
<div class="context depth-r"> <div class="context depth-r">
<ActivityLine activity=object /> <Item item=obj />
<Object object=note />
<div class="depth-r"> <div class="depth-r">
<TimelineRepliesRecursive tl=tl root=oid /> <TimelineRepliesRecursive tl=tl root=oid />
</div> </div>
</div> </div>
} }
} else {
view! {
<div class="context depth-r">
<ActivityLine activity=object />
</div>
}
}
},
Some(apb::ObjectType::Activity(_)) => view! {
<div class="context depth-r">
<ActivityLine activity=object />
</div>
},
_ => {
let oid = object.id().unwrap_or_default().to_string();
view! {
<div class="context depth-r">
<Object object=object />
<div class="depth-r">
<TimelineRepliesRecursive tl=tl root=oid />
</div>
</div>
}
},
}
} }
/ > / >
} }
@ -168,42 +139,13 @@ pub fn TimelineFeed(tl: Timeline) -> impl IntoView {
key=|k| k.to_string() key=|k| k.to_string()
children=move |id: String| { children=move |id: String| {
match CACHE.get(&id) { match CACHE.get(&id) {
Some(item) => match item.object_type() { Some(i) => view! {
// special case for placeholder activities <Item item=i />
Some(apb::ObjectType::Note) => view! {
<Object object=item.clone() />
<hr /> <hr />
}.into_view(), }.into_view(),
// everything else
Some(apb::ObjectType::Activity(t)) => {
let object_id = item.object().id().unwrap_or_default();
let object = match t {
apb::ActivityType::Create | apb::ActivityType::Announce =>
CACHE.get(&object_id).map(|obj| {
view! { <Object object=obj /> }
}.into_view()),
apb::ActivityType::Follow =>
CACHE.get(&object_id).map(|obj| {
view! {
<div class="ml-1">
<ActorBanner object=obj />
<FollowRequestButtons activity_id=id actor_id=object_id />
</div>
}
}.into_view()),
_ => None,
};
view! {
<ActivityLine activity=item />
{object}
<hr/ >
}.into_view()
},
// should never happen
_ => view! { <p><code>type not implemented</code></p><hr /> }.into_view(),
},
None => view! { None => view! {
<p><code>{id}</code>" "[<a href={uri}>go</a>]</p> <p><code>{id}</code>" "[<a href={uri}>go</a>]</p>
<hr />
}.into_view(), }.into_view(),
} }
} }