chore: updated models and some server components

This commit is contained in:
əlemi 2024-05-25 05:31:10 +02:00
parent 94ec7d0d37
commit b09cfd0526
Signed by: alemi
GPG key ID: A4895B84D311642C
15 changed files with 338 additions and 278 deletions

View file

@ -1,7 +1,7 @@
use apb::{ActivityMut, ActivityType, BaseMut, ObjectMut};
use sea_orm::entity::prelude::*;
use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns};
use crate::routes::activitypub::jsonld::LD;
use crate::{model::Audience, errors::UpubError, routes::activitypub::jsonld::LD};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "activities")]
@ -14,10 +14,10 @@ pub struct Model {
pub actor: String,
pub object: Option<String>,
pub target: Option<String>,
pub to: Option<Json>,
pub bto: Option<Json>,
pub cc: Option<Json>,
pub bcc: Option<Json>,
pub to: Audience,
pub bto: Audience,
pub cc: Audience,
pub bcc: Audience,
pub published: ChronoDateTimeUtc,
}
@ -71,11 +71,28 @@ impl Related<super::object::Entity> for Entity {
impl ActiveModelBehavior for ActiveModel {}
impl Entity {
pub fn find_by_ap_id(id: &str) -> Select<Entity> {
Entity::find().filter(Column::Id.eq(id))
}
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result<i64> {
Entity::find()
.filter(Column::Id.eq(id))
.select_only()
.select_column(Column::Internal)
.into_tuple::<i64>()
.one(db)
.await?
.ok_or_else(UpubError::not_found)
}
}
impl ActiveModel {
pub fn new(activity: &impl apb::Activity) -> Result<Self, super::FieldError> {
Ok(ActiveModel {
id: sea_orm::ActiveValue::NotSet,
ap_id: sea_orm::ActiveValue::Set(activity.id().ok_or(super::FieldError("id"))?.to_string()),
internal: sea_orm::ActiveValue::NotSet,
id: sea_orm::ActiveValue::Set(activity.id().ok_or(super::FieldError("id"))?.to_string()),
activity_type: sea_orm::ActiveValue::Set(activity.activity_type().ok_or(super::FieldError("type"))?),
actor: sea_orm::ActiveValue::Set(activity.actor().id().ok_or(super::FieldError("actor"))?),
object: sea_orm::ActiveValue::Set(activity.object().id()),

View file

@ -1,8 +1,8 @@
use sea_orm::entity::prelude::*;
use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns};
use apb::{Actor, ActorMut, ActorType, BaseMut, DocumentMut, Endpoints, EndpointsMut, Object, ObjectMut, PublicKey, PublicKeyMut};
use crate::routes::activitypub::jsonld::LD;
use crate::{errors::UpubError, routes::activitypub::jsonld::LD};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "actors")]
@ -133,14 +133,31 @@ impl Related<super::session::Entity> for Entity {
impl ActiveModelBehavior for ActiveModel {}
impl Entity {
pub fn find_by_ap_id(id: &str) -> Select<Entity> {
Entity::find().filter(Column::Id.eq(id))
}
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result<i64> {
Entity::find()
.filter(Column::Id.eq(id))
.select_only()
.select_column(Column::Internal)
.into_tuple::<i64>()
.one(db)
.await?
.ok_or_else(UpubError::not_found)
}
}
impl ActiveModel {
pub fn new(object: &impl Actor, instance: i32) -> Result<Self, super::FieldError> {
pub fn new(object: &impl Actor) -> Result<Self, super::FieldError> {
let ap_id = object.id().ok_or(super::FieldError("id"))?.to_string();
let (_domain, fallback_preferred_username) = split_user_id(&ap_id);
let (domain, fallback_preferred_username) = split_user_id(&ap_id);
Ok(ActiveModel {
instance: sea_orm::ActiveValue::Set(instance), // TODO receiving it from outside is cheap
id: sea_orm::ActiveValue::NotSet,
ap_id: sea_orm::ActiveValue::Set(ap_id),
internal: sea_orm::ActiveValue::NotSet,
domain: sea_orm::ActiveValue::Set(domain),
id: sea_orm::ActiveValue::Set(ap_id),
preferred_username: sea_orm::ActiveValue::Set(object.preferred_username().unwrap_or(&fallback_preferred_username).to_string()),
actor_type: sea_orm::ActiveValue::Set(object.actor_type().ok_or(super::FieldError("type"))?),
name: sea_orm::ActiveValue::Set(object.name().map(|x| x.to_string())),
@ -154,9 +171,9 @@ impl ActiveModel {
following: sea_orm::ActiveValue::Set(object.following().id()),
created: sea_orm::ActiveValue::Set(object.published().unwrap_or(chrono::Utc::now())),
updated: sea_orm::ActiveValue::Set(chrono::Utc::now()),
following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i64),
followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i64),
statuses_count: sea_orm::ActiveValue::Set(object.statuses_count().unwrap_or(0) as i64),
following_count: sea_orm::ActiveValue::Set(object.following_count().unwrap_or(0) as i32),
followers_count: sea_orm::ActiveValue::Set(object.followers_count().unwrap_or(0) as i32),
statuses_count: sea_orm::ActiveValue::Set(object.statuses_count().unwrap_or(0) as i32),
public_key: sea_orm::ActiveValue::Set(object.public_key().get().ok_or(super::FieldError("publicKey"))?.public_key_pem().to_string()),
private_key: sea_orm::ActiveValue::Set(None), // there's no way to transport privkey over AP json, must come from DB
})

View file

@ -154,12 +154,12 @@ impl FromQueryResult for Event {
impl Entity {
pub fn find_addressed(uid: Option<&str>) -> Select<Entity> {
pub fn find_addressed(uid: Option<i64>) -> Select<Entity> {
let mut select = Entity::find()
.distinct()
.select_only()
.join(sea_orm::JoinType::LeftJoin, Relation::Object.def())
.join(sea_orm::JoinType::LeftJoin, Relation::Activity.def())
.join(sea_orm::JoinType::LeftJoin, Relation::Objects.def())
.join(sea_orm::JoinType::LeftJoin, Relation::Activities.def())
.filter(
// TODO ghetto double inner join because i want to filter out tombstones
Condition::any()
@ -169,12 +169,11 @@ impl Entity {
.order_by(Column::Published, Order::Desc);
if let Some(uid) = uid {
let uid = uid.to_string();
select = select
.join(
sea_orm::JoinType::LeftJoin,
crate::model::object::Relation::Like.def()
.on_condition(move |_l, _r| crate::model::like::Column::Actor.eq(uid.clone()).into_condition()),
crate::model::object::Relation::Likes.def()
.on_condition(move |_l, _r| crate::model::like::Column::Actor.eq(uid).into_condition()),
)
.select_column_as(crate::model::like::Column::Actor, format!("{}{}", crate::model::like::Entity.table_name(), crate::model::like::Column::Actor.to_string()));
}

View file

@ -1,4 +1,4 @@
use apb::{DocumentMut, ObjectMut};
use apb::{DocumentMut, DocumentType, ObjectMut};
use sea_orm::entity::prelude::*;
use crate::routes::activitypub::jsonld::LD;
@ -13,7 +13,7 @@ pub struct Model {
#[sea_orm(unique)]
pub url: String,
pub object: i64,
pub document_type: String,
pub document_type: DocumentType,
pub name: Option<String>,
pub media_type: String,
pub created: ChronoDateTimeUtc,
@ -52,12 +52,12 @@ impl Model {
#[axum::async_trait]
pub trait BatchFillable {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr>;
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr>;
}
#[axum::async_trait]
impl BatchFillable for &[Event] {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr> {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
let objects : Vec<crate::model::object::Model> = self
.iter()
.filter_map(|x| match x {
@ -70,12 +70,12 @@ impl BatchFillable for &[Event] {
let attachments = objects.load_many(Entity, db).await?;
let mut out : std::collections::BTreeMap<String, Vec<Model>> = std::collections::BTreeMap::new();
let mut out : std::collections::BTreeMap<i64, Vec<Model>> = std::collections::BTreeMap::new();
for attach in attachments.into_iter().flatten() {
if out.contains_key(&attach.object) {
out.get_mut(&attach.object).expect("contains but get failed?").push(attach);
} else {
out.insert(attach.object.clone(), vec![attach]);
out.insert(attach.object, vec![attach]);
}
}
@ -85,14 +85,14 @@ impl BatchFillable for &[Event] {
#[axum::async_trait]
impl BatchFillable for Vec<Event> {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr> {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
self.as_slice().load_attachments_batch(db).await
}
}
#[axum::async_trait]
impl BatchFillable for Event {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<String, Vec<Model>>, DbErr> {
async fn load_attachments_batch(&self, db: &DatabaseConnection) -> Result<std::collections::BTreeMap<i64, Vec<Model>>, DbErr> {
let x = vec![self.clone()]; // TODO wasteful clone and vec![] but ehhh convenient
x.load_attachments_batch(db).await
}

View file

@ -1,4 +1,6 @@
use sea_orm::entity::prelude::*;
use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns};
use crate::errors::UpubError;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "instances")]
@ -39,3 +41,20 @@ impl Related<super::addressing::Entity> for Entity {
}
impl ActiveModelBehavior for ActiveModel {}
impl Entity {
pub fn find_by_domain(domain: &str) -> Select<Entity> {
Entity::find().filter(Column::Domain.eq(domain))
}
pub async fn domain_to_internal(domain: &str, db: &DatabaseConnection) -> crate::Result<i64> {
Entity::find()
.filter(Column::Domain.eq(domain))
.select_only()
.select_column(Column::Internal)
.into_tuple::<i64>()
.one(db)
.await?
.ok_or_else(UpubError::not_found)
}
}

View file

@ -1,7 +1,7 @@
use apb::{BaseMut, Collection, CollectionMut, ObjectMut};
use sea_orm::entity::prelude::*;
use apb::{BaseMut, Collection, CollectionMut, ObjectMut, ObjectType};
use sea_orm::{entity::prelude::*, QuerySelect, SelectColumns};
use crate::routes::activitypub::jsonld::LD;
use crate::{errors::UpubError, routes::activitypub::jsonld::LD};
use super::Audience;
@ -12,7 +12,7 @@ pub struct Model {
pub internal: i64,
#[sea_orm(unique)]
pub id: String,
pub object_type: String,
pub object_type: ObjectType,
pub attributed_to: Option<String>,
pub name: Option<String>,
pub summary: Option<String>,
@ -24,10 +24,10 @@ pub struct Model {
pub announces: i32,
pub replies: i32,
pub context: Option<String>,
pub to: Option<Json>,
pub bto: Option<Json>,
pub cc: Option<Json>,
pub bcc: Option<Json>,
pub to: Audience,
pub bto: Audience,
pub cc: Audience,
pub bcc: Audience,
pub published: ChronoDateTimeUtc,
pub updated: ChronoDateTimeUtc,
}
@ -122,11 +122,28 @@ impl Related<Entity> for Entity {
impl ActiveModelBehavior for ActiveModel {}
impl Entity {
pub fn find_by_ap_id(id: &str) -> Select<Entity> {
Entity::find().filter(Column::Id.eq(id))
}
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result<i64> {
Entity::find()
.filter(Column::Id.eq(id))
.select_only()
.select_column(Column::Internal)
.into_tuple::<i64>()
.one(db)
.await?
.ok_or_else(UpubError::not_found)
}
}
impl ActiveModel {
pub fn new(object: &impl apb::Object) -> Result<Self, super::FieldError> {
Ok(ActiveModel {
id: sea_orm::ActiveValue::NotSet,
ap_id: sea_orm::ActiveValue::Set(object.id().ok_or(super::FieldError("id"))?.to_string()),
internal: sea_orm::ActiveValue::NotSet,
id: sea_orm::ActiveValue::Set(object.id().ok_or(super::FieldError("id"))?.to_string()),
object_type: sea_orm::ActiveValue::Set(object.object_type().ok_or(super::FieldError("type"))?),
attributed_to: sea_orm::ActiveValue::Set(object.attributed_to().id()),
name: sea_orm::ActiveValue::Set(object.name().map(|x| x.to_string())),
@ -135,14 +152,14 @@ impl ActiveModel {
context: sea_orm::ActiveValue::Set(object.context().id()),
in_reply_to: sea_orm::ActiveValue::Set(object.in_reply_to().id()),
published: sea_orm::ActiveValue::Set(object.published().ok_or(super::FieldError("published"))?),
updated: sea_orm::ActiveValue::Set(object.updated()),
updated: sea_orm::ActiveValue::Set(object.updated().unwrap_or_else(chrono::Utc::now)),
url: sea_orm::ActiveValue::Set(object.url().id()),
replies: sea_orm::ActiveValue::Set(object.replies().get()
.map_or(0, |x| x.total_items().unwrap_or(0)) as i64),
.map_or(0, |x| x.total_items().unwrap_or(0)) as i32),
likes: sea_orm::ActiveValue::Set(object.likes().get()
.map_or(0, |x| x.total_items().unwrap_or(0)) as i64),
.map_or(0, |x| x.total_items().unwrap_or(0)) as i32),
announces: sea_orm::ActiveValue::Set(object.shares().get()
.map_or(0, |x| x.total_items().unwrap_or(0)) as i64),
.map_or(0, |x| x.total_items().unwrap_or(0)) as i32),
to: sea_orm::ActiveValue::Set(object.to().into()),
bto: sea_orm::ActiveValue::Set(object.bto().into()),
cc: sea_orm::ActiveValue::Set(object.cc().into()),
@ -176,7 +193,7 @@ impl Model {
.set_shares(apb::Node::object(
serde_json::Value::new_object()
.set_collection_type(Some(apb::CollectionType::OrderedCollection))
.set_total_items(Some(self.shares as u64))
.set_total_items(Some(self.announces as u64))
))
.set_likes(apb::Node::object(
serde_json::Value::new_object()
@ -186,7 +203,7 @@ impl Model {
.set_replies(apb::Node::object(
serde_json::Value::new_object()
.set_collection_type(Some(apb::CollectionType::OrderedCollection))
.set_total_items(Some(self.comments as u64))
.set_total_items(Some(self.replies as u64))
))
}
}

View file

@ -25,7 +25,7 @@ pub async fn login(
// TODO salt the pwd
match model::credential::Entity::find()
.filter(Condition::all()
.add(model::credential::Column::Email.eq(login.email))
.add(model::credential::Column::Login.eq(login.email))
.add(model::credential::Column::Password.eq(sha256::digest(login.password)))
)
.one(ctx.db())
@ -41,8 +41,9 @@ pub async fn login(
let expires = chrono::Utc::now() + std::time::Duration::from_secs(3600 * 6);
model::session::Entity::insert(
model::session::ActiveModel {
id: sea_orm::ActiveValue::Set(token.clone()),
actor: sea_orm::ActiveValue::Set(x.id.clone()),
internal: sea_orm::ActiveValue::NotSet,
secret: sea_orm::ActiveValue::Set(token.clone()),
actor: sea_orm::ActiveValue::Set(x.actor.clone()),
expires: sea_orm::ActiveValue::Set(expires),
}
)
@ -50,7 +51,7 @@ pub async fn login(
.await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(AuthSuccess {
token, expires,
user: x.id
user: x.actor
}))
},
None => Err(UpubError::unauthorized()),

View file

@ -2,7 +2,7 @@ pub mod replies;
use apb::{CollectionMut, ObjectMut};
use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns};
use sea_orm::{ColumnTrait, ModelTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{errors::UpubError, model::{self, addressing::Event}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}};
@ -62,7 +62,7 @@ pub async fn view(
// .set_id(Some(&crate::url!(ctx, "/objects/{id}/replies")))
// .set_first(apb::Node::link(crate::url!(ctx, "/objects/{id}/replies/page")))
.set_collection_type(Some(apb::CollectionType::Collection))
.set_total_items(Some(object.comments as u64))
.set_total_items(Some(object.replies as u64))
.set_items(apb::Node::links(replies_ids))
);
}

View file

@ -8,7 +8,7 @@ use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use apb::{ActorMut, EndpointsMut, Node};
use crate::{errors::UpubError, model::{self, user}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url};
use crate::{errors::UpubError, model, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url};
use super::{jsonld::LD, JsonLD, TryFetch};
@ -61,7 +61,7 @@ pub async fn view(
},
};
match user::Entity::find_by_id(&uid)
match model::actor::Entity::find_by_ap_id(&uid)
.find_also_related(model::config::Entity)
.one(ctx.db()).await?
{

View file

@ -35,7 +35,7 @@ pub async fn nodeinfo_discovery(State(ctx): State<Context>) -> Json<NodeInfoDisc
pub async fn nodeinfo(State(ctx): State<Context>, Path(version): Path<String>) -> Result<Json<nodeinfo::NodeInfoOwned>, StatusCode> {
// TODO it's unsustainable to count these every time, especially comments since it's a complex
// filter! keep these numbers caches somewhere, maybe db, so that we can just look them up
let total_users = model::user::Entity::find().count(ctx.db()).await.ok();
let total_users = model::actor::Entity::find().count(ctx.db()).await.ok();
let total_posts = None;
let total_comments = None;
let (software, version) = match version.as_str() {
@ -103,28 +103,9 @@ pub async fn webfinger(State(ctx): State<Context>, Query(query): Query<Webfinger
.split_once('@')
{
if domain == ctx.domain() {
if user == ctx.domain() {
// we fetch with our domain as user, they are checking us back, this is a special edge case
Ok(JsonRD(JsonResourceDescriptor {
subject: format!("acct:{user}@{domain}"),
aliases: vec![ctx.base().to_string()],
links: vec![
JsonResourceDescriptorLink {
rel: "self".to_string(),
link_type: Some("application/ld+json".to_string()),
href: Some(ctx.base().to_string()),
properties: jrd::Map::default(),
titles: jrd::Map::default(),
},
],
expires: None,
properties: jrd::Map::default(),
}))
} else {
// local user
let uid = ctx.uid(user);
let usr = model::user::Entity::find_by_id(uid)
let usr = model::actor::Entity::find_by_ap_id(&uid)
.one(ctx.db())
.await?
.ok_or_else(UpubError::not_found)?;
@ -144,13 +125,11 @@ pub async fn webfinger(State(ctx): State<Context>, Query(query): Query<Webfinger
expires: None,
properties: jrd::Map::default(),
}))
}
} else {
// remote user
let usr = model::user::Entity::find()
.filter(model::user::Column::PreferredUsername.eq(user))
.filter(model::user::Column::Domain.eq(domain))
let usr = model::actor::Entity::find()
.filter(model::actor::Column::PreferredUsername.eq(user))
.filter(model::actor::Column::Domain.eq(domain))
.one(ctx.db())
.await?
.ok_or_else(UpubError::not_found)?;

View file

@ -1,6 +1,5 @@
use axum::{extract::{Path, State}, http::StatusCode, Json};
use mastodon_async_entities::account::{Account, AccountId};
use sea_orm::EntityTrait;
use crate::{model, server::{auth::AuthIdentity, Context}};
@ -9,7 +8,7 @@ pub async fn view(
AuthIdentity(_auth): AuthIdentity,
Path(id): Path<String>
) -> Result<Json<Account>, StatusCode> {
match model::user::Entity::find_by_id(ctx.uid(&id))
match model::actor::Entity::find_by_ap_id(&ctx.uid(&id))
.find_also_related(model::config::Entity)
.one(ctx.db())
.await

View file

@ -1,4 +1,4 @@
use sea_orm::{EntityTrait, IntoActiveModel};
use sea_orm::{ActiveValue::{Set, NotSet}, EntityTrait};
#[axum::async_trait]
pub trait Administrable {
@ -28,52 +28,56 @@ impl Administrable for super::Context {
let ap_id = self.uid(&username);
let db = self.db();
let domain = self.domain().to_string();
let user_model = crate::model::user::Model {
id: ap_id.clone(),
name: display_name,
domain, summary,
preferred_username: username.clone(),
following: None,
following_count: 0,
followers: None,
followers_count: 0,
statuses_count: 0,
icon: avatar_url,
image: banner_url,
inbox: None,
shared_inbox: None,
outbox: None,
actor_type: apb::ActorType::Person,
created: chrono::Utc::now(),
updated: chrono::Utc::now(),
private_key: Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string()),
public_key: std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string(),
let user_model = crate::model::actor::Model {
internal: NotSet,
id: Set(ap_id.clone()),
name: Set(display_name),
domain: Set(domain),
summary: Set(summary),
preferred_username: Set(username.clone()),
following: Set(None),
following_count: Set(0),
followers: Set(None),
followers_count: Set(0),
statuses_count: Set(0),
icon: Set(avatar_url),
image: Set(banner_url),
inbox: Set(None),
shared_inbox: Set(None),
outbox: Set(None),
actor_type: Set(apb::ActorType::Person),
created: Set(chrono::Utc::now()),
updated: Set(chrono::Utc::now()),
private_key: Set(Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string())),
public_key: Set(std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string()),
};
crate::model::user::Entity::insert(user_model.into_active_model())
crate::model::actor::Entity::insert(user_model)
.exec(db)
.await?;
let config_model = crate::model::config::Model {
id: ap_id.clone(),
accept_follow_requests: true,
show_followers_count: true,
show_following_count: true,
show_followers: false,
show_following: false,
let config_model = crate::model::config::ActiveModel {
internal: NotSet,
actor: Set(ap_id.clone()),
accept_follow_requests: Set(true),
show_followers_count: Set(true),
show_following_count: Set(true),
show_followers: Set(false),
show_following: Set(false),
};
crate::model::config::Entity::insert(config_model.into_active_model())
crate::model::config::Entity::insert(config_model)
.exec(db)
.await?;
let credentials_model = crate::model::credential::Model {
id: ap_id,
email: username,
password,
let credentials_model = crate::model::credential::ActiveModel {
internal: NotSet,
actor: Set(ap_id),
login: Set(username),
password: Set(password),
};
crate::model::credential::Entity::insert(credentials_model.into_active_model())
crate::model::credential::Entity::insert(credentials_model)
.exec(db)
.await?;

View file

@ -54,7 +54,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker
};
let del_row = model::delivery::ActiveModel {
id: sea_orm::ActiveValue::Set(delivery.id),
internal: sea_orm::ActiveValue::Set(delivery.internal),
..Default::default()
};
let del = model::delivery::Entity::delete(del_row)
@ -72,7 +72,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker
tracing::info!("delivering {} to {}", delivery.activity, delivery.target);
let payload = match model::activity::Entity::find_by_id(&delivery.activity)
let payload = match model::activity::Entity::find_by_ap_id(&delivery.activity)
.find_also_related(model::object::Entity)
.one(db)
.await? // TODO probably should not fail here and at least re-insert the delivery
@ -99,24 +99,19 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker
},
};
let key = if delivery.actor == format!("https://{domain}") {
let Some(model::application::Model { private_key: key, .. }) = model::application::Entity::find()
.one(db).await?
let Some(actor) = model::actor::Entity::find_by_ap_id(&delivery.actor)
.one(db)
.await?
else {
tracing::error!("no private key configured for application");
tracing::error!("abandoning delivery of {} from non existant actor: {}", delivery.activity, delivery.actor);
continue;
};
key
} else {
let Some(model::user::Model{ private_key: Some(key), .. }) = model::user::Entity::find_by_id(&delivery.actor)
.one(db).await?
else {
tracing::error!("can not dispatch activity for user without private key: {}", delivery.actor);
continue;
};
key
};
let Some(key) = actor.private_key
else {
tracing::error!("abandoning delivery of {} from actor without private key: {}", delivery.activity, delivery.actor);
continue;
};
if let Err(e) = Context::request(
Method::POST, &delivery.target,
@ -125,7 +120,7 @@ async fn worker(db: &DatabaseConnection, domain: &str, poll_interval: u64, waker
).await {
tracing::warn!("failed delivery of {} to {} : {e}", delivery.activity, delivery.target);
let new_delivery = model::delivery::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
internal: sea_orm::ActiveValue::NotSet,
not_before: sea_orm::ActiveValue::Set(delivery.next_delivery()),
actor: sea_orm::ActiveValue::Set(delivery.actor),
target: sea_orm::ActiveValue::Set(delivery.target),

View file

@ -1,9 +1,9 @@
use std::collections::BTreeMap;
use apb::{target::Addressed, Activity, Base, Collection, CollectionPage, Link, Object};
use apb::{target::Addressed, Activity, Actor, ActorMut, Base, Collection, Link, Object};
use base64::Engine;
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
use sea_orm::EntityTrait;
use crate::{errors::UpubError, model, VERSION};
@ -13,14 +13,14 @@ use super::{httpsign::HttpSignature, normalizer::Normalizer, Context};
pub trait Fetcher {
async fn webfinger(&self, user: &str, host: &str) -> crate::Result<String>;
async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model>;
async fn pull_user(&self, id: &str) -> crate::Result<model::user::Model>;
async fn fetch_user(&self, id: &str) -> crate::Result<model::actor::Model>;
async fn pull_user(&self, id: &str) -> crate::Result<serde_json::Value>;
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model>;
async fn pull_object(&self, id: &str) -> crate::Result<model::object::Model>;
async fn pull_object(&self, id: &str) -> crate::Result<serde_json::Value>;
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
async fn pull_activity(&self, id: &str) -> crate::Result<serde_json::Value>;
async fn fetch_thread(&self, id: &str) -> crate::Result<()>;
@ -115,30 +115,35 @@ impl Fetcher for Context {
}
async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model> {
if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? {
async fn fetch_user(&self, id: &str) -> crate::Result<model::actor::Model> {
if let Some(x) = model::actor::Entity::find_by_ap_id(id).one(self.db()).await? {
return Ok(x); // already in db, easy
}
let user_model = self.pull_user(id).await?;
let user_document = self.pull_user(id).await?;
let user_model = model::actor::ActiveModel::new(&user_document)?;
// TODO this may fail: while fetching, remote server may fetch our service actor.
// if it does so with http signature, we will fetch that actor in background
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
model::user::Entity::insert(user_model.clone().into_active_model())
.exec(self.db()).await?;
model::actor::Entity::insert(user_model).exec(self.db()).await?;
Ok(user_model)
// TODO fetch it back to get the internal id
Ok(
model::actor::Entity::find_by_ap_id(id)
.one(self.db())
.await?
.ok_or_else(UpubError::internal_server_error)?
)
}
async fn pull_user(&self, id: &str) -> crate::Result<model::user::Model> {
let user = Self::request(
async fn pull_user(&self, id: &str) -> crate::Result<serde_json::Value> {
let mut user = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
).await?.json::<serde_json::Value>().await?;
let mut user_model = model::user::Model::new(&user)?;
// TODO try fetching these numbers from audience/generator fields to avoid making 2 more GETs
if let Some(followers_url) = &user_model.followers {
if let Some(followers_url) = &user.followers().id() {
let req = Self::request(
Method::GET, followers_url, None,
&format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
@ -146,13 +151,13 @@ impl Fetcher for Context {
if let Ok(res) = req {
if let Ok(user_followers) = res.json::<serde_json::Value>().await {
if let Some(total) = user_followers.total_items() {
user_model.followers_count = total as i64;
user = user.set_followers_count(Some(total));
}
}
}
}
if let Some(following_url) = &user_model.following {
if let Some(following_url) = &user.following().id() {
let req = Self::request(
Method::GET, following_url, None,
&format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
@ -160,33 +165,38 @@ impl Fetcher for Context {
if let Ok(res) = req {
if let Ok(user_following) = res.json::<serde_json::Value>().await {
if let Some(total) = user_following.total_items() {
user_model.following_count = total as i64;
user = user.set_following_count(Some(total));
}
}
}
}
Ok(user_model)
Ok(user)
}
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model> {
if let Some(x) = model::activity::Entity::find_by_id(id).one(self.db()).await? {
if let Some(x) = model::activity::Entity::find_by_ap_id(id).one(self.db()).await? {
return Ok(x); // already in db, easy
}
let activity_model = self.pull_activity(id).await?;
let activity_document = self.pull_activity(id).await?;
let activity_model = model::activity::ActiveModel::new(&activity_document)?;
model::activity::Entity::insert(activity_model.clone().into_active_model())
model::activity::Entity::insert(activity_model)
.exec(self.db()).await?;
let addressed = activity_model.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?;
// TODO fetch it back to get the internal id
let activity = model::activity::Entity::find_by_ap_id(id)
.one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?;
Ok(activity_model)
let addressed = activity.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(&activity.id), None, &expanded_addresses).await?;
Ok(activity)
}
async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model> {
async fn pull_activity(&self, id: &str) -> crate::Result<serde_json::Value> {
let activity = Self::request(
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
).await?.json::<serde_json::Value>().await?;
@ -203,84 +213,33 @@ impl Fetcher for Context {
}
}
let activity_model = model::activity::Model::new(&activity)?;
Ok(activity_model)
Ok(activity)
}
async fn fetch_thread(&self, id: &str) -> crate::Result<()> {
crawl_replies(self, id, 0).await
// crawl_replies(self, id, 0).await
todo!()
}
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> {
fetch_object_inner(self, id, 0).await
}
async fn pull_object(&self, id: &str) -> crate::Result<model::object::Model> {
let object = Context::request(
async fn pull_object(&self, id: &str) -> crate::Result<serde_json::Value> {
Ok(
Context::request(
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
).await?.json::<serde_json::Value>().await?;
Ok(model::object::Model::new(&object)?)
)
.await?
.json::<serde_json::Value>()
.await?
)
}
}
#[async_recursion::async_recursion]
async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> {
tracing::info!("crawling replies of '{id}'");
let object = Context::request(
Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
).await?.json::<serde_json::Value>().await?;
let object_model = model::object::Model::new(&object)?;
match model::object::Entity::insert(object_model.into_active_model())
.exec(ctx.db()).await
{
Ok(_) => {},
Err(sea_orm::DbErr::RecordNotInserted) => {},
Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite
Err(e) => return Err(e.into()),
}
if depth > 16 {
tracing::warn!("stopping thread crawling: too deep!");
return Ok(());
}
let mut page_url = match object.replies().get() {
Some(serde_json::Value::String(x)) => {
let replies = Context::request(
Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
).await?.json::<serde_json::Value>().await?;
replies.first().id()
},
Some(serde_json::Value::Object(x)) => {
let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
obj.first().id()
},
_ => return Ok(()),
};
while let Some(ref url) = page_url {
let replies = Context::request(
Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
).await?.json::<serde_json::Value>().await?;
for reply in replies.items() {
// TODO right now it crawls one by one, could be made in parallel but would be quite more
// abusive, so i'll keep it like this while i try it out
crawl_replies(ctx, reply.href(), depth + 1).await?;
}
page_url = replies.next().id();
}
Ok(())
}
#[async_recursion::async_recursion]
async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result<model::object::Model> {
if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? {
if let Some(x) = model::object::Entity::find_by_ap_id(id).one(ctx.db()).await? {
return Ok(x); // already in db, easy
}
@ -290,7 +249,7 @@ async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Res
if let Some(oid) = object.id() {
if oid != id {
if let Some(x) = model::object::Entity::find_by_id(oid).one(ctx.db()).await? {
if let Some(x) = model::object::Entity::find_by_ap_id(oid).one(ctx.db()).await? {
return Ok(x); // already in db, but with id different that given url
}
}
@ -341,3 +300,56 @@ impl Fetchable for apb::Node<serde_json::Value> {
Ok(self)
}
}
// #[async_recursion::async_recursion]
// async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> {
// tracing::info!("crawling replies of '{id}'");
// let object = Context::request(
// Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
//
// let object_model = model::object::Model::new(&object)?;
// match model::object::Entity::insert(object_model.into_active_model())
// .exec(ctx.db()).await
// {
// Ok(_) => {},
// Err(sea_orm::DbErr::RecordNotInserted) => {},
// Err(sea_orm::DbErr::Exec(_)) => {}, // ughhh bad fix for sqlite
// Err(e) => return Err(e.into()),
// }
//
// if depth > 16 {
// tracing::warn!("stopping thread crawling: too deep!");
// return Ok(());
// }
//
// let mut page_url = match object.replies().get() {
// Some(serde_json::Value::String(x)) => {
// let replies = Context::request(
// Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
// replies.first().id()
// },
// Some(serde_json::Value::Object(x)) => {
// let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
// obj.first().id()
// },
// _ => return Ok(()),
// };
//
// while let Some(ref url) = page_url {
// let replies = Context::request(
// Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
// ).await?.json::<serde_json::Value>().await?;
//
// for reply in replies.items() {
// // TODO right now it crawls one by one, could be made in parallel but would be quite more
// // abusive, so i'll keep it like this while i try it out
// crawl_replies(ctx, reply.href(), depth + 1).await?;
// }
//
// page_url = replies.next().id();
// }
//
// Ok(())
// }

View file

@ -1,5 +1,5 @@
use apb::{Node, Base, Object, Document};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set};
use sea_orm::{sea_query::Expr, ActiveValue::Set, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
use crate::{errors::UpubError, model, server::Context};
use super::fetcher::Fetcher;
@ -12,23 +12,23 @@ pub trait Normalizer {
#[axum::async_trait]
impl Normalizer for super::Context {
async fn insert_object(&self, object_node: impl apb::Object, server: Option<String>) -> crate::Result<model::object::Model> {
let mut object_model = model::object::Model::new(&object_node)?;
let oid = object_model.id.clone();
let uid = object_model.attributed_to.clone();
let oid = object_node.id().ok_or_else(UpubError::bad_request)?.to_string();
let uid = object_node.attributed_to().id();
let mut object_model = model::object::ActiveModel::new(&object_node)?;
if let Some(server) = server {
// make sure we're allowed to create this object
if let Some(object_author) = &object_model.attributed_to {
if let Set(Some(object_author)) = &object_model.attributed_to {
if server != Context::server(object_author) {
return Err(UpubError::forbidden());
}
} else if server != Context::server(&object_model.id) {
} else if server != Context::server(&oid) {
return Err(UpubError::forbidden());
};
}
// make sure content only contains a safe subset of html
if let Some(content) = object_model.content {
object_model.content = Some(mdhtml::safe_html(&content));
if let Set(Some(content)) = object_model.content {
object_model.content = Set(Some(mdhtml::safe_html(&content)));
}
// fix context for remote posts
@ -37,33 +37,34 @@ impl Normalizer for super::Context {
// > btw! also if any link is broken or we get rate limited, the whole insertion fails which is
// > kind of dumb. there should be a job system so this can be done in waves. or maybe there's
// > some whole other way to do this?? im thinking but misskey aaaa!! TODO
if let Some(ref reply) = object_model.in_reply_to {
if let Some(o) = model::object::Entity::find_by_id(reply).one(self.db()).await? {
object_model.context = o.context;
if let Set(Some(ref reply)) = object_model.in_reply_to {
if let Some(o) = model::object::Entity::find_by_ap_id(reply).one(self.db()).await? {
object_model.context = Set(o.context);
} else {
object_model.context = None; // TODO to be filled by some other task
object_model.context = Set(None); // TODO to be filled by some other task
}
} else {
object_model.context = Some(object_model.id.clone());
object_model.context = Set(Some(oid.clone()));
}
model::object::Entity::insert(object_model.clone().into_active_model()).exec(self.db()).await?;
let object = model::object::Entity::find_by_ap_id(&oid).one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?;
// update replies counter
if let Some(ref in_reply_to) = object_model.in_reply_to {
if let Set(Some(ref in_reply_to)) = object_model.in_reply_to {
if self.fetch_object(in_reply_to).await.is_ok() {
model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(in_reply_to))
.col_expr(model::object::Column::Comments, Expr::col(model::object::Column::Comments).add(1))
.col_expr(model::object::Column::Replies, Expr::col(model::object::Column::Replies).add(1))
.exec(self.db())
.await?;
}
}
// update statuses counter
if let Some(object_author) = uid {
model::user::Entity::update_many()
.col_expr(model::user::Column::StatusesCount, Expr::col(model::user::Column::StatusesCount).add(1))
.filter(model::user::Column::Id.eq(&object_author))
model::actor::Entity::update_many()
.col_expr(model::actor::Column::StatusesCount, Expr::col(model::actor::Column::StatusesCount).add(1))
.filter(model::actor::Column::Id.eq(&object_author))
.exec(self.db())
.await?;
}
@ -76,18 +77,18 @@ impl Normalizer for super::Context {
continue
},
Node::Link(l) => model::attachment::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
internal: sea_orm::ActiveValue::NotSet,
url: Set(l.href().to_string()),
object: Set(oid.clone()),
object: Set(object.internal),
document_type: Set(apb::DocumentType::Page),
name: Set(l.link_name().map(|x| x.to_string())),
media_type: Set(l.link_media_type().unwrap_or("link").to_string()),
created: Set(chrono::Utc::now()),
},
Node::Object(o) => model::attachment::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
internal: sea_orm::ActiveValue::NotSet,
url: Set(o.url().id().unwrap_or_else(|| o.id().map(|x| x.to_string()).unwrap_or_default())),
object: Set(oid.clone()),
object: Set(object.internal),
document_type: Set(o.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
name: Set(o.name().map(|x| x.to_string())),
media_type: Set(o.media_type().unwrap_or("link").to_string()),
@ -113,9 +114,9 @@ impl Normalizer for super::Context {
};
let attachment_model = model::attachment::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
internal: sea_orm::ActiveValue::NotSet,
url: Set(img.url().id().unwrap_or_else(|| img.id().map(|x| x.to_string()).unwrap_or_default())),
object: Set(oid.clone()),
object: Set(object.internal),
document_type: Set(img.as_document().map_or(apb::DocumentType::Document, |x| x.document_type().unwrap_or(apb::DocumentType::Page))),
name: Set(img.name().map(|x| x.to_string())),
media_type: Set(img.media_type().unwrap_or(media_type.as_deref().unwrap_or("link")).to_string()),
@ -126,6 +127,6 @@ impl Normalizer for super::Context {
.await?;
}
Ok(object_model)
Ok(object)
}
}