diff --git a/Cargo.toml b/Cargo.toml index 54f9cb08..2e50e2cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,6 @@ time = { version = "0.3", features = ["serde"], optional = true } async-recursion = "1.1" [features] -default = ["faker", "migrations", "mastodon"] -faker = [] +default = ["migrations"] migrations = ["dep:sea-orm-migration"] mastodon = ["dep:mastodon-async-entities", "dep:time"] diff --git a/src/model/faker.rs b/src/cli/faker.rs similarity index 96% rename from src/model/faker.rs rename to src/cli/faker.rs index 2f273205..6f6edd14 100644 --- a/src/model/faker.rs +++ b/src/cli/faker.rs @@ -1,5 +1,4 @@ -use crate::model::{addressing, config, credential}; -use super::{activity, object, user, Audience}; +use crate::model::{addressing, config, credential, activity, object, user, Audience}; use openssl::rsa::Rsa; use sea_orm::IntoActiveModel; @@ -7,7 +6,7 @@ pub async fn faker(db: &sea_orm::DatabaseConnection, domain: String, count: u64) use sea_orm::{EntityTrait, Set}; let key = Rsa::generate(2048).unwrap(); - let test_user = super::user::Model { + let test_user = user::Model { id: format!("{domain}/users/test"), name: Some("μpub".into()), domain: clean_domain(&domain), diff --git a/src/cli/fetch.rs b/src/cli/fetch.rs new file mode 100644 index 00000000..b674995e --- /dev/null +++ b/src/cli/fetch.rs @@ -0,0 +1,42 @@ +use sea_orm::{EntityTrait, IntoActiveModel}; + +use crate::server::fetcher::Fetchable; + +pub async fn fetch(db: sea_orm::DatabaseConnection, domain: String, uri: String, save: bool) -> crate::Result<()> { + use apb::Base; + + let ctx = crate::server::Context::new(db, domain) + .await.expect("failed creating server context"); + + let mut node = apb::Node::link(uri.to_string()); + node.fetch(&ctx).await?; + + let obj = node.get().expect("node still empty after fetch?"); + + if save { + match obj.base_type() { + Some(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { + crate::model::user::Entity::insert( + crate::model::user::Model::new(obj).unwrap().into_active_model() + ).exec(ctx.db()).await.unwrap(); + }, + Some(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { + crate::model::activity::Entity::insert( + crate::model::activity::Model::new(obj).unwrap().into_active_model() + ).exec(ctx.db()).await.unwrap(); + }, + Some(apb::BaseType::Object(apb::ObjectType::Note)) => { + crate::model::object::Entity::insert( + crate::model::object::Model::new(obj).unwrap().into_active_model() + ).exec(ctx.db()).await.unwrap(); + }, + Some(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), + Some(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), + None => tracing::error!("no type on object"), + } + } + + println!("{}", serde_json::to_string_pretty(&obj).unwrap()); + + Ok(()) +} diff --git a/src/cli/fix.rs b/src/cli/fix.rs new file mode 100644 index 00000000..027d0b7b --- /dev/null +++ b/src/cli/fix.rs @@ -0,0 +1,87 @@ +use sea_orm::EntityTrait; + + +pub async fn fix(db: sea_orm::DatabaseConnection, likes: bool, shares: bool, replies: bool) -> crate::Result<()> { + use futures::TryStreamExt; + + if likes { + tracing::info!("fixing likes..."); + let mut store = std::collections::HashMap::new(); + { + let mut stream = crate::model::like::Entity::find().stream(&db).await?; + while let Some(like) = stream.try_next().await? { + store.insert(like.likes.clone(), store.get(&like.likes).unwrap_or(&0) + 1); + } + } + + for (k, v) in store { + let m = crate::model::object::ActiveModel { + id: sea_orm::Set(k.clone()), + likes: sea_orm::Set(v), + ..Default::default() + }; + if let Err(e) = crate::model::object::Entity::update(m) + .exec(&db) + .await + { + tracing::warn!("record not updated ({k}): {e}"); + } + } + } + + if shares { + tracing::info!("fixing shares..."); + let mut store = std::collections::HashMap::new(); + { + let mut stream = crate::model::share::Entity::find().stream(&db).await?; + while let Some(share) = stream.try_next().await? { + store.insert(share.shares.clone(), store.get(&share.shares).unwrap_or(&0) + 1); + } + } + + for (k, v) in store { + let m = crate::model::object::ActiveModel { + id: sea_orm::Set(k.clone()), + shares: sea_orm::Set(v), + ..Default::default() + }; + if let Err(e) = crate::model::object::Entity::update(m) + .exec(&db) + .await + { + tracing::warn!("record not updated ({k}): {e}"); + } + } + } + + if replies { + tracing::info!("fixing replies..."); + let mut store = std::collections::HashMap::new(); + { + let mut stream = crate::model::object::Entity::find().stream(&db).await?; + while let Some(object) = stream.try_next().await? { + if let Some(reply) = object.in_reply_to { + let before = store.get(&reply).unwrap_or(&0); + store.insert(reply, before + 1); + } + } + } + + for (k, v) in store { + let m = crate::model::object::ActiveModel { + id: sea_orm::Set(k.clone()), + comments: sea_orm::Set(v), + ..Default::default() + }; + if let Err(e) = crate::model::object::Entity::update(m) + .exec(&db) + .await + { + tracing::warn!("record not updated ({k}): {e}"); + } + } + } + + tracing::info!("done running fix tasks"); + Ok(()) +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs new file mode 100644 index 00000000..ef23ff0e --- /dev/null +++ b/src/cli/mod.rs @@ -0,0 +1,14 @@ +mod fix; +pub use fix::*; + +mod fetch; +pub use fetch::*; + +mod faker; +pub use faker::*; + +mod relay; +pub use relay::*; + +mod register; +pub use register::*; diff --git a/src/cli/register.rs b/src/cli/register.rs new file mode 100644 index 00000000..2f84d801 --- /dev/null +++ b/src/cli/register.rs @@ -0,0 +1,44 @@ +use openssl::rsa::Rsa; +use sea_orm::{EntityTrait, IntoActiveModel}; + +pub async fn register( + db: sea_orm::DatabaseConnection, + domain: String, +) -> crate::Result<()> { + let key = Rsa::generate(2048).unwrap(); + let test_user = crate::model::user::Model { + id: format!("{domain}/users/test"), + name: Some("μpub".into()), + domain: clean_domain(&domain), + preferred_username: "test".to_string(), + summary: Some("hello world! i'm manually generated but served dynamically from db! check progress at https://git.alemi.dev/upub.git".to_string()), + following: None, + following_count: 0, + followers: None, + followers_count: 0, + statuses_count: 0, + icon: Some("https://cdn.alemi.dev/social/circle-square.png".to_string()), + image: Some("https://cdn.alemi.dev/social/someriver-xs.jpg".to_string()), + inbox: None, + shared_inbox: None, + outbox: None, + actor_type: apb::ActorType::Person, + created: chrono::Utc::now(), + updated: chrono::Utc::now(), + private_key: Some(std::str::from_utf8(&key.private_key_to_pem().unwrap()).unwrap().to_string()), + // TODO generate a fresh one every time + public_key: std::str::from_utf8(&key.public_key_to_pem().unwrap()).unwrap().to_string(), + }; + + crate::model::user::Entity::insert(test_user.clone().into_active_model()).exec(&db).await?; + + Ok(()) +} + +// TODO duplicated, make an util?? idk +fn clean_domain(domain: &str) -> String { + domain + .replace("http://", "") + .replace("https://", "") + .replace('/', "") +} diff --git a/src/cli/relay.rs b/src/cli/relay.rs new file mode 100644 index 00000000..888853de --- /dev/null +++ b/src/cli/relay.rs @@ -0,0 +1,40 @@ +use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder}; + +pub async fn relay(db: sea_orm::DatabaseConnection, domain: String, actor: String, accept: bool) -> crate::Result<()> { + let ctx = crate::server::Context::new(db, domain).await?; + + let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); + + let mut activity_model = crate::model::activity::Model { + id: aid.clone(), + activity_type: apb::ActivityType::Follow, + actor: ctx.base(), + object: Some(actor.clone()), + target: None, + published: chrono::Utc::now(), + to: crate::model::Audience(vec![actor.clone()]), + bto: crate::model::Audience::default(), + cc: crate::model::Audience(vec![apb::target::PUBLIC.to_string()]), + bcc: crate::model::Audience::default(), + }; + + if accept { + let follow_req = crate::model::activity::Entity::find() + .filter(crate::model::activity::Column::ActivityType.eq("Follow")) + .filter(crate::model::activity::Column::Actor.eq(&actor)) + .filter(crate::model::activity::Column::Object.eq(ctx.base())) + .order_by_desc(crate::model::activity::Column::Published) + .one(ctx.db()) + .await? + .expect("no follow request to accept"); + activity_model.activity_type = apb::ActivityType::Accept(apb::AcceptType::Accept); + activity_model.object = Some(follow_req.id); + }; + + crate::model::activity::Entity::insert(activity_model.into_active_model()) + .exec(ctx.db()).await?; + + ctx.dispatch(&ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await?; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 9ce1af57..d26b318a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ pub mod server; pub mod model; pub mod routes; +pub mod cli; pub mod errors; @@ -11,13 +12,11 @@ mod migrations; use sea_orm_migration::MigratorTrait; use clap::{Parser, Subcommand}; -use sea_orm::{ColumnTrait, ConnectOptions, Database, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder}; +use sea_orm::{ConnectOptions, Database}; pub use errors::UpubResult as Result; use tower_http::{cors::CorsLayer, trace::TraceLayer}; -use crate::server::fetcher::Fetchable; - pub const VERSION: &str = env!("CARGO_PKG_VERSION"); #[derive(Parser)] @@ -113,58 +112,25 @@ async fn main() { match args.command { #[cfg(feature = "migrations")] - CliCommand::Migrate => migrations::Migrator::up(&db, None) - .await.expect("error applying migrations"), + CliCommand::Migrate => + migrations::Migrator::up(&db, None) + .await.expect("error applying migrations"), #[cfg(feature = "faker")] - CliCommand::Faker { count } => model::faker::faker(&db, args.domain, count) - .await.expect("error creating fake entities"), + CliCommand::Faker { count } => + cli::faker(&db, args.domain, count) + .await.expect("error creating fake entities"), - CliCommand::Fetch { uri, save } => fetch(db, args.domain, uri, save) - .await.expect("error fetching object"), + CliCommand::Fetch { uri, save } => + cli::fetch(db, args.domain, uri, save) + .await.expect("error fetching object"), - CliCommand::Relay { actor, accept } => { - let ctx = server::Context::new(db, args.domain) - .await.expect("failed creating server context"); - - let aid = ctx.aid(uuid::Uuid::new_v4().to_string()); - - let mut activity_model = model::activity::Model { - id: aid.clone(), - activity_type: apb::ActivityType::Follow, - actor: ctx.base(), - object: Some(actor.clone()), - target: None, - published: chrono::Utc::now(), - to: model::Audience(vec![actor.clone()]), - bto: model::Audience::default(), - cc: model::Audience(vec![apb::target::PUBLIC.to_string()]), - bcc: model::Audience::default(), - }; - - if accept { - let follow_req = model::activity::Entity::find() - .filter(model::activity::Column::ActivityType.eq("Follow")) - .filter(model::activity::Column::Actor.eq(&actor)) - .filter(model::activity::Column::Object.eq(ctx.base())) - .order_by_desc(model::activity::Column::Published) - .one(ctx.db()) - .await - .expect("failed querying db for relay follow req") - .expect("no follow request to accept"); - activity_model.activity_type = apb::ActivityType::Accept(apb::AcceptType::Accept); - activity_model.object = Some(follow_req.id); - }; - - model::activity::Entity::insert(activity_model.into_active_model()) - .exec(ctx.db()).await.expect("could not insert activity in db"); - - ctx.dispatch(&ctx.base(), vec![actor, apb::target::PUBLIC.to_string()], &aid, None).await - .expect("could not dispatch relay activity"); - }, + CliCommand::Relay { actor, accept } => + cli::relay(db, args.domain, actor, accept) + .await.expect("error registering/accepting relay"), CliCommand::Fix { likes, shares, replies } => - fix(db, likes, shares, replies) + cli::fix(db, likes, shares, replies) .await .expect("failed running fix task"), @@ -192,128 +158,3 @@ async fn main() { }, } } - - -async fn fetch(db: sea_orm::DatabaseConnection, domain: String, uri: String, save: bool) -> crate::Result<()> { - use apb::Base; - - let ctx = server::Context::new(db, domain) - .await.expect("failed creating server context"); - - let mut node = apb::Node::link(uri.to_string()); - node.fetch(&ctx).await?; - - let obj = node.get().expect("node still empty after fetch?"); - - if save { - match obj.base_type() { - Some(apb::BaseType::Object(apb::ObjectType::Actor(_))) => { - model::user::Entity::insert( - model::user::Model::new(obj).unwrap().into_active_model() - ).exec(ctx.db()).await.unwrap(); - }, - Some(apb::BaseType::Object(apb::ObjectType::Activity(_))) => { - model::activity::Entity::insert( - model::activity::Model::new(obj).unwrap().into_active_model() - ).exec(ctx.db()).await.unwrap(); - }, - Some(apb::BaseType::Object(apb::ObjectType::Note)) => { - model::object::Entity::insert( - model::object::Model::new(obj).unwrap().into_active_model() - ).exec(ctx.db()).await.unwrap(); - }, - Some(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t), - Some(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"), - None => tracing::error!("no type on object"), - } - } - - println!("{}", serde_json::to_string_pretty(&obj).unwrap()); - - Ok(()) -} - -async fn fix(db: sea_orm::DatabaseConnection, likes: bool, shares: bool, replies: bool) -> crate::Result<()> { - use futures::TryStreamExt; - - if likes { - tracing::info!("fixing likes..."); - let mut store = std::collections::HashMap::new(); - { - let mut stream = model::like::Entity::find().stream(&db).await?; - while let Some(like) = stream.try_next().await? { - store.insert(like.likes.clone(), store.get(&like.likes).unwrap_or(&0) + 1); - } - } - - for (k, v) in store { - let m = model::object::ActiveModel { - id: sea_orm::Set(k.clone()), - likes: sea_orm::Set(v), - ..Default::default() - }; - if let Err(e) = model::object::Entity::update(m) - .exec(&db) - .await - { - tracing::warn!("record not updated ({k}): {e}"); - } - } - } - - if shares { - tracing::info!("fixing shares..."); - let mut store = std::collections::HashMap::new(); - { - let mut stream = model::share::Entity::find().stream(&db).await?; - while let Some(share) = stream.try_next().await? { - store.insert(share.shares.clone(), store.get(&share.shares).unwrap_or(&0) + 1); - } - } - - for (k, v) in store { - let m = model::object::ActiveModel { - id: sea_orm::Set(k.clone()), - shares: sea_orm::Set(v), - ..Default::default() - }; - if let Err(e) = model::object::Entity::update(m) - .exec(&db) - .await - { - tracing::warn!("record not updated ({k}): {e}"); - } - } - } - - if replies { - tracing::info!("fixing replies..."); - let mut store = std::collections::HashMap::new(); - { - let mut stream = model::object::Entity::find().stream(&db).await?; - while let Some(object) = stream.try_next().await? { - if let Some(reply) = object.in_reply_to { - let before = store.get(&reply).unwrap_or(&0); - store.insert(reply, before + 1); - } - } - } - - for (k, v) in store { - let m = model::object::ActiveModel { - id: sea_orm::Set(k.clone()), - comments: sea_orm::Set(v), - ..Default::default() - }; - if let Err(e) = model::object::Entity::update(m) - .exec(&db) - .await - { - tracing::warn!("record not updated ({k}): {e}"); - } - } - } - - tracing::info!("done running fix tasks"); - Ok(()) -} diff --git a/src/model/mod.rs b/src/model/mod.rs index 8ed7aef2..04bdb6d5 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -14,9 +14,6 @@ pub mod delivery; pub mod attachment; pub mod application; -#[cfg(feature = "faker")] -pub mod faker; - #[derive(Debug, Clone, thiserror::Error)] #[error("missing required field: '{0}'")] pub struct FieldError(pub &'static str);