diff --git a/src/cli/fetch.rs b/src/cli/fetch.rs index ef2230d2..a589e6b3 100644 --- a/src/cli/fetch.rs +++ b/src/cli/fetch.rs @@ -1,6 +1,6 @@ use sea_orm::EntityTrait; -use crate::server::fetcher::Fetchable; +use crate::server::{fetcher::Fetchable, normalizer::Normalizer, Context}; pub async fn fetch(ctx: crate::server::Context, uri: String, save: bool) -> crate::Result<()> { use apb::Base; @@ -8,24 +8,23 @@ pub async fn fetch(ctx: crate::server::Context, uri: String, save: bool) -> crat let mut node = apb::Node::link(uri.to_string()); node.fetch(&ctx).await?; - let obj = node.get().expect("node still empty after fetch?"); + let obj = node.extract().expect("node still empty after fetch?"); + let server = Context::server(&uri); + + println!("{}", serde_json::to_string_pretty(&obj).unwrap()); if save { match obj.base_type() { Some(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { crate::model::actor::Entity::insert( - crate::model::actor::ActiveModel::new(obj).unwrap() + crate::model::actor::ActiveModel::new(&obj).unwrap() ).exec(ctx.db()).await.unwrap(); }, Some(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { - crate::model::activity::Entity::insert( - crate::model::activity::ActiveModel::new(obj).unwrap() - ).exec(ctx.db()).await.unwrap(); + ctx.insert_activity(obj, Some(server)).await.unwrap(); }, Some(apb::BaseType::Object(apb::ObjectType::Note)) => { - crate::model::object::Entity::insert( - crate::model::object::ActiveModel::new(obj).unwrap() - ).exec(ctx.db()).await.unwrap(); + ctx.insert_object(obj, Some(server)).await.unwrap(); }, Some(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), Some(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), @@ -33,7 +32,5 @@ pub async fn fetch(ctx: crate::server::Context, uri: String, save: bool) -> crat } } - println!("{}", serde_json::to_string_pretty(&obj).unwrap()); - Ok(()) } diff --git a/src/cli/fix.rs b/src/cli/fix.rs index a7d7a1ab..53ede8eb 100644 --- a/src/cli/fix.rs +++ b/src/cli/fix.rs @@ -36,7 +36,7 @@ pub async fn fix(ctx: crate::server::Context, likes: bool, shares: bool, replies { let mut stream = crate::model::announce::Entity::find().stream(db).await?; while let Some(share) = stream.try_next().await? { - store.insert(share.object.clone(), store.get(&share.object).unwrap_or(&0) + 1); + store.insert(share.object, store.get(&share.object).unwrap_or(&0) + 1); } } diff --git a/src/errors.rs b/src/errors.rs index 446fdbdd..a346c898 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -62,6 +62,10 @@ impl UpubError { pub fn internal_server_error() -> Self { Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR) } + + pub fn field(field: &'static str) -> Self { + Self::Field(crate::model::FieldError(field)) + } } pub type UpubResult = Result; diff --git a/src/model/activity.rs b/src/model/activity.rs index eddc86da..44d5aa7e 100644 --- a/src/model/activity.rs +++ b/src/model/activity.rs @@ -89,6 +89,7 @@ impl Entity { } impl ActiveModel { + //#[deprecated = "should remove this, get models thru normalizer"] pub fn new(activity: &impl apb::Activity) -> Result { Ok(ActiveModel { internal: sea_orm::ActiveValue::NotSet, diff --git a/src/model/addressing.rs b/src/model/addressing.rs index 72ac9b39..8bbd139c 100644 --- a/src/model/addressing.rs +++ b/src/model/addressing.rs @@ -97,15 +97,6 @@ pub enum Event { impl Event { - pub fn id(&self) -> &str { - match self { - Event::Tombstone => "", - Event::Activity(x) => x.id.as_str(), - Event::StrayObject { object, liked: _ } => object.id.as_str(), - Event::DeepActivity { activity: _, liked: _, object } => object.id.as_str(), - } - } - pub fn internal(&self) -> i64 { match self { Event::Tombstone => 0, diff --git a/src/model/attachment.rs b/src/model/attachment.rs index ac322463..3956819d 100644 --- a/src/model/attachment.rs +++ b/src/model/attachment.rs @@ -72,10 +72,9 @@ impl BatchFillable for &[Event] { let mut out : std::collections::BTreeMap> = std::collections::BTreeMap::new(); for attach in attachments.into_iter().flatten() { - if out.contains_key(&attach.object) { - out.get_mut(&attach.object).expect("contains but get failed?").push(attach); - } else { - out.insert(attach.object, vec![attach]); + match out.entry(attach.object) { + std::collections::btree_map::Entry::Vacant(a) => { a.insert(vec![attach]); }, + std::collections::btree_map::Entry::Occupied(mut e) => { e.get_mut().push(attach); }, } } diff --git a/src/routes/activitypub/inbox.rs b/src/routes/activitypub/inbox.rs index a0a0a234..32bb379f 100644 --- a/src/routes/activitypub/inbox.rs +++ b/src/routes/activitypub/inbox.rs @@ -63,7 +63,7 @@ pub async fn post( return Err(UpubError::bad_request()); }; - if !(server == Context::server(&actor)) { + if server != Context::server(&actor) { return Err(UpubError::unauthorized()); } diff --git a/src/server/auth.rs b/src/server/auth.rs index 1bba370e..2eddd9d5 100644 --- a/src/server/auth.rs +++ b/src/server/auth.rs @@ -40,11 +40,11 @@ impl Identity { } } - pub fn is(&self, id: &str) -> bool { + pub fn is(&self, uid: &str) -> bool { match self { Identity::Anonymous => false, Identity::Remote { .. } => false, // TODO per-actor server auth should check this - Identity::Local { id, .. } => id.as_str() == id + Identity::Local { id, .. } => id.as_str() == uid } } diff --git a/src/server/context.rs b/src/server/context.rs index 30645689..2bd443b0 100644 --- a/src/server/context.rs +++ b/src/server/context.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeSet, sync::Arc}; use openssl::rsa::Rsa; -use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, RelationTrait, SelectColumns, Set}; +use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; use crate::{config::Config, errors::UpubError, model, server::fetcher::Fetcher}; use uriproxy::UriClass; @@ -150,6 +150,7 @@ impl Context { } // TODO remove this!! + //#[deprecated = "context is id of first post in thread"] pub fn context_id(&self, id: &str) -> String { if id.starts_with("tag:") { return id.to_string(); @@ -227,8 +228,8 @@ impl Context { { let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else { ( - Some(model::instance::Entity::domain_to_internal(&Context::server(&target), self.db()).await?), - Some(model::actor::Entity::ap_to_internal(&target, self.db()).await?), + Some(model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await?), + Some(model::actor::Entity::ap_to_internal(target, self.db()).await?), ) }; addressing.push( @@ -290,6 +291,7 @@ impl Context { Ok(()) } + //#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"] pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { let addressed = self.expand_addressing(activity_targets).await?; let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs index 4762fc1f..6fbc65d1 100644 --- a/src/server/fetcher.rs +++ b/src/server/fetcher.rs @@ -180,20 +180,13 @@ impl Fetcher for Context { } let activity_document = self.pull_activity(id).await?; - let activity_model = model::activity::ActiveModel::new(&activity_document)?; + let activity_model = self.insert_activity(activity_document, Some(Context::server(id))).await?; - model::activity::Entity::insert(activity_model) - .exec(self.db()).await?; - - // TODO fetch it back to get the internal id - let activity = model::activity::Entity::find_by_ap_id(id) - .one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?; - - let addressed = activity.addressed(); + let addressed = activity_model.addressed(); let expanded_addresses = self.expand_addressing(addressed).await?; - self.address_to(Some(activity.internal), None, &expanded_addresses).await?; + self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?; - Ok(activity) + Ok(activity_model) } async fn pull_activity(&self, id: &str) -> crate::Result { @@ -216,7 +209,7 @@ impl Fetcher for Context { Ok(activity) } - async fn fetch_thread(&self, id: &str) -> crate::Result<()> { + async fn fetch_thread(&self, _id: &str) -> crate::Result<()> { // crawl_replies(self, id, 0).await todo!() } diff --git a/src/server/inbox.rs b/src/server/inbox.rs index 1a106504..998980db 100644 --- a/src/server/inbox.rs +++ b/src/server/inbox.rs @@ -1,8 +1,8 @@ use apb::{target::Addressed, Activity, Base, Object}; use reqwest::StatusCode; -use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; +use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}, server::normalizer::Normalizer}; +use crate::{errors::{LoggableError, UpubError}, model, server::normalizer::Normalizer}; use super::{fetcher::Fetcher, Context}; @@ -13,48 +13,43 @@ impl apb::server::Inbox for Context { type Activity = serde_json::Value; async fn create(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { - let activity_model = model::activity::Model::new(&activity)?; - let aid = activity_model.id.clone(); let Some(object_node) = activity.object().extract() else { // TODO we could process non-embedded activities or arrays but im lazy rn tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); return Err(UpubError::unprocessable()); }; + let activity_model = self.insert_activity(activity, Some(server.clone())).await?; let object_model = self.insert_object(object_node, Some(server)).await?; - let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(&aid), Some(&object_model.id), &expanded_addressing).await?; - tracing::info!("{} posted {}", aid, object_model.id); + let expanded_addressing = self.expand_addressing(activity_model.addressed()).await?; + self.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing).await?; + tracing::info!("{} posted {}", activity_model.id, object_model.id); Ok(()) } - async fn like(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - let aid = activity.id().ok_or(UpubError::bad_request())?; + async fn like(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { let uid = activity.actor().id().ok_or(UpubError::bad_request())?; + let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; let object_uri = activity.object().id().ok_or(UpubError::bad_request())?; let obj = self.fetch_object(&object_uri).await?; - let oid = obj.id; let like = model::like::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - actor: sea_orm::Set(uid.clone()), - likes: sea_orm::Set(oid.clone()), - date: sea_orm::Set(activity.published().unwrap_or(chrono::Utc::now())), + internal: NotSet, + actor: Set(internal_uid), + object: Set(obj.internal), + published: Set(activity.published().unwrap_or(chrono::Utc::now())), }; match model::like::Entity::insert(like).exec(self.db()).await { Err(sea_orm::DbErr::RecordNotInserted) => Err(UpubError::not_modified()), Err(sea_orm::DbErr::Exec(_)) => Err(UpubError::not_modified()), // bad fix for sqlite Err(e) => { - tracing::error!("unexpected error procesing like from {uid} to {oid}: {e}"); + tracing::error!("unexpected error procesing like from {uid} to {}: {e}", obj.id); Err(UpubError::internal_server_error()) } Ok(_) => { - let activity_model = model::activity::Model::new(&activity)?.into_active_model(); - model::activity::Entity::insert(activity_model) - .exec(self.db()) - .await?; - let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; + let activity_model = self.insert_activity(activity, Some(server)).await?; + let mut expanded_addressing = self.expand_addressing(activity_model.addressed()).await?; if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!! expanded_addressing.push( - model::object::Entity::find_by_id(&oid) + model::object::Entity::find_by_id(obj.internal) .select_only() .select_column(model::object::Column::AttributedTo) .into_tuple::() @@ -63,190 +58,188 @@ impl apb::server::Inbox for Context { .ok_or_else(UpubError::not_found)? ); } - self.address_to(Some(aid), None, &expanded_addressing).await?; + self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; model::object::Entity::update_many() .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) - .filter(model::object::Column::Id.eq(oid.clone())) + .filter(model::object::Column::Internal.eq(obj.internal)) .exec(self.db()) .await?; - tracing::info!("{} liked {}", uid, oid); + tracing::info!("{} liked {}", uid, obj.id); Ok(()) }, } } async fn follow(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - let activity_model = model::activity::Model::new(&activity)?; - let aid = activity_model.id.clone(); - let target_user_uri = activity_model.object - .as_deref() - .ok_or_else(UpubError::bad_request)? - .to_string(); - let usr = self.fetch_user(&target_user_uri).await?; - let target_user_id = usr.id; - tracing::info!("{} wants to follow {}", activity_model.actor, target_user_id); - model::activity::Entity::insert(activity_model.into_active_model()) + let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string(); + let source_actor = activity.actor().id().ok_or_else(UpubError::bad_request)?; + let source_actor_internal = model::actor::Entity::ap_to_internal(&source_actor, self.db()).await?; + let target_actor = activity.object().id().ok_or_else(UpubError::bad_request)?; + let usr = self.fetch_user(&target_actor).await?; + let activity_model = model::activity::ActiveModel::new(&activity)?; + model::activity::Entity::insert(activity_model) + .exec(self.db()).await?; + let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; + let relation_model = model::relation::ActiveModel { + internal: NotSet, + accept: Set(None), + activity: Set(internal_aid), + follower: Set(source_actor_internal), + following: Set(usr.internal), + }; + model::relation::Entity::insert(relation_model) .exec(self.db()).await?; let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; - if !expanded_addressing.contains(&target_user_id) { - expanded_addressing.push(target_user_id); + if !expanded_addressing.contains(&target_actor) { + expanded_addressing.push(target_actor); } - self.address_to(Some(&aid), None, &expanded_addressing).await?; + self.address_to(Some(internal_aid), None, &expanded_addressing).await?; + tracing::info!("{} wants to follow {}", source_actor, usr.id); Ok(()) } async fn accept(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { // TODO what about TentativeAccept - let activity_model = model::activity::Model::new(&activity)?; - - if let Some(mut r) = model::relay::Entity::find_by_id(&activity_model.actor) + let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string(); + let target_actor = activity.actor().id().ok_or_else(UpubError::bad_request)?; + let follow_request_id = activity.object().id().ok_or_else(UpubError::bad_request)?; + let follow_activity = model::activity::Entity::find_by_ap_id(&follow_request_id) .one(self.db()) .await? - { - r.accepted = true; - model::relay::Entity::update(r.into_active_model()).exec(self.db()).await?; - model::activity::Entity::insert(activity_model.clone().into_active_model()) - .exec(self.db()) - .await?; - tracing::info!("relay {} is now broadcasting to us", activity_model.actor); - return Ok(()); - } + .ok_or_else(UpubError::not_found)?; - let Some(follow_request_id) = &activity_model.object else { - return Err(UpubError::bad_request()); - }; - let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) - .one(self.db()).await? - else { - return Err(UpubError::not_found()); - }; - if follow_activity.object.unwrap_or("".into()) != activity_model.actor { + if follow_activity.object.unwrap_or("".into()) != follow_activity.actor { return Err(UpubError::forbidden()); } - tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor); + let activity_model = model::activity::ActiveModel::new(&activity)?; + model::activity::Entity::insert(activity_model) + .exec(self.db()) + .await?; + let accept_internal_id = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; - model::activity::Entity::insert(activity_model.clone().into_active_model()) - .exec(self.db()) - .await?; - model::user::Entity::update_many() + model::actor::Entity::update_many() .col_expr( - model::user::Column::FollowingCount, - Expr::col(model::user::Column::FollowingCount).add(1) + model::actor::Column::FollowingCount, + Expr::col(model::actor::Column::FollowingCount).add(1) ) - .filter(model::user::Column::Id.eq(&follow_activity.actor)) + .filter(model::actor::Column::Id.eq(&follow_activity.actor)) .exec(self.db()) .await?; - model::relation::Entity::insert( - model::relation::ActiveModel { - follower: Set(follow_activity.actor.clone()), - following: Set(activity_model.actor), - ..Default::default() - } - ).exec(self.db()).await?; + model::actor::Entity::update_many() + .col_expr( + model::actor::Column::FollowersCount, + Expr::col(model::actor::Column::FollowersCount).add(1) + ) + .filter(model::actor::Column::Id.eq(&follow_activity.actor)) + .exec(self.db()) + .await?; + + model::relation::Entity::update_many() + .col_expr(model::relation::Column::Accept, Expr::value(Some(accept_internal_id))) + .filter(model::relation::Column::Activity.eq(follow_activity.internal)) + .exec(self.db()).await?; + + tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor); let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; if !expanded_addressing.contains(&follow_activity.actor) { expanded_addressing.push(follow_activity.actor); } - self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?; + self.address_to(Some(accept_internal_id), None, &expanded_addressing).await?; Ok(()) } async fn reject(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { // TODO what about TentativeReject? - let activity_model = model::activity::Model::new(&activity)?; - let Some(follow_request_id) = &activity_model.object else { - return Err(UpubError::bad_request()); - }; - let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) - .one(self.db()).await? - else { - return Err(UpubError::not_found()); - }; - if follow_activity.object.unwrap_or("".into()) != activity_model.actor { + let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string(); + let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; + let follow_request_id = activity.object().id().ok_or_else(UpubError::bad_request)?; + let follow_activity = model::activity::Entity::find_by_ap_id(&follow_request_id) + .one(self.db()) + .await? + .ok_or_else(UpubError::not_found)?; + + if follow_activity.object.unwrap_or("".into()) != uid { return Err(UpubError::forbidden()); } - tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor); - - model::activity::Entity::insert(activity_model.clone().into_active_model()) + let activity_model = model::activity::ActiveModel::new(&activity)?; + model::activity::Entity::insert(activity_model) .exec(self.db()) .await?; + let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?; + + model::relation::Entity::delete_many() + .filter(model::relation::Column::Activity.eq(internal_aid)) + .exec(self.db()) + .await?; + + tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor); let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?; if !expanded_addressing.contains(&follow_activity.actor) { expanded_addressing.push(follow_activity.actor); } - self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?; + + self.address_to(Some(internal_aid), None, &expanded_addressing).await?; Ok(()) } async fn delete(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - // TODO verify the signature before just deleting lmao - let oid = activity.object().id().ok_or(UpubError::bad_request())?; - tracing::debug!("deleting '{oid}'"); // this is so spammy wtf! - // TODO maybe we should keep the tombstone? - model::user::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from users"); - model::activity::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from activities"); - model::object::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects"); + let oid = activity.object().id().ok_or_else(UpubError::bad_request)?; + model::actor::Entity::delete_by_ap_id(&oid).exec(self.db()).await.info_failed("failed deleting from users"); + model::object::Entity::delete_by_ap_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects"); + tracing::debug!("deleted '{oid}'"); Ok(()) } - async fn update(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { - let activity_model = model::activity::Model::new(&activity)?; - let aid = activity_model.id.clone(); + async fn update(&self, _server: String, activity: serde_json::Value) -> crate::Result<()> { + let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; + let aid = activity.id().ok_or_else(UpubError::bad_request)?; let Some(object_node) = activity.object().extract() else { // TODO we could process non-embedded activities or arrays but im lazy rn tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); return Err(UpubError::unprocessable()); }; - let Some(oid) = object_node.id().map(|x| x.to_string()) else { - return Err(UpubError::bad_request()); - }; - // make sure we're allowed to edit this object - if let Some(object_author) = object_node.attributed_to().id() { - if server != Context::server(&object_author) { - return Err(UpubError::forbidden()); - } - } else if server != Context::server(&oid) { - return Err(UpubError::forbidden()); - }; - match object_node.object_type() { - Some(apb::ObjectType::Actor(_)) => { - // TODO oof here is an example of the weakness of this model, we have to go all the way - // back up to serde_json::Value because impl Object != impl Actor - let actor_model = model::user::Model::new(&object_node)?; - let mut update_model = actor_model.into_active_model(); - update_model.updated = sea_orm::Set(chrono::Utc::now()); - update_model.reset(model::user::Column::Name); - update_model.reset(model::user::Column::Summary); - update_model.reset(model::user::Column::Image); - update_model.reset(model::user::Column::Icon); - model::user::Entity::update(update_model) - .exec(self.db()).await?; - }, - Some(apb::ObjectType::Note) => { - let object_model = model::object::Model::new(&object_node)?; - let mut update_model = object_model.into_active_model(); - update_model.updated = sea_orm::Set(Some(chrono::Utc::now())); - update_model.reset(model::object::Column::Name); - update_model.reset(model::object::Column::Summary); - update_model.reset(model::object::Column::Content); - update_model.reset(model::object::Column::Sensitive); - model::object::Entity::update(update_model) - .exec(self.db()).await?; - }, - Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"), - None => tracing::warn!("empty type on embedded updated object"), - } + let oid = object_node.id().ok_or_else(UpubError::bad_request)?.to_string(); - tracing::info!("{} updated {}", aid, oid); - model::activity::Entity::insert(activity_model.into_active_model()) + let activity_model = model::activity::ActiveModel::new(&activity)?; + model::activity::Entity::insert(activity_model) .exec(self.db()) .await?; + let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?; + + let internal_oid = match object_node.object_type().ok_or_else(UpubError::bad_request)? { + apb::ObjectType::Actor(_) => { + let internal_uid = model::actor::Entity::ap_to_internal(&oid, self.db()).await?; + let mut actor_model = model::actor::ActiveModel::new(&object_node)?; + actor_model.internal = Set(internal_uid); + actor_model.updated = Set(chrono::Utc::now()); + model::actor::Entity::update(actor_model) + .exec(self.db()) + .await?; + Some(internal_uid) + }, + apb::ObjectType::Note => { + let internal_oid = model::object::Entity::ap_to_internal(&oid, self.db()).await?; + let mut object_model = model::object::ActiveModel::new(&object_node)?; + object_model.internal = Set(internal_oid); + object_model.updated = Set(chrono::Utc::now()); + model::object::Entity::update(object_model) + .exec(self.db()) + .await?; + Some(internal_oid) + }, + t => { + tracing::warn!("no side effects implemented for update type {t:?}"); + None + }, + }; + + tracing::info!("{} updated {}", uid, oid); let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?; + self.address_to(Some(internal_aid), internal_oid, &expanded_addressing).await?; Ok(()) } @@ -254,9 +247,8 @@ impl apb::server::Inbox for Context { let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?; // TODO in theory we could work with just object_id but right now only accept embedded let undone_activity = activity.object().extract().ok_or_else(UpubError::bad_request)?; - let undone_aid = undone_activity.id().ok_or_else(UpubError::bad_request)?; - let undone_object_uri = undone_activity.object().id().ok_or_else(UpubError::bad_request)?; let activity_type = undone_activity.activity_type().ok_or_else(UpubError::bad_request)?; + let undone_object_id = undone_activity.object().id().ok_or_else(UpubError::bad_request)?; let undone_activity_author = undone_activity.actor().id().ok_or_else(UpubError::bad_request)?; // can't undo activities from remote actors! @@ -264,32 +256,36 @@ impl apb::server::Inbox for Context { return Err(UpubError::forbidden()); }; - let obj = self.fetch_object(&undone_object_uri).await?; - let undone_object_id = obj.id; + self.insert_activity(activity.clone(), Some(server)).await?; match activity_type { apb::ActivityType::Like => { + let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; + let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?; model::like::Entity::delete_many() .filter( Condition::all() - .add(model::like::Column::Actor.eq(&uid)) - .add(model::like::Column::Likes.eq(&undone_object_id)) + .add(model::like::Column::Actor.eq(internal_uid)) + .add(model::like::Column::Object.eq(internal_oid)) ) .exec(self.db()) .await?; model::object::Entity::update_many() - .filter(model::object::Column::Id.eq(&undone_object_id)) + .filter(model::object::Column::Internal.eq(internal_oid)) .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1)) .exec(self.db()) .await?; }, apb::ActivityType::Follow => { + let undone_aid = undone_activity.id().ok_or_else(UpubError::bad_request)?; + let internal_aid = model::activity::Entity::ap_to_internal(undone_aid, self.db()).await?; model::relation::Entity::delete_many() - .filter( - Condition::all() - .add(model::relation::Column::Follower.eq(&uid)) - .add(model::relation::Column::Following.eq(&undone_object_id)) - ) + .filter(model::relation::Column::Activity.eq(internal_aid)) + .exec(self.db()) + .await?; + model::actor::Entity::update_many() + .filter(model::actor::Column::Id.eq(&undone_object_id)) + .col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1)) .exec(self.db()) .await?; }, @@ -299,48 +295,41 @@ impl apb::server::Inbox for Context { }, } - model::activity::Entity::delete_by_id(undone_aid).exec(self.db()).await?; - Ok(()) - } - async fn announce(&self, _: String, activity: serde_json::Value) -> crate::Result<()> { - let activity_model = model::activity::Model::new(&activity)?; - let Some(object_uri) = &activity_model.object else { - return Err(FieldError("object").into()); - }; - let obj = self.fetch_object(object_uri).await?; - let oid = obj.id; + async fn announce(&self, server: String, activity: serde_json::Value) -> crate::Result<()> { + let uid = activity.actor().id().ok_or_else(|| UpubError::field("actor"))?; + let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?; + let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?; + let activity_model = self.insert_activity(activity.clone(), Some(server)).await?; + let announced = self.fetch_object(&announced_id).await?; // relays send us activities as Announce, but we don't really want to count those towards the // total shares count of an object, so just fetch the object and be done with it if self.is_relay(&activity_model.actor) { - tracing::info!("relay {} broadcasted {}", activity_model.actor, oid); + tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id); return Ok(()) } - let share = model::share::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - actor: sea_orm::Set(activity_model.actor.clone()), - shares: sea_orm::Set(oid.clone()), - date: sea_orm::Set(activity.published().unwrap_or(chrono::Utc::now())), + let share = model::announce::ActiveModel { + internal: NotSet, + actor: Set(internal_uid), + object: Set(announced.internal), + published: Set(activity.published().unwrap_or(chrono::Utc::now())), }; let expanded_addressing = self.expand_addressing(activity.addressed()).await?; - self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?; - model::share::Entity::insert(share) + self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?; + model::announce::Entity::insert(share) .exec(self.db()).await?; - model::activity::Entity::insert(activity_model.clone().into_active_model()) - .exec(self.db()) - .await?; model::object::Entity::update_many() - .col_expr(model::object::Column::Shares, Expr::col(model::object::Column::Shares).add(1)) - .filter(model::object::Column::Id.eq(oid.clone())) + .col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1)) + .filter(model::object::Column::Internal.eq(announced.internal)) .exec(self.db()) .await?; - tracing::info!("{} shared {}", activity_model.actor, oid); + tracing::info!("{} shared {}", activity_model.actor, announced.id); Ok(()) } } diff --git a/src/server/normalizer.rs b/src/server/normalizer.rs index 4d358c2c..d7253a9e 100644 --- a/src/server/normalizer.rs +++ b/src/server/normalizer.rs @@ -1,5 +1,5 @@ use apb::{Node, Base, Object, Document}; -use sea_orm::{sea_query::Expr, ActiveValue::Set, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; +use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter}; use crate::{errors::UpubError, model, server::Context}; use super::fetcher::Fetcher; @@ -7,6 +7,7 @@ use super::fetcher::Fetcher; #[axum::async_trait] pub trait Normalizer { async fn insert_object(&self, obj: impl apb::Object, server: Option) -> crate::Result; + async fn insert_activity(&self, act: impl apb::Activity, server: Option) -> crate::Result; } #[axum::async_trait] @@ -129,4 +130,35 @@ impl Normalizer for super::Context { Ok(object) } + + async fn insert_activity(&self, activity: impl apb::Activity, server: Option) -> crate::Result { + let mut activity_model = model::activity::Model { + internal: 0, + id: activity.id().ok_or_else(|| UpubError::field("id"))?.to_string(), + activity_type: activity.activity_type().ok_or_else(|| UpubError::field("type"))?, + actor: activity.actor().id().ok_or_else(|| UpubError::field("actor"))?, + object: activity.object().id(), + target: activity.target().id(), + published: activity.published().unwrap_or(chrono::Utc::now()), + to: activity.to().into(), + bto: activity.bto().into(), + cc: activity.cc().into(), + bcc: activity.bcc().into(), + }; + if let Some(server) = server { + if Context::server(&activity_model.actor) != server + || Context::server(&activity_model.id) != server { + return Err(UpubError::forbidden()); + } + } + let mut active_model = activity_model.clone().into_active_model(); + active_model.internal = NotSet; + model::activity::Entity::insert(active_model) + .exec(self.db()) + .await?; + + let internal = model::activity::Entity::ap_to_internal(&activity_model.id, self.db()).await?; + activity_model.internal = internal; + Ok(activity_model) + } } diff --git a/src/server/outbox.rs b/src/server/outbox.rs index 8cb3dc0f..32116523 100644 --- a/src/server/outbox.rs +++ b/src/server/outbox.rs @@ -367,8 +367,10 @@ impl apb::server::Outbox for Context { return Err(UpubError::forbidden()); } - let mut new_actor_model = model::actor::ActiveModel::default(); - new_actor_model.internal = Unchanged(old_actor_model.internal); + let mut new_actor_model = model::actor::ActiveModel { + internal: Unchanged(old_actor_model.internal), + ..Default::default() + }; if let Some(name) = object_node.name() { new_actor_model.name = Set(Some(name.to_string())); @@ -398,8 +400,10 @@ impl apb::server::Outbox for Context { return Err(UpubError::forbidden()); } - let mut new_object_model = model::object::ActiveModel::default(); - new_object_model.internal = Unchanged(old_object_model.internal); + let mut new_object_model = model::object::ActiveModel { + internal: Unchanged(old_object_model.internal), + ..Default::default() + }; if let Some(name) = object_node.name() { new_object_model.name = Set(Some(name.to_string())); diff --git a/web/src/components/post.rs b/web/src/components/post.rs index 3a9b39d2..d6d89fea 100644 --- a/web/src/components/post.rs +++ b/web/src/components/post.rs @@ -1,7 +1,6 @@ use apb::{ActivityMut, Base, BaseMut, Object, ObjectMut}; use leptos::*; -use leptos_use::DebounceOptions; use crate::{prelude::*, WEBFINGER}; #[derive(Debug, Clone, Copy, Default)] diff --git a/web/src/components/timeline.rs b/web/src/components/timeline.rs index 8fee1712..a86b50b8 100644 --- a/web/src/components/timeline.rs +++ b/web/src/components/timeline.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeSet, pin::Pin, sync::Arc}; use apb::{Activity, ActivityMut, Base, Object}; use leptos::*; -use leptos_use::{signal_debounced, signal_throttled, use_display_media, use_document_visibility, use_element_size, use_infinite_scroll_with_options, use_scroll, use_scroll_with_options, use_window, use_window_scroll, UseDisplayMediaReturn, UseElementSizeReturn, UseInfiniteScrollOptions, UseScrollOptions, UseScrollReturn}; +use leptos_use::{signal_throttled, use_element_size, use_window_scroll, UseElementSizeReturn}; use crate::prelude::*; #[derive(Debug, Clone, Copy)] diff --git a/web/src/lib.rs b/web/src/lib.rs index 362dfc0d..b1b68b0f 100644 --- a/web/src/lib.rs +++ b/web/src/lib.rs @@ -99,7 +99,7 @@ impl WebfingerCache { Ok(res) => match res.error_for_status() { Ok(res) => match res.json::().await { Ok(doc) => { - if let Some(uid) = doc.links.into_iter().find(|x| x.rel == "self").map(|x| x.href).flatten() { + if let Some(uid) = doc.links.into_iter().find(|x| x.rel == "self").and_then(|x| x.href) { self.0.insert(query, LookupStatus::Found(uid)); } else { self.0.insert(query, LookupStatus::NotFound); diff --git a/web/src/page/register.rs b/web/src/page/register.rs index 981ed455..395b45de 100644 --- a/web/src/page/register.rs +++ b/web/src/page/register.rs @@ -19,13 +19,13 @@ pub fn RegisterPage() -> impl IntoView {
( Method::PUT, &format!("{URL_BASE}/auth"), None, auth ).await { - Ok(x) => {}, + Ok(_x) => {}, Err(e) => set_error.set(Some( view! {
{e.to_string()}
} )),