1
0
Fork 0
forked from alemi/upub

fix: announces processing

basically dont fetch every time, check if we have it already before
This commit is contained in:
əlemi 2024-06-06 03:34:29 +02:00
parent 90e4454d3e
commit 797837f2a1
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 51 additions and 21 deletions

View file

@ -171,8 +171,30 @@ impl Context {
.await .await
} }
pub async fn find_internal(&self, id: &str) -> Result<Option<Internal>, DbErr> {
if let Some(internal) = model::object::Entity::ap_to_internal(id, self.db()).await? {
return Ok(Some(Internal::Object(internal)));
}
if let Some(internal) = model::activity::Entity::ap_to_internal(id, self.db()).await? {
return Ok(Some(Internal::Activity(internal)));
}
if let Some(internal) = model::actor::Entity::ap_to_internal(id, self.db()).await? {
return Ok(Some(Internal::Actor(internal)));
}
Ok(None)
}
#[allow(unused)] #[allow(unused)]
pub fn is_relay(&self, id: &str) -> bool { pub fn is_relay(&self, id: &str) -> bool {
self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id)
} }
} }
pub enum Internal {
Object(i64),
Activity(i64),
Actor(i64),
}

View file

@ -95,7 +95,7 @@ pub trait Fetcher {
async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError>; async fn resolve_activity(&self, activity: serde_json::Value) -> Result<crate::model::activity::Model, PullError>;
async fn fetch_object(&self, id: &str) -> Result<crate::model::object::Model, PullError> { self.fetch_object_r(id, 0).await } async fn fetch_object(&self, id: &str) -> Result<crate::model::object::Model, PullError> { self.fetch_object_r(id, 0).await }
#[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result<crate::model::object::Model, PullError> { self.resolve_object_r(object, 0).await } async fn resolve_object(&self, object: serde_json::Value) -> Result<crate::model::object::Model, PullError> { self.resolve_object_r(object, 0).await }
async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError>; async fn fetch_object_r(&self, id: &str, depth: u32) -> Result<crate::model::object::Model, PullError>;
async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError>; async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result<crate::model::object::Model, PullError>;

View file

@ -1,6 +1,6 @@
use apb::{target::Addressed, Activity, Base, Object}; use apb::{target::Addressed, Activity, Base, Object};
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{ext::{AnyQuery, LoggableError}, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ProcessorError { pub enum ProcessorError {
@ -354,11 +354,33 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
let published = activity.published().unwrap_or(chrono::Utc::now()); let published = activity.published().unwrap_or(chrono::Utc::now());
let addressed = activity.addressed(); let addressed = activity.addressed();
match match ctx.find_internal(&announced_id).await? {
// if we already have this activity, skip it
Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed
// actors and objects which we already have
Some(x) => x,
// something new, fetch it!
None => {
match ctx.pull(&announced_id).await? { match ctx.pull(&announced_id).await? {
Pull::Actor(_) => Err(ProcessorError::Unprocessable), // if we receive a remote activity, process it directly
Pull::Object(object) => { Pull::Activity(x) => return ctx.process(x).await,
// actors are not processable at all
let object_model = ctx.resolve_object(object).await?; Pull::Actor(_) => return Err(ProcessorError::Unprocessable),
// objects are processed down below, make a mock Internal::Object(internal)
Pull::Object(x) =>
crate::context::Internal::Object(
ctx.resolve_object(x).await?.internal
),
}
}
} {
crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable),
crate::context::Internal::Activity(_) => Err(ProcessorError::AlreadyProcessed), // ???
crate::context::Internal::Object(internal) => {
let object_model = model::object::Entity::find_by_id(internal)
.one(ctx.db())
.await?
.ok_or_else(|| sea_orm::DbErr::RecordNotFound(internal.to_string()))?;
let activity_model = ctx.insert_activity(activity).await?; let activity_model = ctx.insert_activity(activity).await?;
// relays send us objects as Announce, but we don't really want to count those towards the // relays send us objects as Announce, but we don't really want to count those towards the
@ -388,19 +410,5 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res
tracing::info!("{} shared {}", activity_model.actor, announced_id); tracing::info!("{} shared {}", activity_model.actor, announced_id);
Ok(()) Ok(())
}, },
Pull::Activity(activity) => {
// groups update all members of other things that happen inside, process those
match activity.activity_type()? {
apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(ctx, activity).await?),
apb::ActivityType::Create => Ok(create(ctx, activity).await?),
apb::ActivityType::Undo => Ok(undo(ctx, activity).await?),
apb::ActivityType::Delete => Ok(delete(ctx, activity).await?),
apb::ActivityType::Update => Ok(update(ctx, activity).await?),
x => {
tracing::warn!("ignoring unhandled announced activity of type {x:?}");
Err(ProcessorError::Unprocessable)
},
}
},
} }
} }