diff --git a/upub/cli/src/lib.rs b/upub/cli/src/lib.rs index 0787a4b..d07d0ba 100644 --- a/upub/cli/src/lib.rs +++ b/upub/cli/src/lib.rs @@ -40,14 +40,11 @@ pub enum CliCommand { save: bool, }, - /// follow a remote relay + /// act on remote relay actors at instance level Relay { - /// actor url, same as with pleroma - actor: String, - - #[arg(long, default_value_t = false)] - /// instead of sending a follow request, send an accept - accept: bool + #[clap(subcommand)] + /// action to take against this relay + action: RelayCommand, }, /// run db maintenance tasks @@ -122,8 +119,8 @@ pub async fn run(ctx: upub::Context, command: CliCommand) -> Result<(), Box Ok(fetch(ctx, uri, save).await?), - CliCommand::Relay { actor, accept } => - Ok(relay(ctx, actor, accept).await?), + CliCommand::Relay { action } => + Ok(relay(ctx, action).await?), CliCommand::Fix { likes, shares, replies } => Ok(fix(ctx, likes, shares, replies).await?), CliCommand::Update { days } => diff --git a/upub/cli/src/relay.rs b/upub/cli/src/relay.rs index ad51092..b8c3ac9 100644 --- a/upub/cli/src/relay.rs +++ b/upub/cli/src/relay.rs @@ -1,40 +1,129 @@ -use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; +use apb::{ActivityMut, BaseMut, ObjectMut}; +use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait}; +use upub::traits::fetch::PullError; -pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<(), sea_orm::DbErr> { - let aid = ctx.aid(&uuid::Uuid::new_v4().to_string()); +#[derive(Debug, Clone, clap::Subcommand)] +/// available actions to take on relays +pub enum RelayCommand { + /// get all current pending and accepted relays + Status, + /// request to follow a specific relay + Follow { + /// relay actor to follow (must be full AP id, like for pleroma) + actor: String, + }, + /// accept a pending relay request + Accept { + /// relay actor to accept (must be full AP id, like for pleroma) + actor: String, + }, + /// retract a follow relation to a relay, stopping receiving content + Unfollow { + /// relay actor to unfollow (must be full AP id, like for pleroma) + actor: String, + }, + /// remove a follow relation from a relay, stopping sending content + Remove { + /// relay actor to unfollow (must be full AP id, like for pleroma) + actor: String, + }, +} - let mut activity_model = upub::model::activity::ActiveModel { - internal: NotSet, - id: Set(aid.clone()), - activity_type: Set(apb::ActivityType::Follow), - actor: Set(ctx.base().to_string()), - object: Set(Some(actor.clone())), - target: Set(None), - published: Set(chrono::Utc::now()), - to: Set(upub::model::Audience(vec![actor.clone()])), - bto: Set(upub::model::Audience::default()), - cc: Set(upub::model::Audience(vec![apb::target::PUBLIC.to_string()])), - bcc: Set(upub::model::Audience::default()), - }; +pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> { + match action { + RelayCommand::Status => { + let internal_actor = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; - if accept { - let follow_req = upub::model::activity::Entity::find() - .filter(upub::model::activity::Column::ActivityType.eq("Follow")) - .filter(upub::model::activity::Column::Actor.eq(&actor)) - .filter(upub::model::activity::Column::Object.eq(ctx.base())) - .order_by_desc(upub::model::activity::Column::Published) - .one(ctx.db()) - .await? - .expect("no follow request to accept"); - activity_model.activity_type = Set(apb::ActivityType::Accept(apb::AcceptType::Accept)); - activity_model.object = Set(Some(follow_req.id)); - }; + tracing::info!("active sinks:"); + for sink in upub::Query::related(None, Some(internal_actor), false) + .into_model::() + .all(ctx.db()) + .await? + { + tracing::info!("[>>] {} {}", sink.name.unwrap_or_default(), sink.id); + } - upub::model::activity::Entity::insert(activity_model) - .exec(ctx.db()).await?; + tracing::info!("active sources:"); + for source in upub::Query::related(Some(internal_actor), None, false) + .into_model::() + .all(ctx.db()) + .await? + { + tracing::info!("[<<] {} {}", source.name.unwrap_or_default(), source.id); + } + }, - // TODO!!! - // ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?; + RelayCommand::Follow { actor } => { + let aid = ctx.aid(&upub::Context::new_id()); + let payload = apb::new() + .set_id(Some(&aid)) + .set_activity_type(Some(apb::ActivityType::Follow)) + .set_actor(apb::Node::link(ctx.base().to_string())) + .set_object(apb::Node::link(actor.clone())) + .set_to(apb::Node::links(vec![actor.clone()])) + .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()])) + .set_published(Some(chrono::Utc::now())); + let job = upub::model::job::ActiveModel { + internal: NotSet, + activity: Set(aid.clone()), + job_type: Set(upub::model::job::JobType::Outbound), + actor: Set(ctx.base().to_string()), + target: Set(None), + payload: Set(Some(payload)), + attempt: Set(0), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + }; + tracing::info!("following relay {actor}"); + upub::model::job::Entity::insert(job).exec(ctx.db()).await?; + }, + + RelayCommand::Accept { actor } => { + let their_internal = upub::model::actor::Entity::ap_to_internal(&actor, ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(actor.clone()))?; + let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; + let relation = upub::Query::related(Some(their_internal), Some(my_internal), true) + .into_model::() + .one(ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?; + let activity = upub::model::activity::Entity::find_by_id(relation.activity) + .one(ctx.db()) + .await? + .ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?; + let aid = ctx.aid(&upub::Context::new_id()); + let payload = apb::new() + .set_id(Some(&aid)) + .set_activity_type(Some(apb::ActivityType::Accept(apb::AcceptType::Accept))) + .set_actor(apb::Node::link(ctx.base().to_string())) + .set_object(apb::Node::link(activity.id)) + .set_to(apb::Node::links(vec![actor.clone()])) + .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()])) + .set_published(Some(chrono::Utc::now())); + let job = upub::model::job::ActiveModel { + internal: NotSet, + activity: Set(aid.clone()), + job_type: Set(upub::model::job::JobType::Outbound), + actor: Set(ctx.base().to_string()), + target: Set(None), + payload: Set(Some(payload)), + attempt: Set(0), + published: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + }; + tracing::info!("accepting relay {actor}"); + upub::model::job::Entity::insert(job).exec(ctx.db()).await?; + }, + + RelayCommand::Remove { .. } => todo!(), + + RelayCommand::Unfollow { .. } => todo!(), + } Ok(()) }