Compare commits
2 commits
fde3372bcc
...
9fce61ea78
Author | SHA1 | Date | |
---|---|---|---|
9fce61ea78 | |||
54b619dffc |
3 changed files with 131 additions and 41 deletions
|
@ -40,14 +40,11 @@ pub enum CliCommand {
|
||||||
save: bool,
|
save: bool,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// follow a remote relay
|
/// act on remote relay actors at instance level
|
||||||
Relay {
|
Relay {
|
||||||
/// actor url, same as with pleroma
|
#[clap(subcommand)]
|
||||||
actor: String,
|
/// action to take against this relay
|
||||||
|
action: RelayCommand,
|
||||||
#[arg(long, default_value_t = false)]
|
|
||||||
/// instead of sending a follow request, send an accept
|
|
||||||
accept: bool
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/// run db maintenance tasks
|
/// run db maintenance tasks
|
||||||
|
@ -122,8 +119,8 @@ pub async fn run(ctx: upub::Context, command: CliCommand) -> Result<(), Box<dyn
|
||||||
Ok(faker(ctx, count as i64).await?),
|
Ok(faker(ctx, count as i64).await?),
|
||||||
CliCommand::Fetch { uri, save } =>
|
CliCommand::Fetch { uri, save } =>
|
||||||
Ok(fetch(ctx, uri, save).await?),
|
Ok(fetch(ctx, uri, save).await?),
|
||||||
CliCommand::Relay { actor, accept } =>
|
CliCommand::Relay { action } =>
|
||||||
Ok(relay(ctx, actor, accept).await?),
|
Ok(relay(ctx, action).await?),
|
||||||
CliCommand::Fix { likes, shares, replies } =>
|
CliCommand::Fix { likes, shares, replies } =>
|
||||||
Ok(fix(ctx, likes, shares, replies).await?),
|
Ok(fix(ctx, likes, shares, replies).await?),
|
||||||
CliCommand::Update { days } =>
|
CliCommand::Update { days } =>
|
||||||
|
|
|
@ -1,40 +1,129 @@
|
||||||
use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
|
use apb::{ActivityMut, BaseMut, ObjectMut};
|
||||||
|
use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait};
|
||||||
|
use upub::traits::fetch::PullError;
|
||||||
|
|
||||||
pub async fn relay(ctx: upub::Context, actor: String, accept: bool) -> Result<(), sea_orm::DbErr> {
|
#[derive(Debug, Clone, clap::Subcommand)]
|
||||||
let aid = ctx.aid(&uuid::Uuid::new_v4().to_string());
|
/// available actions to take on relays
|
||||||
|
pub enum RelayCommand {
|
||||||
|
/// get all current pending and accepted relays
|
||||||
|
Status,
|
||||||
|
/// request to follow a specific relay
|
||||||
|
Follow {
|
||||||
|
/// relay actor to follow (must be full AP id, like for pleroma)
|
||||||
|
actor: String,
|
||||||
|
},
|
||||||
|
/// accept a pending relay request
|
||||||
|
Accept {
|
||||||
|
/// relay actor to accept (must be full AP id, like for pleroma)
|
||||||
|
actor: String,
|
||||||
|
},
|
||||||
|
/// retract a follow relation to a relay, stopping receiving content
|
||||||
|
Unfollow {
|
||||||
|
/// relay actor to unfollow (must be full AP id, like for pleroma)
|
||||||
|
actor: String,
|
||||||
|
},
|
||||||
|
/// remove a follow relation from a relay, stopping sending content
|
||||||
|
Remove {
|
||||||
|
/// relay actor to unfollow (must be full AP id, like for pleroma)
|
||||||
|
actor: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
let mut activity_model = upub::model::activity::ActiveModel {
|
pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), PullError> {
|
||||||
internal: NotSet,
|
match action {
|
||||||
id: Set(aid.clone()),
|
RelayCommand::Status => {
|
||||||
activity_type: Set(apb::ActivityType::Follow),
|
let internal_actor = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
|
||||||
actor: Set(ctx.base().to_string()),
|
.await?
|
||||||
object: Set(Some(actor.clone())),
|
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
|
||||||
target: Set(None),
|
|
||||||
published: Set(chrono::Utc::now()),
|
|
||||||
to: Set(upub::model::Audience(vec![actor.clone()])),
|
|
||||||
bto: Set(upub::model::Audience::default()),
|
|
||||||
cc: Set(upub::model::Audience(vec![apb::target::PUBLIC.to_string()])),
|
|
||||||
bcc: Set(upub::model::Audience::default()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if accept {
|
tracing::info!("active sinks:");
|
||||||
let follow_req = upub::model::activity::Entity::find()
|
for sink in upub::Query::related(None, Some(internal_actor), false)
|
||||||
.filter(upub::model::activity::Column::ActivityType.eq("Follow"))
|
.into_model::<upub::model::actor::Model>()
|
||||||
.filter(upub::model::activity::Column::Actor.eq(&actor))
|
.all(ctx.db())
|
||||||
.filter(upub::model::activity::Column::Object.eq(ctx.base()))
|
.await?
|
||||||
.order_by_desc(upub::model::activity::Column::Published)
|
{
|
||||||
.one(ctx.db())
|
tracing::info!("[>>] {} {}", sink.name.unwrap_or_default(), sink.id);
|
||||||
.await?
|
}
|
||||||
.expect("no follow request to accept");
|
|
||||||
activity_model.activity_type = Set(apb::ActivityType::Accept(apb::AcceptType::Accept));
|
|
||||||
activity_model.object = Set(Some(follow_req.id));
|
|
||||||
};
|
|
||||||
|
|
||||||
upub::model::activity::Entity::insert(activity_model)
|
tracing::info!("active sources:");
|
||||||
.exec(ctx.db()).await?;
|
for source in upub::Query::related(Some(internal_actor), None, false)
|
||||||
|
.into_model::<upub::model::actor::Model>()
|
||||||
|
.all(ctx.db())
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
tracing::info!("[<<] {} {}", source.name.unwrap_or_default(), source.id);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
// TODO!!!
|
RelayCommand::Follow { actor } => {
|
||||||
// ctx.dispatch(ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?;
|
let aid = ctx.aid(&upub::Context::new_id());
|
||||||
|
let payload = apb::new()
|
||||||
|
.set_id(Some(&aid))
|
||||||
|
.set_activity_type(Some(apb::ActivityType::Follow))
|
||||||
|
.set_actor(apb::Node::link(ctx.base().to_string()))
|
||||||
|
.set_object(apb::Node::link(actor.clone()))
|
||||||
|
.set_to(apb::Node::links(vec![actor.clone()]))
|
||||||
|
.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
|
||||||
|
.set_published(Some(chrono::Utc::now()));
|
||||||
|
let job = upub::model::job::ActiveModel {
|
||||||
|
internal: NotSet,
|
||||||
|
activity: Set(aid.clone()),
|
||||||
|
job_type: Set(upub::model::job::JobType::Outbound),
|
||||||
|
actor: Set(ctx.base().to_string()),
|
||||||
|
target: Set(None),
|
||||||
|
payload: Set(Some(payload)),
|
||||||
|
attempt: Set(0),
|
||||||
|
published: Set(chrono::Utc::now()),
|
||||||
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
};
|
||||||
|
tracing::info!("following relay {actor}");
|
||||||
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
},
|
||||||
|
|
||||||
|
RelayCommand::Accept { actor } => {
|
||||||
|
let their_internal = upub::model::actor::Entity::ap_to_internal(&actor, ctx.db())
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound(actor.clone()))?;
|
||||||
|
let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
|
||||||
|
let relation = upub::Query::related(Some(their_internal), Some(my_internal), true)
|
||||||
|
.into_model::<upub::model::relation::Model>()
|
||||||
|
.one(ctx.db())
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?;
|
||||||
|
let activity = upub::model::activity::Entity::find_by_id(relation.activity)
|
||||||
|
.one(ctx.db())
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?;
|
||||||
|
let aid = ctx.aid(&upub::Context::new_id());
|
||||||
|
let payload = apb::new()
|
||||||
|
.set_id(Some(&aid))
|
||||||
|
.set_activity_type(Some(apb::ActivityType::Accept(apb::AcceptType::Accept)))
|
||||||
|
.set_actor(apb::Node::link(ctx.base().to_string()))
|
||||||
|
.set_object(apb::Node::link(activity.id))
|
||||||
|
.set_to(apb::Node::links(vec![actor.clone()]))
|
||||||
|
.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
|
||||||
|
.set_published(Some(chrono::Utc::now()));
|
||||||
|
let job = upub::model::job::ActiveModel {
|
||||||
|
internal: NotSet,
|
||||||
|
activity: Set(aid.clone()),
|
||||||
|
job_type: Set(upub::model::job::JobType::Outbound),
|
||||||
|
actor: Set(ctx.base().to_string()),
|
||||||
|
target: Set(None),
|
||||||
|
payload: Set(Some(payload)),
|
||||||
|
attempt: Set(0),
|
||||||
|
published: Set(chrono::Utc::now()),
|
||||||
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
};
|
||||||
|
tracing::info!("accepting relay {actor}");
|
||||||
|
upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
|
||||||
|
},
|
||||||
|
|
||||||
|
RelayCommand::Remove { .. } => todo!(),
|
||||||
|
|
||||||
|
RelayCommand::Unfollow { .. } => todo!(),
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,6 +97,10 @@ impl Query {
|
||||||
.filter(condition)
|
.filter(condition)
|
||||||
.select_only();
|
.select_only();
|
||||||
|
|
||||||
|
for column in model::relation::Column::iter() {
|
||||||
|
select = select.select_column_as(column, format!("{}{}", model::relation::Entity.table_name(), column.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
for column in model::actor::Column::iter() {
|
for column in model::actor::Column::iter() {
|
||||||
select = select.select_column_as(column, format!("{}{}", model::actor::Entity.table_name(), column.to_string()));
|
select = select.select_column_as(column, format!("{}{}", model::actor::Entity.table_name(), column.to_string()));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue