fix: db fix tasks would get stuck on sqlite

i think because the db would end up locked, added brackets to be sure to
drop the streaming reference. also it could crash when updating records
not found, just spit a warn
This commit is contained in:
əlemi 2024-04-29 21:23:12 +02:00
parent e4b92584db
commit a6e12468b7
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -239,63 +239,78 @@ async fn fix(db: sea_orm::DatabaseConnection, likes: bool, shares: bool, replies
if likes { if likes {
tracing::info!("fixing likes..."); tracing::info!("fixing likes...");
let mut store = std::collections::HashMap::new(); let mut store = std::collections::HashMap::new();
let mut stream = model::like::Entity::find().stream(&db).await?; {
while let Some(like) = stream.try_next().await? { let mut stream = model::like::Entity::find().stream(&db).await?;
store.insert(like.likes.clone(), store.get(&like.likes).unwrap_or(&0) + 1); while let Some(like) = stream.try_next().await? {
store.insert(like.likes.clone(), store.get(&like.likes).unwrap_or(&0) + 1);
}
} }
for (k, v) in store { for (k, v) in store {
let m = model::object::ActiveModel { let m = model::object::ActiveModel {
id: sea_orm::Set(k), id: sea_orm::Set(k.clone()),
likes: sea_orm::Set(v), likes: sea_orm::Set(v),
..Default::default() ..Default::default()
}; };
model::object::Entity::update(m) if let Err(e) = model::object::Entity::update(m)
.exec(&db) .exec(&db)
.await?; .await
{
tracing::warn!("record not updated ({k}): {e}");
}
} }
} }
if shares { if shares {
tracing::info!("fixing shares..."); tracing::info!("fixing shares...");
let mut store = std::collections::HashMap::new(); let mut store = std::collections::HashMap::new();
let mut stream = model::share::Entity::find().stream(&db).await?; {
while let Some(share) = stream.try_next().await? { let mut stream = model::share::Entity::find().stream(&db).await?;
store.insert(share.shares.clone(), store.get(&share.shares).unwrap_or(&0) + 1); while let Some(share) = stream.try_next().await? {
store.insert(share.shares.clone(), store.get(&share.shares).unwrap_or(&0) + 1);
}
} }
for (k, v) in store { for (k, v) in store {
let m = model::object::ActiveModel { let m = model::object::ActiveModel {
id: sea_orm::Set(k), id: sea_orm::Set(k.clone()),
shares: sea_orm::Set(v), shares: sea_orm::Set(v),
..Default::default() ..Default::default()
}; };
model::object::Entity::update(m) if let Err(e) = model::object::Entity::update(m)
.exec(&db) .exec(&db)
.await?; .await
{
tracing::warn!("record not updated ({k}): {e}");
}
} }
} }
if replies { if replies {
tracing::info!("fixing replies..."); tracing::info!("fixing replies...");
let mut store = std::collections::HashMap::new(); let mut store = std::collections::HashMap::new();
let mut stream = model::object::Entity::find().stream(&db).await?; {
while let Some(object) = stream.try_next().await? { let mut stream = model::object::Entity::find().stream(&db).await?;
if let Some(reply) = object.in_reply_to { while let Some(object) = stream.try_next().await? {
let before = store.get(&reply).unwrap_or(&0); if let Some(reply) = object.in_reply_to {
store.insert(reply, before + 1); let before = store.get(&reply).unwrap_or(&0);
store.insert(reply, before + 1);
}
} }
} }
for (k, v) in store { for (k, v) in store {
let m = model::object::ActiveModel { let m = model::object::ActiveModel {
id: sea_orm::Set(k), id: sea_orm::Set(k.clone()),
comments: sea_orm::Set(v), comments: sea_orm::Set(v),
..Default::default() ..Default::default()
}; };
model::object::Entity::update(m) if let Err(e) = model::object::Entity::update(m)
.exec(&db) .exec(&db)
.await?; .await
{
tracing::warn!("record not updated ({k}): {e}");
}
} }
} }