Compare commits

..

No commits in common. "9fce61ea7823b584fa4ef51612d0e6169a4dae81" and "fde3372bcc49c980afed173f41f04d5b3ec7976b" have entirely different histories.

3 changed files with 41 additions and 131 deletions

View file

@ -40,11 +40,14 @@ pub enum CliCommand {
save: bool, save: bool,
}, },
/// act on remote relay actors at instance level /// follow a remote relay
Relay { Relay {
#[clap(subcommand)] /// actor url, same as with pleroma
/// action to take against this relay actor: String,
action: RelayCommand,
#[arg(long, default_value_t = false)]
/// instead of sending a follow request, send an accept
accept: bool
}, },
/// run db maintenance tasks /// run db maintenance tasks
@ -119,8 +122,8 @@ pub async fn run(ctx: upub::Context, command: CliCommand) -> Result<(), Box<dyn
Ok(faker(ctx, count as i64).await?), Ok(faker(ctx, count as i64).await?),
CliCommand::Fetch { uri, save } => CliCommand::Fetch { uri, save } =>
Ok(fetch(ctx, uri, save).await?), Ok(fetch(ctx, uri, save).await?),
CliCommand::Relay { action } => CliCommand::Relay { actor, accept } =>
Ok(relay(ctx, action).await?), Ok(relay(ctx, actor, accept).await?),
CliCommand::Fix { likes, shares, replies } => CliCommand::Fix { likes, shares, replies } =>
Ok(fix(ctx, likes, shares, replies).await?), Ok(fix(ctx, likes, shares, replies).await?),
CliCommand::Update { days } => CliCommand::Update { days } =>

View file

@ -1,129 +1,40 @@
use apb::{ActivityMut, BaseMut, ObjectMut}; use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
use upub::traits::fetch::PullError;
#[derive(Debug, Clone, clap::Subcommand)] pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<(), sea_orm::DbErr> {
/// available actions to take on relays let aid = ctx.aid(&uuid::Uuid::new_v4().to_string());
pub enum RelayCommand {
/// get all current pending and accepted relays
Status,
/// request to follow a specific relay
Follow {
/// relay actor to follow (must be full AP id, like for pleroma)
actor: String,
},
/// accept a pending relay request
Accept {
/// relay actor to accept (must be full AP id, like for pleroma)
actor: String,
},
/// retract a follow relation to a relay, stopping receiving content
Unfollow {
/// relay actor to unfollow (must be full AP id, like for pleroma)
actor: String,
},
/// remove a follow relation from a relay, stopping sending content
Remove {
/// relay actor to unfollow (must be full AP id, like for pleroma)
actor: String,
},
}
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> { let mut activity_model = upub::model::activity::ActiveModel {
match action { internal: NotSet,
RelayCommand::Status => { id: Set(aid.clone()),
let internal_actor = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db()) activity_type: Set(apb::ActivityType::Follow),
.await? actor: Set(ctx.base().to_string()),
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?; object: Set(Some(actor.clone())),
target: Set(None),
published: Set(chrono::Utc::now()),
to: Set(upub::model::Audience(vec![actor.clone()])),
bto: Set(upub::model::Audience::default()),
cc: Set(upub::model::Audience(vec![apb::target::PUBLIC.to_string()])),
bcc: Set(upub::model::Audience::default()),
};
tracing::info!("active sinks:"); if accept {
for sink in upub::Query::related(None, Some(internal_actor), false) let follow_req = upub::model::activity::Entity::find()
.into_model::<upub::model::actor::Model>() .filter(upub::model::activity::Column::ActivityType.eq("Follow"))
.all(ctx.db()) .filter(upub::model::activity::Column::Actor.eq(&actor))
.await? .filter(upub::model::activity::Column::Object.eq(ctx.base()))
{ .order_by_desc(upub::model::activity::Column::Published)
tracing::info!("[>>] {} {}", sink.name.unwrap_or_default(), sink.id); .one(ctx.db())
} .await?
.expect("no follow request to accept");
activity_model.activity_type = Set(apb::ActivityType::Accept(apb::AcceptType::Accept));
activity_model.object = Set(Some(follow_req.id));
};
tracing::info!("active sources:"); upub::model::activity::Entity::insert(activity_model)
for source in upub::Query::related(Some(internal_actor), None, false) .exec(ctx.db()).await?;
.into_model::<upub::model::actor::Model>()
.all(ctx.db())
.await?
{
tracing::info!("[<<] {} {}", source.name.unwrap_or_default(), source.id);
}
},
RelayCommand::Follow { actor } => { // TODO!!!
let aid = ctx.aid(&upub::Context::new_id()); // ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?;
let payload = apb::new()
.set_id(Some(&aid))
.set_activity_type(Some(apb::ActivityType::Follow))
.set_actor(apb::Node::link(ctx.base().to_string()))
.set_object(apb::Node::link(actor.clone()))
.set_to(apb::Node::links(vec![actor.clone()]))
.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
.set_published(Some(chrono::Utc::now()));
let job = upub::model::job::ActiveModel {
internal: NotSet,
activity: Set(aid.clone()),
job_type: Set(upub::model::job::JobType::Outbound),
actor: Set(ctx.base().to_string()),
target: Set(None),
payload: Set(Some(payload)),
attempt: Set(0),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
};
tracing::info!("following relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
},
RelayCommand::Accept { actor } => {
let their_internal = upub::model::actor::Entity::ap_to_internal(&actor, ctx.db())
.await?
.ok_or_else(|| DbErr::RecordNotFound(actor.clone()))?;
let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
.await?
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
let relation = upub::Query::related(Some(their_internal), Some(my_internal), true)
.into_model::<upub::model::relation::Model>()
.one(ctx.db())
.await?
.ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?;
let activity = upub::model::activity::Entity::find_by_id(relation.activity)
.one(ctx.db())
.await?
.ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?;
let aid = ctx.aid(&upub::Context::new_id());
let payload = apb::new()
.set_id(Some(&aid))
.set_activity_type(Some(apb::ActivityType::Accept(apb::AcceptType::Accept)))
.set_actor(apb::Node::link(ctx.base().to_string()))
.set_object(apb::Node::link(activity.id))
.set_to(apb::Node::links(vec![actor.clone()]))
.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
.set_published(Some(chrono::Utc::now()));
let job = upub::model::job::ActiveModel {
internal: NotSet,
activity: Set(aid.clone()),
job_type: Set(upub::model::job::JobType::Outbound),
actor: Set(ctx.base().to_string()),
target: Set(None),
payload: Set(Some(payload)),
attempt: Set(0),
published: Set(chrono::Utc::now()),
not_before: Set(chrono::Utc::now()),
};
tracing::info!("accepting relay {actor}");
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
},
RelayCommand::Remove { .. } => todo!(),
RelayCommand::Unfollow { .. } => todo!(),
}
Ok(()) Ok(())
} }

View file

@ -97,10 +97,6 @@ impl Query {
.filter(condition) .filter(condition)
.select_only(); .select_only();
for column in model::relation::Column::iter() {
select = select.select_column_as(column, format!("{}{}", model::relation::Entity.table_name(), column.to_string()));
}
for column in model::actor::Column::iter() { for column in model::actor::Column::iter() {
select = select.select_column_as(column, format!("{}{}", model::actor::Entity.table_name(), column.to_string())); select = select.select_column_as(column, format!("{}{}", model::actor::Entity.table_name(), column.to_string()));
} }