diff --git a/src/migrations/m20240421_000001_make_addressed_activity_nullable.rs b/src/migrations/m20240421_000001_make_addressed_activity_nullable.rs new file mode 100644 index 00000000..74c4265f --- /dev/null +++ b/src/migrations/m20240421_000001_make_addressed_activity_nullable.rs @@ -0,0 +1,48 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Addressing::Table) + .modify_column( + ColumnDef::new(Addressing::Activity) + .string() + .null() + ) + .to_owned() + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Addressing::Table) + .modify_column( + ColumnDef::new(Addressing::Activity) + .string() + .not_null() + .default("") + ) + .to_owned() + ) + .await?; + + Ok(()) + } +} + +#[derive(DeriveIden)] +enum Addressing { + Table, + Activity, +} diff --git a/src/migrations/mod.rs b/src/migrations/mod.rs index 19d63012..c80871d3 100644 --- a/src/migrations/mod.rs +++ b/src/migrations/mod.rs @@ -10,6 +10,7 @@ mod m20240324_000001_add_addressing; mod m20240325_000001_add_deliveries; mod m20240325_000002_add_system_key; mod m20240418_000001_add_statuses_and_reply_to; +mod m20240421_000001_make_addressed_activity_nullable; pub struct Migrator; @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator { Box::new(m20240325_000001_add_deliveries::Migration), Box::new(m20240325_000002_add_system_key::Migration), Box::new(m20240418_000001_add_statuses_and_reply_to::Migration), + Box::new(m20240421_000001_make_addressed_activity_nullable::Migration), ] } } diff --git a/src/model/addressing.rs b/src/model/addressing.rs index 7fe4a788..63f14e58 100644 --- a/src/model/addressing.rs +++ b/src/model/addressing.rs @@ -8,7 +8,7 @@ pub struct Model { pub id: i64, pub actor: String, pub server: String, - pub activity: String, + pub activity: Option, pub object: Option, pub published: ChronoDateTimeUtc, } @@ -57,6 +57,10 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} + + + + #[derive(Debug)] pub struct EmbeddedActivity { pub activity: crate::model::activity::Model, @@ -81,6 +85,35 @@ impl FromQueryResult for EmbeddedActivity { } } +#[derive(Debug)] +pub struct WrappedObject { + pub activity: Option, + pub object: crate::model::object::Model, +} + +impl From for serde_json::Value { + fn from(value: WrappedObject) -> Self { + match value.activity { + None => value.object.ap(), + Some(a) => a.ap().set_object( + Node::object(value.object.ap()) + ), + } + } +} + +impl FromQueryResult for WrappedObject { + fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result { + let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name()).ok(); + let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name())?; + Ok(Self { activity, object }) + } +} + + + + + impl Entity { pub fn find_activities() -> Select { let mut select = Entity::find() @@ -106,12 +139,17 @@ impl Entity { let mut select = Entity::find() .distinct() .select_only() - .join(sea_orm::JoinType::InnerJoin, Relation::Object.def()); + .join(sea_orm::JoinType::InnerJoin, Relation::Object.def()) // INNERJOIN: filter out addressings for which we don't have an object anymore // TODO we could in theory return just the link or fetch them again, just ignoring them is mehh + .join(sea_orm::JoinType::LeftJoin, crate::model::object::Relation::Activity.def().rev()); for col in crate::model::object::Column::iter() { - select = select.select_column(col); + select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string())); + } + + for col in crate::model::activity::Column::iter() { + select = select.select_column_as(col, format!("{}{}", crate::model::activity::Entity.table_name(), col.to_string())); } select diff --git a/src/model/faker.rs b/src/model/faker.rs index 0bb57c7a..1fbb36b1 100644 --- a/src/model/faker.rs +++ b/src/model/faker.rs @@ -57,7 +57,7 @@ pub async fn faker(db: &sea_orm::DatabaseConnection, domain: String, count: u64) addressing::Entity::insert(addressing::ActiveModel { actor: Set(apb::target::PUBLIC.to_string()), server: Set("www.w3.org".to_string()), - activity: Set(format!("{domain}/activities/{aid}")), + activity: Set(Some(format!("{domain}/activities/{aid}"))), object: Set(Some(format!("{domain}/objects/{oid}"))), published: Set(chrono::Utc::now()), ..Default::default() diff --git a/src/server/context.rs b/src/server/context.rs index 3e170312..112f59b4 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -156,18 +156,19 @@ impl Context { Ok(out) } - pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { - let local_activity = self.is_local(aid); + pub async fn address_to(&self, aid: Option<&str>, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { + let local_activity = aid.map(|x| self.is_local(x)).unwrap_or(false); + let local_object = oid.map(|x| self.is_local(x)).unwrap_or(false); let addressings : Vec = targets .iter() .filter(|to| !to.is_empty()) .filter(|to| !to.ends_with("/followers")) - .filter(|to| local_activity || to.as_str() == apb::target::PUBLIC || self.is_local(to)) + .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to)) .map(|to| model::addressing::ActiveModel { id: sea_orm::ActiveValue::NotSet, server: Set(Context::server(to)), actor: Set(to.to_string()), - activity: Set(aid.to_string()), + activity: Set(aid.map(|x| x.to_string())), object: Set(oid.map(|x| x.to_string())), published: Set(chrono::Utc::now()), }) @@ -241,7 +242,7 @@ impl Context { pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { let addressed = self.expand_addressing(activity_targets).await?; - self.address_to(aid, oid, &addressed).await?; + self.address_to(Some(aid), oid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?; Ok(()) } diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 8d98fdd9..c6ae905f 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -114,7 +114,7 @@ impl Fetcher for Context { .exec(self.db()).await?; let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(&activity_model.id, activity_model.object.as_deref(), &expanded_addresses).await?; + self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?; Ok(activity_model) } @@ -131,29 +131,9 @@ impl Fetcher for Context { let addressed = object.addressed(); let object_model = model::object::Model::new(&object)?; - // since bare objects make no sense in our representation, we create a mock activity attributed to - // our server actor which creates this object. we respect all addressing we were made aware of - // and claim no ownership of this object, pointing to the original author if it's given. - // TODO it may be cool to make a system that, when the "true" activity is discovered, deletes - // this and replaces the addressing entries? idk kinda lot of work - let wrapper_activity_model = model::activity::Model { - id: self.aid(uuid::Uuid::new_v4().to_string()), - activity_type: apb::ActivityType::Create, - actor: self.base(), - object: Some(object_model.id.clone()), - target: object_model.attributed_to.clone(), - cc: object_model.cc.clone(), - bcc: object_model.bcc.clone(), - to: object_model.to.clone(), - bto: object_model.bto.clone(), - published: chrono::Utc::now(), - }; - let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(&wrapper_activity_model.id, Some(&object_model.id), &expanded_addresses).await?; + self.address_to(None, Some(&object_model.id), &expanded_addresses).await?; - model::activity::Entity::insert(wrapper_activity_model.into_active_model()) - .exec(self.db()).await?; model::object::Entity::insert(object_model.clone().into_active_model()) .exec(self.db()).await?; diff --git a/src/server/inbox.rs b/src/server/inbox.rs index bd3747b7..678f4a5f 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -34,7 +34,7 @@ impl apb::server::Inbox for Context { model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(&aid, Some(&oid), &expanded_addressing).await?; + self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?; tracing::info!("{} posted {}", aid, oid); Ok(()) } @@ -65,7 +65,7 @@ impl apb::server::Inbox for Context { .exec(self.db()) .await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(aid, None, &expanded_addressing).await?; + self.address_to(Some(aid), None, &expanded_addressing).await?; model::object::Entity::update_many() .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) .filter(model::object::Column::Id.eq(oid.clone())) @@ -84,7 +84,7 @@ impl apb::server::Inbox for Context { model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(&aid, None, &expanded_addressing).await?; + self.address_to(Some(&aid), None, &expanded_addressing).await?; Ok(()) } @@ -125,7 +125,7 @@ impl apb::server::Inbox for Context { ).exec(self.db()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(&activity_model.id, None, &expanded_addressing).await?; + self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?; Ok(()) } @@ -151,7 +151,7 @@ impl apb::server::Inbox for Context { .await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(&activity_model.id, None, &expanded_addressing).await?; + self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?; Ok(()) } @@ -199,7 +199,7 @@ impl apb::server::Inbox for Context { .exec(self.db()) .await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(&aid, Some(&oid), &expanded_addressing).await?; + self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?; Ok(()) } @@ -263,7 +263,7 @@ impl apb::server::Inbox for Context { }; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(&activity_model.id, None, &expanded_addressing).await?; + self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?; model::share::Entity::insert(share) .exec(self.db()).await?; model::activity::Entity::insert(activity_model.clone().into_active_model())