feat: crude implementation for most outbox methods
This commit is contained in:
parent
f83103ba72
commit
c3face463e
1 changed files with 206 additions and 122 deletions
|
@ -1,7 +1,7 @@
|
||||||
use axum::{extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json};
|
use axum::{extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json};
|
||||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, IntoActiveModel, Order, QueryFilter, QueryOrder, QuerySelect, SelectColumns, Set};
|
use sea_orm::{ColumnTrait, Condition, DbErr, EntityTrait, IntoActiveModel, Order, QueryFilter, QueryOrder, QuerySelect, SelectColumns, Set};
|
||||||
|
|
||||||
use crate::{activitypub::{jsonld::LD, JsonLD, Pagination, PUBLIC_TARGET}, activitystream::{object::{activity::{Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, model::{self, activity, object, FieldError}, server::Context, url};
|
use crate::{activitypub::{jsonld::LD, JsonLD, Pagination, PUBLIC_TARGET}, activitystream::{object::{activity::{accept::AcceptType, Activity, ActivityMut, ActivityType}, collection::{page::CollectionPageMut, CollectionMut, CollectionType}, Addressed, ObjectMut}, Base, BaseMut, BaseType, Node, ObjectType}, auth::{AuthIdentity, Identity}, model::{self, activity, object, FieldError}, server::Context, url};
|
||||||
|
|
||||||
pub async fn get(
|
pub async fn get(
|
||||||
State(ctx): State<Context>,
|
State(ctx): State<Context>,
|
||||||
|
@ -121,95 +121,58 @@ pub async fn post(
|
||||||
match activity.base_type() {
|
match activity.base_type() {
|
||||||
None => Err(StatusCode::BAD_REQUEST.into()),
|
None => Err(StatusCode::BAD_REQUEST.into()),
|
||||||
Some(BaseType::Link(_)) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()),
|
Some(BaseType::Link(_)) => Err(StatusCode::UNPROCESSABLE_ENTITY.into()),
|
||||||
|
|
||||||
// Some(BaseType::Object(ObjectType::Note)) => {
|
// Some(BaseType::Object(ObjectType::Note)) => {
|
||||||
// },
|
// },
|
||||||
|
|
||||||
Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) => {
|
Some(BaseType::Object(ObjectType::Activity(ActivityType::Create))) => {
|
||||||
let Some(object) = activity.object().get().map(|x| x.underlying_json_object()) else {
|
let Some(object) = activity.object().get().map(|x| x.underlying_json_object()) else {
|
||||||
return Err(StatusCode::BAD_REQUEST.into());
|
return Err(StatusCode::BAD_REQUEST.into());
|
||||||
};
|
};
|
||||||
let oid = ctx.oid(uuid::Uuid::new_v4().to_string());
|
let oid = ctx.oid(uuid::Uuid::new_v4().to_string());
|
||||||
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
||||||
let mut object_model = model::object::Model::new(&object)?;
|
let activity_targets = activity.addressed();
|
||||||
let mut activity_model = model::activity::Model::new(&activity)?;
|
let mut object_model = model::object::Model::new(
|
||||||
object_model.id = oid.clone();
|
&object
|
||||||
|
.set_id(Some(&oid))
|
||||||
|
.set_attributed_to(Node::link(uid.clone()))
|
||||||
|
.set_published(Some(chrono::Utc::now()))
|
||||||
|
)?;
|
||||||
|
let mut activity_model = model::activity::Model::new(
|
||||||
|
&activity
|
||||||
|
.set_id(Some(&aid))
|
||||||
|
.set_actor(Node::link(uid.clone()))
|
||||||
|
.set_published(Some(chrono::Utc::now()))
|
||||||
|
)?;
|
||||||
object_model.to = activity_model.to.clone();
|
object_model.to = activity_model.to.clone();
|
||||||
object_model.bto = activity_model.bto.clone();
|
object_model.bto = activity_model.bto.clone();
|
||||||
object_model.cc = activity_model.cc.clone();
|
object_model.cc = activity_model.cc.clone();
|
||||||
object_model.bcc = activity_model.bcc.clone();
|
object_model.bcc = activity_model.bcc.clone();
|
||||||
object_model.attributed_to = Some(uid.clone());
|
|
||||||
object_model.published = chrono::Utc::now();
|
|
||||||
activity_model.id = aid.clone();
|
|
||||||
activity_model.published = chrono::Utc::now();
|
|
||||||
activity_model.actor = uid.clone();
|
|
||||||
activity_model.object = Some(oid.clone());
|
activity_model.object = Some(oid.clone());
|
||||||
|
|
||||||
model::object::Entity::insert(object_model.into_active_model())
|
model::object::Entity::insert(object_model.into_active_model())
|
||||||
.exec(ctx.db()).await?;
|
.exec(ctx.db()).await?;
|
||||||
|
|
||||||
model::activity::Entity::insert(activity_model.into_active_model())
|
model::activity::Entity::insert(activity_model.into_active_model())
|
||||||
.exec(ctx.db()).await?;
|
.exec(ctx.db()).await?;
|
||||||
|
|
||||||
let mut addressed = activity.addressed();
|
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
|
||||||
let followers = url!(ctx, "/users/{id}/followers"); // TODO maybe can be done better?
|
ctx.address_to(&aid, Some(&oid), &addressed).await?;
|
||||||
if let Some(i) = addressed.iter().position(|x| x == &followers) {
|
ctx.deliver_to(&aid, &uid, &addressed).await?;
|
||||||
addressed.remove(i);
|
|
||||||
model::relation::Entity::find()
|
|
||||||
.filter(Condition::all().add(model::relation::Column::Following.eq(uid.clone())))
|
|
||||||
.select_column(model::relation::Column::Follower)
|
|
||||||
.into_tuple::<String>()
|
|
||||||
.all(ctx.db())
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.for_each(|x| addressed.push(x));
|
|
||||||
}
|
|
||||||
|
|
||||||
let addressings : Vec<model::addressing::ActiveModel> = addressed
|
|
||||||
.iter()
|
|
||||||
.map(|to| model::addressing::ActiveModel {
|
|
||||||
server: Set(Context::server(to)),
|
|
||||||
actor: Set(to.to_string()),
|
|
||||||
activity: Set(aid.clone()),
|
|
||||||
object: Set(Some(oid.clone())),
|
|
||||||
published: Set(chrono::Utc::now()),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
model::addressing::Entity::insert_many(addressings)
|
|
||||||
.exec(ctx.db()).await?;
|
|
||||||
|
|
||||||
let deliveries : Vec<model::delivery::ActiveModel> = addressed
|
|
||||||
.iter()
|
|
||||||
.filter(|to| Context::server(to) != ctx.base())
|
|
||||||
.filter(|to| to != &PUBLIC_TARGET)
|
|
||||||
.map(|to| model::delivery::ActiveModel {
|
|
||||||
// TODO we should resolve each user by id and check its inbox because we can't assume
|
|
||||||
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
|
|
||||||
actor: Set(uid.clone()),
|
|
||||||
target: Set(format!("{}/inbox", to)),
|
|
||||||
activity: Set(aid.clone()),
|
|
||||||
created: Set(chrono::Utc::now()),
|
|
||||||
not_before: Set(chrono::Utc::now()),
|
|
||||||
attempt: Set(0),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
model::delivery::Entity::insert_many(deliveries)
|
|
||||||
.exec(ctx.db())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(CreationResult(aid))
|
Ok(CreationResult(aid))
|
||||||
},
|
},
|
||||||
|
|
||||||
Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) => {
|
Some(BaseType::Object(ObjectType::Activity(ActivityType::Like))) => {
|
||||||
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
||||||
|
let activity_targets = activity.addressed();
|
||||||
let Some(oid) = activity.object().id().map(|x| x.to_string()) else {
|
let Some(oid) = activity.object().id().map(|x| x.to_string()) else {
|
||||||
return Err(StatusCode::BAD_REQUEST.into());
|
return Err(StatusCode::BAD_REQUEST.into());
|
||||||
};
|
};
|
||||||
let mut activity_model = model::activity::Model::new(&activity)?;
|
let activity_model = model::activity::Model::new(
|
||||||
activity_model.id = aid.clone();
|
&activity
|
||||||
activity_model.published = chrono::Utc::now();
|
.set_id(Some(&aid))
|
||||||
activity_model.actor = uid.clone();
|
.set_published(Some(chrono::Utc::now()))
|
||||||
|
.set_actor(Node::link(uid.clone()))
|
||||||
|
)?;
|
||||||
|
|
||||||
let like_model = model::like::ActiveModel {
|
let like_model = model::like::ActiveModel {
|
||||||
actor: Set(uid.clone()),
|
actor: Set(uid.clone()),
|
||||||
|
@ -221,67 +184,124 @@ pub async fn post(
|
||||||
model::activity::Entity::insert(activity_model.into_active_model())
|
model::activity::Entity::insert(activity_model.into_active_model())
|
||||||
.exec(ctx.db()).await?;
|
.exec(ctx.db()).await?;
|
||||||
|
|
||||||
let mut addressed = activity.addressed();
|
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
|
||||||
let followers = url!(ctx, "/users/{id}/followers"); // TODO maybe can be done better?
|
ctx.address_to(&aid, None, &addressed).await?;
|
||||||
if let Some(i) = addressed.iter().position(|x| x == &followers) {
|
ctx.deliver_to(&aid, &uid, &addressed).await?;
|
||||||
addressed.remove(i);
|
|
||||||
model::relation::Entity::find()
|
|
||||||
.filter(Condition::all().add(model::relation::Column::Following.eq(uid.clone())))
|
|
||||||
.select_column(model::relation::Column::Follower)
|
|
||||||
.into_tuple::<String>()
|
|
||||||
.all(ctx.db())
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.for_each(|x| addressed.push(x));
|
|
||||||
}
|
|
||||||
|
|
||||||
let addressings : Vec<model::addressing::ActiveModel> = addressed
|
|
||||||
.iter()
|
|
||||||
.map(|to| model::addressing::ActiveModel {
|
|
||||||
server: Set(Context::server(to)),
|
|
||||||
actor: Set(to.to_string()),
|
|
||||||
activity: Set(aid.clone()),
|
|
||||||
object: Set(None),
|
|
||||||
published: Set(chrono::Utc::now()),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
model::addressing::Entity::insert_many(addressings)
|
|
||||||
.exec(ctx.db())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let deliveries : Vec<model::delivery::ActiveModel> = addressed
|
|
||||||
.iter()
|
|
||||||
.filter(|to| Context::server(to) != ctx.base())
|
|
||||||
.filter(|to| to != &PUBLIC_TARGET)
|
|
||||||
.map(|to| model::delivery::ActiveModel {
|
|
||||||
// TODO we should resolve each user by id and check its inbox because we can't assume
|
|
||||||
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
|
|
||||||
actor: Set(uid.clone()),
|
|
||||||
target: Set(format!("{}/inbox", to)),
|
|
||||||
activity: Set(aid.clone()),
|
|
||||||
created: Set(chrono::Utc::now()),
|
|
||||||
not_before: Set(chrono::Utc::now()),
|
|
||||||
attempt: Set(0),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
model::delivery::Entity::insert_many(deliveries)
|
|
||||||
.exec(ctx.db())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(CreationResult(aid))
|
Ok(CreationResult(aid))
|
||||||
},
|
},
|
||||||
// Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) => {
|
|
||||||
// },
|
Some(BaseType::Object(ObjectType::Activity(ActivityType::Follow))) => {
|
||||||
// Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) => {
|
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
||||||
// },
|
let activity_targets = activity.addressed();
|
||||||
// Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) => {
|
if activity.object().id().is_none() {
|
||||||
// },
|
return Err(StatusCode::BAD_REQUEST.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let activity_model = model::activity::Model::new(
|
||||||
|
&activity
|
||||||
|
.set_id(Some(&aid))
|
||||||
|
.set_actor(Node::link(uid.clone()))
|
||||||
|
.set_published(Some(chrono::Utc::now()))
|
||||||
|
)?;
|
||||||
|
model::activity::Entity::insert(activity_model.into_active_model())
|
||||||
|
.exec(ctx.db()).await?;
|
||||||
|
|
||||||
|
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
|
||||||
|
ctx.address_to(&aid, None, &addressed).await?;
|
||||||
|
ctx.deliver_to(&aid, &uid, &addressed).await?;
|
||||||
|
Ok(CreationResult(aid))
|
||||||
|
},
|
||||||
|
|
||||||
|
Some(BaseType::Object(ObjectType::Activity(ActivityType::Undo))) => {
|
||||||
|
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
||||||
|
let activity_targets = activity.addressed();
|
||||||
|
{
|
||||||
|
let Some(old_aid) = activity.object().id() else {
|
||||||
|
return Err(StatusCode::BAD_REQUEST.into());
|
||||||
|
};
|
||||||
|
let Some(old_activity) = model::activity::Entity::find_by_id(old_aid)
|
||||||
|
.one(ctx.db()).await?
|
||||||
|
else {
|
||||||
|
return Err(StatusCode::NOT_FOUND.into());
|
||||||
|
};
|
||||||
|
if old_activity.actor != uid {
|
||||||
|
return Err(StatusCode::FORBIDDEN.into());
|
||||||
|
}
|
||||||
|
match old_activity.activity_type {
|
||||||
|
ActivityType::Like => {
|
||||||
|
model::like::Entity::delete(model::like::ActiveModel {
|
||||||
|
actor: Set(old_activity.actor), likes: Set(old_activity.object.unwrap_or("".into())),
|
||||||
|
..Default::default()
|
||||||
|
}).exec(ctx.db()).await?;
|
||||||
|
},
|
||||||
|
ActivityType::Follow => {
|
||||||
|
model::relation::Entity::delete(model::relation::ActiveModel {
|
||||||
|
follower: Set(old_activity.actor), following: Set(old_activity.object.unwrap_or("".into())),
|
||||||
|
..Default::default()
|
||||||
|
}).exec(ctx.db()).await?;
|
||||||
|
},
|
||||||
|
t => tracing::warn!("extra side effects for activity {t:?} not implemented"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let activity_model = model::activity::Model::new(
|
||||||
|
&activity
|
||||||
|
.set_id(Some(&aid))
|
||||||
|
.set_actor(Node::link(uid.clone()))
|
||||||
|
.set_published(Some(chrono::Utc::now()))
|
||||||
|
)?;
|
||||||
|
model::activity::Entity::insert(activity_model.into_active_model()).exec(ctx.db()).await?;
|
||||||
|
|
||||||
|
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
|
||||||
|
ctx.address_to(&aid, None, &addressed).await?;
|
||||||
|
ctx.deliver_to(&aid, &uid, &addressed).await?;
|
||||||
|
Ok(CreationResult(aid))
|
||||||
|
},
|
||||||
|
|
||||||
|
Some(BaseType::Object(ObjectType::Activity(ActivityType::Accept(AcceptType::Accept)))) => {
|
||||||
|
let aid = ctx.aid(uuid::Uuid::new_v4().to_string());
|
||||||
|
let activity_targets = activity.addressed();
|
||||||
|
if activity.object().id().is_none() {
|
||||||
|
return Err(StatusCode::BAD_REQUEST.into());
|
||||||
|
}
|
||||||
|
let Some(accepted_id) = activity.object().id() else {
|
||||||
|
return Err(StatusCode::BAD_REQUEST.into());
|
||||||
|
};
|
||||||
|
let Some(accepted_activity) = model::activity::Entity::find_by_id(accepted_id)
|
||||||
|
.one(ctx.db()).await?
|
||||||
|
else {
|
||||||
|
return Err(StatusCode::NOT_FOUND.into());
|
||||||
|
};
|
||||||
|
|
||||||
|
match accepted_activity.activity_type {
|
||||||
|
ActivityType::Follow => {
|
||||||
|
model::relation::Entity::insert(
|
||||||
|
model::relation::ActiveModel {
|
||||||
|
follower: Set(accepted_activity.actor), following: Set(uid.clone()),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
).exec(ctx.db()).await?;
|
||||||
|
},
|
||||||
|
t => tracing::warn!("no side effects implemented for accepting {t:?}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
let activity_model = model::activity::Model::new(
|
||||||
|
&activity
|
||||||
|
.set_id(Some(&aid))
|
||||||
|
.set_actor(Node::link(uid.clone()))
|
||||||
|
.set_published(Some(chrono::Utc::now()))
|
||||||
|
)?;
|
||||||
|
model::activity::Entity::insert(activity_model.into_active_model())
|
||||||
|
.exec(ctx.db()).await?;
|
||||||
|
|
||||||
|
let addressed = ctx.expand_addressing(&uid, activity_targets).await?;
|
||||||
|
ctx.address_to(&aid, None, &addressed).await?;
|
||||||
|
ctx.deliver_to(&aid, &uid, &addressed).await?;
|
||||||
|
Ok(CreationResult(aid))
|
||||||
|
},
|
||||||
|
|
||||||
// Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) => {
|
// Some(BaseType::Object(ObjectType::Activity(ActivityType::Reject(RejectType::Reject)))) => {
|
||||||
// },
|
// },
|
||||||
|
|
||||||
Some(_) => Err(StatusCode::NOT_IMPLEMENTED.into()),
|
Some(_) => Err(StatusCode::NOT_IMPLEMENTED.into()),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -289,3 +309,67 @@ pub async fn post(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Context {
|
||||||
|
async fn expand_addressing(&self, uid: &str, mut targets: Vec<String>) -> Result<Vec<String>, DbErr> {
|
||||||
|
let following_addr = format!("{uid}/followers");
|
||||||
|
if let Some(i) = targets.iter().position(|x| x == &following_addr) {
|
||||||
|
targets.remove(i);
|
||||||
|
model::relation::Entity::find()
|
||||||
|
.filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string())))
|
||||||
|
.select_column(model::relation::Column::Follower)
|
||||||
|
.into_tuple::<String>()
|
||||||
|
.all(self.db())
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.for_each(|x| targets.push(x));
|
||||||
|
}
|
||||||
|
Ok(targets)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> Result<(), DbErr> {
|
||||||
|
let addressings : Vec<model::addressing::ActiveModel> = targets
|
||||||
|
.iter()
|
||||||
|
.map(|to| model::addressing::ActiveModel {
|
||||||
|
server: Set(Context::server(to)),
|
||||||
|
actor: Set(to.to_string()),
|
||||||
|
activity: Set(aid.to_string()),
|
||||||
|
object: Set(oid.map(|x| x.to_string())),
|
||||||
|
published: Set(chrono::Utc::now()),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
model::addressing::Entity::insert_many(addressings)
|
||||||
|
.exec(self.db())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deliver_to(&self, aid: &str, from: &str, targets: &[String]) -> Result<(), DbErr> {
|
||||||
|
let deliveries : Vec<model::delivery::ActiveModel> = targets
|
||||||
|
.iter()
|
||||||
|
.filter(|to| Context::server(to) != self.base())
|
||||||
|
.filter(|to| to != &PUBLIC_TARGET)
|
||||||
|
.map(|to| model::delivery::ActiveModel {
|
||||||
|
actor: Set(from.to_string()),
|
||||||
|
// TODO we should resolve each user by id and check its inbox because we can't assume
|
||||||
|
// it's /users/{id}/inbox for every software, but oh well it's waaaaay easier now
|
||||||
|
target: Set(format!("{}/inbox", to)),
|
||||||
|
activity: Set(aid.to_string()),
|
||||||
|
created: Set(chrono::Utc::now()),
|
||||||
|
not_before: Set(chrono::Utc::now()),
|
||||||
|
attempt: Set(0),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
model::delivery::Entity::insert_many(deliveries)
|
||||||
|
.exec(self.db())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue