From ae4950b2e7f4bbe6441cc27f0bb5073ca61d5f7b Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 25 Jun 2024 05:25:57 +0200 Subject: [PATCH] feat: implemented relay unfollow/remove, fixes --- upub/cli/src/relay.rs | 101 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 14 deletions(-) diff --git a/upub/cli/src/relay.rs b/upub/cli/src/relay.rs index 6ab72bfa..dd7d10f2 100644 --- a/upub/cli/src/relay.rs +++ b/upub/cli/src/relay.rs @@ -30,14 +30,26 @@ pub enum RelayCommand { } pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> { + let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; + + let their_internal = match &action { + RelayCommand::Status => 0, + RelayCommand::Follow { actor } + | RelayCommand::Accept { actor } + | RelayCommand::Unfollow { actor } + | RelayCommand::Remove { actor } + => + upub::model::actor::Entity::ap_to_internal(actor, ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(actor.clone()))?, + }; + match action { RelayCommand::Status => { - let internal_actor = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) - .await? - .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; - tracing::info!("active sinks:"); - for sink in upub::Query::related(None, Some(internal_actor), false) + for sink in upub::Query::related(None, Some(my_internal), false) .into_model::() .all(ctx.db()) .await? @@ -46,7 +58,7 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE } tracing::info!("active sources:"); - for source in upub::Query::related(Some(internal_actor), None, false) + for source in upub::Query::related(Some(my_internal), None, false) .into_model::() .all(ctx.db()) .await? @@ -81,12 +93,6 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE }, RelayCommand::Accept { actor } => { - let their_internal = upub::model::actor::Entity::ap_to_internal(&actor, ctx.db()) - .await? - .ok_or_else(|| DbErr::RecordNotFound(actor.clone()))?; - let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) - .await? - .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; let relation = upub::model::relation::Entity::find() .filter(upub::model::relation::Column::Follower.eq(their_internal)) .filter(upub::model::relation::Column::Following.eq(my_internal)) @@ -121,9 +127,76 @@ pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullE upub::model::job::Entity::insert(job).exec(ctx.db()).await?; }, - RelayCommand::Remove { .. } => todo!(), + RelayCommand::Remove { actor } => { + let relation = upub::model::relation::Entity::find() + .filter(upub::model::relation::Column::Follower.eq(their_internal)) + .filter(upub::model::relation::Column::Following.eq(my_internal)) + .one(ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?; + let accept_activity_id = relation.accept.ok_or(DbErr::RecordNotFound(format!("accept-{their_internal}-{my_internal}")))?; + let activity = upub::model::activity::Entity::find_by_id(accept_activity_id) + .one(ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", accept_activity_id)))?; + let aid = ctx.aid(&upub::Context::new_id()); + let payload = apb::new() + .set_id(Some(&aid)) + .set_activity_type(Some(apb::ActivityType::Undo)) + .set_actor(apb::Node::link(ctx.base().to_string())) + .set_object(apb::Node::object(activity.ap())) + .set_to(apb::Node::links(vec![actor.clone()])) + .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()])) + .set_published(Some(chrono::Utc::now())); + let job = upub::model::job::ActiveModel { + internal: NotSet, + activity: Set(aid.clone()), + job_type: Set(upub::model::job::JobType::Outbound), + actor: Set(ctx.base().to_string()), + target: Set(None), + payload: Set(Some(payload)), + attempt: Set(0), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + }; + tracing::info!("unfollowing relay {actor}"); + upub::model::job::Entity::insert(job).exec(ctx.db()).await?; + }, - RelayCommand::Unfollow { .. } => todo!(), + RelayCommand::Unfollow { actor } => { + let relation = upub::model::relation::Entity::find() + .filter(upub::model::relation::Column::Follower.eq(my_internal)) + .filter(upub::model::relation::Column::Following.eq(their_internal)) + .one(ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(format!("relation-{my_internal}-{their_internal}")))?; + let activity = upub::model::activity::Entity::find_by_id(relation.activity) + .one(ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?; + let aid = ctx.aid(&upub::Context::new_id()); + let payload = apb::new() + .set_id(Some(&aid)) + .set_activity_type(Some(apb::ActivityType::Undo)) + .set_actor(apb::Node::link(ctx.base().to_string())) + .set_object(apb::Node::object(activity.ap())) + .set_to(apb::Node::links(vec![actor.clone()])) + .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()])) + .set_published(Some(chrono::Utc::now())); + let job = upub::model::job::ActiveModel { + internal: NotSet, + activity: Set(aid.clone()), + job_type: Set(upub::model::job::JobType::Outbound), + actor: Set(ctx.base().to_string()), + target: Set(None), + payload: Set(Some(payload)), + attempt: Set(0), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + }; + tracing::info!("unfollowing relay {actor}"); + upub::model::job::Entity::insert(job).exec(ctx.db()).await?; + }, } Ok(())