mirror of
https://git.alemi.dev/dashboard.git
synced 2024-11-22 23:44:55 +01:00
fix: spawn tokio runtime in another thread
This commit is contained in:
parent
eb460fe3db
commit
78c7982911
4 changed files with 249 additions and 19 deletions
|
@ -13,6 +13,7 @@ path = "src/main.rs"
|
||||||
crate-type = ["cdylib", "rlib"]
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
dirs = "4"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
chrono = { version = "0.4", features = ["wasmbind"] }
|
chrono = { version = "0.4", features = ["wasmbind"] }
|
||||||
eframe = { version = "0.18", features = ["persistence"] }
|
eframe = { version = "0.18", features = ["persistence"] }
|
||||||
|
|
201
src/app/data/source.rs
Normal file
201
src/app/data/source.rs
Normal file
|
@ -0,0 +1,201 @@
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use rand::Rng;
|
||||||
|
use std::io::{Write, Read};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::{Deserialize, Serialize, de::{DeserializeOwned}};
|
||||||
|
use eframe::egui::{plot::Value, Context};
|
||||||
|
|
||||||
|
pub fn native_save(name: &str, data:String) -> std::io::Result<()> {
|
||||||
|
let mut file = std::fs::File::create(name)?;
|
||||||
|
file.write_all(data.as_bytes())?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
pub struct DataSource {
|
||||||
|
data : Arc<Mutex<Vec<Value>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct SerializableValue {
|
||||||
|
x : f64,
|
||||||
|
y : f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataSource {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self{ data: Arc::new(Mutex::new(Vec::new())) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn view(&self) -> Vec<Value> { // TODO handle errors
|
||||||
|
return self.data.lock().unwrap().clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(&self) -> String {
|
||||||
|
let mut out : Vec<SerializableValue> = Vec::new();
|
||||||
|
for value in self.view() {
|
||||||
|
out.push(SerializableValue { x: value.x, y: value.y });
|
||||||
|
}
|
||||||
|
return serde_json::to_string(&out).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait PlotValue {
|
||||||
|
fn as_value(&self) -> Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Data {
|
||||||
|
fn load_remote(&mut self, url:&str, ctx:Context);
|
||||||
|
fn load_local(&mut self, file:&str, ctx:Context);
|
||||||
|
|
||||||
|
fn read(&mut self, file:&str, storage:Arc<Mutex<Vec<Value>>>, ctx:Context) -> std::io::Result<()> {
|
||||||
|
let mut file = std::fs::File::open(file)?;
|
||||||
|
let mut contents = String::new();
|
||||||
|
file.read_to_string(&mut contents)?;
|
||||||
|
let data : Vec<SerializableValue> = serde_json::from_str(contents.as_str())?;
|
||||||
|
for v in data {
|
||||||
|
storage.lock().unwrap().push(Value { x: v.x, y: v.y });
|
||||||
|
}
|
||||||
|
ctx.request_repaint();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch<T>(&mut self, base:&str, endpoint:&str, storage:Arc<Mutex<Vec<Value>>>, ctx:Context)
|
||||||
|
where T : DeserializeOwned + PlotValue {
|
||||||
|
let request = ehttp::Request::get(format!("{}/{}", base, endpoint));
|
||||||
|
ehttp::fetch(request, move |result: ehttp::Result<ehttp::Response>| {
|
||||||
|
let data : T = serde_json::from_slice(result.unwrap().bytes.as_slice()).unwrap();
|
||||||
|
storage.lock().unwrap().push(data.as_value());
|
||||||
|
ctx.request_repaint();
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TpsData {
|
||||||
|
pub ds: DataSource,
|
||||||
|
load_interval : i64,
|
||||||
|
last_load : DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct TpsResponseData {
|
||||||
|
tps: f64
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PlotValue for TpsResponseData {
|
||||||
|
fn as_value(&self) -> Value {
|
||||||
|
Value { x: Utc::now().timestamp() as f64, y: self.tps }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TpsData {
|
||||||
|
pub fn new(load_interval:i64) -> Self {
|
||||||
|
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data for TpsData{
|
||||||
|
fn load_remote(&mut self, url:&str, ctx:Context) {
|
||||||
|
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
|
||||||
|
self.last_load = Utc::now();
|
||||||
|
self.fetch::<TpsResponseData>(url, "tps", self.ds.data.clone(), ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_local(&mut self, file:&str, ctx:Context) {
|
||||||
|
self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ChatData {
|
||||||
|
pub ds : DataSource,
|
||||||
|
load_interval : i64,
|
||||||
|
last_load : DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct ChatResponseData {
|
||||||
|
volume: f64
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PlotValue for ChatResponseData {
|
||||||
|
fn as_value(&self) -> Value {
|
||||||
|
Value { x:Utc::now().timestamp() as f64, y: self.volume }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChatData {
|
||||||
|
pub fn new(load_interval:i64) -> Self {
|
||||||
|
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data for ChatData{
|
||||||
|
fn load_remote(&mut self, url:&str, ctx:Context) {
|
||||||
|
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
|
||||||
|
self.last_load = Utc::now();
|
||||||
|
self.fetch::<ChatResponseData>(url, "chat_activity", self.ds.data.clone(), ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_local(&mut self, file:&str, ctx:Context) {
|
||||||
|
self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PlayerCountData {
|
||||||
|
pub ds : DataSource,
|
||||||
|
load_interval : i64,
|
||||||
|
last_load : DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct PlayerCountResponseData {
|
||||||
|
count: i32
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PlotValue for PlayerCountResponseData {
|
||||||
|
fn as_value(&self) -> Value {
|
||||||
|
Value { x:Utc::now().timestamp() as f64, y: self.count as f64 }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PlayerCountData {
|
||||||
|
pub fn new(load_interval:i64) -> Self {
|
||||||
|
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data for PlayerCountData{
|
||||||
|
fn load_remote(&mut self, url:&str, ctx:Context) {
|
||||||
|
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
|
||||||
|
self.last_load = Utc::now();
|
||||||
|
self.fetch::<PlayerCountResponseData>(url, "player_count", self.ds.data.clone(), ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_local(&mut self, file:&str, ctx:Context) {
|
||||||
|
self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RandomData {
|
||||||
|
pub ds : DataSource,
|
||||||
|
load_interval : i64,
|
||||||
|
last_load : DateTime<Utc>,
|
||||||
|
rng: rand::rngs::ThreadRng,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RandomData {
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn new(load_interval:i64) -> Self {
|
||||||
|
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval, rng : rand::thread_rng() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data for RandomData{
|
||||||
|
fn load_remote(&mut self, _url:&str, ctx:Context) {
|
||||||
|
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
|
||||||
|
self.last_load = Utc::now();
|
||||||
|
self.ds.data.lock().unwrap().push(Value {x:Utc::now().timestamp() as f64, y:self.rng.gen()});
|
||||||
|
ctx.request_repaint();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_local(&mut self, _file:&str, _ctx:Context) {}
|
||||||
|
}
|
33
src/main.rs
33
src/main.rs
|
@ -1,21 +1,42 @@
|
||||||
mod app;
|
mod app;
|
||||||
mod util;
|
mod util;
|
||||||
|
|
||||||
use app::App;
|
use std::sync::{Arc, Mutex};
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
use crate::util::worker::{BackgroundWorker, NativeBackgroundWorker};
|
use crate::util::worker::{BackgroundWorker, NativeBackgroundWorker};
|
||||||
|
use crate::app::{App, data::store::{SQLiteDataStore, DataStorage}};
|
||||||
|
|
||||||
// When compiling natively:
|
// When compiling natively:
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
fn main() {
|
fn main() -> ! {
|
||||||
let native_options = eframe::NativeOptions::default();
|
let native_options = eframe::NativeOptions::default();
|
||||||
|
|
||||||
|
let mut store_path = dirs::data_dir().unwrap_or(std::path::PathBuf::from(".")); // TODO get cwd more consistently?
|
||||||
|
store_path.push("dashboard.db");
|
||||||
|
|
||||||
let worker = NativeBackgroundWorker::start();
|
println!("{}", store_path.as_path().to_str().unwrap());
|
||||||
|
|
||||||
|
let store = Arc::new(
|
||||||
|
SQLiteDataStore::new(store_path)
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker
|
eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker
|
||||||
"2b2t queue stats",
|
"dashboard",
|
||||||
native_options,
|
native_options,
|
||||||
Box::new(|cc| Box::new(App::new(cc))),
|
Box::new(move |cc| {
|
||||||
|
let worker = NativeBackgroundWorker::start();
|
||||||
|
let ctx = cc.egui_ctx.clone();
|
||||||
|
worker.task(async move {
|
||||||
|
loop {
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
ctx.request_repaint();
|
||||||
|
// tokio::spawn(async move {store2.fetch_all().await});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(App::new(cc, store))
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
// worker.stop();
|
// worker.stop();
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use tokio::{runtime::Runtime, sync::oneshot::Sender};
|
use tokio::{runtime::{Handle, Runtime, Builder}, sync::oneshot::Sender, time::{sleep, Duration}};
|
||||||
|
|
||||||
pub(crate) trait BackgroundWorker {
|
pub(crate) trait BackgroundWorker {
|
||||||
fn start() -> Self; // TODO make it return an error? Can we even do anything without a background worker
|
fn start() -> Self; // TODO make it return an error? Can we even do anything without a background worker
|
||||||
|
@ -7,24 +7,31 @@ pub(crate) trait BackgroundWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct NativeBackgroundWorker {
|
pub(crate) struct NativeBackgroundWorker {
|
||||||
runtime : Runtime,
|
runtime : Handle,
|
||||||
end_tx : Sender<bool>,
|
// end_tx : Sender<bool>,
|
||||||
worker : std::thread::JoinHandle<bool>,
|
worker : std::thread::JoinHandle<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackgroundWorker for NativeBackgroundWorker {
|
impl BackgroundWorker for NativeBackgroundWorker {
|
||||||
fn start() -> Self {
|
fn start() -> Self {
|
||||||
let runtime = Runtime::new().expect("Failed creating Tokio runtime");
|
let (rt_tx, rt_rx) = tokio::sync::oneshot::channel::<Handle>();
|
||||||
let (end_tx, end_rx) = tokio::sync::oneshot::channel::<bool>();
|
let worker = std::thread::spawn(|| {
|
||||||
let r_handle = runtime.handle().clone();
|
let runtime = Builder::new_multi_thread()
|
||||||
let worker = std::thread::spawn(move ||{
|
.worker_threads(1)
|
||||||
r_handle.block_on(async {
|
.enable_all()
|
||||||
end_rx.await.expect("Error shutting down")
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
rt_tx.send(runtime.handle().clone()).unwrap();
|
||||||
|
runtime.block_on(async {
|
||||||
|
loop {
|
||||||
|
println!("keepalive loop");
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
NativeBackgroundWorker {
|
NativeBackgroundWorker {
|
||||||
runtime : runtime,
|
runtime : rt_rx.blocking_recv().unwrap(),
|
||||||
end_tx : end_tx,
|
// end_tx : end_tx,
|
||||||
worker : worker,
|
worker : worker,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,7 +41,7 @@ impl BackgroundWorker for NativeBackgroundWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(self) {
|
fn stop(self) {
|
||||||
self.end_tx.send(true).expect("Failed signaling termination");
|
// self.end_tx.send(true).expect("Failed signaling termination");
|
||||||
self.worker.join().expect("Failed joining main worker thread");
|
// self.worker.join().expect("Failed joining main worker thread");
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in a new issue