diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 40a7ee5..e9b2a08 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -11,23 +11,15 @@ use super::{auth::HttpSignature, Context}; #[axum::async_trait] pub trait Fetcher { - async fn request( - method: reqwest::Method, - url: &str, - payload: Option<&str>, - from: &str, - key: &str, - domain: &str, - ) -> crate::Result; - async fn fetch_user(&self, id: &str) -> crate::Result; + async fn pull_user(&self, id: &str) -> crate::Result; + async fn fetch_object(&self, id: &str) -> crate::Result; + async fn pull_object(&self, id: &str) -> crate::Result; + async fn fetch_activity(&self, id: &str) -> crate::Result; -} + async fn pull_activity(&self, id: &str) -> crate::Result; - -#[axum::async_trait] -impl Fetcher for Context { async fn request( method: reqwest::Method, url: &str, @@ -75,12 +67,28 @@ impl Fetcher for Context { .error_for_status()? ) } +} + +#[axum::async_trait] +impl Fetcher for Context { async fn fetch_user(&self, id: &str) -> crate::Result { if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? { return Ok(x); // already in db, easy } + let user_model = self.pull_user(id).await?; + + // TODO this may fail: while fetching, remote server may fetch our service actor. + // if it does so with http signature, we will fetch that actor in background + // meaning that, once we reach here, it's already inserted and returns an UNIQUE error + model::user::Entity::insert(user_model.clone().into_active_model()) + .exec(self.db()).await?; + + Ok(user_model) + } + + async fn pull_user(&self, id: &str) -> crate::Result { let user = Self::request( Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), ).await?.json::().await?; @@ -115,12 +123,6 @@ impl Fetcher for Context { } } - // TODO this may fail: while fetching, remote server may fetch our service actor. - // if it does so with http signature, we will fetch that actor in background - // meaning that, once we reach here, it's already inserted and returns an UNIQUE error - model::user::Entity::insert(user_model.clone().into_active_model()) - .exec(self.db()).await?; - Ok(user_model) } @@ -129,6 +131,19 @@ impl Fetcher for Context { return Ok(x); // already in db, easy } + let activity_model = self.pull_activity(id).await?; + + model::activity::Entity::insert(activity_model.clone().into_active_model()) + .exec(self.db()).await?; + + let addressed = activity_model.addressed(); + let expanded_addresses = self.expand_addressing(addressed).await?; + self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?; + + Ok(activity_model) + } + + async fn pull_activity(&self, id: &str) -> crate::Result { let activity = Self::request( Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), ).await?.json::().await?; @@ -145,21 +160,22 @@ impl Fetcher for Context { } } - let addressed = activity.addressed(); let activity_model = model::activity::Model::new(&activity)?; - model::activity::Entity::insert(activity_model.clone().into_active_model()) - .exec(self.db()).await?; - - let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?; - Ok(activity_model) } async fn fetch_object(&self, id: &str) -> crate::Result { fetch_object_inner(self, id, 0).await } + + async fn pull_object(&self, id: &str) -> crate::Result { + let object = Context::request( + Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(), + ).await?.json::().await?; + + Ok(model::object::Model::new(&object)?) + } } #[async_recursion::async_recursion]