Compare commits

...

2 commits

Author SHA1 Message Date
174ef4198d
chore: clippy warns, async_trait fixes 2024-09-19 16:51:20 +02:00
0934cdaad4
feat: waker implementation
basically posting should now be instant? very ugly implementation tho: i
wanted to keep tokio out of core but this is awful and also im
realistically never swapping out tokio, so might as well move these
tokens down in core...
2024-09-19 16:50:13 +02:00
17 changed files with 70 additions and 16 deletions

23
main.rs
View file

@ -4,8 +4,8 @@ use sea_orm::{ConnectOptions, Database};
use signal_hook::consts::signal::*; use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals; use signal_hook_tokio::Signals;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use upub::{context, ext::LoggableError};
use upub::ext::LoggableError;
#[cfg(feature = "cli")] #[cfg(feature = "cli")]
use upub_cli as cli; use upub_cli as cli;
@ -167,8 +167,10 @@ async fn init(args: Args, config: upub::Config) {
return; return;
} }
let (tx_wake, rx_wake) = tokio::sync::watch::channel(false);
let wake = CancellationToken(rx_wake);
let ctx = upub::Context::new(db, domain, config.clone()) let ctx = upub::Context::new(db, domain, config.clone(), Some(Box::new(WakerToken(tx_wake))))
.await.expect("failed creating server context"); .await.expect("failed creating server context");
#[cfg(feature = "cli")] #[cfg(feature = "cli")]
@ -195,12 +197,12 @@ async fn init(args: Args, config: upub::Config) {
#[cfg(feature = "worker")] #[cfg(feature = "worker")]
Mode::Work { filter, tasks, poll } => Mode::Work { filter, tasks, poll } =>
worker::spawn(ctx, tasks, poll, filter.into(), stop) worker::spawn(ctx, tasks, poll, filter.into(), stop, wake)
.await.expect("failed running worker"), .await.expect("failed running worker"),
#[cfg(all(feature = "serve", feature = "worker"))] #[cfg(all(feature = "serve", feature = "worker"))]
Mode::Monolith { bind, tasks, poll } => { Mode::Monolith { bind, tasks, poll } => {
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone()); worker::spawn(ctx.clone(), tasks, poll, None, stop.clone(), wake);
routes::serve(ctx, bind, stop) routes::serve(ctx, bind, stop)
.await.expect("failed serving api routes"); .await.expect("failed serving api routes");
@ -217,6 +219,13 @@ async fn init(args: Args, config: upub::Config) {
signals_task.await.expect("failed joining signal handler task"); signals_task.await.expect("failed joining signal handler task");
} }
struct WakerToken(tokio::sync::watch::Sender<bool>);
impl context::WakerToken for WakerToken {
fn wake(&self) {
self.0.send_replace(true);
}
}
#[derive(Clone)] #[derive(Clone)]
struct CancellationToken(tokio::sync::watch::Receiver<bool>); struct CancellationToken(tokio::sync::watch::Receiver<bool>);
@ -226,6 +235,12 @@ impl worker::StopToken for CancellationToken {
} }
} }
impl worker::WakeToken for CancellationToken {
async fn wait(&mut self) {
self.0.changed().await.err_failed("error waiting for waker token");
}
}
impl routes::ShutdownToken for CancellationToken { impl routes::ShutdownToken for CancellationToken {
async fn event(mut self) { async fn event(mut self) {
self.0.changed().await.warn_failed("cancellation token channel closed, stopping..."); self.0.changed().await.warn_failed("cancellation token channel closed, stopping...");

View file

@ -1,5 +1,5 @@
use sea_orm::{EntityTrait, TransactionTrait}; use sea_orm::{EntityTrait, TransactionTrait};
use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Fetcher, Normalizer}; use upub::traits::{fetch::RequestError, Addresser, Fetcher, Normalizer};
pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<String>) -> Result<(), RequestError> { pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<String>) -> Result<(), RequestError> {
use apb::Base; use apb::Base;
@ -48,11 +48,11 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<St
}, },
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
let act = ctx.insert_activity(obj, &tx).await?; let act = ctx.insert_activity(obj, &tx).await?;
ctx.address((Some(&act), None), &tx).await?; ctx.address(Some(&act), None, &tx).await?;
}, },
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => { Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
let obj = ctx.insert_object(obj, &tx).await?; let obj = ctx.insert_object(obj, &tx).await?;
ctx.address((None, Some(&obj)), &tx).await?; ctx.address(None, Some(&obj), &tx).await?;
}, },
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),

View file

@ -17,6 +17,7 @@ struct ContextInner {
actor: model::actor::Model, actor: model::actor::Model,
instance: model::instance::Model, instance: model::instance::Model,
pkey: String, pkey: String,
waker: Option<Box<dyn WakerToken>>,
#[allow(unused)] relay: Relays, #[allow(unused)] relay: Relays,
} }
@ -33,10 +34,14 @@ macro_rules! url {
}; };
} }
pub trait WakerToken: Sync + Send {
fn wake(&self);
}
impl Context { impl Context {
// TODO slim constructor down, maybe make a builder? // TODO slim constructor down, maybe make a builder?
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> { pub async fn new(db: DatabaseConnection, mut domain: String, config: Config, waker: Option<Box<dyn WakerToken>>) -> Result<Self, crate::init::InitError> {
let protocol = if domain.starts_with("http://") let protocol = if domain.starts_with("http://")
{ "http://" } else { "https://" }.to_string(); { "http://" } else { "https://" }.to_string();
if domain.ends_with('/') { if domain.ends_with('/') {
@ -72,7 +77,7 @@ impl Context {
}; };
Ok(Context(Arc::new(ContextInner { Ok(Context(Arc::new(ContextInner {
base_url, db, domain, protocol, actor, instance, config, pkey, relay, base_url, db, domain, protocol, actor, instance, config, pkey, relay, waker,
}))) })))
} }
@ -167,6 +172,12 @@ impl Context {
Ok(None) Ok(None)
} }
pub fn wake_workers(&self) {
if let Some(ref waker) = self.0.waker {
waker.wake();
}
}
#[allow(unused)] #[allow(unused)]
pub fn is_relay(&self, id: &str) -> bool { pub fn is_relay(&self, id: &str) -> bool {
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)

View file

@ -1,6 +1,6 @@
use sea_orm::{ConnectionTrait, PaginatorTrait}; use sea_orm::{ConnectionTrait, PaginatorTrait};
#[allow(async_fn_in_trait)]
pub trait AnyQuery { pub trait AnyQuery {
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>; async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>;
} }

View file

@ -3,6 +3,7 @@ use std::collections::{hash_map::Entry, HashMap};
use sea_orm::{ConnectionTrait, DbErr, EntityTrait, FromQueryResult, ModelTrait, QueryFilter}; use sea_orm::{ConnectionTrait, DbErr, EntityTrait, FromQueryResult, ModelTrait, QueryFilter};
use super::RichActivity; use super::RichActivity;
#[allow(async_fn_in_trait)]
pub trait BatchFillable: Sized { pub trait BatchFillable: Sized {
async fn with_batched<E>(self, tx: &impl ConnectionTrait) -> Result<Self, DbErr> async fn with_batched<E>(self, tx: &impl ConnectionTrait) -> Result<Self, DbErr>
where where
@ -114,6 +115,7 @@ use crate::selector::rich::{RichHashtag, RichMention};
} }
} }
#[allow(async_fn_in_trait)]
pub trait BatchFillableAcceptor<B> { pub trait BatchFillableAcceptor<B> {
async fn accept(&mut self, batch: B, tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn accept(&mut self, batch: B, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
} }

View file

@ -5,6 +5,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, E
use crate::traits::fetch::Fetcher; use crate::traits::fetch::Fetcher;
#[allow(async_fn_in_trait)]
pub trait Addresser { pub trait Addresser {
async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn address(&self, activity: Option<&crate::model::activity::Model>, object: Option<&crate::model::object::Model>, tx: &impl ConnectionTrait) -> Result<(), DbErr>; async fn address(&self, activity: Option<&crate::model::activity::Model>, object: Option<&crate::model::object::Model>, tx: &impl ConnectionTrait) -> Result<(), DbErr>;

View file

@ -2,6 +2,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
use crate::ext::JsonVec; use crate::ext::JsonVec;
#[allow(async_fn_in_trait)]
pub trait Administrable { pub trait Administrable {
async fn register_user( async fn register_user(
&self, &self,

View file

@ -463,6 +463,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
Ok(object_model) Ok(object_model)
} }
#[allow(async_fn_in_trait)]
pub trait Fetchable : Sync + Send { pub trait Fetchable : Sync + Send {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>; async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
} }

View file

@ -15,6 +15,7 @@ pub enum NormalizerError {
DbErr(#[from] sea_orm::DbErr), DbErr(#[from] sea_orm::DbErr),
} }
#[allow(async_fn_in_trait)]
pub trait Normalizer { pub trait Normalizer {
async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>; async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>;
async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>; async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>;

View file

@ -29,10 +29,12 @@ pub enum ProcessorError {
PullError(#[from] crate::traits::fetch::RequestError), PullError(#[from] crate::traits::fetch::RequestError),
} }
#[async_trait::async_trait]
pub trait Processor { pub trait Processor {
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>; async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>;
} }
#[async_trait::async_trait]
impl Processor for crate::Context { impl Processor for crate::Context {
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> { async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
// TODO we could process Links and bare Objects maybe, but probably out of AP spec? // TODO we could process Links and bare Objects maybe, but probably out of AP spec?

View file

@ -33,3 +33,8 @@ nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "
# mastodon # mastodon
mastodon-async-entities = { version = "1.1.0", optional = true } mastodon-async-entities = { version = "1.1.0", optional = true }
time = { version = "0.3", features = ["serde"], optional = true } time = { version = "0.3", features = ["serde"], optional = true }
[features]
default = []
mastodon = ["dep:mastodon-async-entities"]
web = []

View file

@ -62,6 +62,7 @@ impl Identity {
pub struct AuthIdentity(pub Identity); pub struct AuthIdentity(pub Identity);
#[axum::async_trait]
impl<S> FromRequestParts<S> for AuthIdentity impl<S> FromRequestParts<S> for AuthIdentity
where where
upub::Context: FromRef<S>, upub::Context: FromRef<S>,

View file

@ -53,12 +53,13 @@ pub async fn serve(ctx: upub::Context, bind: String, shutdown: impl ShutdownToke
let listener = tokio::net::TcpListener::bind(bind).await?; let listener = tokio::net::TcpListener::bind(bind).await?;
axum::serve(listener, router) axum::serve(listener, router)
.with_graceful_shutdown(async move { shutdown.event().await }) .with_graceful_shutdown(shutdown.event())
.await?; .await?;
Ok(()) Ok(())
} }
pub trait ShutdownToken: Sync + Send + 'static { pub trait ShutdownToken: Sync + Send + 'static {
async fn event(self); // TODO this is bs...
fn event(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
} }

View file

@ -3,6 +3,7 @@ use reqwest::Method;
use apb::{LD, ActivityMut}; use apb::{LD, ActivityMut};
use upub::{Context, model, traits::Fetcher}; use upub::{Context, model, traits::Fetcher};
#[allow(clippy::manual_map)] // TODO can Update code be improved?
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> { pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
tracing::info!("delivering {} to {:?}", job.activity, job.target); tracing::info!("delivering {} to {:?}", job.activity, job.target);

View file

@ -29,10 +29,11 @@ pub enum JobError {
pub type JobResult<T> = Result<T, JobError>; pub type JobResult<T> = Result<T, JobError>;
#[allow(async_fn_in_trait)]
pub trait JobDispatcher : Sized { pub trait JobDispatcher : Sized {
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>; async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
async fn lock(&self, job_internal: i64) -> JobResult<bool>; async fn lock(&self, job_internal: i64) -> JobResult<bool>;
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken); async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
} }
impl JobDispatcher for Context { impl JobDispatcher for Context {
@ -69,12 +70,15 @@ impl JobDispatcher for Context {
Ok(true) Ok(true)
} }
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken) { async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
macro_rules! restart { macro_rules! restart {
(now) => { continue }; (now) => { continue };
() => { () => {
{ {
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await; tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
_ = wake.wait() => {},
}
continue; continue;
} }
} }

View file

@ -11,14 +11,20 @@ pub fn spawn(
poll: u64, poll: u64,
filter: Option<upub::model::job::JobType>, filter: Option<upub::model::job::JobType>,
stop: impl StopToken, stop: impl StopToken,
wake: impl WakeToken,
) -> tokio::task::JoinHandle<()> { ) -> tokio::task::JoinHandle<()> {
use dispatcher::JobDispatcher; use dispatcher::JobDispatcher;
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("starting worker task"); tracing::info!("starting worker task");
ctx.run(concurrency, poll, filter, stop).await ctx.run(concurrency, poll, filter, stop, wake).await
}) })
} }
pub trait StopToken: Sync + Send + 'static { pub trait StopToken: Sync + Send + 'static {
fn stop(&self) -> bool; fn stop(&self) -> bool;
} }
pub trait WakeToken: Sync + Send + 'static {
// TODO this is bs...
fn wait(&mut self) -> impl std::future::Future<Output = ()> + std::marker::Send;
}

View file

@ -148,5 +148,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
tx.commit().await?; tx.commit().await?;
ctx.wake_workers();
Ok(()) Ok(())
} }