@ -71,25 +71,9 @@ pub async fn create(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
tracing::warn!("failed fetching replies for received object: {e}");
let notified = object_node.tag()
.filter_map(|x| Some(x.id().ok()?.to_string()))
let object_model = ctx.insert_object(object_node, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?;
for uid in notified {
if !ctx.is_local(&uid) { continue }
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&uid, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
ctx.insert_activity(activity, tx).await?;
tracing::debug!("{} posted {}", object_model.attributed_to.as_deref().unwrap_or("<anonymous>"), object_model.id);
@ -119,20 +103,9 @@ pub async fn like(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
// likes without addressing are "silent likes", process them but dont store activity or notify
// likes without addressing are "silent likes", process them but dont store activity
if !activity.addressed().is_empty() {
let activity_model = ctx.insert_activity(activity, tx).await?;
// TODO check that object author is in this like addressing!!! otherwise skip notification
if let Some(ref attributed_to) = obj.attributed_to {
if ctx.is_local(attributed_to) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
ctx.insert_activity(activity, tx).await?;
tracing::debug!("{} liked {}", actor.id, obj.id);
@ -161,18 +134,7 @@ pub async fn dislike(ctx: &crate::Context, activity: impl apb::Activity, tx: &Da
// dislikes without addressing are "silent dislikes", process them but dont store activity
if !activity.addressed().is_empty() {
let activity_model = ctx.insert_activity(activity, tx).await?;
// TODO check that object author is in this like addressing!!! otherwise skip notification
if let Some(ref attributed_to) = obj.attributed_to {
if ctx.is_local(attributed_to) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
ctx.insert_activity(activity, tx).await?;
tracing::debug!("{} disliked {}", actor.id, obj.id);
@ -187,12 +149,6 @@ pub async fn follow(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
let target_actor = ctx.fetch_user(activity.object().id()?, tx).await?;
let activity_model = ctx.insert_activity(activity, tx).await?;
if ctx.is_local(&target_actor.id) {
crate::Query::notify(activity_model.internal, target_actor.internal)
if let Some(relation) = crate::model::relation::Entity::find()
@ -250,14 +206,6 @@ pub async fn accept(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
let activity_model = ctx.insert_activity(activity, tx).await?;
if ctx.is_local(&follow_activity.actor) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
let follower = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx)
@ -308,17 +256,6 @@ pub async fn reject(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
let activity_model = ctx.insert_activity(activity, tx).await?;
// TODO most software doesn't show this, but i think instead we should?? if someone rejects it's
// better to know it clearly rather than not knowing if it got lost and maybe retry (being more
// annoying)
if ctx.is_local(&follow_activity.actor) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(&follow_activity.actor, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
@ -339,7 +276,6 @@ pub async fn delete(ctx: &crate::Context, activity: impl apb::Activity, tx: &Dat
if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?;
// TODO we should delete notifications from CREATEs related to objects we deleted
tracing::debug!("deleted '{oid}'");
@ -462,38 +398,24 @@ pub async fn undo(ctx: &crate::Context, activity: impl apb::Activity, tx: &Datab
_ => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
// TODO we should store undos to make local delete deliveries work and relations make sense
// we should store undos to make local delete deliveries work and relations make sense
// except when they have empty addressing
// so that also remote "secret" undos dont get stored
if !activity.addressed().is_empty() {
ctx.insert_activity(activity, tx).await?;
if let Some(internal) = crate::model::activity::Entity::ap_to_internal(undone_activity.id()?, tx).await? {
pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &DatabaseTransaction) -> Result<(), ProcessorError> {
let announced_id = activity.object().id()?.to_string();
let object = match ctx.find_internal(&announced_id).await? {
match match ctx.find_internal(&announced_id).await? {
// if we already have this activity, skip it
Some(crate::context::Internal::Activity(_)) => return Ok(()), // already processed
// actors and objects which we already have
Some(crate::context::Internal::Actor(_)) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
// objects that we already have
Some(crate::context::Internal::Object(internal)) => {
.ok_or_else(|| sea_orm::DbErr::RecordNotFound(format!("object#{internal}")))?
Some(x) => x,
// something new, fetch it!
None => {
match ctx.pull(&announced_id).await? {
@ -502,58 +424,55 @@ pub async fn announce(ctx: &crate::Context, activity: impl apb::Activity, tx: &D
// actors are not processable at all
Pull::Actor(_) => return Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
// objects are processed down below, make a mock Internal::Object(internal)
Pull::Object(x) => ctx.resolve_object(x, tx).await?,
Pull::Object(x) =>
ctx.resolve_object(x, tx).await?.internal
} {
crate::context::Internal::Activity(_) => unreachable!(),
crate::context::Internal::Actor(_) => Err(ProcessorError::Unprocessable(activity.id()?.to_string())),
crate::context::Internal::Object(internal) => {
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
let actor = ctx.fetch_user(activity.actor().id()?, tx).await?;
// we only care about announces produced by "Person" actors, because there's intention
// anything shared by groups, services or applications is automated: fetch it and be done
if actor.actor_type == apb::ActorType::Person {
// if this user never "boosted" this object before, increase its counter
if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, object.internal)
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
let share = crate::model::announce::ActiveModel {
internal: NotSet,
actor: Set(actor.internal),
object: Set(object.internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
// TODO we should probably insert an activity, otherwise this won't appear on timelines!!
// or maybe go update all addressing records for this object, pushing them up
// or maybe create new addressing rows with more recent dates
// or maybe create fake objects that reference the original one
// idk!!!!
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
let activity_model = ctx.insert_activity(activity, tx).await?;
if let Some(ref attributed_to) = object.attributed_to {
if ctx.is_local(attributed_to) {
if let Some(actor_internal) = crate::model::actor::Entity::ap_to_internal(attributed_to, tx).await? {
crate::Query::notify(activity_model.internal, actor_internal)
// we only care about announces produced by "Person" actors, because there's intention
// anything shared by groups, services or applications is automated: fetch it and be done
if actor.actor_type == apb::ActorType::Person {
// if this user never "boosted" this object before, increase its counter
if !crate::model::announce::Entity::find_by_uid_oid(actor.internal, internal)
.col_expr(crate::model::object::Column::Announces, Expr::col(crate::model::object::Column::Announces).add(1))
tracing::debug!("{} shared {}", actor.id, announced_id);
let share = crate::model::announce::ActiveModel {
internal: NotSet,
actor: Set(actor.internal),
object: Set(internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
// TODO we should probably insert an activity, otherwise this won't appear on timelines!!
// or maybe go update all addressing records for this object, pushing them up
// or maybe create new addressing rows with more recent dates
// or maybe create fake objects that reference the original one
// idk!!!!
if actor.actor_type == apb::ActorType::Person || ctx.is_local(&actor.id) {
ctx.insert_activity(activity, tx).await?;
tracing::debug!("{} shared {}", actor.id, announced_id);