From a6e12468b79967e50c0883b316cfbce3f5b3ff13 Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 29 Apr 2024 21:23:12 +0200 Subject: [PATCH] fix: db fix tasks would get stuck on sqlite i think because the db would end up locked, added brackets to be sure to drop the streaming reference. also it could crash when updating records not found, just spit a warn --- src/main.rs | 55 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 4f059c2..9ce1af5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -239,63 +239,78 @@ async fn fix(db: sea_orm::DatabaseConnection, likes: bool, shares: bool, replies if likes { tracing::info!("fixing likes..."); let mut store = std::collections::HashMap::new(); - let mut stream = model::like::Entity::find().stream(&db).await?; - while let Some(like) = stream.try_next().await? { - store.insert(like.likes.clone(), store.get(&like.likes).unwrap_or(&0) + 1); + { + let mut stream = model::like::Entity::find().stream(&db).await?; + while let Some(like) = stream.try_next().await? { + store.insert(like.likes.clone(), store.get(&like.likes).unwrap_or(&0) + 1); + } } for (k, v) in store { let m = model::object::ActiveModel { - id: sea_orm::Set(k), + id: sea_orm::Set(k.clone()), likes: sea_orm::Set(v), ..Default::default() }; - model::object::Entity::update(m) + if let Err(e) = model::object::Entity::update(m) .exec(&db) - .await?; + .await + { + tracing::warn!("record not updated ({k}): {e}"); + } } } if shares { tracing::info!("fixing shares..."); let mut store = std::collections::HashMap::new(); - let mut stream = model::share::Entity::find().stream(&db).await?; - while let Some(share) = stream.try_next().await? { - store.insert(share.shares.clone(), store.get(&share.shares).unwrap_or(&0) + 1); + { + let mut stream = model::share::Entity::find().stream(&db).await?; + while let Some(share) = stream.try_next().await? { + store.insert(share.shares.clone(), store.get(&share.shares).unwrap_or(&0) + 1); + } } for (k, v) in store { let m = model::object::ActiveModel { - id: sea_orm::Set(k), + id: sea_orm::Set(k.clone()), shares: sea_orm::Set(v), ..Default::default() }; - model::object::Entity::update(m) + if let Err(e) = model::object::Entity::update(m) .exec(&db) - .await?; + .await + { + tracing::warn!("record not updated ({k}): {e}"); + } } } if replies { tracing::info!("fixing replies..."); let mut store = std::collections::HashMap::new(); - let mut stream = model::object::Entity::find().stream(&db).await?; - while let Some(object) = stream.try_next().await? { - if let Some(reply) = object.in_reply_to { - let before = store.get(&reply).unwrap_or(&0); - store.insert(reply, before + 1); + { + let mut stream = model::object::Entity::find().stream(&db).await?; + while let Some(object) = stream.try_next().await? { + if let Some(reply) = object.in_reply_to { + let before = store.get(&reply).unwrap_or(&0); + store.insert(reply, before + 1); + } } } for (k, v) in store { let m = model::object::ActiveModel { - id: sea_orm::Set(k), + id: sea_orm::Set(k.clone()), comments: sea_orm::Set(v), ..Default::default() }; - model::object::Entity::update(m) + if let Err(e) = model::object::Entity::update(m) .exec(&db) - .await?; + .await + { + tracing::warn!("record not updated ({k}): {e}"); + } } }