diff --git a/src/activitypub/mod.rs b/src/activitypub/mod.rs index f63fdf5..58a3fe6 100644 --- a/src/activitypub/mod.rs +++ b/src/activitypub/mod.rs @@ -129,3 +129,14 @@ pub async fn auth(State(ctx): State, Json(login): Json) -> R } } } + +#[axum::async_trait] +pub trait APOutbox { + async fn post_note(&self, uid: String, object: serde_json::Value) -> crate::Result; + async fn post_activity(&self, uid: String, activity: serde_json::Value) -> crate::Result; + async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result; + async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result; + async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result; + async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result; + async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result; +} diff --git a/src/activitypub/user/mod.rs b/src/activitypub/user/mod.rs index 94238ba..f835b6e 100644 --- a/src/activitypub/user/mod.rs +++ b/src/activitypub/user/mod.rs @@ -41,7 +41,7 @@ pub fn ap_user(user: model::user::Model) -> serde_json::Value { .set_public_key_pem(&user.public_key) )) .set_discoverable(Some(true)) - .set_endpoints(None) + .set_endpoints(Node::Empty) } pub async fn view(State(ctx) : State, Path(id): Path) -> Result, StatusCode> { diff --git a/src/activitypub/user/outbox.rs b/src/activitypub/user/outbox.rs index e28fccc..2a64ed6 100644 --- a/src/activitypub/user/outbox.rs +++ b/src/activitypub/user/outbox.rs @@ -1,8 +1,8 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; -use sea_orm::{EntityTrait, IntoActiveModel, Order, QueryOrder, QuerySelect, Set}; +use sea_orm::{EntityTrait, Order, QueryOrder, QuerySelect}; -use apb::{AcceptType, Activity, ActivityMut, ActivityType, ObjectMut, Base, BaseMut, BaseType, Node, ObjectType}; -use crate::{activitypub::{jsonld::LD, Addressed, CreationResult, JsonLD, Pagination}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; +use apb::{AcceptType, ActivityMut, ActivityType, Base, BaseType, Node, ObjectType, RejectType}; +use crate::{activitypub::{jsonld::LD, APOutbox, CreationResult, JsonLD, Pagination}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; pub async fn get( State(ctx): State, @@ -75,219 +75,29 @@ pub async fn post( Identity::Local(uid) => if ctx.uid(id.clone()) == uid { match activity.base_type() { None => Err(StatusCode::BAD_REQUEST.into()), + Some(BaseType::Link(_)) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()), - Some(BaseType::Object(ObjectType::Note)) => { - let oid = ctx.oid(uuid::Uuid::new_v4().to_string()); - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let object_model = model::object::Model::new( - &activity - .set_id(Some(&oid)) - .set_attributed_to(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - let activity_model = model::activity::Model { - id: aid.clone(), - activity_type: ActivityType::Create, - actor: uid.clone(), - object: Some(oid.clone()), - target: None, - cc: object_model.cc.clone(), - bcc: object_model.bcc.clone(), - to: object_model.to.clone(), - bto: object_model.bto.clone(), - published: object_model.published, - }; + Some(BaseType::Object(ObjectType::Note)) => + Ok(CreationResult(ctx.post_note(uid, activity).await?)), - model::object::Entity::insert(object_model.into_active_model()) - .exec(ctx.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(ctx.db()).await?; + Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) => + Ok(CreationResult(ctx.post_activity(uid, activity).await?)), - let addressed = ctx.expand_addressing(&uid, activity_targets).await?; - ctx.address_to(&aid, Some(&oid), &addressed).await?; - ctx.deliver_to(&aid, &uid, &addressed).await?; + Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) => + Ok(CreationResult(ctx.like(uid, activity).await?)), - Ok(CreationResult(aid)) - }, + Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) => + Ok(CreationResult(ctx.follow(uid, activity).await?)), - Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) => { - let Some(object) = activity.object().extract() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let oid = ctx.oid(uuid::Uuid::new_v4().to_string()); - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let mut object_model = model::object::Model::new( - &object - .set_id(Some(&oid)) - .set_attributed_to(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - let mut activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - object_model.to = activity_model.to.clone(); - object_model.bto = activity_model.bto.clone(); - object_model.cc = activity_model.cc.clone(); - object_model.bcc = activity_model.bcc.clone(); - activity_model.object = Some(oid.clone()); + Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) => + Ok(CreationResult(ctx.undo(uid, activity).await?)), - model::object::Entity::insert(object_model.into_active_model()) - .exec(ctx.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(ctx.db()).await?; + Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) => + Ok(CreationResult(ctx.accept(uid, activity).await?)), - let addressed = ctx.expand_addressing(&uid, activity_targets).await?; - ctx.address_to(&aid, Some(&oid), &addressed).await?; - ctx.deliver_to(&aid, &uid, &addressed).await?; - Ok(CreationResult(aid)) - }, - - Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) => { - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let Some(oid) = activity.object().id() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_published(Some(chrono::Utc::now())) - .set_actor(Node::link(uid.clone())) - )?; - - let like_model = model::like::ActiveModel { - actor: Set(uid.clone()), - likes: Set(oid), - date: Set(chrono::Utc::now()), - ..Default::default() - }; - model::like::Entity::insert(like_model).exec(ctx.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(ctx.db()).await?; - - let addressed = ctx.expand_addressing(&uid, activity_targets).await?; - ctx.address_to(&aid, None, &addressed).await?; - ctx.deliver_to(&aid, &uid, &addressed).await?; - Ok(CreationResult(aid)) - }, - - Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) => { - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - if activity.object().id().is_none() { - return Err(StatusCode::BAD_REQUEST.into()); - } - - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(ctx.db()).await?; - - let addressed = ctx.expand_addressing(&uid, activity_targets).await?; - ctx.address_to(&aid, None, &addressed).await?; - ctx.deliver_to(&aid, &uid, &addressed).await?; - Ok(CreationResult(aid)) - }, - - Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) => { - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - { - let Some(old_aid) = activity.object().id() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let Some(old_activity) = model::activity::Entity::find_by_id(old_aid) - .one(ctx.db()).await? - else { - return Err(StatusCode::NOT_FOUND.into()); - }; - if old_activity.actor != uid { - return Err(StatusCode::FORBIDDEN.into()); - } - match old_activity.activity_type { - ActivityType::Like => { - model::like::Entity::delete(model::like::ActiveModel { - actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())), - ..Default::default() - }).exec(ctx.db()).await?; - }, - ActivityType::Follow => { - model::relation::Entity::delete(model::relation::ActiveModel { - follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())), - ..Default::default() - }).exec(ctx.db()).await?; - }, - t => tracing::warn!("extra side effects for activity {t:?} not implemented"), - } - } - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()).exec(ctx.db()).await?; - - let addressed = ctx.expand_addressing(&uid, activity_targets).await?; - ctx.address_to(&aid, None, &addressed).await?; - ctx.deliver_to(&aid, &uid, &addressed).await?; - Ok(CreationResult(aid)) - }, - - Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) => { - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - if activity.object().id().is_none() { - return Err(StatusCode::BAD_REQUEST.into()); - } - let Some(accepted_id) = activity.object().id() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id) - .one(ctx.db()).await? - else { - return Err(StatusCode::NOT_FOUND.into()); - }; - - match accepted_activity.activity_type { - ActivityType::Follow => { - model::relation::Entity::insert( - model::relation::ActiveModel { - follower: Set(accepted_activity.actor), following: Set(uid.clone()), - ..Default::default() - } - ).exec(ctx.db()).await?; - }, - t => tracing::warn!("no side effects implemented for accepting {t:?}"), - } - - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(ctx.db()).await?; - - let addressed = ctx.expand_addressing(&uid, activity_targets).await?; - ctx.address_to(&aid, None, &addressed).await?; - ctx.deliver_to(&aid, &uid, &addressed).await?; - Ok(CreationResult(aid)) - }, - - // Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) => { - // }, + Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) => + Ok(CreationResult(ctx.reject(uid, activity).await?)), Some(_) => Err(StatusCode::NOT_IMPLEMENTED.into()), } diff --git a/src/errors.rs b/src/errors.rs index 1ffd980..ef561ea 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,5 @@ +use reqwest::StatusCode; + #[derive(Debug, thiserror::Error)] pub enum UpubError { #[error("database error: {0}")] @@ -17,8 +19,8 @@ pub enum UpubError { } impl UpubError { - pub fn code(code: axum::http::StatusCode) -> Self { - UpubError::Status(code) + pub fn bad_request() -> Self { + Self::Status(StatusCode::BAD_REQUEST) } } diff --git a/src/server.rs b/src/server.rs index 9c5b23b..32dd71a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,11 @@ use std::{str::Utf8Error, sync::Arc}; use openssl::rsa::Rsa; -use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; +use reqwest::StatusCode; +use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; -use crate::{activitypub::{jsonld::LD, PUBLIC_TARGET}, dispatcher::Dispatcher, fetcher::Fetcher, model}; -use apb::{CollectionPageMut, CollectionMut, CollectionType, BaseMut, Node}; +use crate::{activitypub::{jsonld::LD, APOutbox, Addressed, CreationResult, PUBLIC_TARGET}, dispatcher::Dispatcher, errors::UpubError, fetcher::Fetcher, model}; +use apb::{Activity, ActivityMut, BaseMut, CollectionMut, CollectionPageMut, CollectionType, Node, ObjectMut}; #[derive(Clone)] pub struct Context(Arc); @@ -38,6 +39,8 @@ pub enum ContextError { } impl Context { + + // TODO slim constructor down, maybe make a builder? pub async fn new(db: DatabaseConnection, mut domain: String) -> Result { let protocol = if domain.starts_with("http://") { "http://" } else { "https://" }.to_string(); @@ -208,6 +211,7 @@ impl Context { Ok(()) } + // TODO should probs not be here pub fn ap_collection(&self, id: &str, total_items: Option) -> serde_json::Value { serde_json::Value::new_object() .set_id(Some(id)) @@ -216,6 +220,7 @@ impl Context { .set_total_items(total_items) } + // TODO should probs not be here pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec) -> serde_json::Value { serde_json::Value::new_object() .set_id(Some(&format!("{id}?offset={offset}"))) @@ -224,4 +229,224 @@ impl Context { .set_next(Node::link(format!("{id}?offset={}", offset+limit))) .set_ordered_items(Node::Array(items)) } + + pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { + let addressed = self.expand_addressing(uid, activity_targets).await?; + self.address_to(aid, oid, &addressed).await?; + self.deliver_to(aid, uid, &addressed).await?; + Ok(()) + } +} + +#[axum::async_trait] +impl APOutbox for Context { + async fn post_note(&self, uid: String, object: serde_json::Value) -> crate::Result { + let oid = self.oid(uuid::Uuid::new_v4().to_string()); + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = object.addressed(); + let object_model = model::object::Model::new( + &object + .set_id(Some(&oid)) + .set_attributed_to(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + let activity_model = model::activity::Model { + id: aid.clone(), + activity_type: apb::ActivityType::Create, + actor: uid.clone(), + object: Some(oid.clone()), + target: None, + cc: object_model.cc.clone(), + bcc: object_model.bcc.clone(), + to: object_model.to.clone(), + bto: object_model.bto.clone(), + published: object_model.published, + }; + + model::object::Entity::insert(object_model.into_active_model()) + .exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; + + Ok(aid) + } + + async fn post_activity(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let Some(object) = activity.object().extract() else { + return Err(UpubError::bad_request()); + }; + + let oid = self.oid(uuid::Uuid::new_v4().to_string()); + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + let mut object_model = model::object::Model::new( + &object + .set_id(Some(&oid)) + .set_attributed_to(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + let mut activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + object_model.to = activity_model.to.clone(); + object_model.bto = activity_model.bto.clone(); + object_model.cc = activity_model.cc.clone(); + object_model.bcc = activity_model.bcc.clone(); + activity_model.object = Some(oid.clone()); + + model::object::Entity::insert(object_model.into_active_model()) + .exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; + + Ok(aid) + } + + + async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + let Some(oid) = activity.object().id() else { + return Err(StatusCode::BAD_REQUEST.into()); + }; + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_published(Some(chrono::Utc::now())) + .set_actor(Node::link(uid.clone())) + )?; + + let like_model = model::like::ActiveModel { + actor: Set(uid.clone()), + likes: Set(oid), + date: Set(chrono::Utc::now()), + ..Default::default() + }; + model::like::Entity::insert(like_model).exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } + + async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + if activity.object().id().is_none() { + return Err(StatusCode::BAD_REQUEST.into()); + } + + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } + + async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + if activity.object().id().is_none() { + return Err(StatusCode::BAD_REQUEST.into()); + } + let Some(accepted_id) = activity.object().id() else { + return Err(StatusCode::BAD_REQUEST.into()); + }; + let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id) + .one(self.db()).await? + else { + return Err(StatusCode::NOT_FOUND.into()); + }; + + match accepted_activity.activity_type { + apb::ActivityType::Follow => { + model::relation::Entity::insert( + model::relation::ActiveModel { + follower: Set(accepted_activity.actor), following: Set(uid.clone()), + ..Default::default() + } + ).exec(self.db()).await?; + }, + t => tracing::warn!("no side effects implemented for accepting {t:?}"), + } + + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } + + async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result { + todo!() + } + + async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + { + let Some(old_aid) = activity.object().id() else { + return Err(StatusCode::BAD_REQUEST.into()); + }; + let Some(old_activity) = model::activity::Entity::find_by_id(old_aid) + .one(self.db()).await? + else { + return Err(StatusCode::NOT_FOUND.into()); + }; + if old_activity.actor != uid { + return Err(StatusCode::FORBIDDEN.into()); + } + match old_activity.activity_type { + apb::ActivityType::Like => { + model::like::Entity::delete(model::like::ActiveModel { + actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())), + ..Default::default() + }).exec(self.db()).await?; + }, + apb::ActivityType::Follow => { + model::relation::Entity::delete(model::relation::ActiveModel { + follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())), + ..Default::default() + }).exec(self.db()).await?; + }, + t => tracing::warn!("extra side effects for activity {t:?} not implemented"), + } + } + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()) + .await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } }