1
0
Fork 0
forked from alemi/upub

chore: separated outbox business logic in methods

dumped all in server but temporary, now can be properly modularized
also fixed endpoints
This commit is contained in:
əlemi 2024-04-08 02:30:43 +02:00
parent 9d3376a1f4
commit 7ce872cfff
Signed by: alemi
GPG key ID: A4895B84D311642C
5 changed files with 262 additions and 214 deletions

View file

@ -129,3 +129,14 @@ pub async fn auth(State(ctx): State<Context>, Json(login): Json<LoginForm>) -> R
}
}
}
#[axum::async_trait]
pub trait APOutbox {
async fn post_note(&self, uid: String, object: serde_json::Value) -> crate::Result<String>;
async fn post_activity(&self, uid: String, activity: serde_json::Value) -> crate::Result<String>;
async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result<String>;
async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result<String>;
async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result<String>;
async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result<String>;
async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String>;
}

View file

@ -41,7 +41,7 @@ pub fn ap_user(user: model::user::Model) -> serde_json::Value {
.set_public_key_pem(&user.public_key)
))
.set_discoverable(Some(true))
.set_endpoints(None)
.set_endpoints(Node::Empty)
}
pub async fn view(State(ctx) : State<Context>, Path(id): Path<String>) -> Result<JsonLD<serde_json::Value>, StatusCode> {

View file

@ -1,8 +1,8 @@
use axum::{extract::{Path, Query, State}, http::StatusCode, Json};
use sea_orm::{EntityTrait, IntoActiveModel, Order, QueryOrder, QuerySelect, Set};
use sea_orm::{EntityTrait, Order, QueryOrder, QuerySelect};
use apb::{AcceptType, Activity, ActivityMut, ActivityType, ObjectMut, Base, BaseMut, BaseType, Node, ObjectType};
use crate::{activitypub::{jsonld::LD, Addressed, CreationResult, JsonLD, Pagination}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url};
use apb::{AcceptType, ActivityMut, ActivityType, Base, BaseType, Node, ObjectType, RejectType};
use crate::{activitypub::{jsonld::LD, APOutbox, CreationResult, JsonLD, Pagination}, auth::{AuthIdentity, Identity}, errors::UpubError, model, server::Context, url};
pub async fn get(
State(ctx): State<Context>,
@ -75,219 +75,29 @@ pub async fn post(
Identity::Local(uid) => if ctx.uid(id.clone()) == uid {
match activity.base_type() {
None => Err(StatusCode::BAD_REQUEST.into()),
Some(BaseType::Link(_)) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()),
Some(BaseType::Object(ObjectType::Note)) => {
let oid = ctx.oid(uuid::Uuid::new_v4().to_string());
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
let object_model = model::object::Model::new(
&activity
.set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
let activity_model = model::activity::Model {
id: aid.clone(),
activity_type: ActivityType::Create,
actor: uid.clone(),
object: Some(oid.clone()),
target: None,
cc: object_model.cc.clone(),
bcc: object_model.bcc.clone(),
to: object_model.to.clone(),
bto: object_model.bto.clone(),
published: object_model.published,
};
Some(BaseType::Object(ObjectType::Note)) =>
Ok(CreationResult(ctx.post_note(uid, activity).await?)),
model::object::Entity::insert(object_model.into_active_model())
.exec(ctx.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(ctx.db()).await?;
Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) =>
Ok(CreationResult(ctx.post_activity(uid, activity).await?)),
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
ctx.address_to(&aid, Some(&oid), &addressed).await?;
ctx.deliver_to(&aid, &uid, &addressed).await?;
Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) =>
Ok(CreationResult(ctx.like(uid, activity).await?)),
Ok(CreationResult(aid))
},
Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) =>
Ok(CreationResult(ctx.follow(uid, activity).await?)),
Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) => {
let Some(object) = activity.object().extract() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let oid = ctx.oid(uuid::Uuid::new_v4().to_string());
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
let mut object_model = model::object::Model::new(
&object
.set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
let mut activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
object_model.to = activity_model.to.clone();
object_model.bto = activity_model.bto.clone();
object_model.cc = activity_model.cc.clone();
object_model.bcc = activity_model.bcc.clone();
activity_model.object = Some(oid.clone());
Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) =>
Ok(CreationResult(ctx.undo(uid, activity).await?)),
model::object::Entity::insert(object_model.into_active_model())
.exec(ctx.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(ctx.db()).await?;
Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) =>
Ok(CreationResult(ctx.accept(uid, activity).await?)),
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
ctx.address_to(&aid, Some(&oid), &addressed).await?;
ctx.deliver_to(&aid, &uid, &addressed).await?;
Ok(CreationResult(aid))
},
Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) => {
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
let Some(oid) = activity.object().id() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_published(Some(chrono::Utc::now()))
.set_actor(Node::link(uid.clone()))
)?;
let like_model = model::like::ActiveModel {
actor: Set(uid.clone()),
likes: Set(oid),
date: Set(chrono::Utc::now()),
..Default::default()
};
model::like::Entity::insert(like_model).exec(ctx.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(ctx.db()).await?;
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
ctx.address_to(&aid, None, &addressed).await?;
ctx.deliver_to(&aid, &uid, &addressed).await?;
Ok(CreationResult(aid))
},
Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) => {
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
if activity.object().id().is_none() {
return Err(StatusCode::BAD_REQUEST.into());
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(ctx.db()).await?;
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
ctx.address_to(&aid, None, &addressed).await?;
ctx.deliver_to(&aid, &uid, &addressed).await?;
Ok(CreationResult(aid))
},
Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) => {
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
{
let Some(old_aid) = activity.object().id() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let Some(old_activity) = model::activity::Entity::find_by_id(old_aid)
.one(ctx.db()).await?
else {
return Err(StatusCode::NOT_FOUND.into());
};
if old_activity.actor != uid {
return Err(StatusCode::FORBIDDEN.into());
}
match old_activity.activity_type {
ActivityType::Like => {
model::like::Entity::delete(model::like::ActiveModel {
actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())),
..Default::default()
}).exec(ctx.db()).await?;
},
ActivityType::Follow => {
model::relation::Entity::delete(model::relation::ActiveModel {
follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())),
..Default::default()
}).exec(ctx.db()).await?;
},
t => tracing::warn!("extra side effects for activity {t:?} not implemented"),
}
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model()).exec(ctx.db()).await?;
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
ctx.address_to(&aid, None, &addressed).await?;
ctx.deliver_to(&aid, &uid, &addressed).await?;
Ok(CreationResult(aid))
},
Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) => {
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
if activity.object().id().is_none() {
return Err(StatusCode::BAD_REQUEST.into());
}
let Some(accepted_id) = activity.object().id() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id)
.one(ctx.db()).await?
else {
return Err(StatusCode::NOT_FOUND.into());
};
match accepted_activity.activity_type {
ActivityType::Follow => {
model::relation::Entity::insert(
model::relation::ActiveModel {
follower: Set(accepted_activity.actor), following: Set(uid.clone()),
..Default::default()
}
).exec(ctx.db()).await?;
},
t => tracing::warn!("no side effects implemented for accepting {t:?}"),
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(ctx.db()).await?;
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
ctx.address_to(&aid, None, &addressed).await?;
ctx.deliver_to(&aid, &uid, &addressed).await?;
Ok(CreationResult(aid))
},
// Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) => {
// },
Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) =>
Ok(CreationResult(ctx.reject(uid, activity).await?)),
Some(_) => Err(StatusCode::NOT_IMPLEMENTED.into()),
}

View file

@ -1,3 +1,5 @@
use reqwest::StatusCode;
#[derive(Debug, thiserror::Error)]
pub enum UpubError {
#[error("database error: {0}")]
@ -17,8 +19,8 @@ pub enum UpubError {
}
impl UpubError {
pub fn code(code: axum::http::StatusCode) -> Self {
UpubError::Status(code)
pub fn bad_request() -> Self {
Self::Status(StatusCode::BAD_REQUEST)
}
}

View file

@ -1,10 +1,11 @@
use std::{str::Utf8Error, sync::Arc};
use openssl::rsa::Rsa;
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
use reqwest::StatusCode;
use sea_orm::{ColumnTrait, Condition, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set};
use crate::{activitypub::{jsonld::LD, PUBLIC_TARGET}, dispatcher::Dispatcher, fetcher::Fetcher, model};
use apb::{CollectionPageMut, CollectionMut, CollectionType, BaseMut, Node};
use crate::{activitypub::{jsonld::LD, APOutbox, Addressed, CreationResult, PUBLIC_TARGET}, dispatcher::Dispatcher, errors::UpubError, fetcher::Fetcher, model};
use apb::{Activity, ActivityMut, BaseMut, CollectionMut, CollectionPageMut, CollectionType, Node, ObjectMut};
#[derive(Clone)]
pub struct Context(Arc<ContextInner>);
@ -38,6 +39,8 @@ pub enum ContextError {
}
impl Context {
// TODO slim constructor down, maybe make a builder?
pub async fn new(db: DatabaseConnection, mut domain: String) -> Result<Self, ContextError> {
let protocol = if domain.starts_with("http://")
{ "http://" } else { "https://" }.to_string();
@ -208,6 +211,7 @@ impl Context {
Ok(())
}
// TODO should probs not be here
pub fn ap_collection(&self, id: &str, total_items: Option<u64>) -> serde_json::Value {
serde_json::Value::new_object()
.set_id(Some(id))
@ -216,6 +220,7 @@ impl Context {
.set_total_items(total_items)
}
// TODO should probs not be here
pub fn ap_collection_page(&self, id: &str, offset: u64, limit: u64, items: Vec<serde_json::Value>) -> serde_json::Value {
serde_json::Value::new_object()
.set_id(Some(&format!("{id}?offset={offset}")))
@ -224,4 +229,224 @@ impl Context {
.set_next(Node::link(format!("{id}?offset={}", offset+limit)))
.set_ordered_items(Node::Array(items))
}
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(uid, activity_targets).await?;
self.address_to(aid, oid, &addressed).await?;
self.deliver_to(aid, uid, &addressed).await?;
Ok(())
}
}
#[axum::async_trait]
impl APOutbox for Context {
async fn post_note(&self, uid: String, object: serde_json::Value) -> crate::Result<String> {
let oid = self.oid(uuid::Uuid::new_v4().to_string());
let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = object.addressed();
let object_model = model::object::Model::new(
&object
.set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
let activity_model = model::activity::Model {
id: aid.clone(),
activity_type: apb::ActivityType::Create,
actor: uid.clone(),
object: Some(oid.clone()),
target: None,
cc: object_model.cc.clone(),
bcc: object_model.bcc.clone(),
to: object_model.to.clone(),
bto: object_model.bto.clone(),
published: object_model.published,
};
model::object::Entity::insert(object_model.into_active_model())
.exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
Ok(aid)
}
async fn post_activity(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let Some(object) = activity.object().extract() else {
return Err(UpubError::bad_request());
};
let oid = self.oid(uuid::Uuid::new_v4().to_string());
let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
let mut object_model = model::object::Model::new(
&object
.set_id(Some(&oid))
.set_attributed_to(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
let mut activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
object_model.to = activity_model.to.clone();
object_model.bto = activity_model.bto.clone();
object_model.cc = activity_model.cc.clone();
object_model.bcc = activity_model.bcc.clone();
activity_model.object = Some(oid.clone());
model::object::Entity::insert(object_model.into_active_model())
.exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, Some(&oid)).await?;
Ok(aid)
}
async fn like(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
let Some(oid) = activity.object().id() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_published(Some(chrono::Utc::now()))
.set_actor(Node::link(uid.clone()))
)?;
let like_model = model::like::ActiveModel {
actor: Set(uid.clone()),
likes: Set(oid),
date: Set(chrono::Utc::now()),
..Default::default()
};
model::like::Entity::insert(like_model).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid)
}
async fn follow(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
if activity.object().id().is_none() {
return Err(StatusCode::BAD_REQUEST.into());
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid)
}
async fn accept(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
if activity.object().id().is_none() {
return Err(StatusCode::BAD_REQUEST.into());
}
let Some(accepted_id) = activity.object().id() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id)
.one(self.db()).await?
else {
return Err(StatusCode::NOT_FOUND.into());
};
match accepted_activity.activity_type {
apb::ActivityType::Follow => {
model::relation::Entity::insert(
model::relation::ActiveModel {
follower: Set(accepted_activity.actor), following: Set(uid.clone()),
..Default::default()
}
).exec(self.db()).await?;
},
t => tracing::warn!("no side effects implemented for accepting {t:?}"),
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?;
self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid)
}
async fn reject(&self, _uid: String, _activity: serde_json::Value) -> crate::Result<String> {
todo!()
}
async fn undo(&self, uid: String, activity: serde_json::Value) -> crate::Result<String> {
let aid = self.aid(uuid::Uuid::new_v4().to_string());
let activity_targets = activity.addressed();
{
let Some(old_aid) = activity.object().id() else {
return Err(StatusCode::BAD_REQUEST.into());
};
let Some(old_activity) = model::activity::Entity::find_by_id(old_aid)
.one(self.db()).await?
else {
return Err(StatusCode::NOT_FOUND.into());
};
if old_activity.actor != uid {
return Err(StatusCode::FORBIDDEN.into());
}
match old_activity.activity_type {
apb::ActivityType::Like => {
model::like::Entity::delete(model::like::ActiveModel {
actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())),
..Default::default()
}).exec(self.db()).await?;
},
apb::ActivityType::Follow => {
model::relation::Entity::delete(model::relation::ActiveModel {
follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())),
..Default::default()
}).exec(self.db()).await?;
},
t => tracing::warn!("extra side effects for activity {t:?} not implemented"),
}
}
let activity_model = model::activity::Model::new(
&activity
.set_id(Some(&aid))
.set_actor(Node::link(uid.clone()))
.set_published(Some(chrono::Utc::now()))
)?;
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db())
.await?;
self.dispatch(&uid, activity_targets, &aid, None).await?;
Ok(aid)
}
}