feat: add sql drivers with sqlx

had to compromise on a lot of stuff because sqlx 0.7+ broke the AnyDb
driver, but ehhh it works
This commit is contained in:
əlemi 2024-01-03 03:32:03 +01:00
parent b424579002
commit 0afc79934d
Signed by: alemi
GPG key ID: A4895B84D311642C
6 changed files with 182 additions and 137 deletions

View file

@ -6,24 +6,34 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
async-trait = "0.1.73" # core
axum = "0.6.20"
clap = { version = "4.4.6", features = ["derive"] }
html-escape = "0.2.13"
lazy_static = "1.4.0"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.39" tracing = "0.1.39"
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"
thiserror = "1.0.51" thiserror = "1.0.51"
chrono = { version = "0.4.31", features = ["serde"] }
uuid = { version = "1.6.1", features = ["v4", "fast-rng"] }
md-5 = "0.10.6" md-5 = "0.10.6"
# telegram provider
teloxide = { version = "0.12.2", features = ["macros"], optional = true }
toml = "0.8.8" toml = "0.8.8"
uuid = { version = "1.6.1", features = ["v4", "fast-rng"] }
chrono = { version = "0.4.31", features = ["serde"] }
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
async-trait = "0.1.73"
html-escape = "0.2.13"
clap = { version = "4.4.6", features = ["derive"] }
tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread"] }
axum = "0.6.20"
# db providers
sqlx = { version = "0.7.3", features = ["runtime-tokio", "tls-rustls", "any"] }
# notification providers
teloxide = { version = "0.12.2", features = ["macros"], optional = true }
[features] [features]
default = ["telegram"] default = [
"mysql", "sqlite", "postgres",
"telegram",
]
# db drivers
mysql = ["sqlx/mysql"]
sqlite = ["sqlx/sqlite"]
postgres = ["sqlx/postgres"]
# notifier providers
telegram = ["dep:teloxide"] telegram = ["dep:teloxide"]

View file

@ -13,7 +13,8 @@ pub struct ConfigOverrides {
pub public: Option<bool>, pub public: Option<bool>,
pub date: Option<String>, #[serde(default = "_true")]
pub date: bool,
} }
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
@ -31,3 +32,5 @@ pub enum ConfigNotifierProvider {
chat_id: i64, chat_id: i64,
}, },
} }
fn _true() -> bool { true }

View file

@ -1,7 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use crate::{storage::JsonFileStorageStrategy, routes::Context, notifications::console::ConsoleTracingNotifier, config::{Config, ConfigNotifierProvider}}; use crate::{storage::StorageProvider, routes::Context, notifications::console::ConsoleTracingNotifier, config::{Config, ConfigNotifierProvider}};
mod notifications; mod notifications;
@ -20,7 +20,7 @@ struct CliArgs {
action: CliAction, action: CliAction,
/// connection string of storage database /// connection string of storage database
#[arg(long, default_value = "./storage.json")] // file://./guestbook.db #[arg(long, default_value = "sqlite://./guestbook.db")]
db: String, db: String,
#[arg(long, default_value_t = false)] #[arg(long, default_value_t = false)]
@ -53,9 +53,6 @@ async fn main() {
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
.init(); .init();
// TODO more (and better) storage solutions! sqlx to the rescue...
let storage = Box::new(JsonFileStorageStrategy::new(&args.db));
match args.action { match args.action {
CliAction::Default => { CliAction::Default => {
let mut cfg = Config::default(); let mut cfg = Config::default();
@ -75,16 +72,21 @@ async fn main() {
} }
}; };
let mut state = Context::new(storage, config.overrides); sqlx::any::install_default_drivers(); // must install all available drivers before connecting
let storage = StorageProvider::connect(&args.db, config.overrides).await.unwrap();
let mut state = Context::new(storage);
for notifier in config.notifiers.providers { for notifier in config.notifiers.providers {
match notifier { match notifier {
ConfigNotifierProvider::ConsoleNotifier => { ConfigNotifierProvider::ConsoleNotifier => {
tracing::info!("registering console notifier");
state.register(Box::new(ConsoleTracingNotifier {})); state.register(Box::new(ConsoleTracingNotifier {}));
}, },
#[cfg(feature = "telegram")] #[cfg(feature = "telegram")]
ConfigNotifierProvider::TelegramNotifier { token, chat_id } => { ConfigNotifierProvider::TelegramNotifier { token, chat_id } => {
tracing::info!("registering telegram notifier for chat {}", chat_id);
state.register(Box::new( state.register(Box::new(
notifications::telegram::TGNotifier::new(&token, chat_id) notifications::telegram::TGNotifier::new(&token, chat_id)
)); ));

View file

@ -1,6 +1,7 @@
use md5::{Md5, Digest}; use md5::{Md5, Digest};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::Row;
use uuid::Uuid; use uuid::Uuid;
use crate::config::ConfigOverrides; use crate::config::ConfigOverrides;
@ -11,13 +12,37 @@ const BODY_MAX_CHARS: usize = 4096;
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Page { pub struct Page {
pub id: i64,
pub author: String, pub author: String,
pub contact: Option<String>, pub contact: Option<String>,
pub body: String, pub body: String,
pub date: DateTime<Utc>, pub timestamp: i64, // sqlx::Any db doesn't support DateTime<Utc>
pub public: bool, pub public: bool,
} }
// TODO
// deserializing Option<T> values on AnyDriver is broken, pr to fix is in progress
// https://github.com/launchbadge/sqlx/issues/2416
// https://github.com/launchbadge/sqlx/pull/2716
// until this is merged, must implement by hand
// once this is merged, just do #[derive(sqlx::FromRow)]
// also what the fuck is going on with bools???
// https://github.com/launchbadge/sqlx/issues/2778
impl<'r> sqlx::FromRow<'r, sqlx::any::AnyRow> for Page {
fn from_row(row: &'r sqlx::any::AnyRow) -> Result<Self, sqlx::Error> {
Ok(
Page {
id: row.get(0),
author: row.get(1),
contact: row.try_get(2).ok(),
body: row.get(3),
timestamp: row.get(4),
public: row.get::<i32, usize>(5) > 0,
}
)
}
}
@ -31,8 +56,8 @@ pub struct PageView {
pub date: DateTime<Utc>, pub date: DateTime<Utc>,
} }
impl From<Page> for PageView { impl From<&Page> for PageView {
fn from(page: Page) -> Self { fn from(page: &Page) -> Self {
let mut hasher = Md5::new(); let mut hasher = Md5::new();
hasher.update(page.contact.as_deref().unwrap_or(&Uuid::new_v4().to_string()).as_bytes()); hasher.update(page.contact.as_deref().unwrap_or(&Uuid::new_v4().to_string()).as_bytes());
let avatar = format!("{:x}", hasher.finalize()); let avatar = format!("{:x}", hasher.finalize());
@ -52,84 +77,56 @@ impl From<Page> for PageView {
PageView { PageView {
url, avatar, url, avatar,
author: page.author, author: page.author.clone(),
contact: page.contact, contact: page.contact.clone(),
body: page.body, body: page.body.clone(),
date: page.date, date: DateTime::from_timestamp(page.timestamp, 0).unwrap_or(DateTime::UNIX_EPOCH),
} }
} }
} }
#[derive(Debug, Clone, Default, Deserialize)] #[derive(Debug, Clone, Default, Deserialize)]
pub struct PageInsertion { pub struct PageInsertion {
#[serde(deserialize_with = "non_empty_str")]
pub author: Option<String>,
#[serde(deserialize_with = "non_empty_str")]
pub contact: Option<String>,
pub body: String, pub body: String,
pub author: Option<String>,
pub contact: Option<String>,
pub public: Option<bool>, pub public: Option<bool>,
pub date: Option<DateTime<Utc>>, pub date: Option<DateTime<Utc>>,
} }
impl PageInsertion { impl PageInsertion {
fn trim_and_escape(input: &str, len: usize) -> String {
html_escape::encode_safe(&input.chars().take(len).collect::<String>()).to_string()
}
pub fn sanitize(&mut self) { pub fn sanitize(&mut self) {
self.author = self.author.as_mut().map(|x| html_escape::encode_safe(&x.chars().take(AUTHOR_MAX_CHARS).collect::<String>()).to_string()); if let Some(author) = self.author.as_mut() {
self.contact = self.contact.as_mut().map(|x| html_escape::encode_safe(&x.chars().take(CONTACT_MAX_CHARS).collect::<String>()).to_string()); *author = Self::trim_and_escape(author, AUTHOR_MAX_CHARS);
self.body = html_escape::encode_safe(&self.body.chars().take(BODY_MAX_CHARS).collect::<String>()).to_string(); }
if self.author.is_some() && self.author.as_deref().unwrap().is_empty() {
self.author = None;
}
if let Some(contact) = self.contact.as_mut() {
*contact = Self::trim_and_escape(contact, CONTACT_MAX_CHARS);
}
if self.contact.is_some() && self.contact.as_deref().unwrap().is_empty() {
self.contact = None;
}
self.body = Self::trim_and_escape(&self.body, BODY_MAX_CHARS);
} }
pub fn convert(mut self, overrides: &ConfigOverrides) -> Page { pub fn overrides(&mut self, overrides: &ConfigOverrides) {
self.sanitize(); if let Some(public) = overrides.public { self.public = Some(public) };
if let Some(author) = &overrides.author { self.author = Some(author.clone()) };
let mut page = Page { if overrides.date { self.date = Some(Utc::now()) };
author: self.author.unwrap_or("".into()),
contact: self.contact,
body: self.body,
date: self.date.unwrap_or(Utc::now()),
public: self.public.unwrap_or(true),
};
if let Some(author) = &overrides.author {
page.author = author.to_string();
}
if let Some(public) = overrides.public {
page.public = public;
}
if let Some(date) = &overrides.date {
if date.to_lowercase() == "now" {
page.date = Utc::now();
} else {
page.date = DateTime::parse_from_rfc3339(date).unwrap().into();
}
}
page
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Acknowledgement {
Sent(String),
Refused(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PageOptions { pub struct PageOptions {
pub offset: Option<usize>, pub offset: Option<i32>,
pub limit: Option<usize>, pub limit: Option<i32>,
}
fn non_empty_str<'de, D: serde::Deserializer<'de>>(d: D) -> Result<Option<String>, D::Error> {
Ok(Option::deserialize(d)?.filter(|s: &String| !s.is_empty()))
} }

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use axum::{Json, Form, Router, routing::{put, post, get}, extract::{State, Query}, response::Redirect}; use axum::{Json, Form, Router, routing::{put, post, get}, extract::{State, Query}, response::Redirect};
use crate::{notifications::NotificationProcessor, model::{Page, PageOptions, PageInsertion}, storage::StorageStrategy, config::ConfigOverrides}; use crate::{notifications::NotificationProcessor, model::{Page, PageOptions, PageInsertion, PageView}, storage::StorageProvider};
pub fn create_router_with_app_routes(state: Context) -> Router { pub fn create_router_with_app_routes(state: Context) -> Router {
Router::new() Router::new()
@ -14,13 +14,12 @@ pub fn create_router_with_app_routes(state: Context) -> Router {
pub struct Context { pub struct Context {
providers: Vec<Box<dyn NotificationProcessor<Page>>>, providers: Vec<Box<dyn NotificationProcessor<Page>>>,
storage: Box<dyn StorageStrategy<Page>>, storage: StorageProvider,
overrides: ConfigOverrides,
} }
impl Context { impl Context {
pub fn new(storage: Box<dyn StorageStrategy<Page>>, overrides: ConfigOverrides) -> Self { pub fn new(storage: StorageProvider) -> Self {
Context { providers: Vec::new(), storage, overrides } Context { providers: Vec::new(), storage }
} }
pub fn register(&mut self, notifier: Box<dyn NotificationProcessor<Page>>) { pub fn register(&mut self, notifier: Box<dyn NotificationProcessor<Page>>) {
@ -29,13 +28,15 @@ impl Context {
} }
async fn send_suggestion(payload: PageInsertion, state: Arc<Context>) -> Result<Redirect, String> { async fn send_suggestion(payload: PageInsertion, state: Arc<Context>) -> Result<Redirect, String> {
let page = payload.convert(&state.overrides); tracing::debug!("processing insertion {:?}", payload);
match state.storage.archive(payload).await {
Err(e) => Err(e.to_string()),
Ok(page) => {
for p in state.providers.iter() { for p in state.providers.iter() {
p.process(&page).await; p.process(&page).await;
} }
match state.storage.archive(page).await { Ok(Redirect::to("/"))
Ok(()) => Ok(Redirect::to("/")), },
Err(e) => Err(e.to_string()),
} }
} }
@ -43,9 +44,10 @@ async fn send_suggestion_json(State(state): State<Arc<Context>>, Json(payload):
async fn send_suggestion_form(State(state): State<Arc<Context>>, Form(payload): Form<PageInsertion>) -> Result<Redirect, String> { send_suggestion(payload, state).await } async fn send_suggestion_form(State(state): State<Arc<Context>>, Form(payload): Form<PageInsertion>) -> Result<Redirect, String> { send_suggestion(payload, state).await }
async fn get_suggestion(State(state): State<Arc<Context>>, Query(page): Query<PageOptions>) -> Result<Json<Vec<Page>>, String> { async fn get_suggestion(State(state): State<Arc<Context>>, Query(page): Query<PageOptions>) -> Result<Json<Vec<PageView>>, String> {
let offset = page.offset.unwrap_or(0); let offset = page.offset.unwrap_or(0);
let limit = std::cmp::min(page.limit.unwrap_or(20), 20); let limit = std::cmp::min(page.limit.unwrap_or(20), 20);
tracing::debug!("serving suggestions (offset {} limit {}", offset, limit);
match state.storage.extract(offset, limit).await { match state.storage.extract(offset, limit).await {
Ok(x) => Ok(Json(x)), Ok(x) => Ok(Json(x)),

View file

@ -1,57 +1,88 @@
use tokio::sync::RwLock; use chrono::Utc;
use crate::model::Page; use crate::{model::{PageView, PageInsertion, Page}, config::ConfigOverrides};
#[derive(Debug)]
#[derive(Debug, thiserror::Error)] pub struct StorageProvider {
pub enum StorageStrategyError { db: sqlx::Pool<sqlx::Any>,
#[error("could not interact with filesystem: {0}")] overrides: ConfigOverrides,
IOError(#[from] std::io::Error),
#[error("could not serialize/deserialize data: {0}")]
JsonSerializeError(#[from] serde_json::Error),
} }
#[async_trait::async_trait] // TODO bool type is not supported in Any driver?????
pub trait StorageStrategy<T> : Send + Sync { // so the `public` field is an integer which is ridicolous
async fn archive(&self, payload: T) -> Result<(), StorageStrategyError>; // but literally cannot get it to work ffs
async fn extract(&self, offset: usize, window: usize) -> Result<Vec<T>, StorageStrategyError>; //
} // https://github.com/launchbadge/sqlx/issues/2778
const SQLITE_SCHEMA : &str = "
CREATE TABLE IF NOT EXISTS pages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
author VARCHAR NOT NULL,
contact VARCHAR,
body VARCHAR NOT NULL,
timestamp INTEGER NOT NULL,
public INTEGER NOT NULL
);
";
/// this strategy is rather inefficient since it has to iterate the whole file every time, but it const POSTGRES_SCHEMA : &str = "
/// requires literally zero effort CREATE TABLE IF NOT EXISTS pages (
pub struct JsonFileStorageStrategy { id SERIAL PRIMARY KEY,
path: RwLock<String>, // only needed to prevent race conditions on insertion author TEXT NOT NULL,
} contact TEXT,
body TEXT NOT NULL,
timestamp INTEGER NOT NULL,
public INTEGER NOT NULL
);
";
impl JsonFileStorageStrategy { impl StorageProvider {
pub fn new(path: &str) -> Self { pub async fn connect(dest: &str, overrides: ConfigOverrides) -> sqlx::Result<Self> {
JsonFileStorageStrategy { path: RwLock::new(path.to_string()) } let db = sqlx::AnyPool::connect(dest).await?;
}
}
match db.acquire().await?.backend_name() {
#[async_trait::async_trait] "PostgreSQL" => { sqlx::query(POSTGRES_SCHEMA).execute(&db).await?; },
impl StorageStrategy<Page> for JsonFileStorageStrategy { "SQLite" => { sqlx::query(SQLITE_SCHEMA).execute(&db).await?; },
async fn archive(&self, payload: Page) -> Result<(), StorageStrategyError> { "MySQL" => { sqlx::query(SQLITE_SCHEMA).execute(&db).await?; }, // TODO will this work?
let path = self.path.write().await; _ => tracing::warn!("could not ensure schema: unsupported database type"),
let file_content = std::fs::read_to_string(&*path)?;
let mut current_content : Vec<Page> = serde_json::from_str(&file_content)?;
current_content.push(payload);
let updated_content = serde_json::to_string(&current_content)?;
std::fs::write(&*path, updated_content)?;
Ok(())
} }
async fn extract(&self, offset: usize, window: usize) -> Result<Vec<Page>, StorageStrategyError> { Ok(StorageProvider { db, overrides })
let path = self.path.read().await;
let file_content = std::fs::read_to_string(&*path)?;
let current_content : Vec<Page> = serde_json::from_str(&file_content)?;
let mut out = Vec::new();
for sugg in current_content.iter().rev().skip(offset) {
out.push(sugg.clone());
if out.len() >= window { break };
} }
pub async fn archive(&self, mut page: PageInsertion) -> sqlx::Result<Page> {
page.sanitize();
page.overrides(&self.overrides);
let result = sqlx::query("INSERT INTO pages (author, contact, body, timestamp, public) VALUES ($1, $2, $3, $4, $5)")
.bind(page.author.as_deref().unwrap_or("anonymous").to_string())
.bind(page.contact.clone())
.bind(page.body.clone())
.bind(page.date.unwrap_or(Utc::now()).timestamp())
.bind(if page.public.unwrap_or(true) { 1 } else { 0 })
.execute(&self.db)
.await?;
Ok(
Page {
id: result.last_insert_id().unwrap_or(-1),
author: page.author.unwrap_or("anonymous".into()),
contact: page.contact,
body: page.body,
timestamp: page.date.unwrap_or(Utc::now()).timestamp(),
public: page.public.unwrap_or(true),
}
)
}
pub async fn extract(&self, offset: i32, window: i32) -> sqlx::Result<Vec<PageView>> {
// TODO since AnyPool won't handle booleans we compare with an integer
let out = sqlx::query_as("SELECT * FROM pages WHERE public = 1 LIMIT $1 OFFSET $2")
.bind(window)
.bind(offset)
.fetch_all(&self.db)
.await?
.iter()
.map(PageView::from)
.collect();
Ok(out) Ok(out)
} }
} }