fix: don't create mock activities

basically remade addressing table so that activity too can be null
This commit is contained in:
əlemi 2024-04-21 15:41:29 +02:00
parent fa74c3dd0a
commit 379b4daa84
Signed by: alemi
GPG key ID: A4895B84D311642C
7 changed files with 107 additions and 38 deletions

View file

@ -0,0 +1,48 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Addressing::Table)
.modify_column(
ColumnDef::new(Addressing::Activity)
.string()
.null()
)
.to_owned()
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Addressing::Table)
.modify_column(
ColumnDef::new(Addressing::Activity)
.string()
.not_null()
.default("")
)
.to_owned()
)
.await?;
Ok(())
}
}
#[derive(DeriveIden)]
enum Addressing {
Table,
Activity,
}

View file

@ -10,6 +10,7 @@ mod m20240324_000001_add_addressing;
mod m20240325_000001_add_deliveries; mod m20240325_000001_add_deliveries;
mod m20240325_000002_add_system_key; mod m20240325_000002_add_system_key;
mod m20240418_000001_add_statuses_and_reply_to; mod m20240418_000001_add_statuses_and_reply_to;
mod m20240421_000001_make_addressed_activity_nullable;
pub struct Migrator; pub struct Migrator;
@ -27,6 +28,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240325_000001_add_deliveries::Migration), Box::new(m20240325_000001_add_deliveries::Migration),
Box::new(m20240325_000002_add_system_key::Migration), Box::new(m20240325_000002_add_system_key::Migration),
Box::new(m20240418_000001_add_statuses_and_reply_to::Migration), Box::new(m20240418_000001_add_statuses_and_reply_to::Migration),
Box::new(m20240421_000001_make_addressed_activity_nullable::Migration),
] ]
} }
} }

View file

@ -8,7 +8,7 @@ pub struct Model {
pub id: i64, pub id: i64,
pub actor: String, pub actor: String,
pub server: String, pub server: String,
pub activity: String, pub activity: Option<String>,
pub object: Option<String>, pub object: Option<String>,
pub published: ChronoDateTimeUtc, pub published: ChronoDateTimeUtc,
} }
@ -57,6 +57,10 @@ impl Related<super::object::Entity> for Entity {
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}
#[derive(Debug)] #[derive(Debug)]
pub struct EmbeddedActivity { pub struct EmbeddedActivity {
pub activity: crate::model::activity::Model, pub activity: crate::model::activity::Model,
@ -81,6 +85,35 @@ impl FromQueryResult for EmbeddedActivity {
} }
} }
#[derive(Debug)]
pub struct WrappedObject {
pub activity: Option<crate::model::activity::Model>,
pub object: crate::model::object::Model,
}
impl From<WrappedObject> for serde_json::Value {
fn from(value: WrappedObject) -> Self {
match value.activity {
None => value.object.ap(),
Some(a) => a.ap().set_object(
Node::object(value.object.ap())
),
}
}
}
impl FromQueryResult for WrappedObject {
fn from_query_result(res: &sea_orm::QueryResult, _pre: &str) -> Result<Self, sea_orm::DbErr> {
let activity = crate::model::activity::Model::from_query_result(res, crate::model::activity::Entity.table_name()).ok();
let object = crate::model::object::Model::from_query_result(res, crate::model::object::Entity.table_name())?;
Ok(Self { activity, object })
}
}
impl Entity { impl Entity {
pub fn find_activities() -> Select<Entity> { pub fn find_activities() -> Select<Entity> {
let mut select = Entity::find() let mut select = Entity::find()
@ -106,12 +139,17 @@ impl Entity {
let mut select = Entity::find() let mut select = Entity::find()
.distinct() .distinct()
.select_only() .select_only()
.join(sea_orm::JoinType::InnerJoin, Relation::Object.def()); .join(sea_orm::JoinType::InnerJoin, Relation::Object.def())
// INNERJOIN: filter out addressings for which we don't have an object anymore // INNERJOIN: filter out addressings for which we don't have an object anymore
// TODO we could in theory return just the link or fetch them again, just ignoring them is mehh // TODO we could in theory return just the link or fetch them again, just ignoring them is mehh
.join(sea_orm::JoinType::LeftJoin, crate::model::object::Relation::Activity.def().rev());
for col in crate::model::object::Column::iter() { for col in crate::model::object::Column::iter() {
select = select.select_column(col); select = select.select_column_as(col, format!("{}{}", crate::model::object::Entity.table_name(), col.to_string()));
}
for col in crate::model::activity::Column::iter() {
select = select.select_column_as(col, format!("{}{}", crate::model::activity::Entity.table_name(), col.to_string()));
} }
select select

View file

@ -57,7 +57,7 @@ pub async fn faker(db: &sea_orm::DatabaseConnection, domain: String, count: u64)
addressing::Entity::insert(addressing::ActiveModel { addressing::Entity::insert(addressing::ActiveModel {
actor: Set(apb::target::PUBLIC.to_string()), actor: Set(apb::target::PUBLIC.to_string()),
server: Set("www.w3.org".to_string()), server: Set("www.w3.org".to_string()),
activity: Set(format!("{domain}/activities/{aid}")), activity: Set(Some(format!("{domain}/activities/{aid}"))),
object: Set(Some(format!("{domain}/objects/{oid}"))), object: Set(Some(format!("{domain}/objects/{oid}"))),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
..Default::default() ..Default::default()

View file

@ -156,18 +156,19 @@ impl Context {
Ok(out) Ok(out)
} }
pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { pub async fn address_to(&self, aid: Option<&str>, oid: Option<&str>, targets: &[String]) -> crate::Result<()> {
let local_activity = self.is_local(aid); let local_activity = aid.map(|x| self.is_local(x)).unwrap_or(false);
let local_object = oid.map(|x| self.is_local(x)).unwrap_or(false);
let addressings : Vec<model::addressing::ActiveModel> = targets let addressings : Vec<model::addressing::ActiveModel> = targets
.iter() .iter()
.filter(|to| !to.is_empty()) .filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers")) .filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || to.as_str() == apb::target::PUBLIC || self.is_local(to)) .filter(|to| local_activity || local_object || to.as_str() == apb::target::PUBLIC || self.is_local(to))
.map(|to| model::addressing::ActiveModel { .map(|to| model::addressing::ActiveModel {
id: sea_orm::ActiveValue::NotSet, id: sea_orm::ActiveValue::NotSet,
server: Set(Context::server(to)), server: Set(Context::server(to)),
actor: Set(to.to_string()), actor: Set(to.to_string()),
activity: Set(aid.to_string()), activity: Set(aid.map(|x| x.to_string())),
object: Set(oid.map(|x| x.to_string())), object: Set(oid.map(|x| x.to_string())),
published: Set(chrono::Utc::now()), published: Set(chrono::Utc::now()),
}) })
@ -241,7 +242,7 @@ impl Context {
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> { pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(activity_targets).await?; let addressed = self.expand_addressing(activity_targets).await?;
self.address_to(aid, oid, &addressed).await?; self.address_to(Some(aid), oid, &addressed).await?;
self.deliver_to(aid, uid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?;
Ok(()) Ok(())
} }

View file

@ -114,7 +114,7 @@ impl Fetcher for Context {
.exec(self.db()).await?; .exec(self.db()).await?;
let expanded_addresses = self.expand_addressing(addressed).await?; let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(&activity_model.id, activity_model.object.as_deref(), &expanded_addresses).await?; self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?;
Ok(activity_model) Ok(activity_model)
} }
@ -131,29 +131,9 @@ impl Fetcher for Context {
let addressed = object.addressed(); let addressed = object.addressed();
let object_model = model::object::Model::new(&object)?; let object_model = model::object::Model::new(&object)?;
// since bare objects make no sense in our representation, we create a mock activity attributed to
// our server actor which creates this object. we respect all addressing we were made aware of
// and claim no ownership of this object, pointing to the original author if it's given.
// TODO it may be cool to make a system that, when the "true" activity is discovered, deletes
// this and replaces the addressing entries? idk kinda lot of work
let wrapper_activity_model = model::activity::Model {
id: self.aid(uuid::Uuid::new_v4().to_string()),
activity_type: apb::ActivityType::Create,
actor: self.base(),
object: Some(object_model.id.clone()),
target: object_model.attributed_to.clone(),
cc: object_model.cc.clone(),
bcc: object_model.bcc.clone(),
to: object_model.to.clone(),
bto: object_model.bto.clone(),
published: chrono::Utc::now(),
};
let expanded_addresses = self.expand_addressing(addressed).await?; let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(&wrapper_activity_model.id, Some(&object_model.id), &expanded_addresses).await?; self.address_to(None, Some(&object_model.id), &expanded_addresses).await?;
model::activity::Entity::insert(wrapper_activity_model.into_active_model())
.exec(self.db()).await?;
model::object::Entity::insert(object_model.clone().into_active_model()) model::object::Entity::insert(object_model.clone().into_active_model())
.exec(self.db()).await?; .exec(self.db()).await?;

View file

@ -34,7 +34,7 @@ impl apb::server::Inbox for Context {
model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, Some(&oid), &expanded_addressing).await?; self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?;
tracing::info!("{} posted {}", aid, oid); tracing::info!("{} posted {}", aid, oid);
Ok(()) Ok(())
} }
@ -65,7 +65,7 @@ impl apb::server::Inbox for Context {
.exec(self.db()) .exec(self.db())
.await?; .await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(aid, None, &expanded_addressing).await?; self.address_to(Some(aid), None, &expanded_addressing).await?;
model::object::Entity::update_many() model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Id.eq(oid.clone())) .filter(model::object::Column::Id.eq(oid.clone()))
@ -84,7 +84,7 @@ impl apb::server::Inbox for Context {
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?; .exec(self.db()).await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, None, &expanded_addressing).await?; self.address_to(Some(&aid), None, &expanded_addressing).await?;
Ok(()) Ok(())
} }
@ -125,7 +125,7 @@ impl apb::server::Inbox for Context {
).exec(self.db()).await?; ).exec(self.db()).await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&activity_model.id, None, &expanded_addressing).await?; self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?;
Ok(()) Ok(())
} }
@ -151,7 +151,7 @@ impl apb::server::Inbox for Context {
.await?; .await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&activity_model.id, None, &expanded_addressing).await?; self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?;
Ok(()) Ok(())
} }
@ -199,7 +199,7 @@ impl apb::server::Inbox for Context {
.exec(self.db()) .exec(self.db())
.await?; .await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, Some(&oid), &expanded_addressing).await?; self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?;
Ok(()) Ok(())
} }
@ -263,7 +263,7 @@ impl apb::server::Inbox for Context {
}; };
let expanded_addressing = self.expand_addressing(activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&activity_model.id, None, &expanded_addressing).await?; self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?;
model::share::Entity::insert(share) model::share::Entity::insert(share)
.exec(self.db()).await?; .exec(self.db()).await?;
model::activity::Entity::insert(activity_model.clone().into_active_model()) model::activity::Entity::insert(activity_model.clone().into_active_model())