From bb26ff763d7c951bb25a82cd888417aaf5e449d1 Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 9 Apr 2024 01:14:48 +0200 Subject: [PATCH] chore: restructured completely kinda MVC: Model -> model View -> routes Controller -> server --- src/errors.rs | 19 + src/main.rs | 18 +- src/model/faker.rs | 4 +- src/model/user.rs | 2 +- src/{ => routes}/activitypub/README.md | 0 src/{ => routes}/activitypub/activity.rs | 0 src/{ => routes}/activitypub/inbox.rs | 2 +- src/{ => routes}/activitypub/jsonld.rs | 0 src/{ => routes}/activitypub/mod.rs | 1 + src/{ => routes}/activitypub/object.rs | 0 src/{ => routes}/activitypub/outbox.rs | 0 src/{ => routes/activitypub}/router.rs | 2 +- .../activitypub/user/following.rs | 2 +- src/{ => routes}/activitypub/user/inbox.rs | 2 +- src/{ => routes}/activitypub/user/mod.rs | 0 src/{ => routes}/activitypub/user/outbox.rs | 2 +- src/{ => routes}/activitypub/well_known.rs | 0 src/{ => routes}/mastodon/README.md | 0 src/{ => routes}/mastodon/mod.rs | 0 src/routes/mod.rs | 7 + src/server.rs | 608 ------------------ src/{ => server}/auth.rs | 0 src/server/context.rs | 229 +++++++ src/{ => server}/dispatcher.rs | 10 +- src/{ => server}/fetcher.rs | 0 src/server/inbox.rs | 163 +++++ src/server/mod.rs | 8 + src/server/outbox.rs | 220 +++++++ src/server/server.rs | 0 29 files changed, 667 insertions(+), 632 deletions(-) rename src/{ => routes}/activitypub/README.md (100%) rename src/{ => routes}/activitypub/activity.rs (100%) rename src/{ => routes}/activitypub/inbox.rs (93%) rename src/{ => routes}/activitypub/jsonld.rs (100%) rename src/{ => routes}/activitypub/mod.rs (99%) rename src/{ => routes}/activitypub/object.rs (100%) rename src/{ => routes}/activitypub/outbox.rs (100%) rename src/{ => routes/activitypub}/router.rs (98%) rename src/{ => routes}/activitypub/user/following.rs (94%) rename src/{ => routes}/activitypub/user/inbox.rs (95%) rename src/{ => routes}/activitypub/user/mod.rs (100%) rename src/{ => routes}/activitypub/user/outbox.rs (94%) rename src/{ => routes}/activitypub/well_known.rs (100%) rename src/{ => routes}/mastodon/README.md (100%) rename src/{ => routes}/mastodon/mod.rs (100%) create mode 100644 src/routes/mod.rs delete mode 100644 src/server.rs rename src/{ => server}/auth.rs (100%) create mode 100644 src/server/context.rs rename src/{ => server}/dispatcher.rs (96%) rename src/{ => server}/fetcher.rs (100%) create mode 100644 src/server/inbox.rs create mode 100644 src/server/mod.rs create mode 100644 src/server/outbox.rs create mode 100644 src/server/server.rs diff --git a/src/errors.rs b/src/errors.rs index 840a588..da309d2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,6 +12,9 @@ pub enum UpubError { #[error("openssl error: {0}")] OpenSSL(#[from] openssl::error::ErrorStack), + #[error("invalid UTF8 in key: {0}")] + OpenSSLParse(#[from] std::str::Utf8Error), + #[error("fetch error: {0}")] Reqwest(#[from] reqwest::Error), } @@ -24,6 +27,22 @@ impl UpubError { pub fn unprocessable() -> Self { Self::Status(axum::http::StatusCode::UNPROCESSABLE_ENTITY) } + + pub fn not_found() -> Self { + Self::Status(axum::http::StatusCode::NOT_FOUND) + } + + pub fn forbidden() -> Self { + Self::Status(axum::http::StatusCode::FORBIDDEN) + } + + pub fn not_modified() -> Self { + Self::Status(axum::http::StatusCode::NOT_MODIFIED) + } + + pub fn internal_server_error() -> Self { + Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR) + } } pub type UpubResult = Result; diff --git a/src/main.rs b/src/main.rs index ae07f44..5468897 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,8 @@ -pub mod activitypub; - -mod model; -mod server; -mod router; -mod errors; -mod auth; -mod dispatcher; -mod fetcher; +pub mod server; +pub mod model; +pub mod routes; +pub mod errors; #[cfg(feature = "migrations")] mod migrations; @@ -15,9 +10,6 @@ mod migrations; #[cfg(feature = "migrations")] use sea_orm_migration::MigratorTrait; -#[cfg(feature = "mastodon")] -mod mastodon; - use clap::{Parser, Subcommand}; use sea_orm::{ConnectOptions, Database, EntityTrait, IntoActiveModel}; @@ -92,7 +84,7 @@ async fn main() { .await.expect("error connecting to db"); match args.command { - CliCommand::Serve => router::serve(db, args.domain) + CliCommand::Serve => routes::activitypub::router::serve(db, args.domain) .await, #[cfg(feature = "migrations")] diff --git a/src/model/faker.rs b/src/model/faker.rs index 587b73b..68db65d 100644 --- a/src/model/faker.rs +++ b/src/model/faker.rs @@ -1,4 +1,4 @@ -use crate::{activitypub::PUBLIC_TARGET, model::{config, credential}}; +use crate::{routes::activitypub::PUBLIC_TARGET, model::{config, credential}}; use super::{activity, object, user, Audience}; use openssl::rsa::Rsa; use sea_orm::IntoActiveModel; @@ -10,7 +10,7 @@ pub async fn faker(db: &sea_orm::DatabaseConnection, domain: String, count: u64) let test_user = super::user::Model { id: format!("{domain}/users/test"), name: Some("μpub".into()), - domain: crate::activitypub::domain(&domain), + domain: crate::routes::activitypub::domain(&domain), preferred_username: "test".to_string(), summary: Some("hello world! i'm manually generated but served dynamically from db! check progress at https://git.alemi.dev/upub.git".to_string()), following: None, diff --git a/src/model/user.rs b/src/model/user.rs index 2859278..c090444 100644 --- a/src/model/user.rs +++ b/src/model/user.rs @@ -1,7 +1,7 @@ use sea_orm::entity::prelude::*; use apb::{Collection, Actor, PublicKey, ActorType}; -use crate::activitypub; +use crate::routes::activitypub; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "users")] diff --git a/src/activitypub/README.md b/src/routes/activitypub/README.md similarity index 100% rename from src/activitypub/README.md rename to src/routes/activitypub/README.md diff --git a/src/activitypub/activity.rs b/src/routes/activitypub/activity.rs similarity index 100% rename from src/activitypub/activity.rs rename to src/routes/activitypub/activity.rs diff --git a/src/activitypub/inbox.rs b/src/routes/activitypub/inbox.rs similarity index 93% rename from src/activitypub/inbox.rs rename to src/routes/activitypub/inbox.rs index 514df3d..409031c 100644 --- a/src/activitypub/inbox.rs +++ b/src/routes/activitypub/inbox.rs @@ -1,7 +1,7 @@ use axum::{extract::{Query, State}, http::StatusCode}; use sea_orm::{ColumnTrait, Condition, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect}; -use crate::{auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; +use crate::{server::auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; use super::{activity::ap_activity, jsonld::LD, JsonLD, Pagination, PUBLIC_TARGET}; diff --git a/src/activitypub/jsonld.rs b/src/routes/activitypub/jsonld.rs similarity index 100% rename from src/activitypub/jsonld.rs rename to src/routes/activitypub/jsonld.rs diff --git a/src/activitypub/mod.rs b/src/routes/activitypub/mod.rs similarity index 99% rename from src/activitypub/mod.rs rename to src/routes/activitypub/mod.rs index 0a103ab..7e726a7 100644 --- a/src/activitypub/mod.rs +++ b/src/routes/activitypub/mod.rs @@ -4,6 +4,7 @@ pub mod outbox; pub mod object; pub mod activity; pub mod well_known; +pub mod router; pub mod jsonld; pub use jsonld::JsonLD; diff --git a/src/activitypub/object.rs b/src/routes/activitypub/object.rs similarity index 100% rename from src/activitypub/object.rs rename to src/routes/activitypub/object.rs diff --git a/src/activitypub/outbox.rs b/src/routes/activitypub/outbox.rs similarity index 100% rename from src/activitypub/outbox.rs rename to src/routes/activitypub/outbox.rs diff --git a/src/router.rs b/src/routes/activitypub/router.rs similarity index 98% rename from src/router.rs rename to src/routes/activitypub/router.rs index 24141c5..45adeac 100644 --- a/src/router.rs +++ b/src/routes/activitypub/router.rs @@ -1,6 +1,6 @@ use axum::{routing::{get, post}, Router}; use sea_orm::DatabaseConnection; -use crate::activitypub as ap; +use crate::routes::activitypub as ap; pub async fn serve(db: DatabaseConnection, domain: String) { // build our application with a single route diff --git a/src/activitypub/user/following.rs b/src/routes/activitypub/user/following.rs similarity index 94% rename from src/activitypub/user/following.rs rename to src/routes/activitypub/user/following.rs index 4f0d760..c66683c 100644 --- a/src/activitypub/user/following.rs +++ b/src/routes/activitypub/user/following.rs @@ -1,7 +1,7 @@ use axum::{extract::{Path, Query, State}, http::StatusCode}; use sea_orm::{ColumnTrait, Condition, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect, SelectColumns}; -use crate::{activitypub::{jsonld::LD, JsonLD, Pagination}, model, server::Context, url}; +use crate::{routes::activitypub::{jsonld::LD, JsonLD, Pagination}, model, server::Context, url}; use model::relation::Column::{Following, Follower}; diff --git a/src/activitypub/user/inbox.rs b/src/routes/activitypub/user/inbox.rs similarity index 95% rename from src/activitypub/user/inbox.rs rename to src/routes/activitypub/user/inbox.rs index 5059db2..a53c93c0 100644 --- a/src/activitypub/user/inbox.rs +++ b/src/routes/activitypub/user/inbox.rs @@ -2,7 +2,7 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use sea_orm::{ColumnTrait, Condition, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect}; use apb::{ActivityType, ObjectType, Base, BaseType}; -use crate::{activitypub::{activity::ap_activity, jsonld::LD, APInbox, JsonLD, Pagination}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; +use crate::{routes::activitypub::{activity::ap_activity, jsonld::LD, APInbox, JsonLD, Pagination}, server::{Context, auth::{AuthIdentity, Identity}}, errors::UpubError, model, url}; pub async fn get( State(ctx): State, diff --git a/src/activitypub/user/mod.rs b/src/routes/activitypub/user/mod.rs similarity index 100% rename from src/activitypub/user/mod.rs rename to src/routes/activitypub/user/mod.rs diff --git a/src/activitypub/user/outbox.rs b/src/routes/activitypub/user/outbox.rs similarity index 94% rename from src/activitypub/user/outbox.rs rename to src/routes/activitypub/user/outbox.rs index 81767ad..30f1279 100644 --- a/src/activitypub/user/outbox.rs +++ b/src/routes/activitypub/user/outbox.rs @@ -2,7 +2,7 @@ use axum::{extract::{Path, Query, State}, http::StatusCode, Json}; use sea_orm::{EntityTrait, Order, QueryOrder, QuerySelect}; use apb::{AcceptType, ActivityMut, ActivityType, Base, BaseType, Node, ObjectType, RejectType}; -use crate::{activitypub::{jsonld::LD, APOutbox, CreationResult, JsonLD, Pagination}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; +use crate::{routes::activitypub::{jsonld::LD, APOutbox, CreationResult, JsonLD, Pagination}, server::auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url}; pub async fn get( State(ctx): State, diff --git a/src/activitypub/well_known.rs b/src/routes/activitypub/well_known.rs similarity index 100% rename from src/activitypub/well_known.rs rename to src/routes/activitypub/well_known.rs diff --git a/src/mastodon/README.md b/src/routes/mastodon/README.md similarity index 100% rename from src/mastodon/README.md rename to src/routes/mastodon/README.md diff --git a/src/mastodon/mod.rs b/src/routes/mastodon/mod.rs similarity index 100% rename from src/mastodon/mod.rs rename to src/routes/mastodon/mod.rs diff --git a/src/routes/mod.rs b/src/routes/mod.rs new file mode 100644 index 0000000..4095bfb --- /dev/null +++ b/src/routes/mod.rs @@ -0,0 +1,7 @@ +pub mod activitypub; + +#[cfg(feature = "web")] +pub mod web; + +#[cfg(feature = "mastodon")] +pub mod mastodon; diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 23debd0..0000000 --- a/src/server.rs +++ /dev/null @@ -1,608 +0,0 @@ -use std::{str::Utf8Error, sync::Arc}; - -use openssl::rsa::Rsa; -use reqwest::StatusCode; -use sea_orm::{sea_query::Expr, ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set}; - -use crate::{activitypub::{jsonld::LD, APInbox, APOutbox, Addressed, PUBLIC_TARGET}, dispatcher::Dispatcher, errors::{LoggableError, UpubError}, fetcher::Fetcher, model}; -use apb::{Activity, ActivityMut, Base, BaseMut, CollectionMut, CollectionPageMut, CollectionType, Node, Object, ObjectMut}; - -#[derive(Clone)] -pub struct Context(Arc); -struct ContextInner { - db: DatabaseConnection, - domain: String, - protocol: String, - fetcher: Fetcher, - dispatcher: Dispatcher, - // TODO keep these pre-parsed - app: model::application::Model, -} - -#[macro_export] -macro_rules! url { - ($ctx:expr, $($args: tt)*) => { - format!("{}{}{}", $ctx.protocol(), $ctx.base(), format!($($args)*)) - }; -} - -#[derive(Debug, thiserror::Error)] -pub enum ContextError { - #[error("database error: {0}")] - Db(#[from] DbErr), - - #[error("openssl error: {0}")] - OpenSSL(#[from] openssl::error::ErrorStack), - - #[error("invalid UTF8 PEM key: {0}")] - UTF8Error(#[from] Utf8Error) -} - -impl Context { - - // TODO slim constructor down, maybe make a builder? - pub async fn new(db: DatabaseConnection, mut domain: String) -> Result { - let protocol = if domain.starts_with("http://") - { "http://" } else { "https://" }.to_string(); - if domain.ends_with('/') { - domain.replace_range(domain.len()-1.., ""); - } - if domain.starts_with("http") { - domain = domain.replace("https://", "").replace("http://", ""); - } - let dispatcher = Dispatcher::new(); - for _ in 0..1 { // TODO customize delivery workers amount - dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! - } - let app = match model::application::Entity::find().one(&db).await? { - Some(model) => model, - None => { - tracing::info!("generating application keys"); - let rsa = Rsa::generate(2048)?; - let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string(); - let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string(); - let system = model::application::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - private_key: sea_orm::ActiveValue::Set(privk.clone()), - public_key: sea_orm::ActiveValue::Set(pubk.clone()), - created: sea_orm::ActiveValue::Set(chrono::Utc::now()), - }; - model::application::Entity::insert(system).exec(&db).await?; - // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time - model::application::Entity::find().one(&db).await?.expect("could not find app config just inserted") - } - }; - - let fetcher = Fetcher::new(db.clone(), domain.clone(), app.private_key.clone()); - - Ok(Context(Arc::new(ContextInner { - db, domain, protocol, app, fetcher, dispatcher, - }))) - } - - pub fn app(&self) -> &model::application::Model { - &self.0.app - } - - pub fn db(&self) -> &DatabaseConnection { - &self.0.db - } - - pub fn base(&self) -> &str { - &self.0.domain - } - - pub fn protocol(&self) -> &str { - &self.0.protocol - } - - pub fn uri(&self, entity: &str, id: String) -> String { - if id.starts_with("http") { id } else { - format!("{}{}/{}/{}", self.0.protocol, self.0.domain, entity, id) - } - } - - pub fn fetch(&self) -> &Fetcher { - &self.0.fetcher - } - - /// get full user id uri - pub fn uid(&self, id: String) -> String { - self.uri("users", id) - } - - /// get full object id uri - pub fn oid(&self, id: String) -> String { - self.uri("objects", id) - } - - /// get full activity id uri - pub fn aid(&self, id: String) -> String { - self.uri("activities", id) - } - - /// get bare id, usually an uuid but unspecified - pub fn id(&self, id: String) -> String { - if id.starts_with(&self.0.domain) { - id.split('/').last().unwrap_or("").to_string() - } else { - id - } - } - - pub fn server(id: &str) -> String { - id - .replace("https://", "") - .replace("http://", "") - .split('/') - .next() - .unwrap_or("") - .to_string() - } - - pub async fn expand_addressing(&self, uid: &str, mut targets: Vec) -> Result, DbErr> { - let following_addr = format!("{uid}/followers"); - if let Some(i) = targets.iter().position(|x| x == &following_addr) { - targets.remove(i); - model::relation::Entity::find() - .filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string()))) - .select_only() - .select_column(model::relation::Column::Follower) - .into_tuple::() - .all(self.db()) - .await? - .into_iter() - .for_each(|x| targets.push(x)); - } - Ok(targets) - } - - pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> Result<(), DbErr> { - let addressings : Vec = targets - .iter() - .filter(|to| !to.is_empty()) - .filter(|to| !to.ends_with("/followers")) - .map(|to| model::addressing::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - server: Set(Context::server(to)), - actor: Set(to.to_string()), - activity: Set(aid.to_string()), - object: Set(oid.map(|x| x.to_string())), - published: Set(chrono::Utc::now()), - }) - .collect(); - - if !addressings.is_empty() { - model::addressing::Entity::insert_many(addressings) - .exec(self.db()) - .await?; - } - - Ok(()) - } - - pub async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> { - let deliveries : Vec = targets - .iter() - .filter(|to| !to.is_empty()) - .filter(|to| Context::server(to) != self.base()) - .filter(|to| to != &PUBLIC_TARGET) - .map(|to| model::delivery::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - actor: Set(from.to_string()), - // TODO we should resolve each user by id and check its inbox because we can't assume - // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now - target: Set(format!("{}/inbox", to)), - activity: Set(aid.to_string()), - created: Set(chrono::Utc::now()), - not_before: Set(chrono::Utc::now()), - attempt: Set(0), - }) - .collect(); - - if !deliveries.is_empty() { - model::delivery::Entity::insert_many(deliveries) - .exec(self.db()) - .await?; - } - - self.0.dispatcher.wakeup(); - - Ok(()) - } - - // TODO should probs not be here - pub fn ap_collection(&self, id: &str, total_items: Option) -> serde_json::Value { - serde_json::Value::new_object() - .set_id(Some(id)) - .set_collection_type(Some(CollectionType::OrderedCollection)) - .set_first(Node::link(format!("{id}/page"))) - .set_total_items(total_items) - } - - // TODO should probs not be here - pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec) -> serde_json::Value { - serde_json::Value::new_object() - .set_id(Some(&format!("{id}?offset={offset}"))) - .set_collection_type(Some(CollectionType::OrderedCollectionPage)) - .set_part_of(Node::link(id.replace("/page", ""))) - .set_next(Node::link(format!("{id}?offset={}", offset+limit))) - .set_ordered_items(Node::Array(items)) - } - - pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { - let addressed = self.expand_addressing(uid, activity_targets).await?; - self.address_to(aid, oid, &addressed).await?; - self.deliver_to(aid, uid, &addressed).await?; - Ok(()) - } -} - -#[axum::async_trait] -impl APOutbox for Context { - async fn create_note(&self, uid: String, object: serde_json::Value) -> crate::Result { - let oid = self.oid(uuid::Uuid::new_v4().to_string()); - let aid = self.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = object.addressed(); - let object_model = model::object::Model::new( - &object - .set_id(Some(&oid)) - .set_attributed_to(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - let activity_model = model::activity::Model { - id: aid.clone(), - activity_type: apb::ActivityType::Create, - actor: uid.clone(), - object: Some(oid.clone()), - target: None, - cc: object_model.cc.clone(), - bcc: object_model.bcc.clone(), - to: object_model.to.clone(), - bto: object_model.bto.clone(), - published: object_model.published, - }; - - model::object::Entity::insert(object_model.into_active_model()) - .exec(self.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - - self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; - - Ok(aid) - } - - async fn create(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let Some(object) = activity.object().extract() else { - return Err(UpubError::bad_request()); - }; - - let oid = self.oid(uuid::Uuid::new_v4().to_string()); - let aid = self.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let mut object_model = model::object::Model::new( - &object - .set_id(Some(&oid)) - .set_attributed_to(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - let mut activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - object_model.to = activity_model.to.clone(); - object_model.bto = activity_model.bto.clone(); - object_model.cc = activity_model.cc.clone(); - object_model.bcc = activity_model.bcc.clone(); - activity_model.object = Some(oid.clone()); - - model::object::Entity::insert(object_model.into_active_model()) - .exec(self.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - - self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; - - Ok(aid) - } - - - async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - let Some(oid) = activity.object().id() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_published(Some(chrono::Utc::now())) - .set_actor(Node::link(uid.clone())) - )?; - - let like_model = model::like::ActiveModel { - actor: Set(uid.clone()), - likes: Set(oid), - date: Set(chrono::Utc::now()), - ..Default::default() - }; - model::like::Entity::insert(like_model).exec(self.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - if activity.object().id().is_none() { - return Err(StatusCode::BAD_REQUEST.into()); - } - - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - if activity.object().id().is_none() { - return Err(StatusCode::BAD_REQUEST.into()); - } - let Some(accepted_id) = activity.object().id() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id) - .one(self.db()).await? - else { - return Err(StatusCode::NOT_FOUND.into()); - }; - - match accepted_activity.activity_type { - apb::ActivityType::Follow => { - model::relation::Entity::insert( - model::relation::ActiveModel { - follower: Set(accepted_activity.actor), following: Set(uid.clone()), - ..Default::default() - } - ).exec(self.db()).await?; - }, - t => tracing::warn!("no side effects implemented for accepting {t:?}"), - } - - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } - - async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result { - todo!() - } - - async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result { - let aid = self.aid(uuid::Uuid::new_v4().to_string()); - let activity_targets = activity.addressed(); - { - let Some(old_aid) = activity.object().id() else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let Some(old_activity) = model::activity::Entity::find_by_id(old_aid) - .one(self.db()).await? - else { - return Err(StatusCode::NOT_FOUND.into()); - }; - if old_activity.actor != uid { - return Err(StatusCode::FORBIDDEN.into()); - } - match old_activity.activity_type { - apb::ActivityType::Like => { - model::like::Entity::delete(model::like::ActiveModel { - actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())), - ..Default::default() - }).exec(self.db()).await?; - }, - apb::ActivityType::Follow => { - model::relation::Entity::delete(model::relation::ActiveModel { - follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())), - ..Default::default() - }).exec(self.db()).await?; - }, - t => tracing::warn!("extra side effects for activity {t:?} not implemented"), - } - } - let activity_model = model::activity::Model::new( - &activity - .set_id(Some(&aid)) - .set_actor(Node::link(uid.clone())) - .set_published(Some(chrono::Utc::now())) - )?; - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()) - .await?; - - self.dispatch(&uid, activity_targets, &aid, None).await?; - - Ok(aid) - } -} - -#[axum::async_trait] -impl APInbox for Context { - async fn create(&self, activity: serde_json::Value) -> crate::Result<()> { - let activity_model = model::activity::Model::new(&activity)?; - let activity_targets = activity.addressed(); - let Some(object_node) = activity.object().extract() else { - // TODO we could process non-embedded activities or arrays but im lazy rn - tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); - return Err(StatusCode::UNPROCESSABLE_ENTITY.into()); - }; - let object_model = model::object::Model::new(&object_node)?; - let aid = activity_model.id.clone(); - let oid = object_model.id.clone(); - model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; - model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; - self.address_to(&aid, Some(&oid), &activity_targets).await?; - tracing::info!("{} posted {}", aid, oid); - Ok(()) - } - - async fn like(&self, activity: serde_json::Value) -> crate::Result<()> { - let aid = activity.actor().id().ok_or(StatusCode::BAD_REQUEST)?; - let oid = activity.object().id().ok_or(StatusCode::BAD_REQUEST)?; - let like = model::like::ActiveModel { - id: sea_orm::ActiveValue::NotSet, - actor: sea_orm::Set(aid.clone()), - likes: sea_orm::Set(oid.clone()), - date: sea_orm::Set(chrono::Utc::now()), - }; - match model::like::Entity::insert(like).exec(self.db()).await { - Err(sea_orm::DbErr::RecordNotInserted) => Err(StatusCode::NOT_MODIFIED.into()), - Err(sea_orm::DbErr::Exec(_)) => Err(StatusCode::NOT_MODIFIED.into()), // bad fix for sqlite - Err(e) => { - tracing::error!("unexpected error procesing like from {aid} to {oid}: {e}"); - Err(StatusCode::INTERNAL_SERVER_ERROR.into()) - } - Ok(_) => { - model::object::Entity::update_many() - .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) - .filter(model::object::Column::Id.eq(oid.clone())) - .exec(self.db()) - .await?; - tracing::info!("{} liked {}", aid, oid); - Ok(()) - }, - } - } - - async fn follow(&self, activity: serde_json::Value) -> crate::Result<()> { - let activity_targets = activity.addressed(); - let activity_model = model::activity::Model::new(&activity)?; - let aid = activity_model.id.clone(); - tracing::info!("{} wants to follow {}", activity_model.actor, activity_model.object.as_deref().unwrap_or("")); - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(self.db()).await?; - self.address_to(&aid, None, &activity_targets).await?; - Ok(()) - } - - async fn accept(&self, activity: serde_json::Value) -> crate::Result<()> { - // TODO what about TentativeAccept - let activity_model = model::activity::Model::new(&activity)?; - let Some(follow_request_id) = activity_model.object else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) - .one(self.db()).await? - else { - return Err(StatusCode::NOT_FOUND.into()); - }; - if follow_activity.object.unwrap_or("".into()) != activity_model.actor { - return Err(StatusCode::FORBIDDEN.into()); - } - - tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor); - - model::relation::Entity::insert( - model::relation::ActiveModel { - follower: Set(follow_activity.actor), - following: Set(activity_model.actor), - ..Default::default() - } - ).exec(self.db()).await?; - - self.address_to(&activity_model.id, None, &activity.addressed()).await?; - Ok(()) - } - - async fn reject(&self, activity: serde_json::Value) -> crate::Result<()> { - // TODO what about TentativeReject? - let activity_model = model::activity::Model::new(&activity)?; - let Some(follow_request_id) = activity_model.object else { - return Err(StatusCode::BAD_REQUEST.into()); - }; - let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) - .one(self.db()).await? - else { - return Err(StatusCode::NOT_FOUND.into()); - }; - if follow_activity.object.unwrap_or("".into()) != activity_model.actor { - return Err(StatusCode::FORBIDDEN.into()); - } - tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor); - self.address_to(&activity_model.id, None, &activity.addressed()).await?; - Ok(()) - } - - async fn delete(&self, activity: serde_json::Value) -> crate::Result<()> { - // TODO verify the signature before just deleting lmao - let oid = activity.object().id().ok_or(StatusCode::BAD_REQUEST)?; - // TODO maybe we should keep the tombstone? - model::user::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from users"); - model::activity::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from activities"); - model::object::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects"); - Ok(()) - } - - async fn update(&self, activity: serde_json::Value) -> crate::Result<()> { - let activity_model = model::activity::Model::new(&activity)?; - let activity_targets = activity.addressed(); - let Some(object_node) = activity.object().extract() else { - // TODO we could process non-embedded activities or arrays but im lazy rn - tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); - return Err(UpubError::unprocessable()); - }; - let aid = activity_model.id.clone(); - let Some(oid) = object_node.id().map(|x| x.to_string()) else { - return Err(UpubError::bad_request()); - }; - model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; - match object_node.object_type() { - Some(apb::ObjectType::Actor(_)) => { - // TODO oof here is an example of the weakness of this model, we have to go all the way - // back up to serde_json::Value because impl Object != impl Actor - let actor_model = model::user::Model::new(&object_node)?; - model::user::Entity::update(actor_model.into_active_model()) - .exec(self.db()).await?; - }, - Some(apb::ObjectType::Note) => { - let object_model = model::object::Model::new(&object_node)?; - model::object::Entity::update(object_model.into_active_model()) - .exec(self.db()).await?; - }, - Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"), - None => tracing::warn!("empty type on embedded updated object"), - } - self.address_to(&aid, Some(&oid), &activity_targets).await?; - tracing::info!("{} updated {}", aid, oid); - Ok(()) - } - - async fn undo(&self, _activity: serde_json::Value) -> crate::Result<()> { - todo!() - } -} diff --git a/src/auth.rs b/src/server/auth.rs similarity index 100% rename from src/auth.rs rename to src/server/auth.rs diff --git a/src/server/context.rs b/src/server/context.rs new file mode 100644 index 0000000..b1af279 --- /dev/null +++ b/src/server/context.rs @@ -0,0 +1,229 @@ +use std::sync::Arc; + +use apb::{BaseMut, CollectionMut, CollectionPageMut}; +use openssl::rsa::Rsa; +use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set}; + +use crate::{model, routes::activitypub::{jsonld::LD, PUBLIC_TARGET}}; + +use super::{dispatcher::Dispatcher, fetcher::Fetcher}; + + +#[derive(Clone)] +pub struct Context(Arc); +struct ContextInner { + db: DatabaseConnection, + domain: String, + protocol: String, + fetcher: Fetcher, + dispatcher: Dispatcher, + // TODO keep these pre-parsed + app: model::application::Model, +} + +#[macro_export] +macro_rules! url { + ($ctx:expr, $($args: tt)*) => { + format!("{}{}{}", $ctx.protocol(), $ctx.base(), format!($($args)*)) + }; +} + +impl Context { + + // TODO slim constructor down, maybe make a builder? + pub async fn new(db: DatabaseConnection, mut domain: String) -> crate::Result { + let protocol = if domain.starts_with("http://") + { "http://" } else { "https://" }.to_string(); + if domain.ends_with('/') { + domain.replace_range(domain.len()-1.., ""); + } + if domain.starts_with("http") { + domain = domain.replace("https://", "").replace("http://", ""); + } + let dispatcher = Dispatcher::new(); + for _ in 0..1 { // TODO customize delivery workers amount + dispatcher.spawn(db.clone(), domain.clone(), 30); // TODO ew don't do it this deep and secretly!! + } + let app = match model::application::Entity::find().one(&db).await? { + Some(model) => model, + None => { + tracing::info!("generating application keys"); + let rsa = Rsa::generate(2048)?; + let privk = std::str::from_utf8(&rsa.private_key_to_pem()?)?.to_string(); + let pubk = std::str::from_utf8(&rsa.public_key_to_pem()?)?.to_string(); + let system = model::application::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + private_key: sea_orm::ActiveValue::Set(privk.clone()), + public_key: sea_orm::ActiveValue::Set(pubk.clone()), + created: sea_orm::ActiveValue::Set(chrono::Utc::now()), + }; + model::application::Entity::insert(system).exec(&db).await?; + // sqlite doesn't resurn last inserted id so we're better off just querying again, it's just one time + model::application::Entity::find().one(&db).await?.expect("could not find app config just inserted") + } + }; + + let fetcher = Fetcher::new(db.clone(), domain.clone(), app.private_key.clone()); + + Ok(Context(Arc::new(ContextInner { + db, domain, protocol, app, fetcher, dispatcher, + }))) + } + + pub fn app(&self) -> &model::application::Model { + &self.0.app + } + + pub fn db(&self) -> &DatabaseConnection { + &self.0.db + } + + pub fn base(&self) -> &str { + &self.0.domain + } + + pub fn protocol(&self) -> &str { + &self.0.protocol + } + + pub fn uri(&self, entity: &str, id: String) -> String { + if id.starts_with("http") { id } else { + format!("{}{}/{}/{}", self.0.protocol, self.0.domain, entity, id) + } + } + + pub fn fetch(&self) -> &Fetcher { + &self.0.fetcher + } + + /// get full user id uri + pub fn uid(&self, id: String) -> String { + self.uri("users", id) + } + + /// get full object id uri + pub fn oid(&self, id: String) -> String { + self.uri("objects", id) + } + + /// get full activity id uri + pub fn aid(&self, id: String) -> String { + self.uri("activities", id) + } + + /// get bare id, usually an uuid but unspecified + pub fn id(&self, id: String) -> String { + if id.starts_with(&self.0.domain) { + id.split('/').last().unwrap_or("").to_string() + } else { + id + } + } + + pub fn server(id: &str) -> String { + id + .replace("https://", "") + .replace("http://", "") + .split('/') + .next() + .unwrap_or("") + .to_string() + } + + pub async fn expand_addressing(&self, uid: &str, mut targets: Vec) -> crate::Result> { + let following_addr = format!("{uid}/followers"); + if let Some(i) = targets.iter().position(|x| x == &following_addr) { + targets.remove(i); + model::relation::Entity::find() + .filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string()))) + .select_only() + .select_column(model::relation::Column::Follower) + .into_tuple::() + .all(self.db()) + .await? + .into_iter() + .for_each(|x| targets.push(x)); + } + Ok(targets) + } + + pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { + let addressings : Vec = targets + .iter() + .filter(|to| !to.is_empty()) + .filter(|to| !to.ends_with("/followers")) + .map(|to| model::addressing::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + server: Set(Context::server(to)), + actor: Set(to.to_string()), + activity: Set(aid.to_string()), + object: Set(oid.map(|x| x.to_string())), + published: Set(chrono::Utc::now()), + }) + .collect(); + + if !addressings.is_empty() { + model::addressing::Entity::insert_many(addressings) + .exec(self.db()) + .await?; + } + + Ok(()) + } + + pub async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> crate::Result<()> { + let deliveries : Vec = targets + .iter() + .filter(|to| !to.is_empty()) + .filter(|to| Context::server(to) != self.base()) + .filter(|to| to != &PUBLIC_TARGET) + .map(|to| model::delivery::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + actor: Set(from.to_string()), + // TODO we should resolve each user by id and check its inbox because we can't assume + // it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now + target: Set(format!("{}/inbox", to)), + activity: Set(aid.to_string()), + created: Set(chrono::Utc::now()), + not_before: Set(chrono::Utc::now()), + attempt: Set(0), + }) + .collect(); + + if !deliveries.is_empty() { + model::delivery::Entity::insert_many(deliveries) + .exec(self.db()) + .await?; + } + + self.0.dispatcher.wakeup(); + + Ok(()) + } + + // TODO should probs not be here + pub fn ap_collection(&self, id: &str, total_items: Option) -> serde_json::Value { + serde_json::Value::new_object() + .set_id(Some(id)) + .set_collection_type(Some(apb::CollectionType::OrderedCollection)) + .set_first(apb::Node::link(format!("{id}/page"))) + .set_total_items(total_items) + } + + // TODO should probs not be here + pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec) -> serde_json::Value { + serde_json::Value::new_object() + .set_id(Some(&format!("{id}?offset={offset}"))) + .set_collection_type(Some(apb::CollectionType::OrderedCollectionPage)) + .set_part_of(apb::Node::link(id.replace("/page", ""))) + .set_next(apb::Node::link(format!("{id}?offset={}", offset+limit))) + .set_ordered_items(apb::Node::Array(items)) + } + + pub async fn dispatch(&self, uid: &str, activity_targets: Vec, aid: &str, oid: Option<&str>) -> crate::Result<()> { + let addressed = self.expand_addressing(uid, activity_targets).await?; + self.address_to(aid, oid, &addressed).await?; + self.deliver_to(aid, uid, &addressed).await?; + Ok(()) + } +} diff --git a/src/dispatcher.rs b/src/server/dispatcher.rs similarity index 96% rename from src/dispatcher.rs rename to src/server/dispatcher.rs index e3ada1d..a926f94 100644 --- a/src/dispatcher.rs +++ b/src/server/dispatcher.rs @@ -5,17 +5,21 @@ use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, Qu use tokio::{sync::broadcast, task::JoinHandle}; use apb::{ActivityMut, Node}; -use crate::{activitypub::{activity::ap_activity, object::ap_object}, errors::UpubError, model, server::Context, VERSION}; +use crate::{routes::activitypub::{activity::ap_activity, object::ap_object}, errors::UpubError, model, server::Context, VERSION}; pub struct Dispatcher { waker: broadcast::Sender<()>, } -impl Dispatcher { - pub fn new() -> Self { +impl Default for Dispatcher { + fn default() -> Self { let (waker, _) = broadcast::channel(1); Dispatcher { waker } } +} + +impl Dispatcher { + pub fn new() -> Self { Dispatcher::default() } pub fn spawn(&self, db: DatabaseConnection, domain: String, poll_interval: u64) -> JoinHandle<()> { let waker = self.waker.subscribe(); diff --git a/src/fetcher.rs b/src/server/fetcher.rs similarity index 100% rename from src/fetcher.rs rename to src/server/fetcher.rs diff --git a/src/server/inbox.rs b/src/server/inbox.rs new file mode 100644 index 0000000..3268101 --- /dev/null +++ b/src/server/inbox.rs @@ -0,0 +1,163 @@ +use apb::{Activity, Base, Object}; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, Set}; + +use crate::{errors::{LoggableError, UpubError}, model, routes::activitypub::{APInbox, Addressed}}; + +use super::Context; + + +#[axum::async_trait] +impl APInbox for Context { + async fn create(&self, activity: serde_json::Value) -> crate::Result<()> { + let activity_model = model::activity::Model::new(&activity)?; + let activity_targets = activity.addressed(); + let Some(object_node) = activity.object().extract() else { + // TODO we could process non-embedded activities or arrays but im lazy rn + tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); + return Err(UpubError::unprocessable()); + }; + let object_model = model::object::Model::new(&object_node)?; + let aid = activity_model.id.clone(); + let oid = object_model.id.clone(); + model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; + self.address_to(&aid, Some(&oid), &activity_targets).await?; + tracing::info!("{} posted {}", aid, oid); + Ok(()) + } + + async fn like(&self, activity: serde_json::Value) -> crate::Result<()> { + let aid = activity.actor().id().ok_or(UpubError::bad_request())?; + let oid = activity.object().id().ok_or(UpubError::bad_request())?; + let like = model::like::ActiveModel { + id: sea_orm::ActiveValue::NotSet, + actor: sea_orm::Set(aid.clone()), + likes: sea_orm::Set(oid.clone()), + date: sea_orm::Set(chrono::Utc::now()), + }; + match model::like::Entity::insert(like).exec(self.db()).await { + Err(sea_orm::DbErr::RecordNotInserted) => Err(UpubError::not_modified()), + Err(sea_orm::DbErr::Exec(_)) => Err(UpubError::not_modified()), // bad fix for sqlite + Err(e) => { + tracing::error!("unexpected error procesing like from {aid} to {oid}: {e}"); + Err(UpubError::internal_server_error()) + } + Ok(_) => { + model::object::Entity::update_many() + .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) + .filter(model::object::Column::Id.eq(oid.clone())) + .exec(self.db()) + .await?; + tracing::info!("{} liked {}", aid, oid); + Ok(()) + }, + } + } + + async fn follow(&self, activity: serde_json::Value) -> crate::Result<()> { + let activity_targets = activity.addressed(); + let activity_model = model::activity::Model::new(&activity)?; + let aid = activity_model.id.clone(); + tracing::info!("{} wants to follow {}", activity_model.actor, activity_model.object.as_deref().unwrap_or("")); + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + self.address_to(&aid, None, &activity_targets).await?; + Ok(()) + } + + async fn accept(&self, activity: serde_json::Value) -> crate::Result<()> { + // TODO what about TentativeAccept + let activity_model = model::activity::Model::new(&activity)?; + let Some(follow_request_id) = activity_model.object else { + return Err(UpubError::bad_request()); + }; + let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) + .one(self.db()).await? + else { + return Err(UpubError::not_found()); + }; + if follow_activity.object.unwrap_or("".into()) != activity_model.actor { + return Err(UpubError::forbidden()); + } + + tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor); + + model::relation::Entity::insert( + model::relation::ActiveModel { + follower: Set(follow_activity.actor), + following: Set(activity_model.actor), + ..Default::default() + } + ).exec(self.db()).await?; + + self.address_to(&activity_model.id, None, &activity.addressed()).await?; + Ok(()) + } + + async fn reject(&self, activity: serde_json::Value) -> crate::Result<()> { + // TODO what about TentativeReject? + let activity_model = model::activity::Model::new(&activity)?; + let Some(follow_request_id) = activity_model.object else { + return Err(UpubError::bad_request()); + }; + let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) + .one(self.db()).await? + else { + return Err(UpubError::not_found()); + }; + if follow_activity.object.unwrap_or("".into()) != activity_model.actor { + return Err(UpubError::forbidden()); + } + tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor); + self.address_to(&activity_model.id, None, &activity.addressed()).await?; + Ok(()) + } + + async fn delete(&self, activity: serde_json::Value) -> crate::Result<()> { + // TODO verify the signature before just deleting lmao + let oid = activity.object().id().ok_or(UpubError::bad_request())?; + // TODO maybe we should keep the tombstone? + model::user::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from users"); + model::activity::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from activities"); + model::object::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects"); + Ok(()) + } + + async fn update(&self, activity: serde_json::Value) -> crate::Result<()> { + let activity_model = model::activity::Model::new(&activity)?; + let activity_targets = activity.addressed(); + let Some(object_node) = activity.object().extract() else { + // TODO we could process non-embedded activities or arrays but im lazy rn + tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); + return Err(UpubError::unprocessable()); + }; + let aid = activity_model.id.clone(); + let Some(oid) = object_node.id().map(|x| x.to_string()) else { + return Err(UpubError::bad_request()); + }; + model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; + match object_node.object_type() { + Some(apb::ObjectType::Actor(_)) => { + // TODO oof here is an example of the weakness of this model, we have to go all the way + // back up to serde_json::Value because impl Object != impl Actor + let actor_model = model::user::Model::new(&object_node)?; + model::user::Entity::update(actor_model.into_active_model()) + .exec(self.db()).await?; + }, + Some(apb::ObjectType::Note) => { + let object_model = model::object::Model::new(&object_node)?; + model::object::Entity::update(object_model.into_active_model()) + .exec(self.db()).await?; + }, + Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"), + None => tracing::warn!("empty type on embedded updated object"), + } + self.address_to(&aid, Some(&oid), &activity_targets).await?; + tracing::info!("{} updated {}", aid, oid); + Ok(()) + } + + async fn undo(&self, _activity: serde_json::Value) -> crate::Result<()> { + todo!() + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..1d5985d --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,8 @@ +pub mod context; +pub mod dispatcher; +pub mod fetcher; +pub mod inbox; +pub mod outbox; +pub mod auth; + +pub use context::Context; diff --git a/src/server/outbox.rs b/src/server/outbox.rs new file mode 100644 index 0000000..bbfe7fd --- /dev/null +++ b/src/server/outbox.rs @@ -0,0 +1,220 @@ +use apb::{Activity, ActivityMut, BaseMut, Node, ObjectMut}; +use sea_orm::{EntityTrait, IntoActiveModel, Set}; + +use crate::{errors::UpubError, model, routes::activitypub::{APOutbox, Addressed}}; + +use super::Context; + + +#[axum::async_trait] +impl APOutbox for Context { + async fn create_note(&self, uid: String, object: serde_json::Value) -> crate::Result { + let oid = self.oid(uuid::Uuid::new_v4().to_string()); + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = object.addressed(); + let object_model = model::object::Model::new( + &object + .set_id(Some(&oid)) + .set_attributed_to(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + let activity_model = model::activity::Model { + id: aid.clone(), + activity_type: apb::ActivityType::Create, + actor: uid.clone(), + object: Some(oid.clone()), + target: None, + cc: object_model.cc.clone(), + bcc: object_model.bcc.clone(), + to: object_model.to.clone(), + bto: object_model.bto.clone(), + published: object_model.published, + }; + + model::object::Entity::insert(object_model.into_active_model()) + .exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; + + Ok(aid) + } + + async fn create(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let Some(object) = activity.object().extract() else { + return Err(UpubError::bad_request()); + }; + + let oid = self.oid(uuid::Uuid::new_v4().to_string()); + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + let mut object_model = model::object::Model::new( + &object + .set_id(Some(&oid)) + .set_attributed_to(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + let mut activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + object_model.to = activity_model.to.clone(); + object_model.bto = activity_model.bto.clone(); + object_model.cc = activity_model.cc.clone(); + object_model.bcc = activity_model.bcc.clone(); + activity_model.object = Some(oid.clone()); + + model::object::Entity::insert(object_model.into_active_model()) + .exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?; + + Ok(aid) + } + + + async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + let Some(oid) = activity.object().id() else { + return Err(UpubError::bad_request()); + }; + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_published(Some(chrono::Utc::now())) + .set_actor(Node::link(uid.clone())) + )?; + + let like_model = model::like::ActiveModel { + actor: Set(uid.clone()), + likes: Set(oid), + date: Set(chrono::Utc::now()), + ..Default::default() + }; + model::like::Entity::insert(like_model).exec(self.db()).await?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } + + async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + if activity.object().id().is_none() { + return Err(UpubError::bad_request()); + } + + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } + + async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + if activity.object().id().is_none() { + return Err(UpubError::bad_request()); + } + let Some(accepted_id) = activity.object().id() else { + return Err(UpubError::bad_request()); + }; + let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id) + .one(self.db()).await? + else { + return Err(UpubError::not_found()); + }; + + match accepted_activity.activity_type { + apb::ActivityType::Follow => { + model::relation::Entity::insert( + model::relation::ActiveModel { + follower: Set(accepted_activity.actor), following: Set(uid.clone()), + ..Default::default() + } + ).exec(self.db()).await?; + }, + t => tracing::warn!("no side effects implemented for accepting {t:?}"), + } + + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()).await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } + + async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result { + todo!() + } + + async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result { + let aid = self.aid(uuid::Uuid::new_v4().to_string()); + let activity_targets = activity.addressed(); + { + let Some(old_aid) = activity.object().id() else { + return Err(UpubError::bad_request()); + }; + let Some(old_activity) = model::activity::Entity::find_by_id(old_aid) + .one(self.db()).await? + else { + return Err(UpubError::not_found()); + }; + if old_activity.actor != uid { + return Err(UpubError::forbidden()); + } + match old_activity.activity_type { + apb::ActivityType::Like => { + model::like::Entity::delete(model::like::ActiveModel { + actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())), + ..Default::default() + }).exec(self.db()).await?; + }, + apb::ActivityType::Follow => { + model::relation::Entity::delete(model::relation::ActiveModel { + follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())), + ..Default::default() + }).exec(self.db()).await?; + }, + t => tracing::warn!("extra side effects for activity {t:?} not implemented"), + } + } + let activity_model = model::activity::Model::new( + &activity + .set_id(Some(&aid)) + .set_actor(Node::link(uid.clone())) + .set_published(Some(chrono::Utc::now())) + )?; + model::activity::Entity::insert(activity_model.into_active_model()) + .exec(self.db()) + .await?; + + self.dispatch(&uid, activity_targets, &aid, None).await?; + + Ok(aid) + } +} diff --git a/src/server/server.rs b/src/server/server.rs new file mode 100644 index 0000000..e69de29