Compare commits
2 commits
52b93ba539
...
174ef4198d
Author | SHA1 | Date | |
---|---|---|---|
174ef4198d | |||
0934cdaad4 |
17 changed files with 70 additions and 16 deletions
23
main.rs
23
main.rs
|
@ -4,8 +4,8 @@ use sea_orm::{ConnectOptions, Database};
|
|||
use signal_hook::consts::signal::*;
|
||||
use signal_hook_tokio::Signals;
|
||||
use futures::stream::StreamExt;
|
||||
use upub::{context, ext::LoggableError};
|
||||
|
||||
use upub::ext::LoggableError;
|
||||
#[cfg(feature = "cli")]
|
||||
use upub_cli as cli;
|
||||
|
||||
|
@ -167,8 +167,10 @@ async fn init(args: Args, config: upub::Config) {
|
|||
return;
|
||||
}
|
||||
|
||||
let (tx_wake, rx_wake) = tokio::sync::watch::channel(false);
|
||||
let wake = CancellationToken(rx_wake);
|
||||
|
||||
let ctx = upub::Context::new(db, domain, config.clone())
|
||||
let ctx = upub::Context::new(db, domain, config.clone(), Some(Box::new(WakerToken(tx_wake))))
|
||||
.await.expect("failed creating server context");
|
||||
|
||||
#[cfg(feature = "cli")]
|
||||
|
@ -195,12 +197,12 @@ async fn init(args: Args, config: upub::Config) {
|
|||
|
||||
#[cfg(feature = "worker")]
|
||||
Mode::Work { filter, tasks, poll } =>
|
||||
worker::spawn(ctx, tasks, poll, filter.into(), stop)
|
||||
worker::spawn(ctx, tasks, poll, filter.into(), stop, wake)
|
||||
.await.expect("failed running worker"),
|
||||
|
||||
#[cfg(all(feature = "serve", feature = "worker"))]
|
||||
Mode::Monolith { bind, tasks, poll } => {
|
||||
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone());
|
||||
worker::spawn(ctx.clone(), tasks, poll, None, stop.clone(), wake);
|
||||
|
||||
routes::serve(ctx, bind, stop)
|
||||
.await.expect("failed serving api routes");
|
||||
|
@ -217,6 +219,13 @@ async fn init(args: Args, config: upub::Config) {
|
|||
signals_task.await.expect("failed joining signal handler task");
|
||||
}
|
||||
|
||||
struct WakerToken(tokio::sync::watch::Sender<bool>);
|
||||
impl context::WakerToken for WakerToken {
|
||||
fn wake(&self) {
|
||||
self.0.send_replace(true);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CancellationToken(tokio::sync::watch::Receiver<bool>);
|
||||
|
||||
|
@ -226,6 +235,12 @@ impl worker::StopToken for CancellationToken {
|
|||
}
|
||||
}
|
||||
|
||||
impl worker::WakeToken for CancellationToken {
|
||||
async fn wait(&mut self) {
|
||||
self.0.changed().await.err_failed("error waiting for waker token");
|
||||
}
|
||||
}
|
||||
|
||||
impl routes::ShutdownToken for CancellationToken {
|
||||
async fn event(mut self) {
|
||||
self.0.changed().await.warn_failed("cancellation token channel closed, stopping...");
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use sea_orm::{EntityTrait, TransactionTrait};
|
||||
use upub::traits::{fetch::{Fetchable, RequestError}, Addresser, Fetcher, Normalizer};
|
||||
use upub::traits::{fetch::RequestError, Addresser, Fetcher, Normalizer};
|
||||
|
||||
pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<String>) -> Result<(), RequestError> {
|
||||
use apb::Base;
|
||||
|
@ -48,11 +48,11 @@ pub async fn fetch(ctx: upub::Context, uri: String, save: bool, actor: Option<St
|
|||
},
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
|
||||
let act = ctx.insert_activity(obj, &tx).await?;
|
||||
ctx.address((Some(&act), None), &tx).await?;
|
||||
ctx.address(Some(&act), None, &tx).await?;
|
||||
},
|
||||
Ok(apb::BaseType::Object(apb::ObjectType::Note)) => {
|
||||
let obj = ctx.insert_object(obj, &tx).await?;
|
||||
ctx.address((None, Some(&obj)), &tx).await?;
|
||||
ctx.address(None, Some(&obj), &tx).await?;
|
||||
},
|
||||
Ok(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
|
||||
Ok(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),
|
||||
|
|
|
@ -17,6 +17,7 @@ struct ContextInner {
|
|||
actor: model::actor::Model,
|
||||
instance: model::instance::Model,
|
||||
pkey: String,
|
||||
waker: Option<Box<dyn WakerToken>>,
|
||||
#[allow(unused)] relay: Relays,
|
||||
}
|
||||
|
||||
|
@ -33,10 +34,14 @@ macro_rules! url {
|
|||
};
|
||||
}
|
||||
|
||||
pub trait WakerToken: Sync + Send {
|
||||
fn wake(&self);
|
||||
}
|
||||
|
||||
impl Context {
|
||||
|
||||
// TODO slim constructor down, maybe make a builder?
|
||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> {
|
||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config, waker: Option<Box<dyn WakerToken>>) -> Result<Self, crate::init::InitError> {
|
||||
let protocol = if domain.starts_with("http://")
|
||||
{ "http://" } else { "https://" }.to_string();
|
||||
if domain.ends_with('/') {
|
||||
|
@ -72,7 +77,7 @@ impl Context {
|
|||
};
|
||||
|
||||
Ok(Context(Arc::new(ContextInner {
|
||||
base_url, db, domain, protocol, actor, instance, config, pkey, relay,
|
||||
base_url, db, domain, protocol, actor, instance, config, pkey, relay, waker,
|
||||
})))
|
||||
}
|
||||
|
||||
|
@ -167,6 +172,12 @@ impl Context {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn wake_workers(&self) {
|
||||
if let Some(ref waker) = self.0.waker {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn is_relay(&self, id: &str) -> bool {
|
||||
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use sea_orm::{ConnectionTrait, PaginatorTrait};
|
||||
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait AnyQuery {
|
||||
async fn any(self, db: &impl ConnectionTrait) -> Result<bool, sea_orm::DbErr>;
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::collections::{hash_map::Entry, HashMap};
|
|||
use sea_orm::{ConnectionTrait, DbErr, EntityTrait, FromQueryResult, ModelTrait, QueryFilter};
|
||||
use super::RichActivity;
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait BatchFillable: Sized {
|
||||
async fn with_batched<E>(self, tx: &impl ConnectionTrait) -> Result<Self, DbErr>
|
||||
where
|
||||
|
@ -114,6 +115,7 @@ use crate::selector::rich::{RichHashtag, RichMention};
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait BatchFillableAcceptor<B> {
|
||||
async fn accept(&mut self, batch: B, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, ConnectionTrait, DbErr, E
|
|||
|
||||
use crate::traits::fetch::Fetcher;
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait Addresser {
|
||||
async fn deliver(&self, to: Vec<String>, aid: &str, from: &str, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
||||
async fn address(&self, activity: Option<&crate::model::activity::Model>, object: Option<&crate::model::object::Model>, tx: &impl ConnectionTrait) -> Result<(), DbErr>;
|
||||
|
|
|
@ -2,6 +2,7 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
|||
|
||||
use crate::ext::JsonVec;
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait Administrable {
|
||||
async fn register_user(
|
||||
&self,
|
||||
|
|
|
@ -463,6 +463,7 @@ async fn resolve_object_r(ctx: &crate::Context, object: serde_json::Value, depth
|
|||
Ok(object_model)
|
||||
}
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait Fetchable : Sync + Send {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, RequestError>;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ pub enum NormalizerError {
|
|||
DbErr(#[from] sea_orm::DbErr),
|
||||
}
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait Normalizer {
|
||||
async fn insert_object(&self, obj: impl apb::Object, tx: &impl ConnectionTrait) -> Result<crate::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, NormalizerError>;
|
||||
|
|
|
@ -29,10 +29,12 @@ pub enum ProcessorError {
|
|||
PullError(#[from] crate::traits::fetch::RequestError),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Processor {
|
||||
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Processor for crate::Context {
|
||||
async fn process(&self, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
|
||||
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||
|
|
|
@ -33,3 +33,8 @@ nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "
|
|||
# mastodon
|
||||
mastodon-async-entities = { version = "1.1.0", optional = true }
|
||||
time = { version = "0.3", features = ["serde"], optional = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
mastodon = ["dep:mastodon-async-entities"]
|
||||
web = []
|
||||
|
|
|
@ -62,6 +62,7 @@ impl Identity {
|
|||
|
||||
pub struct AuthIdentity(pub Identity);
|
||||
|
||||
#[axum::async_trait]
|
||||
impl<S> FromRequestParts<S> for AuthIdentity
|
||||
where
|
||||
upub::Context: FromRef<S>,
|
||||
|
|
|
@ -53,12 +53,13 @@ pub async fn serve(ctx: upub::Context, bind: String, shutdown: impl ShutdownToke
|
|||
|
||||
let listener = tokio::net::TcpListener::bind(bind).await?;
|
||||
axum::serve(listener, router)
|
||||
.with_graceful_shutdown(async move { shutdown.event().await })
|
||||
.with_graceful_shutdown(shutdown.event())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub trait ShutdownToken: Sync + Send + 'static {
|
||||
async fn event(self);
|
||||
// TODO this is bs...
|
||||
fn event(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ use reqwest::Method;
|
|||
use apb::{LD, ActivityMut};
|
||||
use upub::{Context, model, traits::Fetcher};
|
||||
|
||||
#[allow(clippy::manual_map)] // TODO can Update code be improved?
|
||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
||||
|
||||
|
|
|
@ -29,10 +29,11 @@ pub enum JobError {
|
|||
|
||||
pub type JobResult<T> = Result<T, JobError>;
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait JobDispatcher : Sized {
|
||||
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
|
||||
async fn lock(&self, job_internal: i64) -> JobResult<bool>;
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken);
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
|
||||
}
|
||||
|
||||
impl JobDispatcher for Context {
|
||||
|
@ -69,12 +70,15 @@ impl JobDispatcher for Context {
|
|||
Ok(true)
|
||||
}
|
||||
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken) {
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
|
||||
macro_rules! restart {
|
||||
(now) => { continue };
|
||||
() => {
|
||||
{
|
||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
|
||||
_ = wake.wait() => {},
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,14 +11,20 @@ pub fn spawn(
|
|||
poll: u64,
|
||||
filter: Option<upub::model::job::JobType>,
|
||||
stop: impl StopToken,
|
||||
wake: impl WakeToken,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
use dispatcher::JobDispatcher;
|
||||
tokio::spawn(async move {
|
||||
tracing::info!("starting worker task");
|
||||
ctx.run(concurrency, poll, filter, stop).await
|
||||
ctx.run(concurrency, poll, filter, stop, wake).await
|
||||
})
|
||||
}
|
||||
|
||||
pub trait StopToken: Sync + Send + 'static {
|
||||
fn stop(&self) -> bool;
|
||||
}
|
||||
|
||||
pub trait WakeToken: Sync + Send + 'static {
|
||||
// TODO this is bs...
|
||||
fn wait(&mut self) -> impl std::future::Future<Output = ()> + std::marker::Send;
|
||||
}
|
||||
|
|
|
@ -148,5 +148,7 @@ pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<
|
|||
|
||||
tx.commit().await?;
|
||||
|
||||
ctx.wake_workers();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue