From 3a874ed3e7cd7ea7cbf54ea2f2207630c1adf29a Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 12 Apr 2024 18:40:24 +0200 Subject: [PATCH] fix: insert activities, expand addressing remote activities for Likes and Follows should still be added to our database so we know where to verify those and we can present them in a timeline for notifications --- src/server/context.rs | 41 ++++++++++++++++++++++++++--------------- src/server/inbox.rs | 42 +++++++++++++++++++++++++++++++----------- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/src/server/context.rs b/src/server/context.rs index dfe1f461..fba35269 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -130,28 +130,39 @@ impl Context { .to_string() } - pub async fn expand_addressing(&self, uid: &str, mut targets: Vec) -> crate::Result> { - let following_addr = format!("{uid}/followers"); - if let Some(i) = targets.iter().position(|x| x == &following_addr) { - targets.remove(i); - model::relation::Entity::find() - .filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string()))) - .select_only() - .select_column(model::relation::Column::Follower) - .into_tuple::() - .all(self.db()) - .await? - .into_iter() - .for_each(|x| targets.push(x)); + pub fn is_local(&self, id: &str) -> bool { + // TODO consider precalculating once this format! + id.starts_with(&format!("{}{}", self.0.protocol, self.0.domain)) + } + + pub async fn expand_addressing(&self, targets: Vec) -> crate::Result> { + let mut out = Vec::new(); + for target in targets { + if target.ends_with("/followers") { + let target_id = target.replace("/followers", ""); + model::relation::Entity::find() + .filter(Condition::all().add(model::relation::Column::Following.eq(target_id))) + .select_only() + .select_column(model::relation::Column::Follower) + .into_tuple::() + .all(self.db()) + .await? + .into_iter() + .for_each(|x| out.push(x)); + } else { + out.push(target); + } } - Ok(targets) + Ok(out) } pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { + let local_activity = self.is_local(aid); let addressings : Vec = targets .iter() .filter(|to| !to.is_empty()) .filter(|to| !to.ends_with("/followers")) + .filter(|to| local_activity || self.is_local(to)) .map(|to| model::addressing::ActiveModel { id: sea_orm::ActiveValue::NotSet, server: Set(Context::server(to)), @@ -221,7 +232,7 @@ impl Context { } pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { - let addressed = self.expand_addressing(uid, activity_targets).await?; + let addressed = self.expand_addressing(activity_targets).await?; self.address_to(aid, oid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?; Ok(()) diff --git a/src/server/inbox.rs b/src/server/inbox.rs index f1cf6657..ccdf2d7c 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -13,7 +13,6 @@ impl apb::server::Inbox for Context { async fn create(&self, activity: serde_json::Value) -> crate::Result<()> { let activity_model = model::activity::Model::new(&activity)?; - let activity_targets = activity.addressed(); let Some(object_node) = activity.object().extract() else { // TODO we could process non-embedded activities or arrays but im lazy rn tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); @@ -24,7 +23,8 @@ impl apb::server::Inbox for Context { let oid = object_model.id.clone(); model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; - self.address_to(&aid, Some(&oid), &activity_targets).await?; + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(&aid, Some(&oid), &expanded_addressing).await?; tracing::info!("{} posted {}", aid, oid); Ok(()) } @@ -46,6 +46,12 @@ impl apb::server::Inbox for Context { Err(UpubError::internal_server_error()) } Ok(_) => { + let activity_model = model::activity::Model::new(&activity)?.into_active_model(); + model::activity::Entity::insert(activity_model) + .exec(self.db()) + .await?; + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(&aid, None, &expanded_addressing).await?; model::object::Entity::update_many() .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) .filter(model::object::Column::Id.eq(oid.clone())) @@ -58,20 +64,20 @@ impl apb::server::Inbox for Context { } async fn follow(&self, activity: serde_json::Value) -> crate::Result<()> { - let activity_targets = activity.addressed(); let activity_model = model::activity::Model::new(&activity)?; let aid = activity_model.id.clone(); tracing::info!("{} wants to follow {}", activity_model.actor, activity_model.object.as_deref().unwrap_or("")); model::activity::Entity::insert(activity_model.into_active_model()) .exec(self.db()).await?; - self.address_to(&aid, None, &activity_targets).await?; + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(&aid, None, &expanded_addressing).await?; Ok(()) } async fn accept(&self, activity: serde_json::Value) -> crate::Result<()> { // TODO what about TentativeAccept let activity_model = model::activity::Model::new(&activity)?; - let Some(follow_request_id) = activity_model.object else { + let Some(follow_request_id) = &activity_model.object else { return Err(UpubError::bad_request()); }; let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) @@ -85,6 +91,9 @@ impl apb::server::Inbox for Context { tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor); + model::activity::Entity::insert(activity_model.clone().into_active_model()) + .exec(self.db()) + .await?; model::relation::Entity::insert( model::relation::ActiveModel { follower: Set(follow_activity.actor), @@ -93,14 +102,15 @@ impl apb::server::Inbox for Context { } ).exec(self.db()).await?; - self.address_to(&activity_model.id, None, &activity.addressed()).await?; + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(&activity_model.id, None, &expanded_addressing).await?; Ok(()) } async fn reject(&self, activity: serde_json::Value) -> crate::Result<()> { // TODO what about TentativeReject? let activity_model = model::activity::Model::new(&activity)?; - let Some(follow_request_id) = activity_model.object else { + let Some(follow_request_id) = &activity_model.object else { return Err(UpubError::bad_request()); }; let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) @@ -111,8 +121,15 @@ impl apb::server::Inbox for Context { if follow_activity.object.unwrap_or("".into()) != activity_model.actor { return Err(UpubError::forbidden()); } + tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor); - self.address_to(&activity_model.id, None, &activity.addressed()).await?; + + model::activity::Entity::insert(activity_model.clone().into_active_model()) + .exec(self.db()) + .await?; + + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(&activity_model.id, None, &expanded_addressing).await?; Ok(()) } @@ -128,7 +145,6 @@ impl apb::server::Inbox for Context { async fn update(&self, activity: serde_json::Value) -> crate::Result<()> { let activity_model = model::activity::Model::new(&activity)?; - let activity_targets = activity.addressed(); let Some(object_node) = activity.object().extract() else { // TODO we could process non-embedded activities or arrays but im lazy rn tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); @@ -138,7 +154,6 @@ impl apb::server::Inbox for Context { let Some(oid) = object_node.id().map(|x| x.to_string()) else { return Err(UpubError::bad_request()); }; - model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; match object_node.object_type() { Some(apb::ObjectType::Actor(_)) => { // TODO oof here is an example of the weakness of this model, we have to go all the way @@ -155,8 +170,13 @@ impl apb::server::Inbox for Context { Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"), None => tracing::warn!("empty type on embedded updated object"), } - self.address_to(&aid, Some(&oid), &activity_targets).await?; + tracing::info!("{} updated {}", aid, oid); + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()) + .await?; + let expanded_addressing = self.expand_addressing(activity.addressed()).await?; + self.address_to(&aid, Some(&oid), &expanded_addressing).await?; Ok(()) }