forked from alemi/upub
chore: separate fetcher fetch from pull
fetch uses db and inserts, pull just gets the remote resource
This commit is contained in:
parent
01c02c5258
commit
9bc7ddb0ae
1 changed files with 42 additions and 26 deletions
|
@ -11,23 +11,15 @@ use super::{auth::HttpSignature, Context};
|
||||||
|
|
||||||
#[axum::async_trait]
|
#[axum::async_trait]
|
||||||
pub trait Fetcher {
|
pub trait Fetcher {
|
||||||
async fn request(
|
|
||||||
method: reqwest::Method,
|
|
||||||
url: &str,
|
|
||||||
payload: Option<&str>,
|
|
||||||
from: &str,
|
|
||||||
key: &str,
|
|
||||||
domain: &str,
|
|
||||||
) -> crate::Result<Response>;
|
|
||||||
|
|
||||||
async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model>;
|
async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model>;
|
||||||
|
async fn pull_user(&self, id: &str) -> crate::Result<model::user::Model>;
|
||||||
|
|
||||||
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model>;
|
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model>;
|
||||||
|
async fn pull_object(&self, id: &str) -> crate::Result<model::object::Model>;
|
||||||
|
|
||||||
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
|
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
|
||||||
}
|
async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
|
||||||
|
|
||||||
|
|
||||||
#[axum::async_trait]
|
|
||||||
impl Fetcher for Context {
|
|
||||||
async fn request(
|
async fn request(
|
||||||
method: reqwest::Method,
|
method: reqwest::Method,
|
||||||
url: &str,
|
url: &str,
|
||||||
|
@ -75,12 +67,28 @@ impl Fetcher for Context {
|
||||||
.error_for_status()?
|
.error_for_status()?
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[axum::async_trait]
|
||||||
|
impl Fetcher for Context {
|
||||||
async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model> {
|
async fn fetch_user(&self, id: &str) -> crate::Result<model::user::Model> {
|
||||||
if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? {
|
if let Some(x) = model::user::Entity::find_by_id(id).one(self.db()).await? {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let user_model = self.pull_user(id).await?;
|
||||||
|
|
||||||
|
// TODO this may fail: while fetching, remote server may fetch our service actor.
|
||||||
|
// if it does so with http signature, we will fetch that actor in background
|
||||||
|
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
|
||||||
|
model::user::Entity::insert(user_model.clone().into_active_model())
|
||||||
|
.exec(self.db()).await?;
|
||||||
|
|
||||||
|
Ok(user_model)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pull_user(&self, id: &str) -> crate::Result<model::user::Model> {
|
||||||
let user = Self::request(
|
let user = Self::request(
|
||||||
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
|
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
|
||||||
).await?.json::<serde_json::Value>().await?;
|
).await?.json::<serde_json::Value>().await?;
|
||||||
|
@ -115,12 +123,6 @@ impl Fetcher for Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO this may fail: while fetching, remote server may fetch our service actor.
|
|
||||||
// if it does so with http signature, we will fetch that actor in background
|
|
||||||
// meaning that, once we reach here, it's already inserted and returns an UNIQUE error
|
|
||||||
model::user::Entity::insert(user_model.clone().into_active_model())
|
|
||||||
.exec(self.db()).await?;
|
|
||||||
|
|
||||||
Ok(user_model)
|
Ok(user_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +131,19 @@ impl Fetcher for Context {
|
||||||
return Ok(x); // already in db, easy
|
return Ok(x); // already in db, easy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let activity_model = self.pull_activity(id).await?;
|
||||||
|
|
||||||
|
model::activity::Entity::insert(activity_model.clone().into_active_model())
|
||||||
|
.exec(self.db()).await?;
|
||||||
|
|
||||||
|
let addressed = activity_model.addressed();
|
||||||
|
let expanded_addresses = self.expand_addressing(addressed).await?;
|
||||||
|
self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?;
|
||||||
|
|
||||||
|
Ok(activity_model)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model> {
|
||||||
let activity = Self::request(
|
let activity = Self::request(
|
||||||
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
|
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
|
||||||
).await?.json::<serde_json::Value>().await?;
|
).await?.json::<serde_json::Value>().await?;
|
||||||
|
@ -145,21 +160,22 @@ impl Fetcher for Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let addressed = activity.addressed();
|
|
||||||
let activity_model = model::activity::Model::new(&activity)?;
|
let activity_model = model::activity::Model::new(&activity)?;
|
||||||
|
|
||||||
model::activity::Entity::insert(activity_model.clone().into_active_model())
|
|
||||||
.exec(self.db()).await?;
|
|
||||||
|
|
||||||
let expanded_addresses = self.expand_addressing(addressed).await?;
|
|
||||||
self.address_to(Some(&activity_model.id), None, &expanded_addresses).await?;
|
|
||||||
|
|
||||||
Ok(activity_model)
|
Ok(activity_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> {
|
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> {
|
||||||
fetch_object_inner(self, id, 0).await
|
fetch_object_inner(self, id, 0).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn pull_object(&self, id: &str) -> crate::Result<model::object::Model> {
|
||||||
|
let object = Context::request(
|
||||||
|
Method::GET, id, None, &format!("https://{}", self.domain()), &self.app().private_key, self.domain(),
|
||||||
|
).await?.json::<serde_json::Value>().await?;
|
||||||
|
|
||||||
|
Ok(model::object::Model::new(&object)?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_recursion::async_recursion]
|
#[async_recursion::async_recursion]
|
||||||
|
|
Loading…
Reference in a new issue