diff --git a/Cargo.toml b/Cargo.toml index 04367f5a..ddc31083 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ sea-orm-migration = { version = "0.12", optional = true } # mastodon mastodon-async-entities = { version = "1.1.0", optional = true } time = { version = "0.3", features = ["serde"], optional = true } +async-recursion = "1.1" [features] default = ["faker", "migrations", "mastodon"] diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 42e6078a..1f10ec94 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -120,34 +120,47 @@ impl Fetcher for Context { } async fn fetch_object(&self, id: &str) -> crate::Result { - if let Some(x) = model::object::Entity::find_by_id(id).one(self.db()).await? { - return Ok(x); // already in db, easy - } - - let object = Self::request( - Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), - ).await?.json::().await?; - - let addressed = object.addressed(); - let object_model = model::object::Model::new(&object)?; - - for attachment in object.attachment() { - let attachment_model = model::attachment::ActiveModel::new(&attachment, object_model.id.clone())?; - model::attachment::Entity::insert(attachment_model) - .exec(self.db()) - .await?; - } - - let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(None, Some(&object_model.id), &expanded_addresses).await?; - - model::object::Entity::insert(object_model.clone().into_active_model()) - .exec(self.db()).await?; - - Ok(object_model) + fetch_object_inner(self, id, 0).await } } +#[async_recursion::async_recursion] +async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result { + if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? { + return Ok(x); // already in db, easy + } + + let object = Context::request( + Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(), + ).await?.json::().await?; + + let addressed = object.addressed(); + let object_model = model::object::Model::new(&object)?; + + if let Some(reply) = &object_model.in_reply_to { + if depth <= 16 { + fetch_object_inner(ctx, reply, depth + 1).await?; + } else { + tracing::warn!("thread deeper than 16, giving up fetching more replies"); + } + } + + for attachment in object.attachment() { + let attachment_model = model::attachment::ActiveModel::new(&attachment, object_model.id.clone())?; + model::attachment::Entity::insert(attachment_model) + .exec(ctx.db()) + .await?; + } + + let expanded_addresses = ctx.expand_addressing(addressed).await?; + ctx.address_to(None, Some(&object_model.id), &expanded_addresses).await?; + + model::object::Entity::insert(object_model.clone().into_active_model()) + .exec(ctx.db()).await?; + + Ok(object_model) +} + #[axum::async_trait] pub trait Fetchable : Sync + Send { async fn fetch(&mut self, ctx: &crate::server::Context) -> crate::Result<&mut Self>;