Compare commits

..

2 commits

Author SHA1 Message Date
d7d450e942
chore(web): unused stuff 2024-11-21 03:23:18 +01:00
2b6037f860
fix: users update task cloaks and fetches follows
it now uses pull+resolve user, and then does an update by hand. before
it had a custom update flow so it skipped all the additions we made into
user normalizer and insertion. this is a bad ad-hoc change since
resolve_user is now different from all other resolves, but oh well
2024-11-21 03:22:05 +01:00
3 changed files with 23 additions and 19 deletions

View file

@ -1,11 +1,12 @@
use futures::TryStreamExt; use futures::TryStreamExt;
use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter, ModelTrait}; use sea_orm::{ActiveModelTrait, ActiveValue::{Unchanged, Set}, ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QueryOrder};
use upub::traits::Fetcher; use upub::traits::Fetcher;
pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) -> Result<(), sea_orm::DbErr> { pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) -> Result<(), sea_orm::DbErr> {
let mut count = 0; let mut count = 0;
let mut stream = upub::model::actor::Entity::find() let mut stream = upub::model::actor::Entity::find()
.filter(upub::model::actor::Column::Updated.lt(chrono::Utc::now() - chrono::Duration::days(days))) .filter(upub::model::actor::Column::Updated.lt(chrono::Utc::now() - chrono::Duration::days(days)))
.order_by_asc(upub::model::actor::Column::Updated)
.stream(ctx.db()) .stream(ctx.db())
.await?; .await?;
@ -30,9 +31,10 @@ pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) ->
} }
}, },
Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id), Err(e) => tracing::warn!("could not fetch user {}: {e}", user.id),
Ok(doc) => match upub::AP::actor_q(&doc, Some(user.internal)) { Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await {
Ok(mut u) => { Ok(mut u) => {
tracing::info!("updating user {}", user.id); tracing::info!("updating user {}", user.id);
u.internal = Unchanged(user.internal);
u.updated = Set(chrono::Utc::now()); u.updated = Set(chrono::Utc::now());
u.update(ctx.db()).await?; u.update(ctx.db()).await?;
count += 1; count += 1;

View file

@ -89,7 +89,7 @@ pub trait Fetcher {
async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>; async fn fetch_domain(&self, domain: &str, tx: &impl ConnectionTrait) -> Result<crate::model::instance::Model, RequestError>;
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>; async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>;
async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError>; async fn resolve_user(&self, actor: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::ActiveModel, RequestError>;
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>; async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>; async fn resolve_activity(&self, activity: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError>;
@ -291,7 +291,7 @@ impl Fetcher for crate::Context {
Ok(instance_model) Ok(instance_model)
} }
async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> { async fn resolve_user(&self, mut document: serde_json::Value, tx: &impl ConnectionTrait) -> Result<crate::model::actor::ActiveModel, RequestError> {
let id = document.id()?.to_string(); let id = document.id()?.to_string();
let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?; let _domain = self.fetch_domain(&crate::Context::server(&id), tx).await?;
@ -340,18 +340,7 @@ impl Fetcher for crate::Context {
} }
} }
// TODO this may fail: while fetching, remote server may fetch our service actor. Ok(user_model)
// if it does so with http signature, we will fetch that actor in background
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
crate::model::actor::Entity::insert(user_model).exec(tx).await?;
// TODO fetch it back to get the internal id
Ok(
crate::model::actor::Entity::find_by_ap_id(&id)
.one(tx)
.await?
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
)
} }
async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> { async fn fetch_user(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::actor::Model, RequestError> {
@ -367,7 +356,20 @@ impl Fetcher for crate::Context {
} }
} }
self.resolve_user(document, tx).await let active_model = self.resolve_user(document, tx).await?;
// TODO this may fail: while fetching, remote server may fetch our service actor.
// if it does so with http signature, we will fetch that actor in background
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
crate::model::actor::Entity::insert(active_model).exec(tx).await?;
// TODO fetch it back to get the internal id
Ok(
crate::model::actor::Entity::find_by_ap_id(id)
.one(tx)
.await?
.ok_or_else(|| DbErr::RecordNotFound(id.to_string()))?
)
} }
async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> { async fn fetch_activity(&self, id: &str, tx: &impl ConnectionTrait) -> Result<crate::model::activity::Model, RequestError> {

View file

@ -3,7 +3,7 @@ use leptos::*;
use leptos_router::*; use leptos_router::*;
use crate::{app::FeedRoute, prelude::*, Config}; use crate::{app::FeedRoute, prelude::*, Config};
use apb::{Base, Object}; use apb::Object;
#[component] #[component]
pub fn ObjectView() -> impl IntoView { pub fn ObjectView() -> impl IntoView {
@ -16,7 +16,7 @@ pub fn ObjectView() -> impl IntoView {
let id = Signal::derive(move || params.get().get("id").cloned().unwrap_or_default()); let id = Signal::derive(move || params.get().get("id").cloned().unwrap_or_default());
let object = create_local_resource( let object = create_local_resource(
move || (id.get(), loading.get()), move || (id.get(), loading.get()),
move |(oid, loading)| async move { move |(oid, _loading)| async move {
tracing::info!("rerunning fetcher"); tracing::info!("rerunning fetcher");
let obj = cache::OBJECTS.resolve(&oid, U::Object, auth).await?; let obj = cache::OBJECTS.resolve(&oid, U::Object, auth).await?;
if let Ok(author) = obj.attributed_to().id() { if let Ok(author) = obj.attributed_to().id() {