1
0
Fork 0
forked from alemi/upub

fix: no more errors! no more warnings!!

finished upgrading inbox to new schema, there's ton of space for
improvement but lets first see if it works
This commit is contained in:
əlemi 2024-05-27 05:38:51 +02:00
parent 9a04a67d39
commit fea7c1ecdf
Signed by: alemi
GPG key ID: A4895B84D311642C
17 changed files with 241 additions and 230 deletions

View file

@ -1,6 +1,6 @@
use sea_orm::EntityTrait;
use crate::server::fetcher::Fetchable;
use crate::server::{fetcher::Fetchable, normalizer::Normalizer, Context};
pub async fn fetch(ctx: crate::server::Context, uri: String, save: bool) -> crate::Result<()> {
use apb::Base;
@ -8,24 +8,23 @@ pub async fn fetch(ctx: crate::server::Context, uri: String, save: bool) -> crat
let mut node = apb::Node::link(uri.to_string());
node.fetch(&ctx).await?;
let obj = node.get().expect("node still empty after fetch?");
let obj = node.extract().expect("node still empty after fetch?");
let server = Context::server(&uri);
println!("{}", serde_json::to_string_pretty(&obj).unwrap());
if save {
match obj.base_type() {
Some(apb::BaseType::Object(apb::ObjectType::Actor(_))) => {
crate::model::actor::Entity::insert(
crate::model::actor::ActiveModel::new(obj).unwrap()
crate::model::actor::ActiveModel::new(&obj).unwrap()
).exec(ctx.db()).await.unwrap();
},
Some(apb::BaseType::Object(apb::ObjectType::Activity(_))) => {
crate::model::activity::Entity::insert(
crate::model::activity::ActiveModel::new(obj).unwrap()
).exec(ctx.db()).await.unwrap();
ctx.insert_activity(obj, Some(server)).await.unwrap();
},
Some(apb::BaseType::Object(apb::ObjectType::Note)) => {
crate::model::object::Entity::insert(
crate::model::object::ActiveModel::new(obj).unwrap()
).exec(ctx.db()).await.unwrap();
ctx.insert_object(obj, Some(server)).await.unwrap();
},
Some(apb::BaseType::Object(t)) => tracing::warn!("not implemented: {:?}", t),
Some(apb::BaseType::Link(_)) => tracing::error!("fetched another link?"),
@ -33,7 +32,5 @@ pub async fn fetch(ctx: crate::server::Context, uri: String, save: bool) -> crat
}
}
println!("{}", serde_json::to_string_pretty(&obj).unwrap());
Ok(())
}

View file

@ -36,7 +36,7 @@ pub async fn fix(ctx: crate::server::Context, likes: bool, shares: bool, replies
{
let mut stream = crate::model::announce::Entity::find().stream(db).await?;
while let Some(share) = stream.try_next().await? {
store.insert(share.object.clone(), store.get(&share.object).unwrap_or(&0) + 1);
store.insert(share.object, store.get(&share.object).unwrap_or(&0) + 1);
}
}

View file

@ -62,6 +62,10 @@ impl UpubError {
pub fn internal_server_error() -> Self {
Self::Status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
pub fn field(field: &'static str) -> Self {
Self::Field(crate::model::FieldError(field))
}
}
pub type UpubResult<T> = Result<T, UpubError>;

View file

@ -89,6 +89,7 @@ impl Entity {
}
impl ActiveModel {
//#[deprecated = "should remove this, get models thru normalizer"]
pub fn new(activity: &impl apb::Activity) -> Result<Self, super::FieldError> {
Ok(ActiveModel {
internal: sea_orm::ActiveValue::NotSet,

View file

@ -97,15 +97,6 @@ pub enum Event {
impl Event {
pub fn id(&self) -> &str {
match self {
Event::Tombstone => "",
Event::Activity(x) => x.id.as_str(),
Event::StrayObject { object, liked: _ } => object.id.as_str(),
Event::DeepActivity { activity: _, liked: _, object } => object.id.as_str(),
}
}
pub fn internal(&self) -> i64 {
match self {
Event::Tombstone => 0,

View file

@ -72,10 +72,9 @@ impl BatchFillable for &[Event] {
let mut out : std::collections::BTreeMap<i64, Vec<Model>> = std::collections::BTreeMap::new();
for attach in attachments.into_iter().flatten() {
if out.contains_key(&attach.object) {
out.get_mut(&attach.object).expect("contains but get failed?").push(attach);
} else {
out.insert(attach.object, vec![attach]);
match out.entry(attach.object) {
std::collections::btree_map::Entry::Vacant(a) => { a.insert(vec![attach]); },
std::collections::btree_map::Entry::Occupied(mut e) => { e.get_mut().push(attach); },
}
}

View file

@ -63,7 +63,7 @@ pub async fn post(
return Err(UpubError::bad_request());
};
if !(server == Context::server(&actor)) {
if server != Context::server(&actor) {
return Err(UpubError::unauthorized());
}

View file

@ -40,11 +40,11 @@ impl Identity {
}
}
pub fn is(&self, id: &str) -> bool {
pub fn is(&self, uid: &str) -> bool {
match self {
Identity::Anonymous => false,
Identity::Remote { .. } => false, // TODO per-actor server auth should check this
Identity::Local { id, .. } => id.as_str() == id
Identity::Local { id, .. } => id.as_str() == uid
}
}

View file

@ -1,7 +1,7 @@
use std::{collections::BTreeSet, sync::Arc};
use openssl::rsa::Rsa;
use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, RelationTrait, SelectColumns, Set};
use sea_orm::{ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, SelectColumns, Set};
use crate::{config::Config, errors::UpubError, model, server::fetcher::Fetcher};
use uriproxy::UriClass;
@ -150,6 +150,7 @@ impl Context {
}
// TODO remove this!!
//#[deprecated = "context is id of first post in thread"]
pub fn context_id(&self, id: &str) -> String {
if id.starts_with("tag:") {
return id.to_string();
@ -227,8 +228,8 @@ impl Context {
{
let (server, actor) = if target == apb::target::PUBLIC { (None, None) } else {
(
Some(model::instance::Entity::domain_to_internal(&Context::server(&target), self.db()).await?),
Some(model::actor::Entity::ap_to_internal(&target, self.db()).await?),
Some(model::instance::Entity::domain_to_internal(&Context::server(target), self.db()).await?),
Some(model::actor::Entity::ap_to_internal(target, self.db()).await?),
)
};
addressing.push(
@ -290,6 +291,7 @@ impl Context {
Ok(())
}
//#[deprecated = "should probably directly invoke address_to() since we most likely have internal ids at this point"]
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(activity_targets).await?;
let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?;

View file

@ -180,20 +180,13 @@ impl Fetcher for Context {
}
let activity_document = self.pull_activity(id).await?;
let activity_model = model::activity::ActiveModel::new(&activity_document)?;
let activity_model = self.insert_activity(activity_document, Some(Context::server(id))).await?;
model::activity::Entity::insert(activity_model)
.exec(self.db()).await?;
// TODO fetch it back to get the internal id
let activity = model::activity::Entity::find_by_ap_id(id)
.one(self.db()).await?.ok_or_else(UpubError::internal_server_error)?;
let addressed = activity.addressed();
let addressed = activity_model.addressed();
let expanded_addresses = self.expand_addressing(addressed).await?;
self.address_to(Some(activity.internal), None, &expanded_addresses).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addresses).await?;
Ok(activity)
Ok(activity_model)
}
async fn pull_activity(&self, id: &str) -> crate::Result<serde_json::Value> {
@ -216,7 +209,7 @@ impl Fetcher for Context {
Ok(activity)
}
async fn fetch_thread(&self, id: &str) -> crate::Result<()> {
async fn fetch_thread(&self, _id: &str) -> crate::Result<()> {
// crawl_replies(self, id, 0).await
todo!()
}

View file

@ -1,8 +1,8 @@
use apb::{target::Addressed, Activity, Base, Object};
use reqwest::StatusCode;
use sea_orm::{sea_query::Expr, ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, SelectColumns, Set};
use sea_orm::{sea_query::Expr, ActiveValue::{Set, NotSet}, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
use crate::{errors::{LoggableError, UpubError}, model::{self, FieldError}, server::normalizer::Normalizer};
use crate::{errors::{LoggableError, UpubError}, model, server::normalizer::Normalizer};
use super::{fetcher::Fetcher, Context};
@ -13,48 +13,43 @@ impl apb::server::Inbox for Context {
type Activity = serde_json::Value;
async fn create(&self, server: String, activity: serde_json::Value) -> crate::Result<()> {
let activity_model = model::activity::Model::new(&activity)?;
let aid = activity_model.id.clone();
let Some(object_node) = activity.object().extract() else {
// TODO we could process non-embedded activities or arrays but im lazy rn
tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap());
return Err(UpubError::unprocessable());
};
let activity_model = self.insert_activity(activity, Some(server.clone())).await?;
let object_model = self.insert_object(object_node, Some(server)).await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(Some(&aid), Some(&object_model.id), &expanded_addressing).await?;
tracing::info!("{} posted {}", aid, object_model.id);
let expanded_addressing = self.expand_addressing(activity_model.addressed()).await?;
self.address_to(Some(activity_model.internal), Some(object_model.internal), &expanded_addressing).await?;
tracing::info!("{} posted {}", activity_model.id, object_model.id);
Ok(())
}
async fn like(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
let aid = activity.id().ok_or(UpubError::bad_request())?;
async fn like(&self, server: String, activity: serde_json::Value) -> crate::Result<()> {
let uid = activity.actor().id().ok_or(UpubError::bad_request())?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let object_uri = activity.object().id().ok_or(UpubError::bad_request())?;
let obj = self.fetch_object(&object_uri).await?;
let oid = obj.id;
let like = model::like::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
actor: sea_orm::Set(uid.clone()),
likes: sea_orm::Set(oid.clone()),
date: sea_orm::Set(activity.published().unwrap_or(chrono::Utc::now())),
internal: NotSet,
actor: Set(internal_uid),
object: Set(obj.internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
match model::like::Entity::insert(like).exec(self.db()).await {
Err(sea_orm::DbErr::RecordNotInserted) => Err(UpubError::not_modified()),
Err(sea_orm::DbErr::Exec(_)) => Err(UpubError::not_modified()), // bad fix for sqlite
Err(e) => {
tracing::error!("unexpected error procesing like from {uid} to {oid}: {e}");
tracing::error!("unexpected error procesing like from {uid} to {}: {e}", obj.id);
Err(UpubError::internal_server_error())
}
Ok(_) => {
let activity_model = model::activity::Model::new(&activity)?.into_active_model();
model::activity::Entity::insert(activity_model)
.exec(self.db())
.await?;
let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?;
let activity_model = self.insert_activity(activity, Some(server)).await?;
let mut expanded_addressing = self.expand_addressing(activity_model.addressed()).await?;
if expanded_addressing.is_empty() { // WHY MASTODON!!!!!!!
expanded_addressing.push(
model::object::Entity::find_by_id(&oid)
model::object::Entity::find_by_id(obj.internal)
.select_only()
.select_column(model::object::Column::AttributedTo)
.into_tuple::<String>()
@ -63,190 +58,188 @@ impl apb::server::Inbox for Context {
.ok_or_else(UpubError::not_found)?
);
}
self.address_to(Some(aid), None, &expanded_addressing).await?;
self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Id.eq(oid.clone()))
.filter(model::object::Column::Internal.eq(obj.internal))
.exec(self.db())
.await?;
tracing::info!("{} liked {}", uid, oid);
tracing::info!("{} liked {}", uid, obj.id);
Ok(())
},
}
}
async fn follow(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
let activity_model = model::activity::Model::new(&activity)?;
let aid = activity_model.id.clone();
let target_user_uri = activity_model.object
.as_deref()
.ok_or_else(UpubError::bad_request)?
.to_string();
let usr = self.fetch_user(&target_user_uri).await?;
let target_user_id = usr.id;
tracing::info!("{} wants to follow {}", activity_model.actor, target_user_id);
model::activity::Entity::insert(activity_model.into_active_model())
let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string();
let source_actor = activity.actor().id().ok_or_else(UpubError::bad_request)?;
let source_actor_internal = model::actor::Entity::ap_to_internal(&source_actor, self.db()).await?;
let target_actor = activity.object().id().ok_or_else(UpubError::bad_request)?;
let usr = self.fetch_user(&target_actor).await?;
let activity_model = model::activity::ActiveModel::new(&activity)?;
model::activity::Entity::insert(activity_model)
.exec(self.db()).await?;
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
let relation_model = model::relation::ActiveModel {
internal: NotSet,
accept: Set(None),
activity: Set(internal_aid),
follower: Set(source_actor_internal),
following: Set(usr.internal),
};
model::relation::Entity::insert(relation_model)
.exec(self.db()).await?;
let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?;
if !expanded_addressing.contains(&target_user_id) {
expanded_addressing.push(target_user_id);
if !expanded_addressing.contains(&target_actor) {
expanded_addressing.push(target_actor);
}
self.address_to(Some(&aid), None, &expanded_addressing).await?;
self.address_to(Some(internal_aid), None, &expanded_addressing).await?;
tracing::info!("{} wants to follow {}", source_actor, usr.id);
Ok(())
}
async fn accept(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
// TODO what about TentativeAccept
let activity_model = model::activity::Model::new(&activity)?;
if let Some(mut r) = model::relay::Entity::find_by_id(&activity_model.actor)
let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string();
let target_actor = activity.actor().id().ok_or_else(UpubError::bad_request)?;
let follow_request_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
let follow_activity = model::activity::Entity::find_by_ap_id(&follow_request_id)
.one(self.db())
.await?
{
r.accepted = true;
model::relay::Entity::update(r.into_active_model()).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(self.db())
.await?;
tracing::info!("relay {} is now broadcasting to us", activity_model.actor);
return Ok(());
}
.ok_or_else(UpubError::not_found)?;
let Some(follow_request_id) = &activity_model.object else {
return Err(UpubError::bad_request());
};
let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id)
.one(self.db()).await?
else {
return Err(UpubError::not_found());
};
if follow_activity.object.unwrap_or("".into()) != activity_model.actor {
if follow_activity.object.unwrap_or("".into()) != follow_activity.actor {
return Err(UpubError::forbidden());
}
tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor);
let activity_model = model::activity::ActiveModel::new(&activity)?;
model::activity::Entity::insert(activity_model)
.exec(self.db())
.await?;
let accept_internal_id = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(self.db())
.await?;
model::user::Entity::update_many()
model::actor::Entity::update_many()
.col_expr(
model::user::Column::FollowingCount,
Expr::col(model::user::Column::FollowingCount).add(1)
model::actor::Column::FollowingCount,
Expr::col(model::actor::Column::FollowingCount).add(1)
)
.filter(model::user::Column::Id.eq(&follow_activity.actor))
.filter(model::actor::Column::Id.eq(&follow_activity.actor))
.exec(self.db())
.await?;
model::relation::Entity::insert(
model::relation::ActiveModel {
follower: Set(follow_activity.actor.clone()),
following: Set(activity_model.actor),
..Default::default()
}
).exec(self.db()).await?;
model::actor::Entity::update_many()
.col_expr(
model::actor::Column::FollowersCount,
Expr::col(model::actor::Column::FollowersCount).add(1)
)
.filter(model::actor::Column::Id.eq(&follow_activity.actor))
.exec(self.db())
.await?;
model::relation::Entity::update_many()
.col_expr(model::relation::Column::Accept, Expr::value(Some(accept_internal_id)))
.filter(model::relation::Column::Activity.eq(follow_activity.internal))
.exec(self.db()).await?;
tracing::info!("{} accepted follow request by {}", target_actor, follow_activity.actor);
let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?;
if !expanded_addressing.contains(&follow_activity.actor) {
expanded_addressing.push(follow_activity.actor);
}
self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?;
self.address_to(Some(accept_internal_id), None, &expanded_addressing).await?;
Ok(())
}
async fn reject(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
// TODO what about TentativeReject?
let activity_model = model::activity::Model::new(&activity)?;
let Some(follow_request_id) = &activity_model.object else {
return Err(UpubError::bad_request());
};
let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id)
.one(self.db()).await?
else {
return Err(UpubError::not_found());
};
if follow_activity.object.unwrap_or("".into()) != activity_model.actor {
let aid = activity.id().ok_or_else(UpubError::bad_request)?.to_string();
let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?;
let follow_request_id = activity.object().id().ok_or_else(UpubError::bad_request)?;
let follow_activity = model::activity::Entity::find_by_ap_id(&follow_request_id)
.one(self.db())
.await?
.ok_or_else(UpubError::not_found)?;
if follow_activity.object.unwrap_or("".into()) != uid {
return Err(UpubError::forbidden());
}
tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor);
model::activity::Entity::insert(activity_model.clone().into_active_model())
let activity_model = model::activity::ActiveModel::new(&activity)?;
model::activity::Entity::insert(activity_model)
.exec(self.db())
.await?;
let internal_aid = model::activity::Entity::ap_to_internal(&aid, self.db()).await?;
model::relation::Entity::delete_many()
.filter(model::relation::Column::Activity.eq(internal_aid))
.exec(self.db())
.await?;
tracing::info!("{} rejected follow request by {}", uid, follow_activity.actor);
let mut expanded_addressing = self.expand_addressing(activity.addressed()).await?;
if !expanded_addressing.contains(&follow_activity.actor) {
expanded_addressing.push(follow_activity.actor);
}
self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?;
self.address_to(Some(internal_aid), None, &expanded_addressing).await?;
Ok(())
}
async fn delete(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
// TODO verify the signature before just deleting lmao
let oid = activity.object().id().ok_or(UpubError::bad_request())?;
tracing::debug!("deleting '{oid}'"); // this is so spammy wtf!
// TODO maybe we should keep the tombstone?
model::user::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from users");
model::activity::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from activities");
model::object::Entity::delete_by_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects");
let oid = activity.object().id().ok_or_else(UpubError::bad_request)?;
model::actor::Entity::delete_by_ap_id(&oid).exec(self.db()).await.info_failed("failed deleting from users");
model::object::Entity::delete_by_ap_id(&oid).exec(self.db()).await.info_failed("failed deleting from objects");
tracing::debug!("deleted '{oid}'");
Ok(())
}
async fn update(&self, server: String, activity: serde_json::Value) -> crate::Result<()> {
let activity_model = model::activity::Model::new(&activity)?;
let aid = activity_model.id.clone();
async fn update(&self, _server: String, activity: serde_json::Value) -> crate::Result<()> {
let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?;
let aid = activity.id().ok_or_else(UpubError::bad_request)?;
let Some(object_node) = activity.object().extract() else {
// TODO we could process non-embedded activities or arrays but im lazy rn
tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap());
return Err(UpubError::unprocessable());
};
let Some(oid) = object_node.id().map(|x| x.to_string()) else {
return Err(UpubError::bad_request());
};
// make sure we're allowed to edit this object
if let Some(object_author) = object_node.attributed_to().id() {
if server != Context::server(&object_author) {
return Err(UpubError::forbidden());
}
} else if server != Context::server(&oid) {
return Err(UpubError::forbidden());
};
match object_node.object_type() {
Some(apb::ObjectType::Actor(_)) => {
// TODO oof here is an example of the weakness of this model, we have to go all the way
// back up to serde_json::Value because impl Object != impl Actor
let actor_model = model::user::Model::new(&object_node)?;
let mut update_model = actor_model.into_active_model();
update_model.updated = sea_orm::Set(chrono::Utc::now());
update_model.reset(model::user::Column::Name);
update_model.reset(model::user::Column::Summary);
update_model.reset(model::user::Column::Image);
update_model.reset(model::user::Column::Icon);
model::user::Entity::update(update_model)
.exec(self.db()).await?;
},
Some(apb::ObjectType::Note) => {
let object_model = model::object::Model::new(&object_node)?;
let mut update_model = object_model.into_active_model();
update_model.updated = sea_orm::Set(Some(chrono::Utc::now()));
update_model.reset(model::object::Column::Name);
update_model.reset(model::object::Column::Summary);
update_model.reset(model::object::Column::Content);
update_model.reset(model::object::Column::Sensitive);
model::object::Entity::update(update_model)
.exec(self.db()).await?;
},
Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"),
None => tracing::warn!("empty type on embedded updated object"),
}
let oid = object_node.id().ok_or_else(UpubError::bad_request)?.to_string();
tracing::info!("{} updated {}", aid, oid);
model::activity::Entity::insert(activity_model.into_active_model())
let activity_model = model::activity::ActiveModel::new(&activity)?;
model::activity::Entity::insert(activity_model)
.exec(self.db())
.await?;
let internal_aid = model::activity::Entity::ap_to_internal(aid, self.db()).await?;
let internal_oid = match object_node.object_type().ok_or_else(UpubError::bad_request)? {
apb::ObjectType::Actor(_) => {
let internal_uid = model::actor::Entity::ap_to_internal(&oid, self.db()).await?;
let mut actor_model = model::actor::ActiveModel::new(&object_node)?;
actor_model.internal = Set(internal_uid);
actor_model.updated = Set(chrono::Utc::now());
model::actor::Entity::update(actor_model)
.exec(self.db())
.await?;
Some(internal_uid)
},
apb::ObjectType::Note => {
let internal_oid = model::object::Entity::ap_to_internal(&oid, self.db()).await?;
let mut object_model = model::object::ActiveModel::new(&object_node)?;
object_model.internal = Set(internal_oid);
object_model.updated = Set(chrono::Utc::now());
model::object::Entity::update(object_model)
.exec(self.db())
.await?;
Some(internal_oid)
},
t => {
tracing::warn!("no side effects implemented for update type {t:?}");
None
},
};
tracing::info!("{} updated {}", uid, oid);
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(Some(&aid), Some(&oid), &expanded_addressing).await?;
self.address_to(Some(internal_aid), internal_oid, &expanded_addressing).await?;
Ok(())
}
@ -254,9 +247,8 @@ impl apb::server::Inbox for Context {
let uid = activity.actor().id().ok_or_else(UpubError::bad_request)?;
// TODO in theory we could work with just object_id but right now only accept embedded
let undone_activity = activity.object().extract().ok_or_else(UpubError::bad_request)?;
let undone_aid = undone_activity.id().ok_or_else(UpubError::bad_request)?;
let undone_object_uri = undone_activity.object().id().ok_or_else(UpubError::bad_request)?;
let activity_type = undone_activity.activity_type().ok_or_else(UpubError::bad_request)?;
let undone_object_id = undone_activity.object().id().ok_or_else(UpubError::bad_request)?;
let undone_activity_author = undone_activity.actor().id().ok_or_else(UpubError::bad_request)?;
// can't undo activities from remote actors!
@ -264,32 +256,36 @@ impl apb::server::Inbox for Context {
return Err(UpubError::forbidden());
};
let obj = self.fetch_object(&undone_object_uri).await?;
let undone_object_id = obj.id;
self.insert_activity(activity.clone(), Some(server)).await?;
match activity_type {
apb::ActivityType::Like => {
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let internal_oid = model::object::Entity::ap_to_internal(&undone_object_id, self.db()).await?;
model::like::Entity::delete_many()
.filter(
Condition::all()
.add(model::like::Column::Actor.eq(&uid))
.add(model::like::Column::Likes.eq(&undone_object_id))
.add(model::like::Column::Actor.eq(internal_uid))
.add(model::like::Column::Object.eq(internal_oid))
)
.exec(self.db())
.await?;
model::object::Entity::update_many()
.filter(model::object::Column::Id.eq(&undone_object_id))
.filter(model::object::Column::Internal.eq(internal_oid))
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).sub(1))
.exec(self.db())
.await?;
},
apb::ActivityType::Follow => {
let undone_aid = undone_activity.id().ok_or_else(UpubError::bad_request)?;
let internal_aid = model::activity::Entity::ap_to_internal(undone_aid, self.db()).await?;
model::relation::Entity::delete_many()
.filter(
Condition::all()
.add(model::relation::Column::Follower.eq(&uid))
.add(model::relation::Column::Following.eq(&undone_object_id))
)
.filter(model::relation::Column::Activity.eq(internal_aid))
.exec(self.db())
.await?;
model::actor::Entity::update_many()
.filter(model::actor::Column::Id.eq(&undone_object_id))
.col_expr(model::actor::Column::FollowersCount, Expr::col(model::actor::Column::FollowersCount).sub(1))
.exec(self.db())
.await?;
},
@ -299,48 +295,41 @@ impl apb::server::Inbox for Context {
},
}
model::activity::Entity::delete_by_id(undone_aid).exec(self.db()).await?;
Ok(())
}
async fn announce(&self, _: String, activity: serde_json::Value) -> crate::Result<()> {
let activity_model = model::activity::Model::new(&activity)?;
let Some(object_uri) = &activity_model.object else {
return Err(FieldError("object").into());
};
let obj = self.fetch_object(object_uri).await?;
let oid = obj.id;
async fn announce(&self, server: String, activity: serde_json::Value) -> crate::Result<()> {
let uid = activity.actor().id().ok_or_else(|| UpubError::field("actor"))?;
let internal_uid = model::actor::Entity::ap_to_internal(&uid, self.db()).await?;
let announced_id = activity.object().id().ok_or_else(|| UpubError::field("object"))?;
let activity_model = self.insert_activity(activity.clone(), Some(server)).await?;
let announced = self.fetch_object(&announced_id).await?;
// relays send us activities as Announce, but we don't really want to count those towards the
// total shares count of an object, so just fetch the object and be done with it
if self.is_relay(&activity_model.actor) {
tracing::info!("relay {} broadcasted {}", activity_model.actor, oid);
tracing::info!("relay {} broadcasted {}", activity_model.actor, announced_id);
return Ok(())
}
let share = model::share::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
actor: sea_orm::Set(activity_model.actor.clone()),
shares: sea_orm::Set(oid.clone()),
date: sea_orm::Set(activity.published().unwrap_or(chrono::Utc::now())),
let share = model::announce::ActiveModel {
internal: NotSet,
actor: Set(internal_uid),
object: Set(announced.internal),
published: Set(activity.published().unwrap_or(chrono::Utc::now())),
};
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(Some(&activity_model.id), None, &expanded_addressing).await?;
model::share::Entity::insert(share)
self.address_to(Some(activity_model.internal), None, &expanded_addressing).await?;
model::announce::Entity::insert(share)
.exec(self.db()).await?;
model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(self.db())
.await?;
model::object::Entity::update_many()
.col_expr(model::object::Column::Shares, Expr::col(model::object::Column::Shares).add(1))
.filter(model::object::Column::Id.eq(oid.clone()))
.col_expr(model::object::Column::Announces, Expr::col(model::object::Column::Announces).add(1))
.filter(model::object::Column::Internal.eq(announced.internal))
.exec(self.db())
.await?;
tracing::info!("{} shared {}", activity_model.actor, oid);
tracing::info!("{} shared {}", activity_model.actor, announced.id);
Ok(())
}
}

View file

@ -1,5 +1,5 @@
use apb::{Node, Base, Object, Document};
use sea_orm::{sea_query::Expr, ActiveValue::Set, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
use sea_orm::{sea_query::Expr, ActiveValue::{NotSet, Set}, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
use crate::{errors::UpubError, model, server::Context};
use super::fetcher::Fetcher;
@ -7,6 +7,7 @@ use super::fetcher::Fetcher;
#[axum::async_trait]
pub trait Normalizer {
async fn insert_object(&self, obj: impl apb::Object, server: Option<String>) -> crate::Result<model::object::Model>;
async fn insert_activity(&self, act: impl apb::Activity, server: Option<String>) -> crate::Result<model::activity::Model>;
}
#[axum::async_trait]
@ -129,4 +130,35 @@ impl Normalizer for super::Context {
Ok(object)
}
async fn insert_activity(&self, activity: impl apb::Activity, server: Option<String>) -> crate::Result<model::activity::Model> {
let mut activity_model = model::activity::Model {
internal: 0,
id: activity.id().ok_or_else(|| UpubError::field("id"))?.to_string(),
activity_type: activity.activity_type().ok_or_else(|| UpubError::field("type"))?,
actor: activity.actor().id().ok_or_else(|| UpubError::field("actor"))?,
object: activity.object().id(),
target: activity.target().id(),
published: activity.published().unwrap_or(chrono::Utc::now()),
to: activity.to().into(),
bto: activity.bto().into(),
cc: activity.cc().into(),
bcc: activity.bcc().into(),
};
if let Some(server) = server {
if Context::server(&activity_model.actor) != server
|| Context::server(&activity_model.id) != server {
return Err(UpubError::forbidden());
}
}
let mut active_model = activity_model.clone().into_active_model();
active_model.internal = NotSet;
model::activity::Entity::insert(active_model)
.exec(self.db())
.await?;
let internal = model::activity::Entity::ap_to_internal(&activity_model.id, self.db()).await?;
activity_model.internal = internal;
Ok(activity_model)
}
}

View file

@ -367,8 +367,10 @@ impl apb::server::Outbox for Context {
return Err(UpubError::forbidden());
}
let mut new_actor_model = model::actor::ActiveModel::default();
new_actor_model.internal = Unchanged(old_actor_model.internal);
let mut new_actor_model = model::actor::ActiveModel {
internal: Unchanged(old_actor_model.internal),
..Default::default()
};
if let Some(name) = object_node.name() {
new_actor_model.name = Set(Some(name.to_string()));
@ -398,8 +400,10 @@ impl apb::server::Outbox for Context {
return Err(UpubError::forbidden());
}
let mut new_object_model = model::object::ActiveModel::default();
new_object_model.internal = Unchanged(old_object_model.internal);
let mut new_object_model = model::object::ActiveModel {
internal: Unchanged(old_object_model.internal),
..Default::default()
};
if let Some(name) = object_node.name() {
new_object_model.name = Set(Some(name.to_string()));

View file

@ -1,7 +1,6 @@
use apb::{ActivityMut, Base, BaseMut, Object, ObjectMut};
use leptos::*;
use leptos_use::DebounceOptions;
use crate::{prelude::*, WEBFINGER};
#[derive(Debug, Clone, Copy, Default)]

View file

@ -2,7 +2,7 @@ use std::{collections::BTreeSet, pin::Pin, sync::Arc};
use apb::{Activity, ActivityMut, Base, Object};
use leptos::*;
use leptos_use::{signal_debounced, signal_throttled, use_display_media, use_document_visibility, use_element_size, use_infinite_scroll_with_options, use_scroll, use_scroll_with_options, use_window, use_window_scroll, UseDisplayMediaReturn, UseElementSizeReturn, UseInfiniteScrollOptions, UseScrollOptions, UseScrollReturn};
use leptos_use::{signal_throttled, use_element_size, use_window_scroll, UseElementSizeReturn};
use crate::prelude::*;
#[derive(Debug, Clone, Copy)]

View file

@ -99,7 +99,7 @@ impl WebfingerCache {
Ok(res) => match res.error_for_status() {
Ok(res) => match res.json::<jrd::JsonResourceDescriptor>().await {
Ok(doc) => {
if let Some(uid) = doc.links.into_iter().find(|x| x.rel == "self").map(|x| x.href).flatten() {
if let Some(uid) = doc.links.into_iter().find(|x| x.rel == "self").and_then(|x| x.href) {
self.0.insert(query, LookupStatus::Found(uid));
} else {
self.0.insert(query, LookupStatus::NotFound);

View file

@ -19,13 +19,13 @@ pub fn RegisterPage() -> impl IntoView {
<form on:submit=move|ev| {
ev.prevent_default();
logging::log!("registering new user...");
let email = username_ref.get().map(|x| x.value()).unwrap_or("".into());
let password = password_ref.get().map(|x| x.value()).unwrap_or("".into());
let _email = username_ref.get().map(|x| x.value()).unwrap_or("".into());
let _password = password_ref.get().map(|x| x.value()).unwrap_or("".into());
spawn_local(async move {
match Http::request::<()>(
Method::PUT, &format!("{URL_BASE}/auth"), None, auth
).await {
Ok(x) => {},
Ok(_x) => {},
Err(e) => set_error.set(Some(
view! { <blockquote>{e.to_string()}</blockquote> }
)),