Compare commits
2 commits
52b93ba539
...
174ef4198d
Author | SHA1 | Date | |
---|---|---|---|
174ef4198d | |||
0934cdaad4 |
17 changed files with 70 additions and 16 deletions
23
main.rs
23
main.rs
|
@ -4,8 +4,8 @@ use sea_orm::{ConnectOptions, Database};
|
||||||
use signal_hook::consts::signal::*;
|
use signal_hook::consts::signal::*;
|
||||||
use signal_hook_tokio::Signals;
|
use signal_hook_tokio::Signals;
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
|
use upub::{context, ext::LoggableError};
|
||||||
|
|
||||||
use upub::ext::LoggableError;
|
|
||||||
#[cfg(feature = "cli")]
|
#[cfg(feature = "cli")]
|
||||||
use upub_cli as cli;
|
use upub_cli as cli;
|
||||||
|
|
||||||
|
@ -167,8 +167,10 @@ async fn init(args: Args, config: upub::Config) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let (tx_wake, rx_wake) = tokio::sync::watch::channel(false);
|
||||||
|
let wake = CancellationToken(rx_wake);
|
||||||
|
|
||||||
let ctx = upub::Context::new(db, domain, config.clone())
|
let ctx = upub::Context::new(db, domain, config.clone(), Some(Box::new(WakerToken(tx_wake))))
|
||||||
.await.expect("failed creating server context");
|
.await.expect("failed creating server context");
|
||||||
|
|
||||||
#[cfg(feature = "cli")]
|
#[cfg(feature = "cli")]
|
||||||
|
@ -195,12 +197,12 @@ async fn init(args: Args, config: upub::Config) {
|
||||||
|
|
||||||
#[cfg(feature = "worker")]
|
#[cfg(feature = "worker")]
|
||||||
Mode::Work { filter, tasks, poll } =>
|
Mode::Work { filter, tasks, poll } =>
|
||||||
worker::spawn(ctx, tasks, poll, filter.into(), stop)
|
worker::spawn(ctx, tasks, poll, filter.into(), stop, wake)
|
||||||
.await.expect("failed running worker"),
|
.await.expect("failed running worker"),
|
||||||
|
|
||||||
#[cfg(all(feature = "serve", feature = "worker"))]
|
#[cfg(all(feature = "serve", feature = "worker"))]
|
||||||
Mode::Monolith { bind, tasks, poll } => {
|
Mode::Monolith { bind, tasks, poll } => {
|
||||||
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone());
|
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone(), wake);
|
||||||
|
|
||||||
routes::serve(ctx, bind, stop)
|
routes::serve(ctx, bind, stop)
|
||||||
.await.expect("failed serving api routes");
|
.await.expect("failed serving api routes");
|
||||||
|
@ -217,6 +219,13 @@ async fn init(args: Args, config: upub::Config) {
|
||||||
signals_task.await.expect("failed joining signal handler task");
|
signals_task.await.expect("failed joining signal handler task");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct WakerToken(tokio::sync::watch::Sender<bool>);
|
||||||
|
impl context::WakerToken for WakerToken {
|
||||||
|
fn wake(&self) {
|
||||||
|
self.0.send_replace(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct CancellationToken(tokio::sync::watch::Receiver<bool>);
|
struct CancellationToken(tokio::sync::watch::Receiver<bool>);
|
||||||
|
|
||||||
|
@ -226,6 +235,12 @@ impl worker::StopToken for CancellationToken {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl worker::WakeToken for CancellationToken {
|
||||||
|
async fn wait(&mut self) {
|
||||||
|
self.0.changed().await.err_failed("error waiting for waker token");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl routes::ShutdownToken for CancellationToken {
|
impl routes::ShutdownToken for CancellationToken {
|
||||||
async fn event(mut self) {
|
async fn event(mut self) {
|
||||||
self.0.changed().await.warn_failed("cancellation token channel closed, stopping...");
|
self.0.changed().await.warn_failed("cancellation token channel closed, stopping...");
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use sea_orm::{EntityTrait, TransactionTrait};
|
use sea_orm::{EntityTrait, TransactionTrait};
|
||||||
use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Fetcher, Normalizer};
|
use upub::traits::{fetch::RequestError, Addresser, Fetcher, Normalizer};
|
||||||
|
|
||||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<String>) -> Result<(), RequestError> {
|
pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<String>) -> Result<(), RequestError> {
|
||||||
use apb::Base;
|
use apb::Base;
|
||||||
|
@ -48,11 +48,11 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<St
|
||||||
},
|
},
|
||||||
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
|
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
|
||||||
let act = ctx.insert_activity(obj, &tx).await?;
|
let act = ctx.insert_activity(obj, &tx).await?;
|
||||||
ctx.address((Some(&act), None), &tx).await?;
|
ctx.address(Some(&act), None, &tx).await?;
|
||||||
},
|
},
|
||||||
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
|
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
|
||||||
let obj = ctx.insert_object(obj, &tx).await?;
|
let obj = ctx.insert_object(obj, &tx).await?;
|
||||||
ctx.address((None, Some(&obj)), &tx).await?;
|
ctx.address(None, Some(&obj), &tx).await?;
|
||||||
},
|
},
|
||||||
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
|
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
|
||||||
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),
|
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),
|
||||||
|
|
|
@ -17,6 +17,7 @@ struct ContextInner {
|
||||||
actor: model::actor::Model,
|
actor: model::actor::Model,
|
||||||
instance: model::instance::Model,
|
instance: model::instance::Model,
|
||||||
pkey: String,
|
pkey: String,
|
||||||
|
waker: Option<Box<dyn WakerToken>>,
|
||||||
#[allow(unused)] relay: Relays,
|
#[allow(unused)] relay: Relays,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,10 +34,14 @@ macro_rules! url {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait WakerToken: Sync + Send {
|
||||||
|
fn wake(&self);
|
||||||
|
}
|
||||||
|
|
||||||
impl Context {
|
impl Context {
|
||||||
|
|
||||||
// TODO slim constructor down, maybe make a builder?
|
// TODO slim constructor down, maybe make a builder?
|
||||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> {
|
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config, waker: Option<Box<dyn WakerToken>>) -> Result<Self, crate::init::InitError> {
|
||||||
let protocol = if domain.starts_with("http://")
|
let protocol = if domain.starts_with("http://")
|
||||||
{ "http://" } else { "https://" }.to_string();
|
{ "http://" } else { "https://" }.to_string();
|
||||||
if domain.ends_with('/') {
|
if domain.ends_with('/') {
|
||||||
|
@ -72,7 +77,7 @@ impl Context {
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Context(Arc::new(ContextInner {
|
Ok(Context(Arc::new(ContextInner {
|
||||||
base_url, db, domain, protocol, actor, instance, config, pkey, relay,
|
base_url, db, domain, protocol, actor, instance, config, pkey, relay, waker,
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,6 +172,12 @@ impl Context {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn wake_workers(&self) {
|
||||||
|
if let Some(ref waker) = self.0.waker {
|
||||||
|
waker.wake();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub fn is_relay(&self, id: &str) -> bool {
|
pub fn is_relay(&self, id: &str) -> bool {
|
||||||
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)
|
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use sea_orm::{ConnectionTrait, PaginatorTrait};
|
use sea_orm::{ConnectionTrait, PaginatorTrait};
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait AnyQuery {
|
pub trait AnyQuery {
|
||||||
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>;
|
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::collections::{hash_map::Entry, HashMap};
|
||||||
use sea_orm::{ConnectionTrait, DbErr, EntityTrait, FromQueryResult, ModelTrait, QueryFilter};
|
use sea_orm::{ConnectionTrait, DbErr, EntityTrait, FromQueryResult, ModelTrait, QueryFilter};
|
||||||
use super::RichActivity;
|
use super::RichActivity;
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait BatchFillable: Sized {
|
pub trait BatchFillable: Sized {
|
||||||
async fn with_batched<E>(self, tx: &impl ConnectionTrait) -> Result<Self, DbErr>
|
async fn with_batched<E>(self, tx: &impl ConnectionTrait) -> Result<Self, DbErr>
|
||||||
where
|
where
|
||||||
|
@ -114,6 +115,7 @@ use crate::selector::rich::{RichHashtag, RichMention};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait BatchFillableAcceptor<B> {
|
pub trait BatchFillableAcceptor<B> {
|
||||||
async fn accept(&mut self, batch: B, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
async fn accept(&mut self, batch: B, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, E
|
||||||
|
|
||||||
use crate::traits::fetch::Fetcher;
|
use crate::traits::fetch::Fetcher;
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait Addresser {
|
pub trait Addresser {
|
||||||
async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
||||||
async fn address(&self, activity: Option<&crate::model::activity::Model>, object: Option<&crate::model::object::Model>, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
async fn address(&self, activity: Option<&crate::model::activity::Model>, object: Option<&crate::model::object::Model>, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
||||||
|
|
|
@ -2,6 +2,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||||
|
|
||||||
use crate::ext::JsonVec;
|
use crate::ext::JsonVec;
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait Administrable {
|
pub trait Administrable {
|
||||||
async fn register_user(
|
async fn register_user(
|
||||||
&self,
|
&self,
|
||||||
|
|
|
@ -463,6 +463,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
|
||||||
Ok(object_model)
|
Ok(object_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait Fetchable : Sync + Send {
|
pub trait Fetchable : Sync + Send {
|
||||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
|
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ pub enum NormalizerError {
|
||||||
DbErr(#[from] sea_orm::DbErr),
|
DbErr(#[from] sea_orm::DbErr),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait Normalizer {
|
pub trait Normalizer {
|
||||||
async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>;
|
async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>;
|
||||||
async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>;
|
async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>;
|
||||||
|
|
|
@ -29,10 +29,12 @@ pub enum ProcessorError {
|
||||||
PullError(#[from] crate::traits::fetch::RequestError),
|
PullError(#[from] crate::traits::fetch::RequestError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
pub trait Processor {
|
pub trait Processor {
|
||||||
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>;
|
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl Processor for crate::Context {
|
impl Processor for crate::Context {
|
||||||
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||||
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||||
|
|
|
@ -33,3 +33,8 @@ nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "
|
||||||
# mastodon
|
# mastodon
|
||||||
mastodon-async-entities = { version = "1.1.0", optional = true }
|
mastodon-async-entities = { version = "1.1.0", optional = true }
|
||||||
time = { version = "0.3", features = ["serde"], optional = true }
|
time = { version = "0.3", features = ["serde"], optional = true }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
mastodon = ["dep:mastodon-async-entities"]
|
||||||
|
web = []
|
||||||
|
|
|
@ -62,6 +62,7 @@ impl Identity {
|
||||||
|
|
||||||
pub struct AuthIdentity(pub Identity);
|
pub struct AuthIdentity(pub Identity);
|
||||||
|
|
||||||
|
#[axum::async_trait]
|
||||||
impl<S> FromRequestParts<S> for AuthIdentity
|
impl<S> FromRequestParts<S> for AuthIdentity
|
||||||
where
|
where
|
||||||
upub::Context: FromRef<S>,
|
upub::Context: FromRef<S>,
|
||||||
|
|
|
@ -53,12 +53,13 @@ pub async fn serve(ctx: upub::Context, bind: String, shutdown: impl ShutdownToke
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(bind).await?;
|
let listener = tokio::net::TcpListener::bind(bind).await?;
|
||||||
axum::serve(listener, router)
|
axum::serve(listener, router)
|
||||||
.with_graceful_shutdown(async move { shutdown.event().await })
|
.with_graceful_shutdown(shutdown.event())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ShutdownToken: Sync + Send + 'static {
|
pub trait ShutdownToken: Sync + Send + 'static {
|
||||||
async fn event(self);
|
// TODO this is bs...
|
||||||
|
fn event(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ use reqwest::Method;
|
||||||
use apb::{LD, ActivityMut};
|
use apb::{LD, ActivityMut};
|
||||||
use upub::{Context, model, traits::Fetcher};
|
use upub::{Context, model, traits::Fetcher};
|
||||||
|
|
||||||
|
#[allow(clippy::manual_map)] // TODO can Update code be improved?
|
||||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||||
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
||||||
|
|
||||||
|
|
|
@ -29,10 +29,11 @@ pub enum JobError {
|
||||||
|
|
||||||
pub type JobResult<T> = Result<T, JobError>;
|
pub type JobResult<T> = Result<T, JobError>;
|
||||||
|
|
||||||
|
#[allow(async_fn_in_trait)]
|
||||||
pub trait JobDispatcher : Sized {
|
pub trait JobDispatcher : Sized {
|
||||||
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
|
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
|
||||||
async fn lock(&self, job_internal: i64) -> JobResult<bool>;
|
async fn lock(&self, job_internal: i64) -> JobResult<bool>;
|
||||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken);
|
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JobDispatcher for Context {
|
impl JobDispatcher for Context {
|
||||||
|
@ -69,12 +70,15 @@ impl JobDispatcher for Context {
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken) {
|
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
|
||||||
macro_rules! restart {
|
macro_rules! restart {
|
||||||
(now) => { continue };
|
(now) => { continue };
|
||||||
() => {
|
() => {
|
||||||
{
|
{
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
|
||||||
|
_ = wake.wait() => {},
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,14 +11,20 @@ pub fn spawn(
|
||||||
poll: u64,
|
poll: u64,
|
||||||
filter: Option<upub::model::job::JobType>,
|
filter: Option<upub::model::job::JobType>,
|
||||||
stop: impl StopToken,
|
stop: impl StopToken,
|
||||||
|
wake: impl WakeToken,
|
||||||
) -> tokio::task::JoinHandle<()> {
|
) -> tokio::task::JoinHandle<()> {
|
||||||
use dispatcher::JobDispatcher;
|
use dispatcher::JobDispatcher;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("starting worker task");
|
tracing::info!("starting worker task");
|
||||||
ctx.run(concurrency, poll, filter, stop).await
|
ctx.run(concurrency, poll, filter, stop, wake).await
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait StopToken: Sync + Send + 'static {
|
pub trait StopToken: Sync + Send + 'static {
|
||||||
fn stop(&self) -> bool;
|
fn stop(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait WakeToken: Sync + Send + 'static {
|
||||||
|
// TODO this is bs...
|
||||||
|
fn wait(&mut self) -> impl std::future::Future<Output = ()> + std::marker::Send;
|
||||||
|
}
|
||||||
|
|
|
@ -148,5 +148,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
||||||
|
|
||||||
tx.commit().await?;
|
tx.commit().await?;
|
||||||
|
|
||||||
|
ctx.wake_workers();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue