diff --git a/src/migrations/m20240429_000001_add_relays_table.rs b/src/migrations/m20240429_000001_add_relays_table.rs new file mode 100644 index 00000000..21b41411 --- /dev/null +++ b/src/migrations/m20240429_000001_add_relays_table.rs @@ -0,0 +1,43 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Relays::Table) + .col( + ColumnDef::new(Relays::Id) + .string() + .not_null() + .primary_key() + ) + .col(ColumnDef::new(Relays::Accepted).boolean().not_null().default(false)) + .col(ColumnDef::new(Relays::Forwarding).boolean().not_null().default(false)) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Relays::Table).to_owned()) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Relays { + Table, + Id, + Accepted, + Forwarding, +} diff --git a/src/migrations/mod.rs b/src/migrations/mod.rs index 9b0f2e87..fdf2be5e 100644 --- a/src/migrations/mod.rs +++ b/src/migrations/mod.rs @@ -12,6 +12,7 @@ mod m20240325_000002_add_system_key; mod m20240418_000001_add_statuses_and_reply_to; mod m20240421_000001_add_attachments; mod m20240424_000001_add_sensitive_field; +mod m20240429_000001_add_relays_table; pub struct Migrator; @@ -31,6 +32,7 @@ impl MigratorTrait for Migrator { Box::new(m20240418_000001_add_statuses_and_reply_to::Migration), Box::new(m20240421_000001_add_attachments::Migration), Box::new(m20240424_000001_add_sensitive_field::Migration), + Box::new(m20240429_000001_add_relays_table::Migration), ] } } diff --git a/src/model/mod.rs b/src/model/mod.rs index 22acc2bf..8ed7aef2 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -3,6 +3,7 @@ pub mod activity; pub mod user; pub mod config; +pub mod relay; pub mod relation; pub mod addressing; pub mod share; diff --git a/src/model/relay.rs b/src/model/relay.rs new file mode 100644 index 00000000..0ea3b596 --- /dev/null +++ b/src/model/relay.rs @@ -0,0 +1,16 @@ +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "relays")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: String, + pub accepted: bool, + pub forwarding: bool, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} +// TODO how to represent this User-to-User relation in sea orm?? + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/server/context.rs b/src/server/context.rs index 148f1575..d7fc9a80 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::{collections::BTreeSet, sync::Arc}; use openssl::rsa::Rsa; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityOrSelect, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; use crate::{model, server::fetcher::Fetcher}; @@ -17,6 +17,7 @@ struct ContextInner { dispatcher: Dispatcher, // TODO keep these pre-parsed app: model::application::Model, + relays: BTreeSet, } #[macro_export] @@ -61,8 +62,17 @@ impl Context { } }; + let relays = model::relay::Entity::find() + .select_only() + .select_column(model::relay::Column::Id) + .filter(model::relay::Column::Accepted.eq(true)) + .into_tuple::() + .all(&db) + .await?; + Ok(Context(Arc::new(ContextInner { db, domain, protocol, app, dispatcher, + relays: BTreeSet::from_iter(relays.into_iter()), }))) } @@ -220,11 +230,14 @@ impl Context { Ok(()) } - pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { let addressed = self.expand_addressing(activity_targets).await?; self.address_to(Some(aid), oid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?; Ok(()) } + + pub fn is_relay(&self, id: &str) -> bool { + self.0.relays.contains(id) + } } diff --git a/src/server/inbox.rs b/src/server/inbox.rs index e1097cfe..055e2463 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -116,6 +116,20 @@ impl apb::server::Inbox for Context { async fn accept(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { // TODO what about TentativeAccept let activity_model = model::activity::Model::new(&activity)?; + + if let Some(mut r) = model::relay::Entity::find_by_id(&activity_model.actor) + .one(self.db()) + .await? + { + r.accepted = true; + model::relay::Entity::update(r.into_active_model()).exec(self.db()).await?; + model::activity::Entity::insert(activity_model.clone().into_active_model()) + .exec(self.db()) + .await?; + tracing::info!("relay {} is now broadcasting to us", activity_model.actor); + return Ok(()); + } + let Some(follow_request_id) = &activity_model.object else { return Err(UpubError::bad_request()); }; @@ -294,6 +308,14 @@ impl apb::server::Inbox for Context { return Err(FieldError("object").into()); }; self.fetch_object(oid).await?; + + // relays send us activities as Announce, but we don't really want to count those towards the + // total shares count of an object, so just fetch the object and be done with it + if !self.is_relay(&activity_model.actor) { + tracing::info!("relay {} broadcasted {}", activity_model.actor, oid); + return Ok(()) + } + let share = model::share::ActiveModel { id: sea_orm::ActiveValue::NotSet, actor: sea_orm::Set(activity_model.actor.clone()),