forked from alemi/upub
feat: kinda botched way to ignore relays
basically relays send us a lot of announce activities to share posts but we don't care about those activities: it's db bloat and it increases the shares count. keep a table keeping track of followed relays and lazily skip activities by them, just fetch. IMPORTANT relays are only loaded at startup, so if you subscribe to a new relay restart your server once it accepts!!!
This commit is contained in:
parent
a6e12468b7
commit
7f996aa2c1
6 changed files with 100 additions and 3 deletions
43
src/migrations/m20240429_000001_add_relays_table.rs
Normal file
43
src/migrations/m20240429_000001_add_relays_table.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.create_table(
|
||||||
|
Table::create()
|
||||||
|
.table(Relays::Table)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(Relays::Id)
|
||||||
|
.string()
|
||||||
|
.not_null()
|
||||||
|
.primary_key()
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(Relays::Accepted).boolean().not_null().default(false))
|
||||||
|
.col(ColumnDef::new(Relays::Forwarding).boolean().not_null().default(false))
|
||||||
|
.to_owned()
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_table(Table::drop().table(Relays::Table).to_owned())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DeriveIden)]
|
||||||
|
enum Relays {
|
||||||
|
Table,
|
||||||
|
Id,
|
||||||
|
Accepted,
|
||||||
|
Forwarding,
|
||||||
|
}
|
|
@ -12,6 +12,7 @@ mod m20240325_000002_add_system_key;
|
||||||
mod m20240418_000001_add_statuses_and_reply_to;
|
mod m20240418_000001_add_statuses_and_reply_to;
|
||||||
mod m20240421_000001_add_attachments;
|
mod m20240421_000001_add_attachments;
|
||||||
mod m20240424_000001_add_sensitive_field;
|
mod m20240424_000001_add_sensitive_field;
|
||||||
|
mod m20240429_000001_add_relays_table;
|
||||||
|
|
||||||
pub struct Migrator;
|
pub struct Migrator;
|
||||||
|
|
||||||
|
@ -31,6 +32,7 @@ impl MigratorTrait for Migrator {
|
||||||
Box::new(m20240418_000001_add_statuses_and_reply_to::Migration),
|
Box::new(m20240418_000001_add_statuses_and_reply_to::Migration),
|
||||||
Box::new(m20240421_000001_add_attachments::Migration),
|
Box::new(m20240421_000001_add_attachments::Migration),
|
||||||
Box::new(m20240424_000001_add_sensitive_field::Migration),
|
Box::new(m20240424_000001_add_sensitive_field::Migration),
|
||||||
|
Box::new(m20240429_000001_add_relays_table::Migration),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ pub mod activity;
|
||||||
pub mod user;
|
pub mod user;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
|
|
||||||
|
pub mod relay;
|
||||||
pub mod relation;
|
pub mod relation;
|
||||||
pub mod addressing;
|
pub mod addressing;
|
||||||
pub mod share;
|
pub mod share;
|
||||||
|
|
16
src/model/relay.rs
Normal file
16
src/model/relay.rs
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
use sea_orm::entity::prelude::*;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
|
||||||
|
#[sea_orm(table_name = "relays")]
|
||||||
|
pub struct Model {
|
||||||
|
#[sea_orm(primary_key)]
|
||||||
|
pub id: String,
|
||||||
|
pub accepted: bool,
|
||||||
|
pub forwarding: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
pub enum Relation {}
|
||||||
|
// TODO how to represent this User-to-User relation in sea orm??
|
||||||
|
|
||||||
|
impl ActiveModelBehavior for ActiveModel {}
|
|
@ -1,7 +1,7 @@
|
||||||
use std::sync::Arc;
|
use std::{collections::BTreeSet, sync::Arc};
|
||||||
|
|
||||||
use openssl::rsa::Rsa;
|
use openssl::rsa::Rsa;
|
||||||
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
|
use sea_orm::{ColumnTrait, DatabaseConnection, EntityOrSelect, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
|
||||||
|
|
||||||
use crate::{model, server::fetcher::Fetcher};
|
use crate::{model, server::fetcher::Fetcher};
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ struct ContextInner {
|
||||||
dispatcher: Dispatcher,
|
dispatcher: Dispatcher,
|
||||||
// TODO keep these pre-parsed
|
// TODO keep these pre-parsed
|
||||||
app: model::application::Model,
|
app: model::application::Model,
|
||||||
|
relays: BTreeSet<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
|
@ -61,8 +62,17 @@ impl Context {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let relays = model::relay::Entity::find()
|
||||||
|
.select_only()
|
||||||
|
.select_column(model::relay::Column::Id)
|
||||||
|
.filter(model::relay::Column::Accepted.eq(true))
|
||||||
|
.into_tuple::<String>()
|
||||||
|
.all(&db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Context(Arc::new(ContextInner {
|
Ok(Context(Arc::new(ContextInner {
|
||||||
db, domain, protocol, app, dispatcher,
|
db, domain, protocol, app, dispatcher,
|
||||||
|
relays: BTreeSet::from_iter(relays.into_iter()),
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,11 +230,14 @@ impl Context {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
|
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
|
||||||
let addressed = self.expand_addressing(activity_targets).await?;
|
let addressed = self.expand_addressing(activity_targets).await?;
|
||||||
self.address_to(Some(aid), oid, &addressed).await?;
|
self.address_to(Some(aid), oid, &addressed).await?;
|
||||||
self.deliver_to(aid, uid, &addressed).await?;
|
self.deliver_to(aid, uid, &addressed).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_relay(&self, id: &str) -> bool {
|
||||||
|
self.0.relays.contains(id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,6 +116,20 @@ impl apb::server::Inbox for Context {
|
||||||
async fn accept(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
|
async fn accept(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
|
||||||
// TODO what about TentativeAccept
|
// TODO what about TentativeAccept
|
||||||
let activity_model = model::activity::Model::new(&activity)?;
|
let activity_model = model::activity::Model::new(&activity)?;
|
||||||
|
|
||||||
|
if let Some(mut r) = model::relay::Entity::find_by_id(&activity_model.actor)
|
||||||
|
.one(self.db())
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
r.accepted = true;
|
||||||
|
model::relay::Entity::update(r.into_active_model()).exec(self.db()).await?;
|
||||||
|
model::activity::Entity::insert(activity_model.clone().into_active_model())
|
||||||
|
.exec(self.db())
|
||||||
|
.await?;
|
||||||
|
tracing::info!("relay {} is now broadcasting to us", activity_model.actor);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let Some(follow_request_id) = &activity_model.object else {
|
let Some(follow_request_id) = &activity_model.object else {
|
||||||
return Err(UpubError::bad_request());
|
return Err(UpubError::bad_request());
|
||||||
};
|
};
|
||||||
|
@ -294,6 +308,14 @@ impl apb::server::Inbox for Context {
|
||||||
return Err(FieldError("object").into());
|
return Err(FieldError("object").into());
|
||||||
};
|
};
|
||||||
self.fetch_object(oid).await?;
|
self.fetch_object(oid).await?;
|
||||||
|
|
||||||
|
// relays send us activities as Announce, but we don't really want to count those towards the
|
||||||
|
// total shares count of an object, so just fetch the object and be done with it
|
||||||
|
if !self.is_relay(&activity_model.actor) {
|
||||||
|
tracing::info!("relay {} broadcasted {}", activity_model.actor, oid);
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
let share = model::share::ActiveModel {
|
let share = model::share::ActiveModel {
|
||||||
id: sea_orm::ActiveValue::NotSet,
|
id: sea_orm::ActiveValue::NotSet,
|
||||||
actor: sea_orm::Set(activity_model.actor.clone()),
|
actor: sea_orm::Set(activity_model.actor.clone()),
|
||||||
|
|
Loading…
Reference in a new issue