fix: users update task cloaks and fetches follows
it now uses pull+resolve user, and then does an update by hand. before it had a custom update flow so it skipped all the additions we made into user normalizer and insertion. this is a bad ad-hoc change since resolve_user is now different from all other resolves, but oh well
This commit is contained in:
parent
d8af116667
commit
2b6037f860
2 changed files with 21 additions and 17 deletions
|
@ -1,11 +1,12 @@
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter, ModelTrait};
|
use sea_orm::{ActiveModelTrait, ActiveValue::{Unchanged, Set}, ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QueryOrder};
|
||||||
use upub::traits::Fetcher;
|
use upub::traits::Fetcher;
|
||||||
|
|
||||||
pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) -> Result<(), sea_orm::DbErr> {
|
pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) -> Result<(), sea_orm::DbErr> {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let mut stream = upub::model::actor::Entity::find()
|
let mut stream = upub::model::actor::Entity::find()
|
||||||
.filter(upub::model::actor::Column::Updated.lt(chrono::Utc::now() - chrono::Duration::days(days)))
|
.filter(upub::model::actor::Column::Updated.lt(chrono::Utc::now() - chrono::Duration::days(days)))
|
||||||
|
.order_by_asc(upub::model::actor::Column::Updated)
|
||||||
.stream(ctx.db())
|
.stream(ctx.db())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -30,9 +31,10 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id),
|
Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id),
|
||||||
Ok(doc) => match upub::AP::actor_q(&doc, Some(user.internal)) {
|
Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await {
|
||||||
Ok(mut u) => {
|
Ok(mut u) => {
|
||||||
tracing::info!("updating user {}", user.id);
|
tracing::info!("updating user {}", user.id);
|
||||||
|
u.internal = Unchanged(user.internal);
|
||||||
u.updated = Set(chrono::Utc::now());
|
u.updated = Set(chrono::Utc::now());
|
||||||
u.update(ctx.db()).await?;
|
u.update(ctx.db()).await?;
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
|
@ -89,7 +89,7 @@ pub trait Fetcher {
|
||||||
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>;
|
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>;
|
||||||
|
|
||||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
||||||
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
|
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::ActiveModel, RequestError>;
|
||||||
|
|
||||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
||||||
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
|
||||||
|
@ -291,7 +291,7 @@ impl Fetcher for crate::Context {
|
||||||
Ok(instance_model)
|
Ok(instance_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::ActiveModel, RequestError> {
|
||||||
let id = document.id()?.to_string();
|
let id = document.id()?.to_string();
|
||||||
|
|
||||||
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
|
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
|
||||||
|
@ -340,18 +340,7 @@ impl Fetcher for crate::Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO this may fail: while fetching, remote server may fetch our service actor.
|
Ok(user_model)
|
||||||
// if it does so with http signature, we will fetch that actor in background
|
|
||||||
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
|
|
||||||
crate::model::actor::Entity::insert(user_model).exec(tx).await?;
|
|
||||||
|
|
||||||
// TODO fetch it back to get the internal id
|
|
||||||
Ok(
|
|
||||||
crate::model::actor::Entity::find_by_ap_id(&id)
|
|
||||||
.one(tx)
|
|
||||||
.await?
|
|
||||||
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
|
||||||
|
@ -367,7 +356,20 @@ impl Fetcher for crate::Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.resolve_user(document, tx).await
|
let active_model = self.resolve_user(document, tx).await?;
|
||||||
|
|
||||||
|
// TODO this may fail: while fetching, remote server may fetch our service actor.
|
||||||
|
// if it does so with http signature, we will fetch that actor in background
|
||||||
|
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
|
||||||
|
crate::model::actor::Entity::insert(active_model).exec(tx).await?;
|
||||||
|
|
||||||
|
// TODO fetch it back to get the internal id
|
||||||
|
Ok(
|
||||||
|
crate::model::actor::Entity::find_by_ap_id(id)
|
||||||
|
.one(tx)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
|
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {
|
||||||
|
|
Loading…
Reference in a new issue