Compare commits
2 commits
a60b5774a1
...
b062608134
Author | SHA1 | Date | |
---|---|---|---|
b062608134 | |||
99d613a1e8 |
3 changed files with 65 additions and 84 deletions
|
@ -3,7 +3,7 @@ use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter,
|
||||||
use tokio::{sync::broadcast, task::JoinHandle};
|
use tokio::{sync::broadcast, task::JoinHandle};
|
||||||
|
|
||||||
use apb::{ActivityMut, Node};
|
use apb::{ActivityMut, Node};
|
||||||
use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}};
|
use crate::{model, routes::activitypub::jsonld::LD, server::{fetcher::Fetcher, Context}};
|
||||||
|
|
||||||
pub struct Dispatcher {
|
pub struct Dispatcher {
|
||||||
waker: broadcast::Sender<()>,
|
waker: broadcast::Sender<()>,
|
||||||
|
@ -18,11 +18,14 @@ impl Default for Dispatcher {
|
||||||
|
|
||||||
impl Dispatcher {
|
impl Dispatcher {
|
||||||
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
|
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
|
||||||
let waker = self.waker.subscribe();
|
let mut waker = self.waker.subscribe();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = worker(db, domain, poll_interval, waker).await {
|
loop {
|
||||||
|
if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await {
|
||||||
tracing::error!("delivery worker exited with error: {e}");
|
tracing::error!("delivery worker exited with error: {e}");
|
||||||
}
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await;
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,12 +37,12 @@ impl Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut waker: broadcast::Receiver<()>) -> Result<(), UpubError> {
|
async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let Some(delivery) = model::delivery::Entity::find()
|
let Some(delivery) = model::delivery::Entity::find()
|
||||||
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
|
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
|
||||||
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
||||||
.one(&db)
|
.one(db)
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -55,7 +58,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let del = model::delivery::Entity::delete(del_row)
|
let del = model::delivery::Entity::delete(del_row)
|
||||||
.exec(&db)
|
.exec(db)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if del.rows_affected == 0 {
|
if del.rows_affected == 0 {
|
||||||
|
@ -71,7 +74,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
||||||
|
|
||||||
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
|
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
|
||||||
.find_also_related(model::object::Entity)
|
.find_also_related(model::object::Entity)
|
||||||
.one(&db)
|
.one(db)
|
||||||
.await? // TODO probably should not fail here and at least re-insert the delivery
|
.await? // TODO probably should not fail here and at least re-insert the delivery
|
||||||
{
|
{
|
||||||
Some((activity, None)) => activity.ap().ld_context(),
|
Some((activity, None)) => activity.ap().ld_context(),
|
||||||
|
@ -98,7 +101,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
||||||
|
|
||||||
let key = if delivery.actor == format!("https://{domain}") {
|
let key = if delivery.actor == format!("https://{domain}") {
|
||||||
let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find()
|
let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find()
|
||||||
.one(&db).await?
|
.one(db).await?
|
||||||
else {
|
else {
|
||||||
tracing::error!("no private key configured for application");
|
tracing::error!("no private key configured for application");
|
||||||
continue;
|
continue;
|
||||||
|
@ -106,7 +109,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
||||||
key
|
key
|
||||||
} else {
|
} else {
|
||||||
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
|
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
|
||||||
.one(&db).await?
|
.one(db).await?
|
||||||
else {
|
else {
|
||||||
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
|
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
|
||||||
continue;
|
continue;
|
||||||
|
@ -118,7 +121,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
||||||
if let Err(e) = Context::request(
|
if let Err(e) = Context::request(
|
||||||
Method::POST, &delivery.target,
|
Method::POST, &delivery.target,
|
||||||
Some(&serde_json::to_string(&payload).unwrap()),
|
Some(&serde_json::to_string(&payload).unwrap()),
|
||||||
&delivery.actor, &key, &domain
|
&delivery.actor, &key, domain
|
||||||
).await {
|
).await {
|
||||||
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
||||||
let new_delivery = model::delivery::ActiveModel {
|
let new_delivery = model::delivery::ActiveModel {
|
||||||
|
@ -130,7 +133,7 @@ async fn worker(db: DatabaseConnection, domain: String, poll_interval: u64, mut
|
||||||
created: sea_orm::ActiveValue::Set(delivery.created),
|
created: sea_orm::ActiveValue::Set(delivery.created),
|
||||||
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
||||||
};
|
};
|
||||||
model::delivery::Entity::insert(new_delivery).exec(&db).await?;
|
model::delivery::Entity::insert(new_delivery).exec(db).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use leptos::*;
|
use leptos::*;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
|
|
||||||
use apb::{target::Addressed, Activity, Object};
|
use apb::{target::Addressed, Base, Activity, Object};
|
||||||
|
|
||||||
|
|
||||||
#[component]
|
#[component]
|
||||||
|
@ -31,3 +31,39 @@ pub fn ActivityLine(activity: crate::Object) -> impl IntoView {
|
||||||
</div>
|
</div>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[component]
|
||||||
|
pub fn Item(item: crate::Object) -> impl IntoView {
|
||||||
|
let id = item.id().unwrap_or_default().to_string();
|
||||||
|
match item.object_type() {
|
||||||
|
// special case for placeholder activities
|
||||||
|
Some(apb::ObjectType::Note) | Some(apb::ObjectType::Document(_)) =>
|
||||||
|
view! { <Object object=item /> }.into_view(),
|
||||||
|
// everything else
|
||||||
|
Some(apb::ObjectType::Activity(t)) => {
|
||||||
|
let object_id = item.object().id().unwrap_or_default();
|
||||||
|
let object = match t {
|
||||||
|
apb::ActivityType::Create | apb::ActivityType::Announce =>
|
||||||
|
CACHE.get(&object_id).map(|obj| {
|
||||||
|
view! { <Object object=obj /> }
|
||||||
|
}.into_view()),
|
||||||
|
apb::ActivityType::Follow =>
|
||||||
|
CACHE.get(&object_id).map(|obj| {
|
||||||
|
view! {
|
||||||
|
<div class="ml-1">
|
||||||
|
<ActorBanner object=obj />
|
||||||
|
<FollowRequestButtons activity_id=id actor_id=object_id />
|
||||||
|
</div>
|
||||||
|
}
|
||||||
|
}.into_view()),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
view! {
|
||||||
|
<ActivityLine activity=item />
|
||||||
|
{object}
|
||||||
|
}.into_view()
|
||||||
|
},
|
||||||
|
// should never happen
|
||||||
|
_ => view! { <p><code>type not implemented</code></p> }.into_view(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -92,45 +92,16 @@ pub fn TimelineRepliesRecursive(tl: Timeline, root: String) -> impl IntoView {
|
||||||
<For
|
<For
|
||||||
each=root_values
|
each=root_values
|
||||||
key=|k| k.id().unwrap_or_default().to_string()
|
key=|k| k.id().unwrap_or_default().to_string()
|
||||||
children=move |object: crate::Object| {
|
children=move |obj: crate::Object| {
|
||||||
match object.object_type() {
|
let oid = obj.id().unwrap_or_default().to_string();
|
||||||
Some(apb::ObjectType::Activity(apb::ActivityType::Create)) => {
|
|
||||||
let oid = object.object().id().unwrap_or_default().to_string();
|
|
||||||
if let Some(note) = CACHE.get(&oid) {
|
|
||||||
view! {
|
view! {
|
||||||
<div class="context depth-r">
|
<div class="context depth-r">
|
||||||
<ActivityLine activity=object />
|
<Item item=obj />
|
||||||
<Object object=note />
|
|
||||||
<div class="depth-r">
|
<div class="depth-r">
|
||||||
<TimelineRepliesRecursive tl=tl root=oid />
|
<TimelineRepliesRecursive tl=tl root=oid />
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
view! {
|
|
||||||
<div class="context depth-r">
|
|
||||||
<ActivityLine activity=object />
|
|
||||||
</div>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Some(apb::ObjectType::Activity(_)) => view! {
|
|
||||||
<div class="context depth-r">
|
|
||||||
<ActivityLine activity=object />
|
|
||||||
</div>
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
let oid = object.id().unwrap_or_default().to_string();
|
|
||||||
view! {
|
|
||||||
<div class="context depth-r">
|
|
||||||
<Object object=object />
|
|
||||||
<div class="depth-r">
|
|
||||||
<TimelineRepliesRecursive tl=tl root=oid />
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/ >
|
/ >
|
||||||
}
|
}
|
||||||
|
@ -168,42 +139,13 @@ pub fn TimelineFeed(tl: Timeline) -> impl IntoView {
|
||||||
key=|k| k.to_string()
|
key=|k| k.to_string()
|
||||||
children=move |id: String| {
|
children=move |id: String| {
|
||||||
match CACHE.get(&id) {
|
match CACHE.get(&id) {
|
||||||
Some(item) => match item.object_type() {
|
Some(i) => view! {
|
||||||
// special case for placeholder activities
|
<Item item=i />
|
||||||
Some(apb::ObjectType::Note) => view! {
|
<hr />
|
||||||
<Object object=item.clone() />
|
|
||||||
<hr/ >
|
|
||||||
}.into_view(),
|
}.into_view(),
|
||||||
// everything else
|
|
||||||
Some(apb::ObjectType::Activity(t)) => {
|
|
||||||
let object_id = item.object().id().unwrap_or_default();
|
|
||||||
let object = match t {
|
|
||||||
apb::ActivityType::Create | apb::ActivityType::Announce =>
|
|
||||||
CACHE.get(&object_id).map(|obj| {
|
|
||||||
view! { <Object object=obj /> }
|
|
||||||
}.into_view()),
|
|
||||||
apb::ActivityType::Follow =>
|
|
||||||
CACHE.get(&object_id).map(|obj| {
|
|
||||||
view! {
|
|
||||||
<div class="ml-1">
|
|
||||||
<ActorBanner object=obj />
|
|
||||||
<FollowRequestButtons activity_id=id actor_id=object_id />
|
|
||||||
</div>
|
|
||||||
}
|
|
||||||
}.into_view()),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
view! {
|
|
||||||
<ActivityLine activity=item />
|
|
||||||
{object}
|
|
||||||
<hr/ >
|
|
||||||
}.into_view()
|
|
||||||
},
|
|
||||||
// should never happen
|
|
||||||
_ => view! { <p><code>type not implemented</code></p><hr /> }.into_view(),
|
|
||||||
},
|
|
||||||
None => view! {
|
None => view! {
|
||||||
<p><code>{id}</code>" "[<a href={uri}>go</a>]</p>
|
<p><code>{id}</code>" "[<a href={uri}>go</a>]</p>
|
||||||
|
<hr />
|
||||||
}.into_view(),
|
}.into_view(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue