use std::sync::Arc; use rusqlite::{named_params, params, Connection, OptionalExtension}; use tokio::sync::Mutex; pub type Event = (i64, Option); #[derive(Clone)] pub struct Database(Arc>); impl Database { pub fn open(path: Option<&str>) -> rusqlite::Result { let db = match path { Some(p) => Connection::open(p)?, None => Connection::open_in_memory()?, }; db.execute( "CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, service INTEGER NOT NULL, time BIGINT NOT NULL, value BIGINT NULL )", params![] )?; db.execute( "CREATE INDEX IF NOT EXISTS ordered_event_per_service ON events (service, time DESC)", params![], )?; db.execute( "CREATE TABLE IF NOT EXISTS services ( id INTEGER PRIMARY KEY AUTOINCREMENT, name STRING NOT NULL )", params![] )?; db.execute( "CREATE INDEX IF NOT EXISTS services_names_lookup ON services (name)", params![], )?; Ok(Self(Arc::new(Mutex::new(db)))) } pub async fn services(&self) -> rusqlite::Result> { let db = self.0.lock().await; let mut stmt = db.prepare("SELECT id, name FROM services")?; let res = stmt.query_map( params![], |row| Ok((row.get(0)?, row.get(1)?)) )?; Ok(res.filter_map(|x| x.ok()).collect()) } pub async fn insert(&self, sid: i64, value: Option) -> rusqlite::Result<()> { self.0.lock().await.execute( "INSERT INTO events(service, time, value) VALUES (:sid, :time, :value)", named_params! { ":sid": sid, ":time": chrono::Utc::now().timestamp(), ":value": value } )?; Ok(()) } pub async fn get(&self, sid: i64, limit: i64) -> rusqlite::Result> { let db = self.0.lock().await; let mut stmt = db.prepare("SELECT time, value FROM events WHERE service = :sid ORDER BY time DESC LIMIT :limit")?; let results = stmt.query_map( named_params! { ":sid": sid, ":limit": limit }, |row| Ok((row.get(0)?, row.get(1)?)), )?; Ok( results .filter_map(|x| x.ok()) .collect() ) } #[async_recursion::async_recursion] // TODO can we not??? pub async fn sid(&self, service: &str, upsert: bool) -> rusqlite::Result { let res = { let db = self.0.lock().await; let mut stmt = db.prepare("SELECT id FROM services WHERE name = ?")?; stmt.query_row(params![service], |row| row.get(0)).optional()? }; match res { Some(sid) => Ok(sid), None => { if upsert { self.0.lock().await.execute("INSERT INTO services(name) VALUES (?)", params![service])?; self.sid(service, upsert).await } else { Err(rusqlite::Error::QueryReturnedNoRows) } } } } pub async fn up(&self, sid: i64, since: Option) -> rusqlite::Result> { let db = self.0.lock().await; let (mut stmt, param) = match since { Some(t) => ( db.prepare("SELECT value FROM events WHERE service = :sid AND time > :time ORDER BY time DESC")?, named_params! { ":sid": sid, ":time": t.clone() }, // TODO what's going on here? why is .clone() needed??? ), None => ( db.prepare("SELECT value FROM events WHERE service = :sid ORDER BY time DESC")?, named_params! { ":sid": sid } ), }; stmt.query_row(param, |row| row.get::>(0)) } }