diff --git a/upub/cli/src/update.rs b/upub/cli/src/update.rs index 05ac9cc..b51b50b 100644 --- a/upub/cli/src/update.rs +++ b/upub/cli/src/update.rs @@ -1,11 +1,12 @@ use futures::TryStreamExt; -use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter, ModelTrait}; +use sea_orm::{ActiveModelTrait, ActiveValue::{Unchanged, Set}, ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QueryOrder}; use upub::traits::Fetcher; pub async fn update_users(ctx: upub::Context, days: i64, limit: Option) -> Result<(), sea_orm::DbErr> { let mut count = 0; let mut stream = upub::model::actor::Entity::find() .filter(upub::model::actor::Column::Updated.lt(chrono::Utc::now() - chrono::Duration::days(days))) + .order_by_asc(upub::model::actor::Column::Updated) .stream(ctx.db()) .await?; @@ -30,9 +31,10 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option) -> } }, Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id), - Ok(doc) => match upub::AP::actor_q(&doc, Some(user.internal)) { + Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await { Ok(mut u) => { tracing::info!("updating user {}", user.id); + u.internal = Unchanged(user.internal); u.updated = Set(chrono::Utc::now()); u.update(ctx.db()).await?; count += 1; diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index 7c822f8..1aca561 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -89,7 +89,7 @@ pub trait Fetcher { async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result; async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result; - async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result; + async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result; async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result; async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result; @@ -291,7 +291,7 @@ impl Fetcher for crate::Context { Ok(instance_model) } - async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result { + async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result { let id = document.id()?.to_string(); let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?; @@ -340,18 +340,7 @@ impl Fetcher for crate::Context { } } - // TODO this may fail: while fetching, remote server may fetch our service actor. - // if it does so with http signature, we will fetch that actor in background - // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - crate::model::actor::Entity::insert(user_model).exec(tx).await?; - - // TODO fetch it back to get the internal id - Ok( - crate::model::actor::Entity::find_by_ap_id(&id) - .one(tx) - .await? - .ok_or_else(|| DbErr::RecordNotFound(id.to_string()))? - ) + Ok(user_model) } async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result { @@ -367,7 +356,20 @@ impl Fetcher for crate::Context { } } - self.resolve_user(document, tx).await + let active_model = self.resolve_user(document, tx).await?; + + // TODO this may fail: while fetching, remote server may fetch our service actor. + // if it does so with http signature, we will fetch that actor in background + // meaning that, once we reach here, it's already inserted and returns an UNIQUE error + crate::model::actor::Entity::insert(active_model).exec(tx).await?; + + // TODO fetch it back to get the internal id + Ok( + crate::model::actor::Entity::find_by_ap_id(id) + .one(tx) + .await? + .ok_or_else(|| DbErr::RecordNotFound(id.to_string()))? + ) } async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result {