chore: helpers for internal ids, fix routes and ctx

basically just need to do inbox/outbox? then there's still some issues
with relays relations and auth extra selects but may actually work again
This commit is contained in:
əlemi 2024-05-25 07:00:03 +02:00
parent b09cfd0526
commit 322b18e9cd
Signed by: alemi
GPG key ID: A4895B84D311642C
22 changed files with 212 additions and 139 deletions

View file

@ -21,7 +21,7 @@ pub enum Actors {
StatusesCount,
PublicKey,
PrivateKey,
Created,
Published,
Updated,
}
@ -159,7 +159,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Actors::StatusesCount).integer().not_null().default(0))
.col(ColumnDef::new(Actors::PublicKey).string().not_null())
.col(ColumnDef::new(Actors::PrivateKey).string().null())
.col(ColumnDef::new(Actors::Created).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Actors::Published).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Actors::Updated).date_time().not_null().default(Expr::current_timestamp()))
.to_owned()
)
@ -174,7 +174,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.create_index(Index::create().name("index-actors-instance").table(Actors::Table).col(Actors::Instance).to_owned())
.create_index(Index::create().name("index-actors-domain").table(Actors::Table).col(Actors::Domain).to_owned())
.await?;
@ -321,7 +321,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_index(Index::drop().name("index-actors-instance").table(Actors::Table).to_owned())
.drop_index(Index::drop().name("index-actors-domain").table(Actors::Table).to_owned())
.await?;

View file

@ -136,7 +136,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.create_index(Index::create().name("index-likes-object").table(Likes::Table).col(Likes::Likes).to_owned())
.create_index(Index::create().name("index-likes-object").table(Likes::Table).col(Likes::Object).to_owned())
.await?;
manager
@ -190,7 +190,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.create_index(Index::create().name("index-announces-object").table(Announces::Table).col(Announces::Announces).to_owned())
.create_index(Index::create().name("index-announces-object").table(Announces::Table).col(Announces::Object).to_owned())
.await?;
Ok(())

View file

@ -20,7 +20,7 @@ pub enum Deliveries {
Actor,
Target,
Activity,
Created,
Published,
NotBefore,
Attempt,
}
@ -141,7 +141,7 @@ impl MigrationTrait for Migration {
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade)
)
.col(ColumnDef::new(Deliveries::Created).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Deliveries::Published).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Deliveries::NotBefore).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Deliveries::Attempt).integer().not_null().default(0))
.to_owned()

View file

@ -11,7 +11,7 @@ pub enum Attachments {
Object,
Name,
MediaType,
Created,
Published,
}
#[derive(DeriveIden)]
@ -63,7 +63,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Attachments::DocumentType).string().not_null())
.col(ColumnDef::new(Attachments::Name).string().null())
.col(ColumnDef::new(Attachments::MediaType).string().not_null())
.col(ColumnDef::new(Attachments::Created).date_time().not_null().default(Expr::current_timestamp()))
.col(ColumnDef::new(Attachments::Published).date_time().not_null().default(Expr::current_timestamp()))
.to_owned()
)
.await?;

View file

@ -29,7 +29,7 @@ pub struct Model {
pub statuses_count: i32,
pub public_key: String,
pub private_key: Option<String>,
pub created: ChronoDateTimeUtc,
pub published: ChronoDateTimeUtc,
pub updated: ChronoDateTimeUtc,
}
@ -169,7 +169,7 @@ impl ActiveModel {
shared_inbox: sea_orm::ActiveValue::Set(object.endpoints().get().and_then(|x| Some(x.shared_inbox()?.to_string()))),
followers: sea_orm::ActiveValue::Set(object.followers().id()),
following: sea_orm::ActiveValue::Set(object.following().id()),
created: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())),
published: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())),
updated: sea_orm::ActiveValue::Set(chrono::Utc::now()),
following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i32),
followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i32),
@ -197,7 +197,7 @@ impl Model {
.set_document_type(Some(apb::DocumentType::Image))
.set_url(apb::Node::link(i.clone()))
)))
.set_published(Some(self.created))
.set_published(Some(self.published))
.set_preferred_username(Some(&self.preferred_username))
.set_statuses_count(Some(self.statuses_count as u64))
.set_followers_count(Some(self.followers_count as u64))

View file

@ -106,6 +106,15 @@ impl Event {
}
}
pub fn internal(&self) -> i64 {
match self {
Event::Tombstone => 0,
Event::Activity(x) => x.internal,
Event::StrayObject { object, liked: _ } => object.internal,
Event::DeepActivity { activity: _, liked: _, object } => object.internal,
}
}
pub fn ap(self, attachment: Option<Vec<crate::model::attachment::Model>>) -> serde_json::Value {
let attachment = match attachment {
None => apb::Node::Empty,

View file

@ -16,7 +16,7 @@ pub struct Model {
pub document_type: DocumentType,
pub name: Option<String>,
pub media_type: String,
pub created: ChronoDateTimeUtc,
pub published: ChronoDateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -46,7 +46,7 @@ impl Model {
.set_document_type(Some(self.document_type))
.set_media_type(Some(&self.media_type))
.set_name(self.name.as_deref())
.set_published(Some(self.created))
.set_published(Some(self.published))
}
}

View file

@ -8,7 +8,7 @@ pub struct Model {
pub actor: String,
pub target: String,
pub activity: String,
pub created: ChronoDateTimeUtc,
pub published: ChronoDateTimeUtc,
pub not_before: ChronoDateTimeUtc,
pub attempt: i32,
}
@ -61,6 +61,6 @@ impl Model {
}
pub fn expired(&self) -> bool {
chrono::Utc::now() - self.created > chrono::Duration::days(7)
chrono::Utc::now() - self.published > chrono::Duration::days(7)
}
}

View file

@ -27,7 +27,7 @@ pub async fn view(
.ok_or_else(UpubError::not_found)?;
let mut attachments = row.load_attachments_batch(ctx.db()).await?;
let attach = attachments.remove(row.id());
let attach = attachments.remove(&row.internal());
Ok(JsonLD(row.ap(attach).ld_context()))
}

View file

@ -2,7 +2,7 @@ use apb::{ActorMut, BaseMut, ObjectMut, PublicKeyMut};
use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Redirect, Response}, Form, Json};
use reqwest::Method;
use crate::{errors::UpubError, server::{auth::{AuthIdentity, Identity}, fetcher::Fetcher, Context}, url};
use crate::{errors::UpubError, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url};
use super::{jsonld::LD, JsonLD};
@ -26,7 +26,7 @@ pub async fn view(
.set_summary(Some(&ctx.cfg().instance.description))
.set_inbox(apb::Node::link(url!(ctx, "/inbox")))
.set_outbox(apb::Node::link(url!(ctx, "/outbox")))
.set_published(Some(ctx.app().created))
.set_published(Some(ctx.app().published))
.set_endpoints(apb::Node::Empty)
.set_preferred_username(Some(ctx.domain()))
.set_public_key(apb::Node::object(
@ -50,7 +50,7 @@ pub async fn proxy_get(
AuthIdentity(auth): AuthIdentity,
) -> crate::Result<Json<serde_json::Value>> {
// only local users can request fetches
if !ctx.cfg().security.allow_public_debugger && !matches!(auth, Identity::Local(_)) {
if !ctx.cfg().security.allow_public_debugger && !auth.is_local() {
return Err(UpubError::unauthorized());
}
Ok(Json(
@ -59,7 +59,7 @@ pub async fn proxy_get(
&query.id,
None,
ctx.base(),
&ctx.app().private_key,
ctx.pkey(),
&format!("{}+proxy", ctx.domain()),
)
.await?
@ -74,7 +74,7 @@ pub async fn proxy_form(
Form(query): Form<FetchPath>,
) -> crate::Result<Json<serde_json::Value>> {
// only local users can request fetches
if !ctx.cfg().security.allow_public_debugger && !matches!(auth, Identity::Local(_)) {
if !ctx.cfg().security.allow_public_debugger && auth.is_local() {
return Err(UpubError::unauthorized());
}
Ok(Json(
@ -83,7 +83,7 @@ pub async fn proxy_form(
&query.id,
None,
ctx.base(),
&ctx.app().private_key,
ctx.pkey(),
&format!("{}+proxy", ctx.domain()),
)
.await?

View file

@ -41,7 +41,7 @@ pub async fn post(
AuthIdentity(auth): AuthIdentity,
Json(activity): Json<serde_json::Value>
) -> crate::Result<()> {
let Identity::Remote(server) = auth else {
let Identity::Remote { domain: server, .. } = auth else {
if activity.activity_type() == Some(ActivityType::Delete) {
// this is spammy af, ignore them!
// we basically received a delete for a user we can't fetch and verify, meaning remote
@ -63,8 +63,7 @@ pub async fn post(
return Err(UpubError::bad_request());
};
// TODO add whitelist of relays
if !server.ends_with(&Context::server(&actor)) {
if !(server == Context::server(&actor)) {
return Err(UpubError::unauthorized());
}

View file

@ -10,8 +10,8 @@ pub async fn get(
) -> crate::Result<JsonLD<serde_json::Value>> {
match auth {
Identity::Anonymous => Err(StatusCode::FORBIDDEN.into()),
Identity::Remote(_) => Err(StatusCode::FORBIDDEN.into()),
Identity::Local(user) => if ctx.uid(&id) == user {
Identity::Remote { .. } => Err(StatusCode::FORBIDDEN.into()),
Identity::Local { id: user, .. } => if ctx.uid(&id) == user {
crate::server::builders::collection(&url!(ctx, "/users/{id}/inbox"), None)
} else {
Err(StatusCode::FORBIDDEN.into())
@ -25,7 +25,7 @@ pub async fn page(
AuthIdentity(auth): AuthIdentity,
Query(page): Query<Pagination>,
) -> crate::Result<JsonLD<serde_json::Value>> {
let Identity::Local(uid) = &auth else {
let Identity::Local { id: uid, .. } = &auth else {
// local inbox is only for local users
return Err(UpubError::forbidden());
};

View file

@ -42,8 +42,8 @@ pub async fn post(
) -> Result<CreationResult, UpubError> {
match auth {
Identity::Anonymous => Err(StatusCode::UNAUTHORIZED.into()),
Identity::Remote(_) => Err(StatusCode::NOT_IMPLEMENTED.into()),
Identity::Local(uid) => if ctx.uid(&id) == uid {
Identity::Remote { .. } => Err(StatusCode::NOT_IMPLEMENTED.into()),
Identity::Local { id: uid, .. } => if ctx.uid(&id) == uid {
tracing::debug!("processing new local activity: {}", serde_json::to_string(&activity).unwrap_or_default());
match activity.base_type() {
None => Err(StatusCode::BAD_REQUEST.into()),

View file

@ -20,7 +20,7 @@ pub async fn view(
acct: x.preferred_username.clone(),
avatar: x.icon.as_deref().unwrap_or("").to_string(),
avatar_static: x.icon.unwrap_or_default(),
created_at: time::OffsetDateTime::from_unix_timestamp(x.created.timestamp()).unwrap(),
created_at: time::OffsetDateTime::from_unix_timestamp(x.published.timestamp()).unwrap(),
display_name: x.name.unwrap_or_default(),
// TODO hide these maybe
followers_count: x.followers_count as u64,

View file

@ -28,7 +28,7 @@ impl Administrable for super::Context {
let ap_id = self.uid(&username);
let db = self.db();
let domain = self.domain().to_string();
let user_model = crate::model::actor::Model {
let user_model = crate::model::actor::ActiveModel {
internal: NotSet,
id: Set(ap_id.clone()),
name: Set(display_name),
@ -46,7 +46,7 @@ impl Administrable for super::Context {
shared_inbox: Set(None),
outbox: Set(None),
actor_type: Set(apb::ActorType::Person),
created: Set(chrono::Utc::now()),
published: Set(chrono::Utc::now()),
updated: Set(chrono::Utc::now()),
private_key: Set(Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string())),
public_key: Set(std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string()),

View file

@ -9,8 +9,14 @@ use super::{fetcher::Fetcher, httpsign::HttpSignature};
#[derive(Debug, Clone)]
pub enum Identity {
Anonymous,
Local(String),
Remote(String),
Remote {
domain: String,
internal: i64,
},
Local {
id: String,
internal: i64,
},
}
impl Identity {
@ -18,18 +24,18 @@ impl Identity {
let base_cond = Condition::any().add(model::addressing::Column::Actor.eq(apb::target::PUBLIC));
match self {
Identity::Anonymous => base_cond,
Identity::Remote(server) => base_cond.add(model::addressing::Column::Server.eq(server)),
Identity::Remote { internal, .. } => base_cond.add(model::addressing::Column::Instance.eq(*internal)),
// TODO should we allow all users on same server to see? or just specific user??
Identity::Local(uid) => base_cond
.add(model::addressing::Column::Actor.eq(uid))
.add(model::activity::Column::Actor.eq(uid))
.add(model::object::Column::AttributedTo.eq(uid)),
Identity::Local { id, internal } => base_cond
.add(model::addressing::Column::Actor.eq(*internal))
.add(model::activity::Column::Actor.eq(id))
.add(model::object::Column::AttributedTo.eq(id)),
}
}
pub fn my_id(&self) -> Option<&str> {
pub fn my_id(&self) -> Option<i64> {
match self {
Identity::Local(x) => Some(x.as_str()),
Identity::Local { internal, .. } => Some(*internal),
_ => None,
}
}
@ -37,8 +43,8 @@ impl Identity {
pub fn is(&self, id: &str) -> bool {
match self {
Identity::Anonymous => false,
Identity::Remote(_) => false, // TODO per-actor server auth should check this
Identity::Local(uid) => uid.as_str() == id
Identity::Remote { .. } => false, // TODO per-actor server auth should check this
Identity::Local { id, .. } => id.as_str() == id
}
}
@ -47,23 +53,16 @@ impl Identity {
}
pub fn is_local(&self) -> bool {
matches!(self, Self::Local(_))
matches!(self, Self::Local { .. })
}
pub fn is_remote(&self) -> bool {
matches!(self, Self::Remote(_))
matches!(self, Self::Remote { .. })
}
pub fn is_local_user(&self, uid: &str) -> bool {
match self {
Self::Local(x) => x == uid,
_ => false,
}
}
pub fn is_remote_server(&self, uid: &str) -> bool {
match self {
Self::Remote(x) => x == uid,
Self::Local { id, .. } => id == uid,
_ => false,
}
}
@ -90,13 +89,19 @@ where
.unwrap_or("");
if auth_header.starts_with("Bearer ") {
match model::session::Entity::find_by_id(auth_header.replace("Bearer ", ""))
match model::session::Entity::find()
.filter(model::session::Column::Secret.eq(auth_header.replace("Bearer ", "")))
.filter(model::session::Column::Expires.gt(chrono::Utc::now()))
.one(ctx.db())
.await
{
Ok(Some(x)) => identity = Identity::Local(x.actor),
Ok(None) => return Err(UpubError::unauthorized()),
Ok(Some(x)) => {
// TODO could we store both actor ap id and internal id in session? to avoid this extra
// lookup on *every* local authed request we receive...
let internal = model::actor::Entity::ap_to_internal(&x.actor, ctx.db()).await?;
identity = Identity::Local { id: x.actor, internal };
},
Err(e) => {
tracing::error!("failed querying user session: {e}");
return Err(UpubError::internal_server_error())
@ -122,7 +127,13 @@ where
.build_from_parts(parts)
.verify(&user.public_key)
{
Ok(true) => identity = Identity::Remote(Context::server(&user_id)),
Ok(true) => {
// TODO can we avoid this extra db rountrip made on each server fetch?
let domain = Context::server(&user_id);
// TODO this will fail because we never fetch and insert into instance oops
let internal = model::instance::Entity::domain_to_internal(&domain, ctx.db()).await?;
identity = Identity::Remote { domain, internal };
},
Ok(false) => tracing::warn!("invalid signature: {http_signature:?}"),
Err(e) => tracing::error!("error verifying signature: {e}"),
},

View file

@ -8,7 +8,7 @@ pub async fn paginate(
filter: Condition,
db: &DatabaseConnection,
page: Pagination,
my_id: Option<&str>,
my_id: Option<i64>,
) -> crate::Result<JsonLD<serde_json::Value>> {
let limit = page.batch.unwrap_or(20).min(50);
let offset = page.offset.unwrap_or(0);
@ -27,7 +27,7 @@ pub async fn paginate(
let items : Vec<serde_json::Value> = items
.into_iter()
.map(|item| {
let attach = attachments.remove(item.id());
let attach = attachments.remove(&item.internal());
item.ap(attach)
})
.collect();

View file

@ -1,9 +1,9 @@
use std::{collections::BTreeSet, sync::Arc};
use openssl::rsa::Rsa;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, RelationTrait, SelectColumns, Set};
use crate::{config::Config, model, server::fetcher::Fetcher};
use crate::{config::Config, errors::UpubError, model, server::fetcher::Fetcher};
use uriproxy::UriClass;
use super::dispatcher::Dispatcher;
@ -19,7 +19,8 @@ struct ContextInner {
base_url: String,
dispatcher: Dispatcher,
// TODO keep these pre-parsed
app: model::application::Model,
app: model::actor::Model,
pkey: String,
relays: BTreeSet<String>,
}
@ -46,44 +47,73 @@ impl Context {
for _ in 0..1 { // TODO customize delivery workers amount
dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!!
}
let app = match model::application::Entity::find().one(&db).await? {
let base_url = format!("{}{}", protocol, domain);
let app = match model::actor::Entity::find_by_ap_id(&base_url).one(&db).await? {
Some(model) => model,
None => {
tracing::info!("generating application keys");
let rsa = Rsa::generate(2048)?;
let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string();
let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string();
let system = model::application::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
private_key: sea_orm::ActiveValue::Set(privk.clone()),
public_key: sea_orm::ActiveValue::Set(pubk.clone()),
created: sea_orm::ActiveValue::Set(chrono::Utc::now()),
let system = model::actor::ActiveModel {
internal: NotSet,
id: NotSet,
domain: Set(domain.clone()),
preferred_username: Set(domain.clone()),
actor_type: Set(apb::ActorType::Application),
private_key: Set(Some(privk)),
public_key: Set(pubk),
following: Set(None),
following_count: Set(0),
followers: Set(None),
followers_count: Set(0),
statuses_count: Set(0),
summary: Set(Some("micro social network, federated".to_string())),
name: Set(Some("μpub".to_string())),
image: Set(None),
icon: Set(Some("https://cdn.alemi.dev/social/circle-square.png".to_string())),
inbox: Set(Some(format!("{base_url}/inbox"))),
shared_inbox: Set(Some(format!("{base_url}/inbox"))),
outbox: Set(Some(format!("{base_url}/outbox"))),
published: Set(chrono::Utc::now()),
updated: Set(chrono::Utc::now()),
};
model::application::Entity::insert(system).exec(&db).await?;
model::actor::Entity::insert(system).exec(&db).await?;
// sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time
model::application::Entity::find().one(&db).await?.expect("could not find app config just inserted")
model::actor::Entity::find().one(&db).await?.expect("could not find app config just inserted")
}
};
let relays = model::relay::Entity::find()
.select_only()
.select_column(model::relay::Column::Id)
.filter(model::relay::Column::Accepted.eq(true))
.into_tuple::<String>()
.all(&db)
.await?;
// TODO maybe we could provide a more descriptive error...
let pkey = app.private_key.as_deref().ok_or_else(UpubError::internal_server_error)?.to_string();
// TODO how to handle relations when there's two to the same model???
// let relays = model::relation::Entity::find()
// .filter(model::relation::Column::Following.eq(app.internal))
// .filter(model::relation::Column::Accept.is_not_null())
// .left_join(model::relation::Relation::ActorsFollower.def())
// .select_only()
// .select_column(model::actor::Column::Id)
// .into_tuple::<String>()
// .all(&db)
// .await?;
let relays = Vec::new();
Ok(Context(Arc::new(ContextInner {
base_url: format!("{}{}", protocol, domain),
db, domain, protocol, app, dispatcher, config,
base_url, db, domain, protocol, app, dispatcher, config, pkey,
relays: BTreeSet::from_iter(relays.into_iter()),
})))
}
pub fn app(&self) -> &model::application::Model {
pub fn app(&self) -> &model::actor::Model {
&self.0.app
}
pub fn pkey(&self) -> &str {
&self.0.pkey
}
pub fn db(&self) -> &DatabaseConnection {
&self.0.db
}
@ -150,6 +180,18 @@ impl Context {
id.starts_with(self.base())
}
pub async fn is_local_internal_object(&self, internal: i64) -> crate::Result<bool> {
Ok(model::object::Entity::find_by_id(internal).select_only().one(self.db()).await?.is_some())
}
pub async fn is_local_internal_activity(&self, internal: i64) -> crate::Result<bool> {
Ok(model::activity::Entity::find_by_id(internal).select_only().one(self.db()).await?.is_some())
}
pub async fn is_local_internal_actor(&self, internal: i64) -> crate::Result<bool> {
Ok(model::actor::Entity::find_by_id(internal).select_only().one(self.db()).await?.is_some())
}
pub async fn expand_addressing(&self, targets: Vec<String>) -> crate::Result<Vec<String>> {
let mut out = Vec::new();
for target in targets {
@ -171,26 +213,38 @@ impl Context {
Ok(out)
}
pub async fn address_to(&self, aid: Option<&str>, oid: Option<&str>, targets: &[String]) -> crate::Result<()> {
let local_activity = aid.map(|x| self.is_local(x)).unwrap_or(false);
let local_object = oid.map(|x| self.is_local(x)).unwrap_or(false);
let addressings : Vec<model::addressing::ActiveModel> = targets
pub async fn address_to(&self, aid: Option<i64>, oid: Option<i64>, targets: &[String]) -> crate::Result<()> {
// TODO address_to became kind of expensive, with these two selects right away and then another
// select for each target we're addressing to... can this be improved??
let local_activity = if let Some(x) = aid { self.is_local_internal_activity(x).await? } else { false };
let local_object = if let Some(x) = oid { self.is_local_internal_object(x).await? } else { false };
let mut addressing = Vec::new();
for target in targets
.iter()
.filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to))
.map(|to| model::addressing::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
server: Set(Context::server(to)),
actor: Set(to.to_string()),
activity: Set(aid.map(|x| x.to_string())),
object: Set(oid.map(|x| x.to_string())),
published: Set(chrono::Utc::now()),
})
.collect();
{
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
(
Some(model::instance::Entity::domain_to_internal(&Context::server(&target), self.db()).await?),
Some(model::actor::Entity::ap_to_internal(&target, self.db()).await?),
)
};
addressing.push(
model::addressing::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
instance: Set(server),
actor: Set(actor),
activity: Set(aid),
object: Set(oid),
published: Set(chrono::Utc::now()),
}
);
}
if !addressings.is_empty() {
model::addressing::Entity::insert_many(addressings)
if !addressing.is_empty() {
model::addressing::Entity::insert_many(addressing)
.exec(self.db())
.await?;
}
@ -207,15 +261,15 @@ impl Context {
{
// TODO fetch concurrently
match self.fetch_user(target).await {
Ok(model::user::Model { inbox: Some(inbox), .. }) => deliveries.push(
Ok(model::actor::Model { inbox: Some(inbox), .. }) => deliveries.push(
model::delivery::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
internal: sea_orm::ActiveValue::NotSet,
actor: Set(from.to_string()),
// TODO we should resolve each user by id and check its inbox because we can't assume
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
target: Set(inbox),
activity: Set(aid.to_string()),
created: Set(chrono::Utc::now()),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
attempt: Set(0),
}
@ -238,7 +292,9 @@ impl Context {
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(activity_targets).await?;
self.address_to(Some(aid), oid, &addressed).await?;
let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?;
let internal_oid = if let Some(o) = oid { Some(model::object::Entity::ap_to_internal(o, self.db()).await?) } else { None };
self.address_to(Some(internal_aid), internal_oid, &addressed).await?;
self.deliver_to(aid, uid, &addressed).await?;
Ok(())
}

View file

@ -125,7 +125,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker
actor: sea_orm::ActiveValue::Set(delivery.actor),
target: sea_orm::ActiveValue::Set(delivery.target),
activity: sea_orm::ActiveValue::Set(delivery.activity),
created: sea_orm::ActiveValue::Set(delivery.created),
published: sea_orm::ActiveValue::Set(delivery.published),
attempt: sea_orm::ActiveValue::Set(delivery.attempt + 1),
};
model::delivery::Entity::insert(new_delivery).exec(db).await?;

View file

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Link, Object};
use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Object};
use base64::Engine;
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::EntityTrait;
@ -139,14 +139,14 @@ impl Fetcher for Context {
async fn pull_user(&self, id: &str) -> crate::Result<serde_json::Value> {
let mut user = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await?.json::<serde_json::Value>().await?;
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs
if let Some(followers_url) = &user.followers().id() {
let req = Self::request(
Method::GET, followers_url, None,
&format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await;
if let Ok(res) = req {
if let Ok(user_followers) = res.json::<serde_json::Value>().await {
@ -160,7 +160,7 @@ impl Fetcher for Context {
if let Some(following_url) = &user.following().id() {
let req = Self::request(
Method::GET, following_url, None,
&format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
&format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await;
if let Ok(res) = req {
if let Ok(user_following) = res.json::<serde_json::Value>().await {
@ -191,14 +191,14 @@ impl Fetcher for Context {
let addressed = activity.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(&activity.id), None, &expanded_addresses).await?;
self.address_to(Some(activity.internal), None, &expanded_addresses).await?;
Ok(activity)
}
async fn pull_activity(&self, id: &str) -> crate::Result<serde_json::Value> {
let activity = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(),
).await?.json::<serde_json::Value>().await?;
if let Some(activity_actor) = activity.actor().id() {
@ -228,7 +228,7 @@ impl Fetcher for Context {
async fn pull_object(&self, id: &str) -> crate::Result<serde_json::Value> {
Ok(
Context::request(
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
Method::GET, id, None, &format!("https://{}", self.domain()), self.pkey(), self.domain(),
)
.await?
.json::<serde_json::Value>()
@ -244,7 +244,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
}
let object = Context::request(
Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
Method::GET, id, None, &format!("https://{}", ctx.domain()), ctx.pkey(), ctx.domain(),
).await?.json::<serde_json::Value>().await?;
if let Some(oid) = object.id() {
@ -274,7 +274,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
let object_model = ctx.insert_object(object, None).await?;
let expanded_addresses = ctx.expand_addressing(addressed).await?;
ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?;
ctx.address_to(None, Some(object_model.internal), &expanded_addresses).await?;
Ok(object_model)
}
@ -288,9 +288,7 @@ pub trait Fetchable : Sync + Send {
impl Fetchable for apb::Node<serde_json::Value> {
async fn fetch(&mut self, ctx: &crate::server::Context) -> crate::Result<&mut Self> {
if let apb::Node::Link(uri) = self {
let from = format!("{}{}", ctx.protocol(), ctx.domain()); // TODO helper to avoid this?
let pkey = &ctx.app().private_key;
*self = Context::request(Method::GET, uri.href(), None, &from, pkey, ctx.domain())
*self = Context::request(Method::GET, uri.href(), None, ctx.base(), ctx.pkey(), ctx.domain())
.await?
.json::<serde_json::Value>()
.await?

View file

@ -83,7 +83,7 @@ impl Normalizer for super::Context {
document_type: Set(apb::DocumentType::Page),
name: Set(l.link_name().map(|x| x.to_string())),
media_type: Set(l.link_media_type().unwrap_or("link").to_string()),
created: Set(chrono::Utc::now()),
published: Set(chrono::Utc::now()),
},
Node::Object(o) => model::attachment::ActiveModel {
internal: sea_orm::ActiveValue::NotSet,
@ -92,7 +92,7 @@ impl Normalizer for super::Context {
document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
name: Set(o.name().map(|x| x.to_string())),
media_type: Set(o.media_type().unwrap_or("link").to_string()),
created: Set(o.published().unwrap_or_else(chrono::Utc::now)),
published: Set(o.published().unwrap_or_else(chrono::Utc::now)),
},
};
model::attachment::Entity::insert(attachment_model)
@ -120,7 +120,7 @@ impl Normalizer for super::Context {
document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
name: Set(img.name().map(|x| x.to_string())),
media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()),
created: Set(img.published().unwrap_or_else(chrono::Utc::now)),
published: Set(img.published().unwrap_or_else(chrono::Utc::now)),
};
model::attachment::Entity::insert(attachment_model)
.exec(self.db())

View file

@ -1,6 +1,6 @@
use apb::{target::Addressed, Activity, ActivityMut, ActorMut, BaseMut, Node, Object, ObjectMut, PublicKeyMut};
use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set};
use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns};
use crate::{errors::UpubError, model, routes::activitypub::jsonld::LD};
@ -25,11 +25,11 @@ impl apb::server::Outbox for Context {
if let Some(c) = content {
let mut tmp = mdhtml::safe_markdown(&c);
for (full, [user, domain]) in re.captures_iter(&tmp.clone()).map(|x| x.extract()) {
if let Ok(Some(uid)) = model::user::Entity::find()
.filter(model::user::Column::PreferredUsername.eq(user))
.filter(model::user::Column::Domain.eq(domain))
if let Ok(Some(uid)) = model::actor::Entity::find()
.filter(model::actor::Column::PreferredUsername.eq(user))
.filter(model::actor::Column::Domain.eq(domain))
.select_only()
.select_column(model::user::Column::Id)
.select_column(model::actor::Column::Id)
.into_tuple::<String>()
.one(self.db())
.await
@ -178,12 +178,12 @@ impl apb::server::Outbox for Context {
match accepted_activity.activity_type {
apb::ActivityType::Follow => {
model::user::Entity::update_many()
model::actor::Entity::update_many()
.col_expr(
model::user::Column::FollowersCount,
Expr::col(model::user::Column::FollowersCount).add(1)
model::actor::Column::FollowersCount,
Expr::col(model::actor::Column::FollowersCount).add(1)
)
.filter(model::user::Column::Id.eq(&uid))
.filter(model::actor::Column::Id.eq(&uid))
.exec(self.db())
.await?;
model::relation::Entity::insert(
@ -303,7 +303,7 @@ impl apb::server::Outbox for Context {
match object_node.object_type() {
Some(apb::ObjectType::Actor(_)) => {
let mut actor_model = model::user::Model::new(
let mut actor_model = model::actor::Model::new(
&object_node
// TODO must set these, but we will ignore them
.set_actor_type(Some(apb::ActorType::Person))
@ -311,7 +311,7 @@ impl apb::server::Outbox for Context {
serde_json::Value::new_object().set_public_key_pem("")
))
)?;
let old_actor_model = model::user::Entity::find_by_id(&actor_model.id)
let old_actor_model = model::actor::Entity::find_by_id(&actor_model.id)
.one(self.db())
.await?
.ok_or_else(UpubError::not_found)?;
@ -328,12 +328,12 @@ impl apb::server::Outbox for Context {
let mut update_model = actor_model.into_active_model();
update_model.updated = sea_orm::Set(chrono::Utc::now());
update_model.reset(model::user::Column::Name);
update_model.reset(model::user::Column::Summary);
update_model.reset(model::user::Column::Image);
update_model.reset(model::user::Column::Icon);
update_model.reset(model::actor::Column::Name);
update_model.reset(model::actor::Column::Summary);
update_model.reset(model::actor::Column::Image);
update_model.reset(model::actor::Column::Icon);
model::user::Entity::update(update_model)
model::actor::Entity::update(update_model)
.exec(self.db()).await?;
},
Some(apb::ObjectType::Note) => {
@ -398,17 +398,17 @@ impl apb::server::Outbox for Context {
.set_actor(Node::link(uid.clone()))
)?;
let share_model = model::share::ActiveModel {
let share_model = model::announce::ActiveModel {
internal: NotSet,
actor: Set(uid.clone()),
shares: Set(oid.clone()),
date: Set(chrono::Utc::now()),
..Default::default()
object: Set(oid.clone()),
published: Set(chrono::Utc::now()),
};
model::share::Entity::insert(share_model).exec(self.db()).await?;
model::announce::Entity::insert(share_model).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Shares, Expr::col(model::object::Column::Shares).add(1))
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
.filter(model::object::Column::Id.eq(oid))
.exec(self.db())
.await?;