diff --git a/upub/cli/src/lib.rs b/upub/cli/src/lib.rs index 4371ff50..0787a4bc 100644 --- a/upub/cli/src/lib.rs +++ b/upub/cli/src/lib.rs @@ -19,6 +19,9 @@ pub use update::*; mod nuke; pub use nuke::*; +mod thread; +pub use thread::*; + #[derive(Debug, Clone, clap::Subcommand)] pub enum CliCommand { /// generate fake user, note and activity @@ -104,6 +107,11 @@ pub enum CliCommand { /// also send Delete activities for all local objects #[arg(long, default_value_t = false)] delete_objects: bool, + }, + + /// attempt to fix broken threads and completely gather their context + Thread { + } } @@ -124,5 +132,7 @@ pub async fn run(ctx: upub::Context, command: CliCommand) -> Result<(), Box Ok(nuke(ctx, for_real, delete_objects).await?), + CliCommand::Thread { } => + Ok(thread(ctx).await?), } } diff --git a/upub/cli/src/thread.rs b/upub/cli/src/thread.rs new file mode 100644 index 00000000..b22ddd0d --- /dev/null +++ b/upub/cli/src/thread.rs @@ -0,0 +1,34 @@ +use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; +use upub::traits::{fetch::PullError, Fetcher}; + +pub async fn thread(ctx: upub::Context) -> Result<(), PullError> { + use futures::TryStreamExt; + let db = ctx.db(); + + tracing::info!("fixing contexts..."); + let mut stream = upub::model::object::Entity::find() + .filter(upub::model::object::Column::Context.is_null()) + .stream(db) + .await?; + + while let Some(mut object) = stream.try_next().await? { + match object.in_reply_to { + None => object.context = Some(object.id.clone()), + Some(ref in_reply_to) => { + let reply = ctx.fetch_object(in_reply_to, ctx.db()).await?; + if let Some(context) = reply.context { + object.context = Some(context); + } else { + continue; + } + }, + } + tracing::info!("updating context of {}", object.id); + upub::model::object::Entity::update(object.into_active_model()) + .exec(ctx.db()) + .await?; + } + + tracing::info!("done fixing contexts"); + Ok(()) +}