forked from alemi/upub
chore: traits are back in core
worker is just a worker, everything else is upub
This commit is contained in:
parent
0c1160b42f
commit
52f1238052
38 changed files with 684 additions and 676 deletions
|
@ -12,6 +12,7 @@ readme = "README.md"
|
|||
|
||||
[dependencies]
|
||||
thiserror = "1"
|
||||
async-trait = "0.1"
|
||||
sha256 = "1.5"
|
||||
openssl = "0.10" # TODO handle pubkeys with a smaller crate
|
||||
base64 = "0.22"
|
||||
|
@ -25,12 +26,12 @@ serde-inline-default = "0.2"
|
|||
toml = "0.8"
|
||||
mdhtml = { path = "../../utils/mdhtml", features = ["markdown"] }
|
||||
uriproxy = { path = "../../utils/uriproxy" }
|
||||
httpsign = { path = "../../utils/httpsign/" }
|
||||
jrd = "0.1"
|
||||
tracing = "0.1"
|
||||
tokio = { version = "1.35", features = ["full"] } # TODO slim this down
|
||||
sea-orm = { version = "0.12", features = ["macros", "sqlx-sqlite", "runtime-tokio-rustls"] }
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
||||
axum = "0.7"
|
||||
apb = { path = "../../apb", features = ["unstructured", "orm", "activitypub-fe", "activitypub-counters", "litepub", "ostatus", "toot"] }
|
||||
# nodeinfo = "0.0.2" # the version on crates.io doesn't re-export necessary types to build the struct!!!
|
||||
nodeinfo = { git = "https://codeberg.org/thefederationinfo/nodeinfo-rs", rev = "e865094804" }
|
||||
|
|
|
@ -94,4 +94,8 @@ impl Config {
|
|||
}
|
||||
Config::default()
|
||||
}
|
||||
|
||||
pub fn frontend_url(&self, url: &str) -> Option<String> {
|
||||
Some(format!("{}{}", self.instance.frontend.as_deref()?, url))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::{collections::BTreeSet, sync::Arc};
|
|||
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
|
||||
use crate::{config::Config, errors::UpubError, model, ext::AnyQuery};
|
||||
use crate::{config::Config, ext::AnyQuery, model};
|
||||
use uriproxy::UriClass;
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -36,7 +36,7 @@ macro_rules! url {
|
|||
impl Context {
|
||||
|
||||
// TODO slim constructor down, maybe make a builder?
|
||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> crate::Result<Self> {
|
||||
pub async fn new(db: DatabaseConnection, mut domain: String, config: Config) -> Result<Self, crate::init::InitError> {
|
||||
let protocol = if domain.starts_with("http://")
|
||||
{ "http://" } else { "https://" }.to_string();
|
||||
if domain.ends_with('/') {
|
||||
|
@ -50,10 +50,10 @@ impl Context {
|
|||
let (actor, instance) = super::init::application(domain.clone(), base_url.clone(), &db).await?;
|
||||
|
||||
// TODO maybe we could provide a more descriptive error...
|
||||
let pkey = actor.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string();
|
||||
let pkey = actor.private_key.as_deref().ok_or_else(|| DbErr::RecordNotFound("application private key".into()))?.to_string();
|
||||
|
||||
let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?;
|
||||
let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(UpubError::internal_server_error)?;
|
||||
let relay_sinks = model::relation::Entity::followers(&actor.id, &db).await?.ok_or_else(|| DbErr::RecordNotFound(actor.id.clone()))?;
|
||||
let relay_sources = model::relation::Entity::following(&actor.id, &db).await?.ok_or_else(|| DbErr::RecordNotFound(actor.id.clone()))?;
|
||||
|
||||
let relay = Relays {
|
||||
sources: BTreeSet::from_iter(relay_sources),
|
||||
|
@ -98,6 +98,10 @@ impl Context {
|
|||
&self.0.base_url
|
||||
}
|
||||
|
||||
pub fn new_id() -> String {
|
||||
uuid::Uuid::new_v4().to_string()
|
||||
}
|
||||
|
||||
/// get full user id uri
|
||||
pub fn uid(&self, id: &str) -> String {
|
||||
uriproxy::uri(self.base(), UriClass::Actor, id)
|
|
@ -1,160 +0,0 @@
|
|||
use axum::{http::StatusCode, response::Redirect};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum UpubError {
|
||||
#[error("database error: {0:?}")]
|
||||
Database(#[from] sea_orm::DbErr),
|
||||
|
||||
#[error("{0}")]
|
||||
Status(axum::http::StatusCode),
|
||||
|
||||
#[error("{0}")]
|
||||
Field(#[from] apb::FieldErr),
|
||||
|
||||
#[error("openssl error: {0:?}")]
|
||||
OpenSSL(#[from] openssl::error::ErrorStack),
|
||||
|
||||
#[error("invalid UTF8 in key: {0:?}")]
|
||||
OpenSSLParse(#[from] std::str::Utf8Error),
|
||||
|
||||
#[error("fetch error: {0:?}")]
|
||||
Reqwest(#[from] reqwest::Error),
|
||||
|
||||
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
||||
// helps with debugging!
|
||||
#[error("fetch error: {0:?} -- server responded with {1}")]
|
||||
FetchError(reqwest::Error, String),
|
||||
|
||||
#[error("invalid base64 string: {0:?}")]
|
||||
Base64(#[from] base64::DecodeError),
|
||||
|
||||
#[error("type mismatch on object: expected {0:?}, found {1:?}")]
|
||||
Mismatch(apb::ObjectType, apb::ObjectType),
|
||||
|
||||
#[error("os I/O error: {0}")]
|
||||
IO(#[from] std::io::Error),
|
||||
|
||||
// TODO this isn't really an error but i need to redirect from some routes so this allows me to
|
||||
// keep the type hints on the return type, still what the hell!!!!
|
||||
#[error("redirecting to {0}")]
|
||||
Redirect(String),
|
||||
}
|
||||
|
||||
impl UpubError {
|
||||
pub fn bad_request() -> Self {
|
||||
Self::Status(axum::http::StatusCode::BAD_REQUEST)
|
||||
}
|
||||
|
||||
pub fn unprocessable() -> Self {
|
||||
Self::Status(axum::http::StatusCode::UNPROCESSABLE_ENTITY)
|
||||
}
|
||||
|
||||
pub fn not_found() -> Self {
|
||||
Self::Status(axum::http::StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
pub fn forbidden() -> Self {
|
||||
Self::Status(axum::http::StatusCode::FORBIDDEN)
|
||||
}
|
||||
|
||||
pub fn unauthorized() -> Self {
|
||||
Self::Status(axum::http::StatusCode::UNAUTHORIZED)
|
||||
}
|
||||
|
||||
pub fn not_modified() -> Self {
|
||||
Self::Status(axum::http::StatusCode::NOT_MODIFIED)
|
||||
}
|
||||
|
||||
pub fn internal_server_error() -> Self {
|
||||
Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
|
||||
pub type UpubResult<T> = Result<T, UpubError>;
|
||||
|
||||
impl From<axum::http::StatusCode> for UpubError {
|
||||
fn from(value: axum::http::StatusCode) -> Self {
|
||||
UpubError::Status(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl axum::response::IntoResponse for UpubError {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
// TODO it's kind of jank to hide this print down here, i should probably learn how spans work
|
||||
// in tracing and use the library's features but ehhhh
|
||||
tracing::debug!("emitting error response: {self:?}");
|
||||
let descr = self.to_string();
|
||||
match self {
|
||||
UpubError::Redirect(to) => Redirect::to(&to).into_response(),
|
||||
UpubError::Status(status) => status.into_response(),
|
||||
UpubError::Database(e) => (
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "database",
|
||||
"inner": format!("{e:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
UpubError::Reqwest(x) | UpubError::FetchError(x, _) => (
|
||||
x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "request",
|
||||
"status": x.status().map(|s| s.to_string()).unwrap_or_default(),
|
||||
"url": x.url().map(|x| x.to_string()).unwrap_or_default(),
|
||||
"description": descr,
|
||||
"inner": format!("{x:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
UpubError::Field(x) => (
|
||||
axum::http::StatusCode::BAD_REQUEST,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "field",
|
||||
"field": x.0.to_string(),
|
||||
"description": descr,
|
||||
}))
|
||||
).into_response(),
|
||||
UpubError::Mismatch(expected, found) => (
|
||||
axum::http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "type",
|
||||
"expected": expected.as_ref().to_string(),
|
||||
"found": found.as_ref().to_string(),
|
||||
"description": descr,
|
||||
}))
|
||||
).into_response(),
|
||||
x => (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "unknown",
|
||||
"description": descr,
|
||||
"inner": format!("{x:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait LoggableError {
|
||||
fn info_failed(self, msg: &str);
|
||||
fn warn_failed(self, msg: &str);
|
||||
fn err_failed(self, msg: &str);
|
||||
}
|
||||
|
||||
impl<T, E: std::error::Error> LoggableError for Result<T, E> {
|
||||
fn info_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::info!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn warn_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::warn!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn err_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::error!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,19 +1,45 @@
|
|||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
pub trait AnyQuery {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr>;
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
impl<T : sea_orm::EntityTrait> AnyQuery for sea_orm::Select<T> {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr> {
|
||||
Ok(self.one(db).await?.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
impl<T : sea_orm::SelectorTrait + Send> AnyQuery for sea_orm::Selector<T> {
|
||||
async fn any(self, db: &sea_orm::DatabaseConnection) -> Result<bool, sea_orm::DbErr> {
|
||||
Ok(self.one(db).await?.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait LoggableError {
|
||||
fn info_failed(self, msg: &str);
|
||||
fn warn_failed(self, msg: &str);
|
||||
fn err_failed(self, msg: &str);
|
||||
}
|
||||
|
||||
impl<T, E: std::error::Error> LoggableError for Result<T, E> {
|
||||
fn info_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::info!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn warn_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::warn!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
fn err_failed(self, msg: &str) {
|
||||
if let Err(e) = self {
|
||||
tracing::error!("{} : {}", msg, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,11 +3,23 @@ use sea_orm::{ActiveValue::{NotSet, Set}, DatabaseConnection, EntityTrait};
|
|||
|
||||
use crate::model;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum InitError {
|
||||
#[error("database error: {0:?}")]
|
||||
Database(#[from] sea_orm::DbErr),
|
||||
|
||||
#[error("openssl error: {0:?}")]
|
||||
OpenSSL(#[from] openssl::error::ErrorStack),
|
||||
|
||||
#[error("pem format error: {0:?}")]
|
||||
KeyError(#[from] std::str::Utf8Error),
|
||||
}
|
||||
|
||||
pub async fn application(
|
||||
domain: String,
|
||||
base_url: String,
|
||||
db: &DatabaseConnection
|
||||
) -> crate::Result<(model::actor::Model, model::instance::Model)> {
|
||||
) -> Result<(model::actor::Model, model::instance::Model), InitError> {
|
||||
Ok((
|
||||
match model::actor::Entity::find_by_ap_id(&base_url).one(db).await? {
|
||||
Some(model) => model,
|
|
@ -1,12 +1,15 @@
|
|||
pub mod config;
|
||||
pub mod errors;
|
||||
pub mod server;
|
||||
pub mod model;
|
||||
pub mod traits;
|
||||
|
||||
pub mod context;
|
||||
pub use context::Context;
|
||||
|
||||
pub mod config;
|
||||
pub use config::Config;
|
||||
|
||||
pub mod init;
|
||||
pub mod ext;
|
||||
|
||||
pub use server::Context;
|
||||
pub use config::Config;
|
||||
pub use errors::UpubResult as Result;
|
||||
pub use errors::UpubError as Error;
|
||||
pub use traits::normalize::AP;
|
||||
|
||||
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
|
|
@ -48,12 +48,12 @@ impl Model {
|
|||
}
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
pub trait BatchFillable {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr>;
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
impl BatchFillable for &[Event] {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
|
||||
let objects : Vec<crate::model::object::Model> = self
|
||||
|
@ -80,14 +80,14 @@ impl BatchFillable for &[Event] {
|
|||
}
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
impl BatchFillable for Vec<Event> {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
|
||||
self.as_slice().load_attachments_batch(db).await
|
||||
}
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
impl BatchFillable for Event {
|
||||
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
|
||||
let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient
|
||||
|
|
|
@ -9,7 +9,7 @@ pub mod session;
|
|||
pub mod addressing;
|
||||
pub mod instance;
|
||||
pub mod delivery;
|
||||
pub mod processing;
|
||||
pub mod job;
|
||||
|
||||
pub mod relation;
|
||||
pub mod announce;
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
pub mod admin;
|
||||
pub mod context;
|
||||
pub mod init;
|
||||
|
||||
pub use context::Context;
|
|
@ -1,6 +1,6 @@
|
|||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||
|
||||
use crate::fetch::Fetcher;
|
||||
use crate::traits::fetch::Fetcher;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Addresser {
|
||||
|
@ -12,13 +12,13 @@ pub trait Addresser {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Addresser for upub::Context {
|
||||
impl Addresser for crate::Context {
|
||||
async fn expand_addressing(&self, targets: Vec<String>) -> Result<Vec<String>, DbErr> {
|
||||
let mut out = Vec::new();
|
||||
for target in targets {
|
||||
if target.ends_with("/followers") {
|
||||
let target_id = target.replace("/followers", "");
|
||||
let mut followers = upub::model::relation::Entity::followers(&target_id, self.db())
|
||||
let mut followers = crate::model::relation::Entity::followers(&target_id, self.db())
|
||||
.await?
|
||||
.unwrap_or_else(Vec::new);
|
||||
if followers.is_empty() { // stuff with zero addressing will never be seen again!!! TODO
|
||||
|
@ -48,8 +48,8 @@ impl Addresser for upub::Context {
|
|||
{
|
||||
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
|
||||
match (
|
||||
upub::model::instance::Entity::domain_to_internal(&upub::Context::server(target), self.db()).await?,
|
||||
upub::model::actor::Entity::ap_to_internal(target, self.db()).await?,
|
||||
crate::model::instance::Entity::domain_to_internal(&crate::Context::server(target), self.db()).await?,
|
||||
crate::model::actor::Entity::ap_to_internal(target, self.db()).await?,
|
||||
) {
|
||||
(Some(server), Some(actor)) => (Some(server), Some(actor)),
|
||||
(None, _) => { tracing::error!("failed resolving domain"); continue; },
|
||||
|
@ -57,7 +57,7 @@ impl Addresser for upub::Context {
|
|||
}
|
||||
};
|
||||
addressing.push(
|
||||
upub::model::addressing::ActiveModel {
|
||||
crate::model::addressing::ActiveModel {
|
||||
internal: NotSet,
|
||||
instance: Set(server),
|
||||
actor: Set(actor),
|
||||
|
@ -69,7 +69,7 @@ impl Addresser for upub::Context {
|
|||
}
|
||||
|
||||
if !addressing.is_empty() {
|
||||
upub::model::addressing::Entity::insert_many(addressing)
|
||||
crate::model::addressing::Entity::insert_many(addressing)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -81,13 +81,13 @@ impl Addresser for upub::Context {
|
|||
let mut deliveries = Vec::new();
|
||||
for target in targets.iter()
|
||||
.filter(|to| !to.is_empty())
|
||||
.filter(|to| upub::Context::server(to) != self.domain())
|
||||
.filter(|to| crate::Context::server(to) != self.domain())
|
||||
.filter(|to| to != &apb::target::PUBLIC)
|
||||
{
|
||||
// TODO fetch concurrently
|
||||
match self.fetch_user(target).await {
|
||||
Ok(upub::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
|
||||
upub::model::delivery::ActiveModel {
|
||||
Ok(crate::model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
|
||||
crate::model::delivery::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
actor: Set(from.to_string()),
|
||||
// TODO we should resolve each user by id and check its inbox because we can't assume
|
||||
|
@ -105,7 +105,7 @@ impl Addresser for upub::Context {
|
|||
}
|
||||
|
||||
if !deliveries.is_empty() {
|
||||
upub::model::delivery::Entity::insert_many(deliveries)
|
||||
crate::model::delivery::Entity::insert_many(deliveries)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -119,12 +119,12 @@ impl Addresser for upub::Context {
|
|||
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
|
||||
async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> Result<(), DbErr> {
|
||||
let addressed = self.expand_addressing(activity_targets).await?;
|
||||
let internal_aid = upub::model::activity::Entity::ap_to_internal(aid, self.db())
|
||||
let internal_aid = crate::model::activity::Entity::ap_to_internal(aid, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(aid.to_string()))?;
|
||||
let internal_oid = if let Some(o) = oid {
|
||||
Some(
|
||||
upub::model::object::Entity::ap_to_internal(o, self.db())
|
||||
crate::model::object::Entity::ap_to_internal(o, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(o.to_string()))?
|
||||
)
|
|
@ -1,6 +1,6 @@
|
|||
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||
|
||||
#[axum::async_trait]
|
||||
#[async_trait::async_trait]
|
||||
pub trait Administrable {
|
||||
async fn register_user(
|
||||
&self,
|
||||
|
@ -13,8 +13,8 @@ pub trait Administrable {
|
|||
) -> Result<(), DbErr>;
|
||||
}
|
||||
|
||||
#[axum::async_trait]
|
||||
impl Administrable for super::Context {
|
||||
#[async_trait::async_trait]
|
||||
impl Administrable for crate::Context {
|
||||
async fn register_user(
|
||||
&self,
|
||||
username: String,
|
|
@ -4,7 +4,9 @@ use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object
|
|||
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
|
||||
use sea_orm::{DbErr, EntityTrait, IntoActiveModel, NotSet};
|
||||
|
||||
use super::{address::Addresser, normalize::Normalizer};
|
||||
use crate::traits::normalize::AP;
|
||||
|
||||
use super::{Addresser, Normalizer};
|
||||
use httpsign::HttpSignature;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -32,7 +34,7 @@ pub enum PullError {
|
|||
Malformed(#[from] apb::FieldErr),
|
||||
|
||||
#[error("error normalizing resource: {0:?}")]
|
||||
Normalization(#[from] crate::normalize::NormalizerError),
|
||||
Normalization(#[from] crate::traits::normalize::NormalizerError),
|
||||
|
||||
#[error("too many redirects while resolving resource id, aborting")]
|
||||
TooManyRedirects,
|
||||
|
@ -84,19 +86,19 @@ pub trait Fetcher {
|
|||
|
||||
async fn webfinger(&self, user: &str, host: &str) -> Result<Option<String>, PullError>;
|
||||
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<upub::model::instance::Model, PullError>;
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<crate::model::instance::Model, PullError>;
|
||||
|
||||
async fn fetch_user(&self, id: &str) -> Result<upub::model::actor::Model, PullError>;
|
||||
async fn resolve_user(&self, actor: serde_json::Value) -> Result<upub::model::actor::Model, PullError>;
|
||||
async fn fetch_user(&self, id: &str) -> Result<crate::model::actor::Model, PullError>;
|
||||
async fn resolve_user(&self, actor: serde_json::Value) -> Result<crate::model::actor::Model, PullError>;
|
||||
|
||||
async fn fetch_activity(&self, id: &str) -> Result<upub::model::activity::Model, PullError>;
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<upub::model::activity::Model, PullError>;
|
||||
async fn fetch_activity(&self, id: &str) -> Result<crate::model::activity::Model, PullError>;
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError>;
|
||||
|
||||
async fn fetch_object(&self, id: &str) -> Result<upub::model::object::Model, PullError> { self.fetch_object_r(id, 0).await }
|
||||
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result<upub::model::object::Model, PullError> { self.resolve_object_r(object, 0).await }
|
||||
async fn fetch_object(&self, id: &str) -> Result<crate::model::object::Model, PullError> { self.fetch_object_r(id, 0).await }
|
||||
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result<crate::model::object::Model, PullError> { self.resolve_object_r(object, 0).await }
|
||||
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<upub::model::object::Model, PullError>;
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<upub::model::object::Model, PullError>;
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError>;
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError>;
|
||||
|
||||
|
||||
async fn fetch_thread(&self, id: &str) -> Result<(), PullError>;
|
||||
|
@ -109,7 +111,7 @@ pub trait Fetcher {
|
|||
key: &str,
|
||||
domain: &str,
|
||||
) -> Result<Response, PullError> {
|
||||
let host = upub::Context::server(url);
|
||||
let host = crate::Context::server(url);
|
||||
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(); // lmao @ "GMT"
|
||||
let path = url.replace("https://", "").replace("http://", "").replace(&host, "");
|
||||
let digest = httpsign::digest(payload.unwrap_or_default());
|
||||
|
@ -136,7 +138,7 @@ pub trait Fetcher {
|
|||
.request(method.clone(), url)
|
||||
.header(ACCEPT, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
|
||||
.header(CONTENT_TYPE, "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"")
|
||||
.header(USER_AGENT, format!("upub+{} ({domain})", upub::VERSION))
|
||||
.header(USER_AGENT, format!("upub+{} ({domain})", crate::VERSION))
|
||||
.header("Host", host.clone())
|
||||
.header("Date", date.clone())
|
||||
.header("Digest", digest)
|
||||
|
@ -159,9 +161,9 @@ pub trait Fetcher {
|
|||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Fetcher for upub::Context {
|
||||
impl Fetcher for crate::Context {
|
||||
async fn pull_r(&self, id: &str, depth: u32) -> Result<Pull<serde_json::Value>, PullError> {
|
||||
let _domain = self.fetch_domain(&upub::Context::server(id)).await?;
|
||||
let _domain = self.fetch_domain(&crate::Context::server(id)).await?;
|
||||
|
||||
let document = Self::request(
|
||||
Method::GET, id, None,
|
||||
|
@ -195,7 +197,7 @@ impl Fetcher for upub::Context {
|
|||
let resource = reqwest::Client::new()
|
||||
.get(webfinger_uri)
|
||||
.header(ACCEPT, "application/jrd+json")
|
||||
.header(USER_AGENT, format!("upub+{} ({})", upub::VERSION, self.domain()))
|
||||
.header(USER_AGENT, format!("upub+{} ({})", crate::VERSION, self.domain()))
|
||||
.send()
|
||||
.await?
|
||||
.json::<jrd::JsonResourceDescriptor>()
|
||||
|
@ -221,12 +223,12 @@ impl Fetcher for upub::Context {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<upub::model::instance::Model, PullError> {
|
||||
if let Some(x) = upub::model::instance::Entity::find_by_domain(domain).one(self.db()).await? {
|
||||
async fn fetch_domain(&self, domain: &str) -> Result<crate::model::instance::Model, PullError> {
|
||||
if let Some(x) = crate::model::instance::Entity::find_by_domain(domain).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
let mut instance_model = upub::model::instance::Model {
|
||||
let mut instance_model = crate::model::instance::Model {
|
||||
internal: 0,
|
||||
domain: domain.to_string(),
|
||||
name: None,
|
||||
|
@ -254,7 +256,7 @@ impl Fetcher for upub::Context {
|
|||
}
|
||||
}
|
||||
|
||||
if let Ok(nodeinfo) = upub::model::instance::Entity::nodeinfo(domain).await {
|
||||
if let Ok(nodeinfo) = crate::model::instance::Entity::nodeinfo(domain).await {
|
||||
instance_model.software = Some(nodeinfo.software.name);
|
||||
instance_model.version = nodeinfo.software.version;
|
||||
instance_model.users = nodeinfo.usage.users.and_then(|x| x.total);
|
||||
|
@ -263,8 +265,8 @@ impl Fetcher for upub::Context {
|
|||
|
||||
let mut active_model = instance_model.clone().into_active_model();
|
||||
active_model.internal = NotSet;
|
||||
upub::model::instance::Entity::insert(active_model).exec(self.db()).await?;
|
||||
let internal = upub::model::instance::Entity::domain_to_internal(domain, self.db())
|
||||
crate::model::instance::Entity::insert(active_model).exec(self.db()).await?;
|
||||
let internal = crate::model::instance::Entity::domain_to_internal(domain, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(domain.to_string()))?;
|
||||
instance_model.internal = internal;
|
||||
|
@ -272,7 +274,7 @@ impl Fetcher for upub::Context {
|
|||
Ok(instance_model)
|
||||
}
|
||||
|
||||
async fn resolve_user(&self, mut document: serde_json::Value) -> Result<upub::model::actor::Model, PullError> {
|
||||
async fn resolve_user(&self, mut document: serde_json::Value) -> Result<crate::model::actor::Model, PullError> {
|
||||
let id = document.id()?.to_string();
|
||||
|
||||
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs every time
|
||||
|
@ -304,24 +306,24 @@ impl Fetcher for upub::Context {
|
|||
}
|
||||
}
|
||||
|
||||
let user_model = upub::model::actor::ActiveModel::new(&document)?;
|
||||
let user_model = AP::actor_q(&document)?;
|
||||
|
||||
// TODO this may fail: while fetching, remote server may fetch our service actor.
|
||||
// if it does so with http signature, we will fetch that actor in background
|
||||
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
|
||||
upub::model::actor::Entity::insert(user_model).exec(self.db()).await?;
|
||||
crate::model::actor::Entity::insert(user_model).exec(self.db()).await?;
|
||||
|
||||
// TODO fetch it back to get the internal id
|
||||
Ok(
|
||||
upub::model::actor::Entity::find_by_ap_id(&id)
|
||||
crate::model::actor::Entity::find_by_ap_id(&id)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
|
||||
)
|
||||
}
|
||||
|
||||
async fn fetch_user(&self, id: &str) -> Result<upub::model::actor::Model, PullError> {
|
||||
if let Some(x) = upub::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
async fn fetch_user(&self, id: &str) -> Result<crate::model::actor::Model, PullError> {
|
||||
if let Some(x) = crate::model::actor::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
|
@ -330,8 +332,8 @@ impl Fetcher for upub::Context {
|
|||
self.resolve_user(document).await
|
||||
}
|
||||
|
||||
async fn fetch_activity(&self, id: &str) -> Result<upub::model::activity::Model, PullError> {
|
||||
if let Some(x) = upub::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
async fn fetch_activity(&self, id: &str) -> Result<crate::model::activity::Model, PullError> {
|
||||
if let Some(x) = crate::model::activity::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
|
@ -340,15 +342,15 @@ impl Fetcher for upub::Context {
|
|||
self.resolve_activity(activity).await
|
||||
}
|
||||
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<upub::model::activity::Model, PullError> {
|
||||
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError> {
|
||||
if let Ok(activity_actor) = activity.actor().id() {
|
||||
if let Err(e) = self.fetch_user(&activity_actor).await {
|
||||
if let Err(e) = self.fetch_user(activity_actor).await {
|
||||
tracing::warn!("could not get actor of fetched activity: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(activity_object) = activity.object().id() {
|
||||
if let Err(e) = self.fetch_object(&activity_object).await {
|
||||
if let Err(e) = self.fetch_object(activity_object).await {
|
||||
tracing::warn!("could not get object of fetched activity: {e}");
|
||||
}
|
||||
}
|
||||
|
@ -367,8 +369,8 @@ impl Fetcher for upub::Context {
|
|||
todo!()
|
||||
}
|
||||
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<upub::model::object::Model, PullError> {
|
||||
if let Some(x) = upub::model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError> {
|
||||
if let Some(x) = crate::model::object::Entity::find_by_ap_id(id).one(self.db()).await? {
|
||||
return Ok(x); // already in db, easy
|
||||
}
|
||||
|
||||
|
@ -377,12 +379,12 @@ impl Fetcher for upub::Context {
|
|||
self.resolve_object_r(object, depth).await
|
||||
}
|
||||
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<upub::model::object::Model, PullError> {
|
||||
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError> {
|
||||
let id = object.id()?.to_string();
|
||||
|
||||
if let Ok(oid) = object.id() {
|
||||
if oid != id {
|
||||
if let Some(x) = upub::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? {
|
||||
if let Some(x) = crate::model::object::Entity::find_by_ap_id(oid).one(self.db()).await? {
|
||||
return Ok(x); // already in db, but with id different that given url
|
||||
}
|
||||
}
|
||||
|
@ -415,14 +417,14 @@ impl Fetcher for upub::Context {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Fetchable : Sync + Send {
|
||||
async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError>;
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Fetchable for apb::Node<serde_json::Value> {
|
||||
async fn fetch(&mut self, ctx: &upub::Context) -> Result<&mut Self, PullError> {
|
||||
async fn fetch(&mut self, ctx: &crate::Context) -> Result<&mut Self, PullError> {
|
||||
if let apb::Node::Link(uri) = self {
|
||||
*self = upub::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||
*self = crate::Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain())
|
||||
.await?
|
||||
.json::<serde_json::Value>()
|
||||
.await?
|
||||
|
@ -434,14 +436,14 @@ impl Fetchable for apb::Node<serde_json::Value> {
|
|||
}
|
||||
|
||||
// #[async_recursion::async_recursion]
|
||||
// async fn crawl_replies(ctx: &upub::Context, id: &str, depth: usize) -> Result<(), PullError> {
|
||||
// async fn crawl_replies(ctx: &crate::Context, id: &str, depth: usize) -> Result<(), PullError> {
|
||||
// tracing::info!("crawling replies of '{id}'");
|
||||
// let object = upub::Context::request(
|
||||
// let object = crate::Context::request(
|
||||
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
//
|
||||
// let object_model = upub::model::object::Model::new(&object)?;
|
||||
// match upub::model::object::Entity::insert(object_model.into_active_model())
|
||||
// let object_model = crate::model::object::Model::new(&object)?;
|
||||
// match crate::model::object::Entity::insert(object_model.into_active_model())
|
||||
// .exec(ctx.db()).await
|
||||
// {
|
||||
// Ok(_) => {},
|
||||
|
@ -457,7 +459,7 @@ impl Fetchable for apb::Node<serde_json::Value> {
|
|||
//
|
||||
// let mut page_url = match object.replies().get() {
|
||||
// Some(serde_json::Value::String(x)) => {
|
||||
// let replies = upub::Context::request(
|
||||
// let replies = crate::Context::request(
|
||||
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
// replies.first().id()
|
||||
|
@ -470,7 +472,7 @@ impl Fetchable for apb::Node<serde_json::Value> {
|
|||
// };
|
||||
//
|
||||
// while let Some(ref url) = page_url {
|
||||
// let replies = upub::Context::request(
|
||||
// let replies = crate::Context::request(
|
||||
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
|
||||
// ).await?.json::<serde_json::Value>().await?;
|
||||
//
|
11
upub/core/src/traits/mod.rs
Normal file
11
upub/core/src/traits/mod.rs
Normal file
|
@ -0,0 +1,11 @@
|
|||
pub mod address;
|
||||
pub mod fetch;
|
||||
pub mod normalize;
|
||||
pub mod process;
|
||||
pub mod admin;
|
||||
|
||||
pub use admin::Administrable;
|
||||
pub use address::Addresser;
|
||||
pub use normalize::Normalizer;
|
||||
pub use process::Processor;
|
||||
pub use fetch::Fetcher;
|
|
@ -12,14 +12,14 @@ pub enum NormalizerError {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Normalizer {
|
||||
async fn insert_object(&self, obj: impl apb::Object) -> Result<upub::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity) -> Result<upub::model::activity::Model, NormalizerError>;
|
||||
async fn insert_object(&self, obj: impl apb::Object) -> Result<crate::model::object::Model, NormalizerError>;
|
||||
async fn insert_activity(&self, act: impl apb::Activity) -> Result<crate::model::activity::Model, NormalizerError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Normalizer for upub::Context {
|
||||
impl Normalizer for crate::Context {
|
||||
|
||||
async fn insert_object(&self, object: impl apb::Object) -> Result<upub::model::object::Model, NormalizerError> {
|
||||
async fn insert_object(&self, object: impl apb::Object) -> Result<crate::model::object::Model, NormalizerError> {
|
||||
let oid = object.id()?.to_string();
|
||||
let uid = object.attributed_to().id().str();
|
||||
let t = object.object_type()?;
|
||||
|
@ -45,7 +45,7 @@ impl Normalizer for upub::Context {
|
|||
// > kind of dumb. there should be a job system so this can be done in waves. or maybe there's
|
||||
// > some whole other way to do this?? im thinking but misskey aaaa!! TODO
|
||||
if let Set(Some(ref reply)) = object_active_model.in_reply_to {
|
||||
if let Some(o) = upub::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? {
|
||||
if let Some(o) = crate::model::object::Entity::find_by_ap_id(reply).one(self.db()).await? {
|
||||
object_active_model.context = Set(o.context);
|
||||
} else {
|
||||
object_active_model.context = Set(None); // TODO to be filled by some other task
|
||||
|
@ -54,25 +54,25 @@ impl Normalizer for upub::Context {
|
|||
object_active_model.context = Set(Some(oid.clone()));
|
||||
}
|
||||
|
||||
upub::model::object::Entity::insert(object_active_model).exec(self.db()).await?;
|
||||
let object_model = upub::model::object::Entity::find_by_ap_id(&oid)
|
||||
crate::model::object::Entity::insert(object_active_model).exec(self.db()).await?;
|
||||
let object_model = crate::model::object::Entity::find_by_ap_id(&oid)
|
||||
.one(self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(oid.to_string()))?;
|
||||
|
||||
// update replies counter
|
||||
if let Some(ref in_reply_to) = object_model.in_reply_to {
|
||||
upub::model::object::Entity::update_many()
|
||||
.filter(upub::model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(upub::model::object::Column::Replies, Expr::col(upub::model::object::Column::Replies).add(1))
|
||||
crate::model::object::Entity::update_many()
|
||||
.filter(crate::model::object::Column::Id.eq(in_reply_to))
|
||||
.col_expr(crate::model::object::Column::Replies, Expr::col(crate::model::object::Column::Replies).add(1))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
// update statuses counter
|
||||
if let Some(object_author) = uid {
|
||||
upub::model::actor::Entity::update_many()
|
||||
.col_expr(upub::model::actor::Column::StatusesCount, Expr::col(upub::model::actor::Column::StatusesCount).add(1))
|
||||
.filter(upub::model::actor::Column::Id.eq(&object_author))
|
||||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(crate::model::actor::Column::StatusesCount, Expr::col(crate::model::actor::Column::StatusesCount).add(1))
|
||||
.filter(crate::model::actor::Column::Id.eq(&object_author))
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ impl Normalizer for upub::Context {
|
|||
tracing::warn!("ignoring array-in-array while processing attachments");
|
||||
continue
|
||||
},
|
||||
Node::Link(l) => upub::model::attachment::ActiveModel {
|
||||
Node::Link(l) => crate::model::attachment::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
url: Set(l.href().to_string()),
|
||||
object: Set(object_model.internal),
|
||||
|
@ -96,7 +96,7 @@ impl Normalizer for upub::Context {
|
|||
Node::Object(o) =>
|
||||
AP::attachment_q(o.as_document()?, object_model.internal)?,
|
||||
};
|
||||
upub::model::attachment::Entity::insert(attachment_model)
|
||||
crate::model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ impl Normalizer for upub::Context {
|
|||
}
|
||||
}
|
||||
|
||||
upub::model::attachment::Entity::insert(attachment_model)
|
||||
crate::model::attachment::Entity::insert(attachment_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
}
|
||||
|
@ -131,16 +131,16 @@ impl Normalizer for upub::Context {
|
|||
Ok(object_model)
|
||||
}
|
||||
|
||||
async fn insert_activity(&self, activity: impl apb::Activity) -> Result<upub::model::activity::Model, NormalizerError> {
|
||||
async fn insert_activity(&self, activity: impl apb::Activity) -> Result<crate::model::activity::Model, NormalizerError> {
|
||||
let mut activity_model = AP::activity(&activity)?;
|
||||
|
||||
let mut active_model = activity_model.clone().into_active_model();
|
||||
active_model.internal = NotSet;
|
||||
upub::model::activity::Entity::insert(active_model)
|
||||
crate::model::activity::Entity::insert(active_model)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
let internal = upub::model::activity::Entity::ap_to_internal(&activity_model.id, self.db())
|
||||
let internal = crate::model::activity::Entity::ap_to_internal(&activity_model.id, self.db())
|
||||
.await?
|
||||
.ok_or_else(|| DbErr::RecordNotFound(activity_model.id.clone()))?;
|
||||
activity_model.internal = internal;
|
||||
|
@ -152,8 +152,8 @@ impl Normalizer for upub::Context {
|
|||
pub struct AP;
|
||||
|
||||
impl AP {
|
||||
pub fn activity(activity: &impl apb::Activity) -> Result<upub::model::activity::Model, apb::FieldErr> {
|
||||
Ok(upub::model::activity::Model {
|
||||
pub fn activity(activity: &impl apb::Activity) -> Result<crate::model::activity::Model, apb::FieldErr> {
|
||||
Ok(crate::model::activity::Model {
|
||||
internal: 0,
|
||||
id: activity.id()?.to_string(),
|
||||
activity_type: activity.activity_type()?,
|
||||
|
@ -168,7 +168,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn activity_q(activity: &impl apb::Activity) -> Result<upub::model::activity::ActiveModel, apb::FieldErr> {
|
||||
pub fn activity_q(activity: &impl apb::Activity) -> Result<crate::model::activity::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::activity(activity)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
||||
|
@ -177,8 +177,8 @@ impl AP {
|
|||
|
||||
|
||||
|
||||
pub fn attachment(document: &impl apb::Document, parent: i64) -> Result<upub::model::attachment::Model, apb::FieldErr> {
|
||||
Ok(upub::model::attachment::Model {
|
||||
pub fn attachment(document: &impl apb::Document, parent: i64) -> Result<crate::model::attachment::Model, apb::FieldErr> {
|
||||
Ok(crate::model::attachment::Model {
|
||||
internal: 0,
|
||||
url: document.url().id().str().unwrap_or_default(),
|
||||
object: parent,
|
||||
|
@ -189,7 +189,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result<upub::model::attachment::ActiveModel, apb::FieldErr> {
|
||||
pub fn attachment_q(document: &impl apb::Document, parent: i64) -> Result<crate::model::attachment::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::attachment(document, parent)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
||||
|
@ -197,7 +197,7 @@ impl AP {
|
|||
|
||||
|
||||
|
||||
pub fn object(object: &impl apb::Object) -> Result<upub::model::object::Model, apb::FieldErr> {
|
||||
pub fn object(object: &impl apb::Object) -> Result<crate::model::object::Model, apb::FieldErr> {
|
||||
let t = object.object_type()?;
|
||||
if matches!(t,
|
||||
apb::ObjectType::Activity(_)
|
||||
|
@ -207,7 +207,7 @@ impl AP {
|
|||
) {
|
||||
return Err(apb::FieldErr("type"));
|
||||
}
|
||||
Ok(upub::model::object::Model {
|
||||
Ok(crate::model::object::Model {
|
||||
internal: 0,
|
||||
id: object.id()?.to_string(),
|
||||
object_type: t,
|
||||
|
@ -235,7 +235,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn object_q(object: &impl apb::Object) -> Result<upub::model::object::ActiveModel, apb::FieldErr> {
|
||||
pub fn object_q(object: &impl apb::Object) -> Result<crate::model::object::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::object(object)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
||||
|
@ -243,7 +243,7 @@ impl AP {
|
|||
|
||||
|
||||
|
||||
pub fn actor(actor: &impl apb::Actor) -> Result<upub::model::actor::Model, apb::FieldErr> {
|
||||
pub fn actor(actor: &impl apb::Actor) -> Result<crate::model::actor::Model, apb::FieldErr> {
|
||||
let ap_id = actor.id()?.to_string();
|
||||
let (domain, fallback_preferred_username) = {
|
||||
let clean = ap_id
|
||||
|
@ -254,7 +254,7 @@ impl AP {
|
|||
let last = splits.last().unwrap_or(first);
|
||||
(first.to_string(), last.to_string())
|
||||
};
|
||||
Ok(upub::model::actor::Model {
|
||||
Ok(crate::model::actor::Model {
|
||||
internal: 0,
|
||||
domain,
|
||||
id: ap_id,
|
||||
|
@ -279,7 +279,7 @@ impl AP {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn actor_q(actor: &impl apb::Actor) -> Result<upub::model::actor::ActiveModel, apb::FieldErr> {
|
||||
pub fn actor_q(actor: &impl apb::Actor) -> Result<crate::model::actor::ActiveModel, apb::FieldErr> {
|
||||
let mut m = AP::actor(actor)?.into_active_model();
|
||||
m.internal = NotSet;
|
||||
Ok(m)
|
|
@ -1,7 +1,6 @@
|
|||
use apb::{target::Addressed, Activity, Base, Object};
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
use upub::{errors::LoggableError, ext::AnyQuery};
|
||||
use crate::{address::Addresser, fetch::{Fetcher, Pull}, normalize::Normalizer};
|
||||
use crate::{ext::{AnyQuery, LoggableError}, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProcessorError {
|
||||
|
@ -24,10 +23,10 @@ pub enum ProcessorError {
|
|||
Unprocessable,
|
||||
|
||||
#[error("failed normalizing and inserting entity: {0:?}")]
|
||||
NormalizerError(#[from] crate::normalize::NormalizerError),
|
||||
NormalizerError(#[from] crate::traits::normalize::NormalizerError),
|
||||
|
||||
#[error("failed fetching resource: {0:?}")]
|
||||
PullError(#[from] crate::fetch::PullError),
|
||||
PullError(#[from] crate::traits::fetch::PullError),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
@ -36,7 +35,7 @@ pub trait Processor {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Processor for upub::Context {
|
||||
impl Processor for crate::Context {
|
||||
async fn process(&self, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
// TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||
match activity.activity_type()? {
|
||||
|
@ -55,7 +54,7 @@ impl Processor for upub::Context {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn create(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn create(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let Some(object_node) = activity.object().extract() else {
|
||||
// TODO we could process non-embedded activities or arrays but im lazy rn
|
||||
tracing::error!("refusing to process activity without embedded object");
|
||||
|
@ -74,15 +73,15 @@ pub async fn create(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn like(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let object_uri = activity.object().id()?.to_string();
|
||||
let published = activity.published().unwrap_or_else(|_|chrono::Utc::now());
|
||||
let obj = ctx.fetch_object(&object_uri).await?;
|
||||
if upub::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal)
|
||||
if crate::model::like::Entity::find_by_uid_oid(internal_uid, obj.internal)
|
||||
.any(ctx.db())
|
||||
.await?
|
||||
{
|
||||
|
@ -91,26 +90,26 @@ pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(
|
|||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
|
||||
let like = upub::model::like::ActiveModel {
|
||||
let like = crate::model::like::ActiveModel {
|
||||
internal: NotSet,
|
||||
actor: Set(internal_uid),
|
||||
object: Set(obj.internal),
|
||||
activity: Set(activity_model.internal),
|
||||
published: Set(published),
|
||||
};
|
||||
upub::model::like::Entity::insert(like).exec(ctx.db()).await?;
|
||||
upub::model::object::Entity::update_many()
|
||||
.col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).add(1))
|
||||
.filter(upub::model::object::Column::Internal.eq(obj.internal))
|
||||
crate::model::like::Entity::insert(like).exec(ctx.db()).await?;
|
||||
crate::model::object::Entity::update_many()
|
||||
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).add(1))
|
||||
.filter(crate::model::object::Column::Internal.eq(obj.internal))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
||||
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!!
|
||||
expanded_addressing.push(
|
||||
upub::model::object::Entity::find_by_id(obj.internal)
|
||||
crate::model::object::Entity::find_by_id(obj.internal)
|
||||
.select_only()
|
||||
.select_column(upub::model::object::Column::AttributedTo)
|
||||
.select_column(crate::model::object::Column::AttributedTo)
|
||||
.into_tuple::<String>()
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
|
@ -122,22 +121,22 @@ pub async fn like(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn follow(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let source_actor = activity.actor().id()?.to_string();
|
||||
let source_actor_internal = upub::model::actor::Entity::ap_to_internal(&source_actor, ctx.db())
|
||||
let source_actor_internal = crate::model::actor::Entity::ap_to_internal(&source_actor, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let target_actor = activity.object().id()?.to_string();
|
||||
let usr = ctx.fetch_user(&target_actor).await?;
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
let relation_model = upub::model::relation::ActiveModel {
|
||||
let relation_model = crate::model::relation::ActiveModel {
|
||||
internal: NotSet,
|
||||
accept: Set(None),
|
||||
activity: Set(activity_model.internal),
|
||||
follower: Set(source_actor_internal),
|
||||
following: Set(usr.internal),
|
||||
};
|
||||
upub::model::relation::Entity::insert(relation_model)
|
||||
crate::model::relation::Entity::insert(relation_model)
|
||||
.exec(ctx.db()).await?;
|
||||
let mut expanded_addressing = ctx.expand_addressing(activity_model.addressed()).await?;
|
||||
if !expanded_addressing.contains(&target_actor) {
|
||||
|
@ -148,11 +147,11 @@ pub async fn follow(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
// TODO what about TentativeAccept
|
||||
let target_actor = activity.actor().id()?.to_string();
|
||||
let follow_request_id = activity.object().id()?.to_string();
|
||||
let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
@ -163,26 +162,26 @@ pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
|
||||
upub::model::actor::Entity::update_many()
|
||||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
upub::model::actor::Column::FollowingCount,
|
||||
Expr::col(upub::model::actor::Column::FollowingCount).add(1)
|
||||
crate::model::actor::Column::FollowingCount,
|
||||
Expr::col(crate::model::actor::Column::FollowingCount).add(1)
|
||||
)
|
||||
.filter(upub::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.filter(crate::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
upub::model::actor::Entity::update_many()
|
||||
crate::model::actor::Entity::update_many()
|
||||
.col_expr(
|
||||
upub::model::actor::Column::FollowersCount,
|
||||
Expr::col(upub::model::actor::Column::FollowersCount).add(1)
|
||||
crate::model::actor::Column::FollowersCount,
|
||||
Expr::col(crate::model::actor::Column::FollowersCount).add(1)
|
||||
)
|
||||
.filter(upub::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.filter(crate::model::actor::Column::Id.eq(&follow_activity.actor))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
||||
upub::model::relation::Entity::update_many()
|
||||
.col_expr(upub::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
|
||||
.filter(upub::model::relation::Column::Activity.eq(follow_activity.internal))
|
||||
crate::model::relation::Entity::update_many()
|
||||
.col_expr(crate::model::relation::Column::Accept, Expr::value(Some(activity_model.internal)))
|
||||
.filter(crate::model::relation::Column::Activity.eq(follow_activity.internal))
|
||||
.exec(ctx.db()).await?;
|
||||
|
||||
tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor);
|
||||
|
@ -195,11 +194,11 @@ pub async fn accept(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
// TODO what about TentativeReject?
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let follow_request_id = activity.object().id()?.to_string();
|
||||
let follow_activity = upub::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
let follow_activity = crate::model::activity::Entity::find_by_ap_id(&follow_request_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
@ -210,8 +209,8 @@ pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
|
||||
let activity_model = ctx.insert_activity(activity).await?;
|
||||
|
||||
upub::model::relation::Entity::delete_many()
|
||||
.filter(upub::model::relation::Column::Activity.eq(activity_model.internal))
|
||||
crate::model::relation::Entity::delete_many()
|
||||
.filter(crate::model::relation::Column::Activity.eq(activity_model.internal))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
||||
|
@ -226,17 +225,16 @@ pub async fn reject(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let oid = activity.object().id()?.to_string();
|
||||
upub::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users");
|
||||
upub::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects");
|
||||
crate::model::actor::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from users");
|
||||
crate::model::object::Entity::delete_by_ap_id(&oid).exec(ctx.db()).await.info_failed("failed deleting from objects");
|
||||
tracing::debug!("deleted '{oid}'");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn update(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let aid = activity.id()?.to_string();
|
||||
let Some(object_node) = activity.object().extract() else {
|
||||
tracing::error!("refusing to process activity without embedded object");
|
||||
return Err(ProcessorError::Unprocessable);
|
||||
|
@ -247,24 +245,24 @@ pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
|
||||
match object_node.object_type()? {
|
||||
apb::ObjectType::Actor(_) => {
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&oid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&oid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let mut actor_model = upub::model::actor::ActiveModel::new(object_node.as_actor()?)?;
|
||||
let mut actor_model = crate::AP::actor_q(object_node.as_actor()?)?;
|
||||
actor_model.internal = Set(internal_uid);
|
||||
actor_model.updated = Set(chrono::Utc::now());
|
||||
upub::model::actor::Entity::update(actor_model)
|
||||
crate::model::actor::Entity::update(actor_model)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
apb::ObjectType::Note => {
|
||||
let internal_oid = upub::model::object::Entity::ap_to_internal(&oid, ctx.db())
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&oid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let mut object_model = upub::model::object::ActiveModel::new(&object_node)?;
|
||||
let mut object_model = crate::AP::object_q(&object_node)?;
|
||||
object_model.internal = Set(internal_oid);
|
||||
object_model.updated = Set(chrono::Utc::now());
|
||||
upub::model::object::Entity::update(object_model)
|
||||
crate::model::object::Entity::update(object_model)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
|
@ -277,11 +275,10 @@ pub async fn update(ctx: &upub::Context, activity: impl apb::Activity) -> Result
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
// TODO in theory we could work with just object_id but right now only accept embedded
|
||||
let undone_activity = activity.object().extract().ok_or(apb::FieldErr("object"))?;
|
||||
let undone_activity_id = undone_activity.id()?;
|
||||
let undone_activity_author = undone_activity.as_activity()?.actor().id()?.to_string();
|
||||
|
||||
if uid != undone_activity_author {
|
||||
|
@ -290,7 +287,7 @@ pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(
|
|||
|
||||
let undone_activity_target = undone_activity.as_activity()?.object().id()?.to_string();
|
||||
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
|
||||
|
@ -301,40 +298,40 @@ pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(
|
|||
|
||||
match activity_type {
|
||||
apb::ActivityType::Like => {
|
||||
let internal_oid = upub::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
let internal_oid = crate::model::object::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
upub::model::like::Entity::delete_many()
|
||||
crate::model::like::Entity::delete_many()
|
||||
.filter(
|
||||
Condition::all()
|
||||
.add(upub::model::like::Column::Actor.eq(internal_uid))
|
||||
.add(upub::model::like::Column::Object.eq(internal_oid))
|
||||
.add(crate::model::like::Column::Actor.eq(internal_uid))
|
||||
.add(crate::model::like::Column::Object.eq(internal_oid))
|
||||
)
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
upub::model::object::Entity::update_many()
|
||||
.filter(upub::model::object::Column::Internal.eq(internal_oid))
|
||||
.col_expr(upub::model::object::Column::Likes, Expr::col(upub::model::object::Column::Likes).sub(1))
|
||||
crate::model::object::Entity::update_many()
|
||||
.filter(crate::model::object::Column::Internal.eq(internal_oid))
|
||||
.col_expr(crate::model::object::Column::Likes, Expr::col(crate::model::object::Column::Likes).sub(1))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
apb::ActivityType::Follow => {
|
||||
let internal_uid_following = upub::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
let internal_uid_following = crate::model::actor::Entity::ap_to_internal(&undone_activity_target, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
upub::model::relation::Entity::delete_many()
|
||||
.filter(upub::model::relation::Column::Follower.eq(internal_uid))
|
||||
.filter(upub::model::relation::Column::Following.eq(internal_uid_following))
|
||||
crate::model::relation::Entity::delete_many()
|
||||
.filter(crate::model::relation::Column::Follower.eq(internal_uid))
|
||||
.filter(crate::model::relation::Column::Following.eq(internal_uid_following))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
upub::model::actor::Entity::update_many()
|
||||
.filter(upub::model::actor::Column::Internal.eq(internal_uid))
|
||||
.col_expr(upub::model::actor::Column::FollowingCount, Expr::col(upub::model::actor::Column::FollowingCount).sub(1))
|
||||
crate::model::actor::Entity::update_many()
|
||||
.filter(crate::model::actor::Column::Internal.eq(internal_uid))
|
||||
.col_expr(crate::model::actor::Column::FollowingCount, Expr::col(crate::model::actor::Column::FollowingCount).sub(1))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
upub::model::actor::Entity::update_many()
|
||||
.filter(upub::model::actor::Column::Internal.eq(internal_uid_following))
|
||||
.col_expr(upub::model::actor::Column::FollowersCount, Expr::col(upub::model::actor::Column::FollowersCount).sub(1))
|
||||
crate::model::actor::Entity::update_many()
|
||||
.filter(crate::model::actor::Column::Internal.eq(internal_uid_following))
|
||||
.col_expr(crate::model::actor::Column::FollowersCount, Expr::col(crate::model::actor::Column::FollowersCount).sub(1))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
},
|
||||
|
@ -347,10 +344,10 @@ pub async fn undo(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Result<(), ProcessorError> {
|
||||
let uid = activity.actor().id()?.to_string();
|
||||
let actor = ctx.fetch_user(&uid).await?;
|
||||
let internal_uid = upub::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
let internal_uid = crate::model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or(ProcessorError::Incomplete)?;
|
||||
let announced_id = activity.object().id()?.to_string();
|
||||
|
@ -371,7 +368,7 @@ pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Resu
|
|||
return Ok(())
|
||||
}
|
||||
|
||||
let share = upub::model::announce::ActiveModel {
|
||||
let share = crate::model::announce::ActiveModel {
|
||||
internal: NotSet,
|
||||
actor: Set(internal_uid),
|
||||
object: Set(object_model.internal),
|
||||
|
@ -380,11 +377,11 @@ pub async fn announce(ctx: &upub::Context, activity: impl apb::Activity) -> Resu
|
|||
|
||||
let expanded_addressing = ctx.expand_addressing(addressed).await?;
|
||||
ctx.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
|
||||
upub::model::announce::Entity::insert(share)
|
||||
crate::model::announce::Entity::insert(share)
|
||||
.exec(ctx.db()).await?;
|
||||
upub::model::object::Entity::update_many()
|
||||
.col_expr(upub::model::object::Column::Announces, Expr::col(upub::model::object::Column::Announces).add(1))
|
||||
.filter(upub::model::object::Column::Internal.eq(object_model.internal))
|
||||
crate::model::object::Entity::update_many()
|
||||
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
|
||||
.filter(crate::model::object::Column::Internal.eq(object_model.internal))
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
|
|
@ -1,134 +0,0 @@
|
|||
use reqwest::Method;
|
||||
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder};
|
||||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
|
||||
use apb::{ActivityMut, Node};
|
||||
use crate::{model, Context, server::{fetcher::Fetcher, jsonld::LD}};
|
||||
|
||||
pub struct Dispatcher {
|
||||
waker: broadcast::Sender<()>,
|
||||
}
|
||||
|
||||
impl Default for Dispatcher {
|
||||
fn default() -> Self {
|
||||
let (waker, _) = broadcast::channel(1);
|
||||
Dispatcher { waker }
|
||||
}
|
||||
}
|
||||
|
||||
impl Dispatcher {
|
||||
pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> {
|
||||
let mut waker = self.waker.subscribe();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Err(e) = worker(&db, &domain, poll_interval, &mut waker).await {
|
||||
tracing::error!("delivery worker exited with error: {e}");
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval * 10)).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn wakeup(&self) {
|
||||
match self.waker.send(()) {
|
||||
Err(_) => tracing::error!("no worker to wakeup"),
|
||||
Ok(n) => tracing::debug!("woken {n} workers"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker: &mut broadcast::Receiver<()>) -> crate::Result<()> {
|
||||
loop {
|
||||
let Some(delivery) = model::delivery::Entity::find()
|
||||
.filter(model::delivery::Column::NotBefore.lte(chrono::Utc::now()))
|
||||
.order_by(model::delivery::Column::NotBefore, Order::Asc)
|
||||
.one(db)
|
||||
.await?
|
||||
else {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = waker.recv() => {},
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
|
||||
}
|
||||
continue
|
||||
};
|
||||
|
||||
let del_row = model::delivery::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::Set(delivery.internal),
|
||||
..Default::default()
|
||||
};
|
||||
let del = model::delivery::Entity::delete(del_row)
|
||||
.exec(db)
|
||||
.await?;
|
||||
|
||||
if del.rows_affected == 0 {
|
||||
// another worker claimed this delivery
|
||||
continue; // go back to the top
|
||||
}
|
||||
if delivery.expired() {
|
||||
// try polling for another one
|
||||
continue; // go back to top
|
||||
}
|
||||
|
||||
tracing::info!("delivering {} to {}", delivery.activity, delivery.target);
|
||||
|
||||
let payload = match model::activity::Entity::find_by_ap_id(&delivery.activity)
|
||||
.find_also_related(model::object::Entity)
|
||||
.one(db)
|
||||
.await? // TODO probably should not fail here and at least re-insert the delivery
|
||||
{
|
||||
Some((activity, None)) => activity.ap().ld_context(),
|
||||
Some((activity, Some(object))) => {
|
||||
let always_embed = matches!(
|
||||
activity.activity_type,
|
||||
apb::ActivityType::Create
|
||||
| apb::ActivityType::Undo
|
||||
| apb::ActivityType::Update
|
||||
| apb::ActivityType::Accept(_)
|
||||
| apb::ActivityType::Reject(_)
|
||||
);
|
||||
if always_embed {
|
||||
activity.ap().set_object(Node::object(object.ap())).ld_context()
|
||||
} else {
|
||||
activity.ap().ld_context()
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::warn!("skipping dispatch for deleted object {}", delivery.activity);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
let Some(actor) = model::actor::Entity::find_by_ap_id(&delivery.actor)
|
||||
.one(db)
|
||||
.await?
|
||||
else {
|
||||
tracing::error!("abandoning delivery of {} from non existant actor: {}", delivery.activity, delivery.actor);
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(key) = actor.private_key
|
||||
else {
|
||||
tracing::error!("abandoning delivery of {} from actor without private key: {}", delivery.activity, delivery.actor);
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Err(e) = Context::request(
|
||||
Method::POST, &delivery.target,
|
||||
Some(&serde_json::to_string(&payload).unwrap()),
|
||||
&delivery.actor, &key, domain
|
||||
).await {
|
||||
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
|
||||
let new_delivery = model::delivery::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::NotSet,
|
||||
not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()),
|
||||
actor: sea_orm::ActiveValue::Set(delivery.actor),
|
||||
target: sea_orm::ActiveValue::Set(delivery.target),
|
||||
activity: sea_orm::ActiveValue::Set(delivery.activity),
|
||||
published: sea_orm::ActiveValue::Set(delivery.published),
|
||||
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
|
||||
};
|
||||
model::delivery::Entity::insert(new_delivery).exec(db).await?;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
pub mod address;
|
||||
pub mod normalize;
|
||||
pub mod process;
|
||||
pub mod fetch;
|
||||
|
||||
// pub mod dispatcher;
|
|
@ -1,6 +1,6 @@
|
|||
use axum::extract::{Path, Query, State};
|
||||
use sea_orm::{ColumnTrait, QueryFilter};
|
||||
use upub::{model::{self, addressing::Event, attachment::BatchFillable}, Context};
|
||||
use upub::{model::{self, addressing::Event, attachment::BatchFillable}, traits::Fetcher, Context};
|
||||
use apb::LD;
|
||||
|
||||
use crate::{builders::JsonLD, AuthIdentity};
|
||||
|
@ -14,12 +14,12 @@ pub async fn view(
|
|||
Query(query): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let aid = ctx.aid(&id);
|
||||
// if auth.is_local() && query.fetch && !ctx.is_local(&aid) {
|
||||
// let obj = ctx.fetch_activity(&aid).await?;
|
||||
// if obj.id != aid {
|
||||
// return Err(crate::ApiError::Redirect(obj.id));
|
||||
// }
|
||||
// }
|
||||
if auth.is_local() && query.fetch && !ctx.is_local(&aid) {
|
||||
let obj = ctx.fetch_activity(&aid).await?;
|
||||
if obj.id != aid {
|
||||
return Err(crate::ApiError::Redirect(obj.id));
|
||||
}
|
||||
}
|
||||
|
||||
let row = model::addressing::Entity::find_addressed(auth.my_id())
|
||||
.filter(model::activity::Column::Id.eq(&aid))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use apb::{LD, ActorMut, BaseMut, ObjectMut, PublicKeyMut};
|
||||
use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Redirect, Response}, Form, Json};
|
||||
use reqwest::Method;
|
||||
use upub::Context;
|
||||
use upub::{traits::Fetcher, Context};
|
||||
|
||||
use crate::{builders::JsonLD, AuthIdentity};
|
||||
|
||||
|
@ -52,20 +52,19 @@ pub async fn proxy_get(
|
|||
if !ctx.cfg().security.allow_public_debugger && !auth.is_local() {
|
||||
return Err(crate::ApiError::unauthorized());
|
||||
}
|
||||
todo!()
|
||||
// Ok(Json(
|
||||
// Context::request(
|
||||
// Method::GET,
|
||||
// &query.id,
|
||||
// None,
|
||||
// ctx.base(),
|
||||
// ctx.pkey(),
|
||||
// &format!("{}+proxy", ctx.domain()),
|
||||
// )
|
||||
// .await?
|
||||
// .json::<serde_json::Value>()
|
||||
// .await?
|
||||
// ))
|
||||
Ok(Json(
|
||||
Context::request(
|
||||
Method::GET,
|
||||
&query.id,
|
||||
None,
|
||||
ctx.base(),
|
||||
ctx.pkey(),
|
||||
&format!("{}+proxy", ctx.domain()),
|
||||
)
|
||||
.await?
|
||||
.json::<serde_json::Value>()
|
||||
.await?
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn proxy_form(
|
||||
|
@ -77,18 +76,17 @@ pub async fn proxy_form(
|
|||
if !ctx.cfg().security.allow_public_debugger && auth.is_local() {
|
||||
return Err(crate::ApiError::unauthorized());
|
||||
}
|
||||
todo!()
|
||||
// Ok(Json(
|
||||
// Context::request(
|
||||
// Method::GET,
|
||||
// &query.id,
|
||||
// None,
|
||||
// ctx.base(),
|
||||
// ctx.pkey(),
|
||||
// &format!("{}+proxy", ctx.domain()),
|
||||
// )
|
||||
// .await?
|
||||
// .json::<serde_json::Value>()
|
||||
// .await?
|
||||
// ))
|
||||
Ok(Json(
|
||||
Context::request(
|
||||
Method::GET,
|
||||
&query.id,
|
||||
None,
|
||||
ctx.base(),
|
||||
ctx.pkey(),
|
||||
&format!("{}+proxy", ctx.domain()),
|
||||
)
|
||||
.await?
|
||||
.json::<serde_json::Value>()
|
||||
.await?
|
||||
))
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use axum::{http::StatusCode, extract::State, Json};
|
||||
use rand::Rng;
|
||||
use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||
use upub::{server::admin::Administrable, Context};
|
||||
use upub::{traits::Administrable, Context};
|
||||
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use apb::{Activity, ActivityType, Base};
|
||||
use axum::{extract::{Query, State}, http::StatusCode, Json};
|
||||
use sea_orm::{sea_query::IntoCondition, ColumnTrait};
|
||||
use upub::Context;
|
||||
use sea_orm::{sea_query::IntoCondition, ActiveValue::{NotSet, Set}, ColumnTrait, EntityTrait};
|
||||
use upub::{model::job::JobType, Context};
|
||||
|
||||
use crate::{AuthIdentity, Identity, builders::JsonLD};
|
||||
|
||||
|
@ -42,8 +42,8 @@ pub async fn post(
|
|||
State(ctx): State<Context>,
|
||||
AuthIdentity(auth): AuthIdentity,
|
||||
Json(activity): Json<serde_json::Value>
|
||||
) -> crate::ApiResult<()> {
|
||||
let Identity::Remote { domain: server, user: uid, .. } = auth else {
|
||||
) -> crate::ApiResult<StatusCode> {
|
||||
let Identity::Remote { domain: _server, user: uid, .. } = auth else {
|
||||
if matches!(activity.activity_type(), Ok(ActivityType::Delete)) {
|
||||
// this is spammy af, ignore them!
|
||||
// we basically received a delete for a user we can't fetch and verify, meaning remote
|
||||
|
@ -51,48 +51,35 @@ pub async fn post(
|
|||
// but mastodon keeps hammering us trying to delete this user, so just make mastodon happy
|
||||
// and return 200 without even bothering checking this stuff
|
||||
// would be cool if mastodon played nicer with the network...
|
||||
return Ok(());
|
||||
return Ok(StatusCode::OK);
|
||||
}
|
||||
tracing::warn!("refusing unauthorized activity: {}", pretty_json!(activity));
|
||||
if matches!(auth, Identity::Anonymous) {
|
||||
return Err(crate::ApiError::unauthorized());
|
||||
return Ok(StatusCode::UNAUTHORIZED);
|
||||
} else {
|
||||
return Err(crate::ApiError::forbidden());
|
||||
return Ok(StatusCode::FORBIDDEN);
|
||||
}
|
||||
};
|
||||
|
||||
todo!()
|
||||
let aid = activity.id()?.to_string();
|
||||
|
||||
// let aid = activity.id().ok_or_else(|| crate::ApiError::field("id"))?.to_string();
|
||||
// let actor = activity.actor().id().ok_or_else(|| crate::ApiError::field("actor"))?;
|
||||
if let Some(_internal) = upub::model::activity::Entity::ap_to_internal(&aid, ctx.db()).await? {
|
||||
return Ok(StatusCode::OK); // already processed
|
||||
}
|
||||
|
||||
// if uid != actor {
|
||||
// return Err(crate::ApiError::unauthorized());
|
||||
// }
|
||||
let job = upub::model::job::ActiveModel {
|
||||
internal: NotSet,
|
||||
job_type: Set(JobType::Inbound),
|
||||
actor: Set(uid),
|
||||
target: Set(None),
|
||||
activity: Set(aid),
|
||||
payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing json payload"))),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0)
|
||||
};
|
||||
|
||||
// tracing::debug!("processing federated activity: '{:#}'", activity);
|
||||
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
||||
// // TODO we could process Links and bare Objects maybe, but probably out of AP spec?
|
||||
// match activity.activity_type().ok_or_else(crate::ApiError::bad_request)? {
|
||||
// ActivityType::Activity => {
|
||||
// tracing::warn!("skipping unprocessable base activity: {}", pretty_json!(activity));
|
||||
// Err(StatusCode::UNPROCESSABLE_ENTITY.into()) // won't ingest useless stuff
|
||||
// },
|
||||
|
||||
// // TODO emojireacts are NOT likes, but let's process them like ones for now maybe?
|
||||
// ActivityType::Like | ActivityType::EmojiReact => Ok(ctx.like(server, activity).await?),
|
||||
// ActivityType::Create => Ok(ctx.create(server, activity).await?),
|
||||
// ActivityType::Follow => Ok(ctx.follow(server, activity).await?),
|
||||
// ActivityType::Announce => Ok(ctx.announce(server, activity).await?),
|
||||
// ActivityType::Accept(_) => Ok(ctx.accept(server, activity).await?),
|
||||
// ActivityType::Reject(_) => Ok(ctx.reject(server, activity).await?),
|
||||
// ActivityType::Undo => Ok(ctx.undo(server, activity).await?),
|
||||
// ActivityType::Delete => Ok(ctx.delete(server, activity).await?),
|
||||
// ActivityType::Update => Ok(ctx.update(server, activity).await?),
|
||||
|
||||
// _x => {
|
||||
// tracing::info!("received unimplemented activity on inbox: {}", pretty_json!(activity));
|
||||
// Err(StatusCode::NOT_IMPLEMENTED.into())
|
||||
// },
|
||||
// }
|
||||
Ok(StatusCode::ACCEPTED)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ pub mod replies;
|
|||
use apb::{CollectionMut, ObjectMut, LD};
|
||||
use axum::extract::{Path, Query, State};
|
||||
use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns};
|
||||
use upub::{model::{self, addressing::Event}, Context};
|
||||
use upub::{model::{self, addressing::Event}, traits::Fetcher, Context};
|
||||
|
||||
use crate::{builders::JsonLD, AuthIdentity};
|
||||
|
||||
|
@ -16,13 +16,13 @@ pub async fn view(
|
|||
Query(query): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let oid = ctx.oid(&id);
|
||||
// if auth.is_local() && query.fetch && !ctx.is_local(&oid) {
|
||||
// let obj = ctx.fetch_object(&oid).await?;
|
||||
// // some implementations serve statuses on different urls than their AP id
|
||||
// if obj.id != oid {
|
||||
// return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id))));
|
||||
// }
|
||||
// }
|
||||
if auth.is_local() && query.fetch && !ctx.is_local(&oid) {
|
||||
let obj = ctx.fetch_object(&oid).await?;
|
||||
// some implementations serve statuses on different urls than their AP id
|
||||
if obj.id != oid {
|
||||
return Err(crate::ApiError::Redirect(upub::url!(ctx, "/objects/{}", ctx.id(&obj.id))));
|
||||
}
|
||||
}
|
||||
|
||||
let item = model::addressing::Entity::find_addressed(auth.my_id())
|
||||
.filter(model::object::Column::Id.eq(&oid))
|
||||
|
|
|
@ -8,7 +8,7 @@ pub async fn get(
|
|||
State(ctx): State<Context>,
|
||||
Path(id): Path<String>,
|
||||
AuthIdentity(auth): AuthIdentity,
|
||||
Query(q): Query<TryFetch>,
|
||||
Query(_q): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let replies_id = upub::url!(ctx, "/objects/{id}/replies");
|
||||
let oid = ctx.oid(&id);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
|
||||
use axum::{http::StatusCode, extract::{Path, Query, State}, Json};
|
||||
|
||||
use sea_orm::{ColumnTrait, Condition};
|
||||
use upub::{model, Context};
|
||||
|
@ -54,7 +54,7 @@ pub async fn post(
|
|||
Path(_id): Path<String>,
|
||||
AuthIdentity(_auth): AuthIdentity,
|
||||
Json(activity): Json<serde_json::Value>,
|
||||
) -> crate::ApiResult<()> {
|
||||
) -> crate::ApiResult<StatusCode> {
|
||||
// POSTing to user inboxes is effectively the same as POSTing to the main inbox
|
||||
super::super::inbox::post(State(ctx), AuthIdentity(_auth), Json(activity)).await
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ pub mod following;
|
|||
use axum::extract::{Path, Query, State};
|
||||
|
||||
use apb::{LD, ActorMut, EndpointsMut, Node, ObjectMut};
|
||||
use upub::{ext::AnyQuery, model, Context};
|
||||
use upub::{ext::AnyQuery, model, traits::Fetcher, Context};
|
||||
|
||||
use crate::{builders::JsonLD, ApiError, AuthIdentity};
|
||||
|
||||
|
@ -21,16 +21,18 @@ pub async fn view(
|
|||
Query(query): Query<TryFetch>,
|
||||
) -> crate::ApiResult<JsonLD<serde_json::Value>> {
|
||||
let mut uid = ctx.uid(&id);
|
||||
// if auth.is_local() {
|
||||
// if id.starts_with('@') {
|
||||
// if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') {
|
||||
// uid = ctx.webfinger(user, host).await?;
|
||||
// }
|
||||
// }
|
||||
// if query.fetch && !ctx.is_local(&uid) {
|
||||
// ctx.fetch_user(&uid).await?;
|
||||
// }
|
||||
// }
|
||||
if auth.is_local() {
|
||||
if id.starts_with('@') {
|
||||
if let Some((user, host)) = id.replacen('@', "", 1).split_once('@') {
|
||||
if let Some(webfinger) = ctx.webfinger(user, host).await? {
|
||||
uid = webfinger;
|
||||
}
|
||||
}
|
||||
}
|
||||
if query.fetch && !ctx.is_local(&uid) {
|
||||
ctx.fetch_user(&uid).await?;
|
||||
}
|
||||
}
|
||||
let internal_uid = model::actor::Entity::ap_to_internal(&uid, ctx.db())
|
||||
.await?
|
||||
.ok_or_else(ApiError::not_found)?;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
|
||||
use sea_orm::{ColumnTrait, Condition};
|
||||
use sea_orm::{ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait};
|
||||
|
||||
use apb::{AcceptType, ActivityType, Base, BaseType, ObjectType, RejectType};
|
||||
use upub::{model, Context};
|
||||
|
||||
use crate::{activitypub::{CreationResult, Pagination}, builders::JsonLD, AuthIdentity, Identity};
|
||||
|
@ -46,46 +45,29 @@ pub async fn post(
|
|||
match auth {
|
||||
Identity::Anonymous => Err(StatusCode::UNAUTHORIZED.into()),
|
||||
Identity::Remote { .. } => Err(StatusCode::NOT_IMPLEMENTED.into()),
|
||||
Identity::Local { id: uid, .. } => if ctx.uid(&id) == uid {
|
||||
tracing::debug!("processing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default());
|
||||
todo!()
|
||||
// match activity.base_type()? {
|
||||
// BaseType::Link(_) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()),
|
||||
Identity::Local { id: uid, .. } => {
|
||||
if ctx.uid(&id) != uid {
|
||||
return Err(crate::ApiError::forbidden());
|
||||
}
|
||||
|
||||
// BaseType::Object(ObjectType::Note) =>
|
||||
// Ok(CreationResult(ctx.create_note(uid, activity).await?)),
|
||||
tracing::debug!("enqueuing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default());
|
||||
let aid = ctx.aid(&Context::new_id());
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Create)) =>
|
||||
// Ok(CreationResult(ctx.create(uid, activity).await?)),
|
||||
let job = model::job::ActiveModel {
|
||||
internal: NotSet,
|
||||
activity: Set(aid.clone()),
|
||||
job_type: Set(model::job::JobType::Local),
|
||||
actor: Set(uid.clone()),
|
||||
target: Set(None),
|
||||
published: Set(chrono::Utc::now()),
|
||||
not_before: Set(chrono::Utc::now()),
|
||||
attempt: Set(0),
|
||||
payload: Set(Some(serde_json::to_string(&activity).expect("failed serializing back json object"))),
|
||||
};
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Like)) =>
|
||||
// Ok(CreationResult(ctx.like(uid, activity).await?)),
|
||||
model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Follow)) =>
|
||||
// Ok(CreationResult(ctx.follow(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Announce)) =>
|
||||
// Ok(CreationResult(ctx.announce(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept))) =>
|
||||
// Ok(CreationResult(ctx.accept(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject))) =>
|
||||
// Ok(CreationResult(ctx.reject(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Undo)) =>
|
||||
// Ok(CreationResult(ctx.undo(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Delete)) =>
|
||||
// Ok(CreationResult(ctx.delete(uid, activity).await?)),
|
||||
|
||||
// BaseType::Object(ObjectType::Activity(ActivityType::Update)) =>
|
||||
// Ok(CreationResult(ctx.update(uid, activity).await?)),
|
||||
|
||||
// _ => Err(StatusCode::NOT_IMPLEMENTED.into()),
|
||||
// }
|
||||
} else {
|
||||
Err(StatusCode::FORBIDDEN.into())
|
||||
Ok(CreationResult(aid))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,7 +95,10 @@ impl<T: serde::Serialize> IntoResponse for JsonRD<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn webfinger(State(ctx): State<Context>, Query(query): Query<WebfingerQuery>) -> upub::Result<JsonRD<JsonResourceDescriptor>> {
|
||||
pub async fn webfinger(
|
||||
State(ctx): State<Context>,
|
||||
Query(query): Query<WebfingerQuery>
|
||||
) -> crate::ApiResult<JsonRD<JsonResourceDescriptor>> {
|
||||
if let Some((user, domain)) = query
|
||||
.resource
|
||||
.replace("acct:", "")
|
||||
|
@ -106,7 +109,7 @@ pub async fn webfinger(State(ctx): State<Context>, Query(query): Query<Webfinger
|
|||
.filter(model::actor::Column::Domain.eq(domain))
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or_else(upub::Error::not_found)?;
|
||||
.ok_or_else(crate::ApiError::not_found)?;
|
||||
|
||||
let expires = if domain == ctx.domain() {
|
||||
// TODO configurable webfinger TTL, also 30 days may be too much???
|
||||
|
@ -162,7 +165,7 @@ pub struct OauthAuthorizationServerResponse {
|
|||
authorization_response_iss_parameter_supported: bool,
|
||||
}
|
||||
|
||||
pub async fn oauth_authorization_server(State(ctx): State<Context>) -> upub::Result<Json<OauthAuthorizationServerResponse>> {
|
||||
pub async fn oauth_authorization_server(State(ctx): State<Context>) -> crate::ApiResult<Json<OauthAuthorizationServerResponse>> {
|
||||
Ok(Json(OauthAuthorizationServerResponse {
|
||||
issuer: upub::url!(ctx, ""),
|
||||
authorization_endpoint: upub::url!(ctx, "/auth"),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use axum::{extract::{FromRef, FromRequestParts}, http::{header, request::Parts}};
|
||||
use reqwest::StatusCode;
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||
use httpsign::HttpSignature;
|
||||
use upub::traits::Fetcher;
|
||||
|
||||
use crate::ApiError;
|
||||
|
||||
|
@ -120,11 +120,9 @@ where
|
|||
.next().ok_or(ApiError::bad_request())?
|
||||
.to_string();
|
||||
|
||||
match upub::model::actor::Entity::find_by_ap_id(&user_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
{
|
||||
Some(user) => match http_signature
|
||||
match ctx.fetch_user(&user_id).await {
|
||||
Err(e) => tracing::warn!("failed resolving http signature actor: {e}"),
|
||||
Ok(user) => match http_signature
|
||||
.build_from_parts(parts)
|
||||
.verify(&user.public_key)
|
||||
{
|
||||
|
@ -137,9 +135,6 @@ where
|
|||
Ok(false) => tracing::warn!("invalid signature: {http_signature:?}"),
|
||||
Err(e) => tracing::error!("error verifying signature: {e}"),
|
||||
},
|
||||
None => {
|
||||
// TODO enqueue fetching who tried signing this
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,13 +11,13 @@ pub enum ApiError {
|
|||
#[error("http signature error: {0:?}")]
|
||||
HttpSignature(#[from] httpsign::HttpSignatureError),
|
||||
|
||||
#[error("fetch error: {0:?}")]
|
||||
#[error("outgoing request error: {0:?}")]
|
||||
Reqwest(#[from] reqwest::Error),
|
||||
|
||||
// TODO this is quite ugly because its basically a reqwest::Error but with extra string... buuut
|
||||
// helps with debugging!
|
||||
#[error("fetch error: {0:?} -- server responded with {1}")]
|
||||
FetchError(reqwest::Error, String),
|
||||
#[error("fetch error: {0:?}")]
|
||||
FetchError(#[from] upub::traits::fetch::PullError),
|
||||
|
||||
// wrapper error to return arbitraty status codes
|
||||
#[error("{0}")]
|
||||
|
@ -83,7 +83,7 @@ impl axum::response::IntoResponse for ApiError {
|
|||
"inner": format!("{e:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
ApiError::Reqwest(x) | ApiError::FetchError(x, _) => (
|
||||
ApiError::Reqwest(x) => (
|
||||
x.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "request",
|
||||
|
@ -93,6 +93,14 @@ impl axum::response::IntoResponse for ApiError {
|
|||
"inner": format!("{x:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
ApiError::FetchError(pull) => (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
axum::Json(serde_json::json!({
|
||||
"error": "fetch",
|
||||
"description": descr,
|
||||
"inner": format!("{pull:#?}"),
|
||||
}))
|
||||
).into_response(),
|
||||
ApiError::Field(x) => (
|
||||
axum::http::StatusCode::BAD_REQUEST,
|
||||
axum::Json(serde_json::json!({
|
||||
|
|
|
@ -25,7 +25,7 @@ pub mod mastodon {
|
|||
impl MastodonRouter for axum::Router<upub::Context> {}
|
||||
}
|
||||
|
||||
pub async fn serve(ctx: upub::Context, bind: String) -> upub::Result<()> {
|
||||
pub async fn serve(ctx: upub::Context, bind: String) -> Result<(), std::io::Error> {
|
||||
use activitypub::ActivityPubRouter;
|
||||
use mastodon::MastodonRouter;
|
||||
use tower_http::{cors::CorsLayer, trace::TraceLayer};
|
||||
|
@ -50,8 +50,7 @@ pub async fn serve(ctx: upub::Context, bind: String) -> upub::Result<()> {
|
|||
.with_state(ctx);
|
||||
|
||||
// run our app with hyper, listening locally on port 3000
|
||||
let listener = tokio::net::TcpListener::bind(bind)
|
||||
.await.expect("could not bind tcp socket");
|
||||
let listener = tokio::net::TcpListener::bind(bind).await?;
|
||||
|
||||
axum::serve(listener, router).await?;
|
||||
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
[package]
|
||||
name = "upub-processor"
|
||||
name = "upub-worker"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
authors = [ "alemi <me@alemi.dev>" ]
|
||||
description = "upub background activity processing worker"
|
||||
description = "upub background activity processing and dispatching workers"
|
||||
license = "AGPL-3.0"
|
||||
repository = "https://git.alemi.dev/upub.git"
|
||||
readme = "README.md"
|
||||
|
@ -17,6 +17,7 @@ async-trait = "0.1"
|
|||
serde_json = "1"
|
||||
sea-orm = "0.12"
|
||||
jrd = "0.1"
|
||||
regex = "1.10"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tokio = { version = "1.35", features = ["full"] } # TODO slim this down
|
||||
reqwest = { version = "0.12", features = ["json"] }
|
134
upub/worker/src/dispatcher.rs
Normal file
134
upub/worker/src/dispatcher.rs
Normal file
|
@ -0,0 +1,134 @@
|
|||
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
|
||||
|
||||
use upub::{model, Context};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum JobError {
|
||||
#[error("database error: {0:?}")]
|
||||
Database(#[from] sea_orm::DbErr),
|
||||
|
||||
#[error("invalid payload json: {0:?}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
|
||||
#[error("malformed payload: {0}")]
|
||||
Malformed(#[from] apb::FieldErr),
|
||||
|
||||
#[error("malformed job: missing payload")]
|
||||
MissingPayload,
|
||||
|
||||
#[error("no available implementation to process job")]
|
||||
Unprocessable,
|
||||
|
||||
#[error("error processing activity: {0:?}")]
|
||||
ProcessorError(#[from] upub::traits::process::ProcessorError),
|
||||
}
|
||||
|
||||
pub type JobResult<T> = Result<T, JobError>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait JobDispatcher : Sized {
|
||||
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
|
||||
async fn lock(&self, job_internal: i64) -> JobResult<bool>;
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>);
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl JobDispatcher for Context {
|
||||
async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>> {
|
||||
let mut s = model::job::Entity::find()
|
||||
.filter(model::job::Column::NotBefore.lte(chrono::Utc::now()));
|
||||
|
||||
if let Some(t) = filter {
|
||||
s = s.filter(model::job::Column::JobType.eq(t));
|
||||
}
|
||||
|
||||
Ok(
|
||||
s
|
||||
.order_by(model::job::Column::NotBefore, Order::Asc)
|
||||
.one(self.db())
|
||||
.await?
|
||||
)
|
||||
}
|
||||
|
||||
async fn lock(&self, job_internal: i64) -> JobResult<bool> {
|
||||
let res = model::job::Entity::delete(
|
||||
model::job::ActiveModel {
|
||||
internal: sea_orm::ActiveValue::Set(job_internal),
|
||||
..Default::default()
|
||||
}
|
||||
)
|
||||
.exec(self.db())
|
||||
.await?;
|
||||
|
||||
if res.rows_affected < 1 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>) {
|
||||
macro_rules! restart {
|
||||
(now) => { continue };
|
||||
() => {
|
||||
{
|
||||
tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut pool = tokio::task::JoinSet::new();
|
||||
|
||||
loop {
|
||||
let job = match self.poll(job_filter).await {
|
||||
Ok(Some(j)) => j,
|
||||
Ok(None) => restart!(),
|
||||
Err(e) => {
|
||||
tracing::error!("error polling for jobs: {e}");
|
||||
restart!()
|
||||
},
|
||||
};
|
||||
|
||||
match self.lock(job.internal).await {
|
||||
Ok(true) => {},
|
||||
Ok(false) => restart!(now),
|
||||
Err(e) => {
|
||||
tracing::error!("error locking job: {e}");
|
||||
restart!()
|
||||
},
|
||||
}
|
||||
|
||||
if job.expired() {
|
||||
tracing::info!("dropping expired job {job:?}");
|
||||
restart!(now);
|
||||
}
|
||||
|
||||
let _ctx = self.clone();
|
||||
pool.spawn(async move {
|
||||
let res = match job.job_type {
|
||||
model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await,
|
||||
model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await,
|
||||
model::job::JobType::Local => crate::local::process(_ctx.clone(), &job).await,
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::error!("failed processing job #{}: {e}", job.internal);
|
||||
let active = job.clone().repeat();
|
||||
if let Err(e) = model::job::Entity::insert(active)
|
||||
.exec(_ctx.db())
|
||||
.await
|
||||
{
|
||||
tracing::error!("could not insert back job ({e}), dropping:\n{job:#?}")
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while pool.len() >= concurrency {
|
||||
if let Some(Err(e)) = pool.join_next().await {
|
||||
tracing::error!("failed joining processing task: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
20
upub/worker/src/inbound.rs
Normal file
20
upub/worker/src/inbound.rs
Normal file
|
@ -0,0 +1,20 @@
|
|||
use upub::traits::Processor;
|
||||
|
||||
|
||||
pub async fn process(ctx: upub::Context, job: &upub::model::job::Model) -> crate::JobResult<()> {
|
||||
let Some(ref payload) = job.payload else {
|
||||
tracing::error!("abandoning inbound job without payload: {job:#?}");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Ok(activity) = serde_json::from_str::<serde_json::Value>(payload) else {
|
||||
tracing::error!("abandoning inbound job with invalid payload: {job:#?}");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Err(e) = ctx.process(activity).await {
|
||||
tracing::error!("failed processing job #{}: {e}", job.internal);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
18
upub/worker/src/lib.rs
Normal file
18
upub/worker/src/lib.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
pub mod dispatcher;
|
||||
pub mod inbound;
|
||||
pub mod outbound;
|
||||
pub mod local;
|
||||
|
||||
pub use dispatcher::{JobError, JobResult};
|
||||
|
||||
pub fn spawn(
|
||||
ctx: upub::Context,
|
||||
concurrency: usize,
|
||||
poll: u64,
|
||||
filter: Option<upub::model::job::JobType>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
use dispatcher::JobDispatcher;
|
||||
tokio::spawn(async move {
|
||||
ctx.run(concurrency, poll, filter).await
|
||||
})
|
||||
}
|
|
@ -1,11 +1,73 @@
|
|||
use apb::{target::Addressed, Activity, ActivityMut, Base, BaseMut, Node, Object, ObjectMut};
|
||||
use reqwest::StatusCode;
|
||||
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet, Unchanged}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns};
|
||||
use apb::{target::Addressed, Activity, ActivityMut, BaseMut, Object, ObjectMut};
|
||||
use sea_orm::{EntityTrait, QueryFilter, QuerySelect, SelectColumns, ColumnTrait};
|
||||
use upub::{model, traits::{Addresser, Processor}, Context};
|
||||
|
||||
use crate::{errors::UpubError, model, ext::AnyQuery};
|
||||
|
||||
use super::{addresser::Addresser, fetcher::Fetcher, normalizer::Normalizer, side_effects::SideEffects, Context};
|
||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||
let payload = job.payload.as_ref().ok_or(crate::JobError::MissingPayload)?;
|
||||
let mut activity : serde_json::Value = serde_json::from_str(payload)?;
|
||||
let mut t = activity.object_type()?;
|
||||
|
||||
if matches!(t, apb::ObjectType::Note) {
|
||||
activity = apb::new()
|
||||
.set_activity_type(Some(apb::ActivityType::Create))
|
||||
.set_object(apb::Node::object(activity));
|
||||
t = apb::ObjectType::Activity(apb::ActivityType::Create);
|
||||
}
|
||||
|
||||
activity = activity
|
||||
.set_id(Some(&job.activity))
|
||||
.set_actor(apb::Node::link(job.actor.clone()))
|
||||
.set_published(Some(chrono::Utc::now()));
|
||||
|
||||
if matches!(t, apb::ObjectType::Activity(apb::ActivityType::Create)) {
|
||||
let raw_oid = Context::new_id();
|
||||
let oid = ctx.oid(&raw_oid);
|
||||
// object must be embedded, wont dereference here
|
||||
let object = activity.object().extract().ok_or(apb::FieldErr("object"))?;
|
||||
// TODO regex hell here i come...
|
||||
let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern");
|
||||
let mut content = object.content().map(|x| x.to_string()).ok();
|
||||
if let Some(c) = content {
|
||||
let mut tmp = mdhtml::safe_markdown(&c);
|
||||
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
|
||||
if let Ok(Some(uid)) = model::actor::Entity::find()
|
||||
.filter(model::actor::Column::PreferredUsername.eq(user))
|
||||
.filter(model::actor::Column::Domain.eq(domain))
|
||||
.select_only()
|
||||
.select_column(model::actor::Column::Id)
|
||||
.into_tuple::<String>()
|
||||
.one(ctx.db())
|
||||
.await
|
||||
{
|
||||
tmp = tmp.replacen(full, &format!("<a href=\"{uid}\" class=\"u-url mention\">@{user}</a>"), 1);
|
||||
}
|
||||
}
|
||||
content = Some(tmp);
|
||||
}
|
||||
|
||||
activity = activity
|
||||
.set_object(apb::Node::object(
|
||||
object
|
||||
.set_id(Some(&oid))
|
||||
.set_content(content.as_deref())
|
||||
.set_attributed_to(apb::Node::link(job.actor.clone()))
|
||||
.set_published(Some(chrono::Utc::now()))
|
||||
.set_url(apb::Node::maybe_link(ctx.cfg().frontend_url(&format!("/objects/{raw_oid}")))),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO we expand addressing twice, ugghhhhh
|
||||
let targets = ctx.expand_addressing(activity.addressed()).await?;
|
||||
|
||||
ctx.process(activity).await?;
|
||||
|
||||
ctx.deliver_to(&job.activity, &job.actor, &targets).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
#[axum::async_trait]
|
||||
impl apb::server::Outbox for Context {
|
||||
|
@ -40,27 +102,6 @@ impl apb::server::Outbox for Context {
|
|||
self.fetch_object(&reply).await?;
|
||||
}
|
||||
|
||||
// TODO regex hell here i come...
|
||||
let re = regex::Regex::new(r"@(.+)@([^ ]+)").expect("failed compiling regex pattern");
|
||||
let mut content = object.content().map(|x| x.to_string());
|
||||
if let Some(c) = content {
|
||||
let mut tmp = mdhtml::safe_markdown(&c);
|
||||
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
|
||||
if let Ok(Some(uid)) = model::actor::Entity::find()
|
||||
.filter(model::actor::Column::PreferredUsername.eq(user))
|
||||
.filter(model::actor::Column::Domain.eq(domain))
|
||||
.select_only()
|
||||
.select_column(model::actor::Column::Id)
|
||||
.into_tuple::<String>()
|
||||
.one(self.db())
|
||||
.await
|
||||
{
|
||||
tmp = tmp.replacen(full, &format!("<a href=\"{uid}\" class=\"u-url mention\">@{user}</a>"), 1);
|
||||
}
|
||||
}
|
||||
content = Some(tmp);
|
||||
}
|
||||
|
||||
self.insert_object(
|
||||
object
|
||||
.set_id(Some(&oid))
|
||||
|
@ -421,3 +462,5 @@ impl apb::server::Outbox for Context {
|
|||
Ok(aid)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
63
upub/worker/src/outbound.rs
Normal file
63
upub/worker/src/outbound.rs
Normal file
|
@ -0,0 +1,63 @@
|
|||
use sea_orm::EntityTrait;
|
||||
use reqwest::Method;
|
||||
|
||||
use apb::{LD, Node, ActivityMut};
|
||||
use upub::{Context, model, traits::Fetcher};
|
||||
|
||||
pub async fn process(ctx: Context, job: &model::job::Model) -> crate::JobResult<()> {
|
||||
tracing::info!("delivering {} to {:?}", job.activity, job.target);
|
||||
|
||||
let payload = match model::activity::Entity::find_by_ap_id(&job.activity)
|
||||
.find_also_related(model::object::Entity)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
{
|
||||
Some((activity, None)) => activity.ap().ld_context(),
|
||||
Some((activity, Some(object))) => {
|
||||
let always_embed = matches!(
|
||||
activity.activity_type,
|
||||
apb::ActivityType::Create
|
||||
| apb::ActivityType::Undo
|
||||
| apb::ActivityType::Update
|
||||
| apb::ActivityType::Accept(_)
|
||||
| apb::ActivityType::Reject(_)
|
||||
);
|
||||
if always_embed {
|
||||
activity.ap().set_object(Node::object(object.ap())).ld_context()
|
||||
} else {
|
||||
activity.ap().ld_context()
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::info!("skipping dispatch for deleted object {}", job.activity);
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
let Some(actor) = model::actor::Entity::find_by_ap_id(&job.actor)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
else {
|
||||
tracing::error!("abandoning delivery from non existant actor {}: {job:#?}", job.actor);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some(key) = actor.private_key
|
||||
else {
|
||||
tracing::error!("abandoning delivery from actor without private key {}: {job:#?}", job.actor);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Err(e) = Context::request(
|
||||
Method::POST, job.target.as_deref().unwrap_or(""),
|
||||
Some(&serde_json::to_string(&payload).unwrap()),
|
||||
&job.actor, &key, ctx.domain()
|
||||
).await {
|
||||
tracing::warn!("failed delivery of {} to {:?} : {e}", job.activity, job.target);
|
||||
model::job::Entity::insert(job.clone().repeat())
|
||||
.exec(ctx.db())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue