Compare commits

...

2 commits

Author SHA1 Message Date
174ef4198d
chore: clippy warns, async_trait fixes 2024-09-19 16:51:20 +02:00
0934cdaad4
feat: waker implementation
basically posting should now be instant? very ugly implementation tho: i
wanted to keep tokio out of core but this is awful and also im
realistically never swapping out tokio, so might as well move these
tokens down in core...
2024-09-19 16:50:13 +02:00
17 changed files with 70 additions and 16 deletions

23
main.rs
View file

@ -4,8 +4,8 @@ use sea_orm::{ConnectOptions, Database};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use futures::stream::StreamExt;
use upub::{context, ext::LoggableError};
use upub::ext::LoggableError;
#[cfg(feature = "cli")]
use upub_cli as cli;
@ -167,8 +167,10 @@ async fn init(args: Args, config: upub::Config) {
return;
}
let (tx_wake, rx_wake) = tokio::sync::watch::channel(false);
let wake = CancellationToken(rx_wake);
let ctx = upub::Context::new(db, domain, config.clone())
let ctx = upub::Context::new(db, domain, config.clone(), Some(Box::new(WakerToken(tx_wake))))
.await.expect("failed creating server context");
#[cfg(feature = "cli")]
@ -195,12 +197,12 @@ async fn init(args: Args, config: upub::Config) {
#[cfg(feature = "worker")]
Mode::Work { filter, tasks, poll } =>
worker::spawn(ctx, tasks, poll, filter.into(), stop)
worker::spawn(ctx, tasks, poll, filter.into(), stop, wake)
.await.expect("failed running worker"),
#[cfg(all(feature = "serve", feature = "worker"))]
Mode::Monolith { bind, tasks, poll } => {
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone());
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone(), wake);
routes::serve(ctx, bind, stop)
.await.expect("failed serving api routes");
@ -217,6 +219,13 @@ async fn init(args: Args, config: upub::Config) {
signals_task.await.expect("failed joining signal handler task");
}
struct WakerToken(tokio::sync::watch::Sender<bool>);
impl context::WakerToken for WakerToken {
fn wake(&self) {
self.0.send_replace(true);
}
}
#[derive(Clone)]
struct CancellationToken(tokio::sync::watch::Receiver<bool>);
@ -226,6 +235,12 @@ impl worker::StopToken for CancellationToken {
}
}
impl worker::WakeToken for CancellationToken {
async fn wait(&mut self) {
self.0.changed().await.err_failed("error waiting for waker token");
}
}
impl routes::ShutdownToken for CancellationToken {
async fn event(mut self) {
self.0.changed().await.warn_failed("cancellation token channel closed, stopping...");

View file

@ -1,5 +1,5 @@
use sea_orm::{EntityTrait, TransactionTrait};
use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Fetcher, Normalizer};
use upub::traits::{fetch::RequestError, Addresser, Fetcher, Normalizer};
pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<String>) -> Result<(), RequestError> {
use apb::Base;
@ -48,11 +48,11 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<St
},
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
let act = ctx.insert_activity(obj, &tx).await?;
ctx.address((Some(&act), None), &tx).await?;
ctx.address(Some(&act), None, &tx).await?;
},
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
let obj = ctx.insert_object(obj, &tx).await?;
ctx.address((None, Some(&obj)), &tx).await?;
ctx.address(None, Some(&obj), &tx).await?;
},
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),

View file

@ -17,6 +17,7 @@ struct ContextInner {
actor: model::actor::Model,
instance: model::instance::Model,
pkey: String,
waker: Option<Box<dyn WakerToken>>,
#[allow(unused)] relay: Relays,
}
@ -33,10 +34,14 @@ macro_rules! url {
};
}
pub trait WakerToken: Sync + Send {
fn wake(&self);
}
impl Context {
// TODO slim constructor down, maybe make a builder?
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> {
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config, waker: Option<Box<dyn WakerToken>>) -> Result<Self, crate::init::InitError> {
let protocol = if domain.starts_with("http://")
{ "http://" } else { "https://" }.to_string();
if domain.ends_with('/') {
@ -72,7 +77,7 @@ impl Context {
};
Ok(Context(Arc::new(ContextInner {
base_url, db, domain, protocol, actor, instance, config, pkey, relay,
base_url, db, domain, protocol, actor, instance, config, pkey, relay, waker,
})))
}
@ -167,6 +172,12 @@ impl Context {
Ok(None)
}
pub fn wake_workers(&self) {
if let Some(ref waker) = self.0.waker {
waker.wake();
}
}
#[allow(unused)]
pub fn is_relay(&self, id: &str) -> bool {
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)

View file

@ -1,6 +1,6 @@
use sea_orm::{ConnectionTrait, PaginatorTrait};
#[allow(async_fn_in_trait)]
pub trait AnyQuery {
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>;
}

View file

@ -3,6 +3,7 @@ use std::collections::{hash_map::Entry, HashMap};
use sea_orm::{ConnectionTrait, DbErr, EntityTrait, FromQueryResult, ModelTrait, QueryFilter};
use super::RichActivity;
#[allow(async_fn_in_trait)]
pub trait BatchFillable: Sized {
async fn with_batched<E>(self, tx: &impl ConnectionTrait) -> Result<Self, DbErr>
where
@ -114,6 +115,7 @@ use crate::selector::rich::{RichHashtag, RichMention};
}
}
#[allow(async_fn_in_trait)]
pub trait BatchFillableAcceptor<B> {
async fn accept(&mut self, batch: B, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
}

View file

@ -5,6 +5,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, E
use crate::traits::fetch::Fetcher;
#[allow(async_fn_in_trait)]
pub trait Addresser {
async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
async fn address(&self, activity: Option<&crate::model::activity::Model>, object: Option<&crate::model::object::Model>, tx: &impl ConnectionTrait) -> Result<(), DbErr>;

View file

@ -2,6 +2,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
use crate::ext::JsonVec;
#[allow(async_fn_in_trait)]
pub trait Administrable {
async fn register_user(
&self,

View file

@ -463,6 +463,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
Ok(object_model)
}
#[allow(async_fn_in_trait)]
pub trait Fetchable : Sync + Send {
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
}

View file

@ -15,6 +15,7 @@ pub enum NormalizerError {
DbErr(#[from] sea_orm::DbErr),
}
#[allow(async_fn_in_trait)]
pub trait Normalizer {
async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>;
async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>;

View file

@ -29,10 +29,12 @@ pub enum ProcessorError {
PullError(#[from] crate::traits::fetch::RequestError),
}
#[async_trait::async_trait]
pub trait Processor {
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>;
}
#[async_trait::async_trait]
impl Processor for crate::Context {
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?

View file

@ -33,3 +33,8 @@ nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "
# mastodon
mastodon-async-entities = { version = "1.1.0", optional = true }
time = { version = "0.3", features = ["serde"], optional = true }
[features]
default = []
mastodon = ["dep:mastodon-async-entities"]
web = []

View file

@ -62,6 +62,7 @@ impl Identity {
pub struct AuthIdentity(pub Identity);
#[axum::async_trait]
impl<S> FromRequestParts<S> for AuthIdentity
where
upub::Context: FromRef<S>,

View file

@ -53,12 +53,13 @@ pub async fn serve(ctx: upub::Context, bind: String, shutdown: impl ShutdownToke
let listener = tokio::net::TcpListener::bind(bind).await?;
axum::serve(listener, router)
.with_graceful_shutdown(async move { shutdown.event().await })
.with_graceful_shutdown(shutdown.event())
.await?;
Ok(())
}
pub trait ShutdownToken: Sync + Send + 'static {
async fn event(self);
// TODO this is bs...
fn event(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
}

View file

@ -3,6 +3,7 @@ use reqwest::Method;
use apb::{LD, ActivityMut};
use upub::{Context, model, traits::Fetcher};
#[allow(clippy::manual_map)] // TODO can Update code be improved?
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
tracing::info!("delivering {} to {:?}", job.activity, job.target);

View file

@ -29,10 +29,11 @@ pub enum JobError {
pub type JobResult<T> = Result<T, JobError>;
#[allow(async_fn_in_trait)]
pub trait JobDispatcher : Sized {
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
async fn lock(&self, job_internal: i64) -> JobResult<bool>;
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken);
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
}
impl JobDispatcher for Context {
@ -69,12 +70,15 @@ impl JobDispatcher for Context {
Ok(true)
}
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken) {
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
macro_rules! restart {
(now) => { continue };
() => {
{
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
_ = wake.wait() => {},
}
continue;
}
}

View file

@ -11,14 +11,20 @@ pub fn spawn(
poll: u64,
filter: Option<upub::model::job::JobType>,
stop: impl StopToken,
wake: impl WakeToken,
) -> tokio::task::JoinHandle<()> {
use dispatcher::JobDispatcher;
tokio::spawn(async move {
tracing::info!("starting worker task");
ctx.run(concurrency, poll, filter, stop).await
ctx.run(concurrency, poll, filter, stop, wake).await
})
}
pub trait StopToken: Sync + Send + 'static {
fn stop(&self) -> bool;
}
pub trait WakeToken: Sync + Send + 'static {
// TODO this is bs...
fn wait(&mut self) -> impl std::future::Future<Output = ()> + std::marker::Send;
}

View file

@ -148,5 +148,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
tx.commit().await?;
ctx.wake_workers();
Ok(())
}