fix: insert activities, expand addressing

remote activities for Likes and Follows should still be added to our
database so we know where to verify those and we can present them in a
timeline for notifications
This commit is contained in:
əlemi 2024-04-12 18:40:24 +02:00
parent 9e67eead69
commit 3a874ed3e7
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 57 additions and 26 deletions

View file

@ -130,28 +130,39 @@ impl Context {
.to_string() .to_string()
} }
pub async fn expand_addressing(&self, uid: &str, mut targets: Vec<String>) -> crate::Result<Vec<String>> { pub fn is_local(&self, id: &str) -> bool {
let following_addr = format!("{uid}/followers"); // TODO consider precalculating once this format!
if let Some(i) = targets.iter().position(|x| x == &following_addr) { id.starts_with(&format!("{}{}", self.0.protocol, self.0.domain))
targets.remove(i); }
model::relation::Entity::find()
.filter(Condition::all().add(model::relation::Column::Following.eq(uid.to_string()))) pub async fn expand_addressing(&self, targets: Vec<String>) -> crate::Result<Vec<String>> {
.select_only() let mut out = Vec::new();
.select_column(model::relation::Column::Follower) for target in targets {
.into_tuple::<String>() if target.ends_with("/followers") {
.all(self.db()) let target_id = target.replace("/followers", "");
.await? model::relation::Entity::find()
.into_iter() .filter(Condition::all().add(model::relation::Column::Following.eq(target_id)))
.for_each(|x| targets.push(x)); .select_only()
.select_column(model::relation::Column::Follower)
.into_tuple::<String>()
.all(self.db())
.await?
.into_iter()
.for_each(|x| out.push(x));
} else {
out.push(target);
}
} }
Ok(targets) Ok(out)
} }
pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> crate::Result<()> { pub async fn address_to(&self, aid: &str, oid: Option<&str>, targets: &[String]) -> crate::Result<()> {
let local_activity = self.is_local(aid);
let addressings : Vec<model::addressing::ActiveModel> = targets let addressings : Vec<model::addressing::ActiveModel> = targets
.iter() .iter()
.filter(|to| !to.is_empty()) .filter(|to| !to.is_empty())
.filter(|to| !to.ends_with("/followers")) .filter(|to| !to.ends_with("/followers"))
.filter(|to| local_activity || self.is_local(to))
.map(|to| model::addressing::ActiveModel { .map(|to| model::addressing::ActiveModel {
id: sea_orm::ActiveValue::NotSet, id: sea_orm::ActiveValue::NotSet,
server: Set(Context::server(to)), server: Set(Context::server(to)),
@ -221,7 +232,7 @@ impl Context {
} }
pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> { pub async fn dispatch(&self, uid: &str, activity_targets: Vec<String>, aid: &str, oid: Option<&str>) -> crate::Result<()> {
let addressed = self.expand_addressing(uid, activity_targets).await?; let addressed = self.expand_addressing(activity_targets).await?;
self.address_to(aid, oid, &addressed).await?; self.address_to(aid, oid, &addressed).await?;
self.deliver_to(aid, uid, &addressed).await?; self.deliver_to(aid, uid, &addressed).await?;
Ok(()) Ok(())

View file

@ -13,7 +13,6 @@ impl apb::server::Inbox for Context {
async fn create(&self, activity: serde_json::Value) -> crate::Result<()> { async fn create(&self, activity: serde_json::Value) -> crate::Result<()> {
let activity_model = model::activity::Model::new(&activity)?; let activity_model = model::activity::Model::new(&activity)?;
let activity_targets = activity.addressed();
let Some(object_node) = activity.object().extract() else { let Some(object_node) = activity.object().extract() else {
// TODO we could process non-embedded activities or arrays but im lazy rn // TODO we could process non-embedded activities or arrays but im lazy rn
tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap());
@ -24,7 +23,8 @@ impl apb::server::Inbox for Context {
let oid = object_model.id.clone(); let oid = object_model.id.clone();
model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?; model::object::Entity::insert(object_model.into_active_model()).exec(self.db()).await?;
model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?; model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?;
self.address_to(&aid, Some(&oid), &activity_targets).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, Some(&oid), &expanded_addressing).await?;
tracing::info!("{} posted {}", aid, oid); tracing::info!("{} posted {}", aid, oid);
Ok(()) Ok(())
} }
@ -46,6 +46,12 @@ impl apb::server::Inbox for Context {
Err(UpubError::internal_server_error()) Err(UpubError::internal_server_error())
} }
Ok(_) => { Ok(_) => {
let activity_model = model::activity::Model::new(&activity)?.into_active_model();
model::activity::Entity::insert(activity_model)
.exec(self.db())
.await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, None, &expanded_addressing).await?;
model::object::Entity::update_many() model::object::Entity::update_many()
.col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1)) .col_expr(model::object::Column::Likes, Expr::col(model::object::Column::Likes).add(1))
.filter(model::object::Column::Id.eq(oid.clone())) .filter(model::object::Column::Id.eq(oid.clone()))
@ -58,20 +64,20 @@ impl apb::server::Inbox for Context {
} }
async fn follow(&self, activity: serde_json::Value) -> crate::Result<()> { async fn follow(&self, activity: serde_json::Value) -> crate::Result<()> {
let activity_targets = activity.addressed();
let activity_model = model::activity::Model::new(&activity)?; let activity_model = model::activity::Model::new(&activity)?;
let aid = activity_model.id.clone(); let aid = activity_model.id.clone();
tracing::info!("{} wants to follow {}", activity_model.actor, activity_model.object.as_deref().unwrap_or("<no-one???>")); tracing::info!("{} wants to follow {}", activity_model.actor, activity_model.object.as_deref().unwrap_or("<no-one???>"));
model::activity::Entity::insert(activity_model.into_active_model()) model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db()).await?; .exec(self.db()).await?;
self.address_to(&aid, None, &activity_targets).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, None, &expanded_addressing).await?;
Ok(()) Ok(())
} }
async fn accept(&self, activity: serde_json::Value) -> crate::Result<()> { async fn accept(&self, activity: serde_json::Value) -> crate::Result<()> {
// TODO what about TentativeAccept // TODO what about TentativeAccept
let activity_model = model::activity::Model::new(&activity)?; let activity_model = model::activity::Model::new(&activity)?;
let Some(follow_request_id) = activity_model.object else { let Some(follow_request_id) = &activity_model.object else {
return Err(UpubError::bad_request()); return Err(UpubError::bad_request());
}; };
let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id)
@ -85,6 +91,9 @@ impl apb::server::Inbox for Context {
tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor); tracing::info!("{} accepted follow request by {}", activity_model.actor, follow_activity.actor);
model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(self.db())
.await?;
model::relation::Entity::insert( model::relation::Entity::insert(
model::relation::ActiveModel { model::relation::ActiveModel {
follower: Set(follow_activity.actor), follower: Set(follow_activity.actor),
@ -93,14 +102,15 @@ impl apb::server::Inbox for Context {
} }
).exec(self.db()).await?; ).exec(self.db()).await?;
self.address_to(&activity_model.id, None, &activity.addressed()).await?; let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&activity_model.id, None, &expanded_addressing).await?;
Ok(()) Ok(())
} }
async fn reject(&self, activity: serde_json::Value) -> crate::Result<()> { async fn reject(&self, activity: serde_json::Value) -> crate::Result<()> {
// TODO what about TentativeReject? // TODO what about TentativeReject?
let activity_model = model::activity::Model::new(&activity)?; let activity_model = model::activity::Model::new(&activity)?;
let Some(follow_request_id) = activity_model.object else { let Some(follow_request_id) = &activity_model.object else {
return Err(UpubError::bad_request()); return Err(UpubError::bad_request());
}; };
let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id) let Some(follow_activity) = model::activity::Entity::find_by_id(follow_request_id)
@ -111,8 +121,15 @@ impl apb::server::Inbox for Context {
if follow_activity.object.unwrap_or("".into()) != activity_model.actor { if follow_activity.object.unwrap_or("".into()) != activity_model.actor {
return Err(UpubError::forbidden()); return Err(UpubError::forbidden());
} }
tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor); tracing::info!("{} rejected follow request by {}", activity_model.actor, follow_activity.actor);
self.address_to(&activity_model.id, None, &activity.addressed()).await?;
model::activity::Entity::insert(activity_model.clone().into_active_model())
.exec(self.db())
.await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&activity_model.id, None, &expanded_addressing).await?;
Ok(()) Ok(())
} }
@ -128,7 +145,6 @@ impl apb::server::Inbox for Context {
async fn update(&self, activity: serde_json::Value) -> crate::Result<()> { async fn update(&self, activity: serde_json::Value) -> crate::Result<()> {
let activity_model = model::activity::Model::new(&activity)?; let activity_model = model::activity::Model::new(&activity)?;
let activity_targets = activity.addressed();
let Some(object_node) = activity.object().extract() else { let Some(object_node) = activity.object().extract() else {
// TODO we could process non-embedded activities or arrays but im lazy rn // TODO we could process non-embedded activities or arrays but im lazy rn
tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap()); tracing::error!("refusing to process activity without embedded object: {}", serde_json::to_string_pretty(&activity).unwrap());
@ -138,7 +154,6 @@ impl apb::server::Inbox for Context {
let Some(oid) = object_node.id().map(|x| x.to_string()) else { let Some(oid) = object_node.id().map(|x| x.to_string()) else {
return Err(UpubError::bad_request()); return Err(UpubError::bad_request());
}; };
model::activity::Entity::insert(activity_model.into_active_model()).exec(self.db()).await?;
match object_node.object_type() { match object_node.object_type() {
Some(apb::ObjectType::Actor(_)) => { Some(apb::ObjectType::Actor(_)) => {
// TODO oof here is an example of the weakness of this model, we have to go all the way // TODO oof here is an example of the weakness of this model, we have to go all the way
@ -155,8 +170,13 @@ impl apb::server::Inbox for Context {
Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"), Some(t) => tracing::warn!("no side effects implemented for update type {t:?}"),
None => tracing::warn!("empty type on embedded updated object"), None => tracing::warn!("empty type on embedded updated object"),
} }
self.address_to(&aid, Some(&oid), &activity_targets).await?;
tracing::info!("{} updated {}", aid, oid); tracing::info!("{} updated {}", aid, oid);
model::activity::Entity::insert(activity_model.into_active_model())
.exec(self.db())
.await?;
let expanded_addressing = self.expand_addressing(activity.addressed()).await?;
self.address_to(&aid, Some(&oid), &expanded_addressing).await?;
Ok(()) Ok(())
} }