diff --git a/upub/core/src/context.rs b/upub/core/src/context.rs index d7c83e7..5318d73 100644 --- a/upub/core/src/context.rs +++ b/upub/core/src/context.rs @@ -171,8 +171,30 @@ impl Context { .await } + pub async fn find_internal(&self, id: &str) -> Result, DbErr> { + if let Some(internal) = model::object::Entity::ap_to_internal(id, self.db()).await? { + return Ok(Some(Internal::Object(internal))); + } + + if let Some(internal) = model::activity::Entity::ap_to_internal(id, self.db()).await? { + return Ok(Some(Internal::Activity(internal))); + } + + if let Some(internal) = model::actor::Entity::ap_to_internal(id, self.db()).await? { + return Ok(Some(Internal::Actor(internal))); + } + + Ok(None) + } + #[allow(unused)] pub fn is_relay(&self, id: &str) -> bool { self.0.relay.sources.contains(id) || self.0.relay.sinks.contains(id) } } + +pub enum Internal { + Object(i64), + Activity(i64), + Actor(i64), +} diff --git a/upub/core/src/traits/fetch.rs b/upub/core/src/traits/fetch.rs index ba9703f..d59d7fa 100644 --- a/upub/core/src/traits/fetch.rs +++ b/upub/core/src/traits/fetch.rs @@ -95,7 +95,7 @@ pub trait Fetcher { async fn resolve_activity(&self, activity: serde_json::Value) -> Result; async fn fetch_object(&self, id: &str) -> Result { self.fetch_object_r(id, 0).await } - #[allow(unused)] async fn resolve_object(&self, object: serde_json::Value) -> Result { self.resolve_object_r(object, 0).await } + async fn resolve_object(&self, object: serde_json::Value) -> Result { self.resolve_object_r(object, 0).await } async fn fetch_object_r(&self, id: &str, depth: u32) -> Result; async fn resolve_object_r(&self, object: serde_json::Value, depth: u32) -> Result; diff --git a/upub/core/src/traits/process.rs b/upub/core/src/traits/process.rs index 36420da..b7867fa 100644 --- a/upub/core/src/traits/process.rs +++ b/upub/core/src/traits/process.rs @@ -1,6 +1,6 @@ use apb::{target::Addressed, Activity, Base, Object}; use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{ext::{AnyQuery, LoggableError}, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; +use crate::{ext::{AnyQuery, LoggableError}, model, traits::{fetch::Pull, Addresser, Fetcher, Normalizer}}; #[derive(Debug, thiserror::Error)] pub enum ProcessorError { @@ -353,12 +353,34 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res let announced_id = activity.object().id()?.to_string(); let published = activity.published().unwrap_or(chrono::Utc::now()); let addressed = activity.addressed(); - - match ctx.pull(&announced_id).await? { - Pull::Actor(_) => Err(ProcessorError::Unprocessable), - Pull::Object(object) => { - let object_model = ctx.resolve_object(object).await?; + match match ctx.find_internal(&announced_id).await? { + // if we already have this activity, skip it + Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed + // actors and objects which we already have + Some(x) => x, + // something new, fetch it! + None => { + match ctx.pull(&announced_id).await? { + // if we receive a remote activity, process it directly + Pull::Activity(x) => return ctx.process(x).await, + // actors are not processable at all + Pull::Actor(_) => return Err(ProcessorError::Unprocessable), + // objects are processed down below, make a mock Internal::Object(internal) + Pull::Object(x) => + crate::context::Internal::Object( + ctx.resolve_object(x).await?.internal + ), + } + } + } { + crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable), + crate::context::Internal::Activity(_) => Err(ProcessorError::AlreadyProcessed), // ??? + crate::context::Internal::Object(internal) => { + let object_model = model::object::Entity::find_by_id(internal) + .one(ctx.db()) + .await? + .ok_or_else(|| sea_orm::DbErr::RecordNotFound(internal.to_string()))?; let activity_model = ctx.insert_activity(activity).await?; // relays send us objects as Announce, but we don't really want to count those towards the @@ -388,19 +410,5 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity) -> Res tracing::info!("{} shared {}", activity_model.actor, announced_id); Ok(()) }, - Pull::Activity(activity) => { - // groups update all members of other things that happen inside, process those - match activity.activity_type()? { - apb::ActivityType::Like | apb::ActivityType::EmojiReact => Ok(like(ctx, activity).await?), - apb::ActivityType::Create => Ok(create(ctx, activity).await?), - apb::ActivityType::Undo => Ok(undo(ctx, activity).await?), - apb::ActivityType::Delete => Ok(delete(ctx, activity).await?), - apb::ActivityType::Update => Ok(update(ctx, activity).await?), - x => { - tracing::warn!("ignoring unhandled announced activity of type {x:?}"); - Err(ProcessorError::Unprocessable) - }, - } - }, } }