1
0
Fork 0
forked from alemi/upub

chore: initial work converting outbox logic

This commit is contained in:
əlemi 2024-05-26 18:42:22 +02:00
parent c94bfdcbe8
commit 3c3e98a4f4
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 146 additions and 102 deletions

View file

@ -127,6 +127,10 @@ impl Entity {
Entity::find().filter(Column::Id.eq(id)) Entity::find().filter(Column::Id.eq(id))
} }
pub fn delete_by_ap_id(id: &str) -> sea_orm::DeleteMany<Entity> {
Entity::delete_many().filter(Column::Id.eq(id))
}
pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result<i64> { pub async fn ap_to_internal(id: &str, db: &DatabaseConnection) -> crate::Result<i64> {
Entity::find() Entity::find()
.filter(Column::Id.eq(id)) .filter(Column::Id.eq(id))

View file

@ -1,4 +1,4 @@
use apb::{target::Addressed, Activity, ActivityMut, ActorMut, BaseMut, Node, Object, ObjectMut, PublicKeyMut}; use apb::{target::Addressed, Activity, ActivityMut, ActorMut, Base, BaseMut, Node, Object, ObjectMut, PublicKeyMut};
use reqwest::StatusCode; use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{sea_query::Expr, ActiveModelTrait, ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns};
@ -44,27 +44,26 @@ impl apb::server::Outbox for Context {
object object
.set_id(Some(&oid)) .set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone())) .set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
.set_content(content.as_deref()) .set_content(content.as_deref())
.set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))), .set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))),
Some(self.domain().to_string()), Some(self.domain().to_string()),
).await?; ).await?;
let activity_model = model::activity::Model { let activity_model = model::activity::ActiveModel {
id: aid.clone(), internal: NotSet,
activity_type: apb::ActivityType::Create, id: Set(aid.clone()),
actor: uid.clone(), activity_type: Set(apb::ActivityType::Create),
object: Some(oid.clone()), actor: Set(uid.clone()),
target: None, object: Set(Some(oid.clone())),
cc: object_model.cc.clone(), target: Set(None),
bcc: object_model.bcc.clone(), cc: Set(object_model.cc.clone()),
to: object_model.to.clone(), bcc: Set(object_model.bcc.clone()),
bto: object_model.bto.clone(), to: Set(object_model.to.clone()),
published: object_model.published, bto: Set(object_model.bto.clone()),
published: Set(object_model.published),
}; };
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model).exec(self.db()).await?;
.exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
@ -85,19 +84,18 @@ impl apb::server::Outbox for Context {
object object
.set_id(Some(&oid)) .set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone())) .set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
.set_to(activity.to()) .set_to(activity.to())
.set_bto(activity.bto()) .set_bto(activity.bto())
.set_cc(activity.cc()) .set_cc(activity.cc())
.set_bcc(activity.bcc()), .set_bcc(activity.bcc())
.set_url(Node::maybe_link(self.cfg().instance.frontend.as_ref().map(|x| format!("{x}/objects/{raw_oid}")))),
Some(self.domain().to_string()), Some(self.domain().to_string()),
).await?; ).await?;
let activity_model = model::activity::Model::new( let activity_model = model::activity::ActiveModel::new(
&activity &activity
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
.set_object(Node::link(oid.clone())) .set_object(Node::link(oid.clone()))
)?; )?;
@ -112,26 +110,28 @@ impl apb::server::Outbox for Context {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed(); let activity_targets = activity.addressed();
let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; let oid = activity.object().id().ok_or_else(UpubError::bad_request)?;
self.fetch_object(&oid).await?; let obj_model = self.fetch_object(&oid).await?;
let activity_model = model::activity::Model::new( let activity_model = model::activity::ActiveModel::new(
&activity &activity
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_published(Some(chrono::Utc::now()))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
)?; )?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let like_model = model::like::ActiveModel { let like_model = model::like::ActiveModel {
actor: Set(uid.clone()), internal: NotSet,
likes: Set(oid.clone()), actor: Set(internal_uid),
date: Set(chrono::Utc::now()), object: Set(obj_model.internal),
..Default::default() published: Set(chrono::Utc::now()),
}; };
model::like::Entity::insert(like_model).exec(self.db()).await?; model::like::Entity::insert(like_model).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model)
.exec(self.db()).await?; .exec(self.db()).await?;
model::object::Entity::update_many() model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Id.eq(oid)) .filter(model::object::Column::Internal.eq(obj_model.internal))
.exec(self.db()) .exec(self.db())
.await?; .await?;
@ -143,17 +143,31 @@ impl apb::server::Outbox for Context {
async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> { async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed(); let activity_targets = activity.addressed();
if activity.object().id().is_none() { let target = activity.object().id().ok_or_else(UpubError::bad_request)?;
return Err(UpubError::bad_request());
}
let activity_model = model::activity::Model::new( let activity_model = model::activity::ActiveModel::new(
&activity &activity
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?; )?;
model::activity::Entity::insert(activity_model.into_active_model())
let follower_internal = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let following_internal = model::actor::Entity::ap_to_internal(&target, self.db()).await?;
model::activity::Entity::insert(activity_model)
.exec(self.db()).await?;
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
let relation_model = model::relation::ActiveModel {
internal: NotSet,
follower: Set(follower_internal),
following: Set(following_internal),
activity: Set(internal_aid),
accept: Set(None),
};
model::relation::Entity::insert(relation_model)
.exec(self.db()).await?; .exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, None).await?; self.dispatch(&uid, activity_targets, &aid, None).await?;
@ -164,17 +178,27 @@ impl apb::server::Outbox for Context {
async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> { async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed(); let activity_targets = activity.addressed();
if activity.object().id().is_none() { let accepted_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
let accepted_activity = model::activity::Entity::find_by_ap_id(&accepted_id)
.one(self.db()).await?
.ok_or_else(UpubError::not_found)?;
if accepted_activity.activity_type != apb::ActivityType::Follow {
return Err(UpubError::bad_request()); return Err(UpubError::bad_request());
} }
let Some(accepted_id) = activity.object().id() else { if uid != accepted_activity.object.ok_or_else(UpubError::bad_request)? {
return Err(UpubError::bad_request()); return Err(UpubError::forbidden());
}; }
let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id)
.one(self.db()).await? let activity_model = model::activity::ActiveModel::new(
else { &activity
return Err(UpubError::not_found()); .set_id(Some(&aid))
}; .set_actor(Node::link(uid.clone()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
match accepted_activity.activity_type { match accepted_activity.activity_type {
apb::ActivityType::Follow => { apb::ActivityType::Follow => {
@ -186,72 +210,97 @@ impl apb::server::Outbox for Context {
.filter(model::actor::Column::Id.eq(&uid)) .filter(model::actor::Column::Id.eq(&uid))
.exec(self.db()) .exec(self.db())
.await?; .await?;
model::relation::Entity::insert( model::relation::Entity::update_many()
model::relation::ActiveModel { .filter(model::relation::Column::Activity.eq(accepted_activity.internal))
follower: Set(accepted_activity.actor), following: Set(uid.clone()), .col_expr(model::relation::Column::Accept, Expr::value(Some(internal_aid)))
..Default::default()
}
).exec(self.db()).await?;
},
t => tracing::warn!("no side effects implemented for accepting {t:?}"),
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?; .exec(self.db()).await?;
},
t => tracing::error!("no side effects implemented for accepting {t:?}"),
}
self.dispatch(&uid, activity_targets, &aid, None).await?; self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid) Ok(aid)
} }
async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result<String> { async fn reject(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
todo!() let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
let rejected_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
let rejected_activity = model::activity::Entity::find_by_ap_id(&rejected_id)
.one(self.db()).await?
.ok_or_else(UpubError::not_found)?;
if rejected_activity.activity_type != apb::ActivityType::Follow {
return Err(UpubError::bad_request());
}
if uid != rejected_activity.object.ok_or_else(UpubError::bad_request)? {
return Err(UpubError::forbidden());
}
let activity_model = model::activity::ActiveModel::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
)?;
model::activity::Entity::insert(activity_model)
.exec(self.db()).await?;
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
model::relation::Entity::delete_many()
.filter(model::relation::Column::Activity.eq(internal_aid))
.exec(self.db())
.await?;
self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid)
} }
async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> { async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed(); let activity_targets = activity.addressed();
let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?; let old_aid = activity.object().id().ok_or_else(UpubError::bad_request)?;
let old_activity = model::activity::Entity::find_by_id(old_aid) let old_activity = model::activity::Entity::find_by_ap_id(&old_aid)
.one(self.db()) .one(self.db())
.await? .await?
.ok_or_else(UpubError::not_found)?; .ok_or_else(UpubError::not_found)?;
if old_activity.actor != uid { if old_activity.actor != uid {
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
} }
match old_activity.activity_type { let activity_object = old_activity.object.ok_or_else(UpubError::bad_request)?;
apb::ActivityType::Like => { let actor_internal = model::actor::Entity::ap_to_internal(&old_activity.actor, self.db()).await?;
model::like::Entity::delete_many()
.filter(model::like::Column::Actor.eq(old_activity.actor)) let activity_model = model::activity::ActiveModel::new(
.filter(model::like::Column::Likes.eq(old_activity.object.unwrap_or("".into())))
.exec(self.db())
.await?;
},
apb::ActivityType::Follow => {
model::relation::Entity::delete_many()
.filter(model::relation::Column::Follower.eq(old_activity.actor))
.filter(model::relation::Column::Following.eq(old_activity.object.unwrap_or("".into())))
.exec(self.db())
.await?;
},
t => tracing::warn!("extra side effects for activity {t:?} not implemented"),
}
let activity_model = model::activity::Model::new(
&activity &activity
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?; )?;
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()) .exec(self.db())
.await?; .await?;
match old_activity.activity_type {
apb::ActivityType::Like => {
let object_internal = model::object::Entity::ap_to_internal(&activity_object, self.db()).await?;
model::like::Entity::delete_many()
.filter(model::like::Column::Actor.eq(actor_internal))
.filter(model::like::Column::Object.eq(object_internal))
.exec(self.db())
.await?;
},
apb::ActivityType::Follow => {
let target_internal = model::actor::Entity::ap_to_internal(&activity_object, self.db()).await?;
model::relation::Entity::delete_many()
.filter(model::relation::Column::Follower.eq(actor_internal))
.filter(model::relation::Column::Following.eq(target_internal))
.exec(self.db())
.await?;
},
t => tracing::error!("extra side effects for activity {t:?} not implemented"),
}
self.dispatch(&uid, activity_targets, &aid, None).await?; self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid) Ok(aid)
@ -261,34 +310,28 @@ impl apb::server::Outbox for Context {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; let oid = activity.object().id().ok_or_else(UpubError::bad_request)?;
let object = model::object::Entity::find_by_id(&oid) let object = model::object::Entity::find_by_ap_id(&oid)
.one(self.db()) .one(self.db())
.await? .await?
.ok_or_else(UpubError::not_found)?; .ok_or_else(UpubError::not_found)?;
let Some(author_id) = object.attributed_to else { if uid != object.attributed_to.ok_or_else(UpubError::forbidden)? {
// can't change local objects attributed to nobody // can't change objects of others, and objects from noone count as others
return Err(UpubError::forbidden())
};
if author_id != uid {
// can't change objects of others
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
} }
let addressed = activity.addressed(); let addressed = activity.addressed();
let activity_model = model::activity::Model::new( let activity_model = model::activity::ActiveModel::new(
&activity &activity
.set_id(Some(&aid)) .set_id(Some(&aid))
.set_actor(Node::link(uid.clone())) .set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?; )?;
model::object::Entity::delete_by_id(&oid) model::activity::Entity::insert(activity_model)
.exec(self.db()) .exec(self.db())
.await?; .await?;
model::activity::Entity::insert(activity_model.into_active_model()) model::object::Entity::delete_by_ap_id(&oid)
.exec(self.db()) .exec(self.db())
.await?; .await?;
@ -301,17 +344,11 @@ impl apb::server::Outbox for Context {
let aid = self.aid(&uuid::Uuid::new_v4().to_string()); let aid = self.aid(&uuid::Uuid::new_v4().to_string());
let object_node = activity.object().extract().ok_or_else(UpubError::bad_request)?; let object_node = activity.object().extract().ok_or_else(UpubError::bad_request)?;
let target = object_node.id().ok_or_else(UpubError::bad_request)?.to_string();
match object_node.object_type() { match object_node.object_type() {
Some(apb::ObjectType::Actor(_)) => { Some(apb::ObjectType::Actor(_)) => {
let mut actor_model = model::actor::Model::new( let old_actor_model = model::actor::Entity::find_by_ap_id(&target)
&object_node
// TODO must set these, but we will ignore them
.set_actor_type(Some(apb::ActorType::Person))
.set_public_key(apb::Node::object(
serde_json::Value::new_object().set_public_key_pem("")
))
)?;
let old_actor_model = model::actor::Entity::find_by_id(&actor_model.id)
.one(self.db()) .one(self.db())
.await? .await?
.ok_or_else(UpubError::not_found)?; .ok_or_else(UpubError::not_found)?;
@ -321,6 +358,9 @@ impl apb::server::Outbox for Context {
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
} }
let mut new_actor_model = model::actor::ActiveModel::default();
new_actor_model.internal = Set(old_actor_model.internal);
if actor_model.name.is_none() { actor_model.name = old_actor_model.name } if actor_model.name.is_none() { actor_model.name = old_actor_model.name }
if actor_model.summary.is_none() { actor_model.summary = old_actor_model.summary } if actor_model.summary.is_none() { actor_model.summary = old_actor_model.summary }
if actor_model.image.is_none() { actor_model.image = old_actor_model.image } if actor_model.image.is_none() { actor_model.image = old_actor_model.image }