From ccf4a64c15a219cecf356a435f2b66dace639c6b Mon Sep 17 00:00:00 2001 From: alemi Date: Mon, 6 Jun 2022 04:42:06 +0200 Subject: [PATCH] fix: spawn tokio runtime in another thread --- Cargo.toml | 1 + src/app/data/source.rs | 201 +++++++++++++++++++++++++++++++++++++++++ src/main.rs | 33 +++++-- src/util/worker.rs | 33 ++++--- 4 files changed, 249 insertions(+), 19 deletions(-) create mode 100644 src/app/data/source.rs diff --git a/Cargo.toml b/Cargo.toml index c186629..77b64f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" crate-type = ["cdylib", "rlib"] [dependencies] +dirs = "4" rand = "0.8" chrono = { version = "0.4", features = ["wasmbind"] } eframe = { version = "0.18", features = ["persistence"] } diff --git a/src/app/data/source.rs b/src/app/data/source.rs new file mode 100644 index 0000000..f7eb211 --- /dev/null +++ b/src/app/data/source.rs @@ -0,0 +1,201 @@ +use std::sync::{Arc, Mutex}; +use rand::Rng; +use std::io::{Write, Read}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize, de::{DeserializeOwned}}; +use eframe::egui::{plot::Value, Context}; + +pub fn native_save(name: &str, data:String) -> std::io::Result<()> { + let mut file = std::fs::File::create(name)?; + file.write_all(data.as_bytes())?; + return Ok(()); +} +pub struct DataSource { + data : Arc>>, +} + +#[derive(Serialize, Deserialize)] +struct SerializableValue { + x : f64, + y : f64, +} + +impl DataSource { + pub fn new() -> Self { + Self{ data: Arc::new(Mutex::new(Vec::new())) } + } + + pub fn view(&self) -> Vec { // TODO handle errors + return self.data.lock().unwrap().clone(); + } + + pub fn serialize(&self) -> String { + let mut out : Vec = Vec::new(); + for value in self.view() { + out.push(SerializableValue { x: value.x, y: value.y }); + } + return serde_json::to_string(&out).unwrap(); + } +} + +pub trait PlotValue { + fn as_value(&self) -> Value; +} + +pub trait Data { + fn load_remote(&mut self, url:&str, ctx:Context); + fn load_local(&mut self, file:&str, ctx:Context); + + fn read(&mut self, file:&str, storage:Arc>>, ctx:Context) -> std::io::Result<()> { + let mut file = std::fs::File::open(file)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + let data : Vec = serde_json::from_str(contents.as_str())?; + for v in data { + storage.lock().unwrap().push(Value { x: v.x, y: v.y }); + } + ctx.request_repaint(); + Ok(()) + } + + fn fetch(&mut self, base:&str, endpoint:&str, storage:Arc>>, ctx:Context) + where T : DeserializeOwned + PlotValue { + let request = ehttp::Request::get(format!("{}/{}", base, endpoint)); + ehttp::fetch(request, move |result: ehttp::Result| { + let data : T = serde_json::from_slice(result.unwrap().bytes.as_slice()).unwrap(); + storage.lock().unwrap().push(data.as_value()); + ctx.request_repaint(); + }); + + } +} + +pub struct TpsData { + pub ds: DataSource, + load_interval : i64, + last_load : DateTime, +} + +#[derive(Serialize, Deserialize)] +struct TpsResponseData { + tps: f64 +} + +impl PlotValue for TpsResponseData { + fn as_value(&self) -> Value { + Value { x: Utc::now().timestamp() as f64, y: self.tps } + } +} + +impl TpsData { + pub fn new(load_interval:i64) -> Self { + Self { ds: DataSource::new() , last_load: Utc::now(), load_interval } + } +} + +impl Data for TpsData{ + fn load_remote(&mut self, url:&str, ctx:Context) { + if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; } + self.last_load = Utc::now(); + self.fetch::(url, "tps", self.ds.data.clone(), ctx); + } + + fn load_local(&mut self, file:&str, ctx:Context) { + self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file)); + } +} + +pub struct ChatData { + pub ds : DataSource, + load_interval : i64, + last_load : DateTime, +} + +#[derive(Serialize, Deserialize)] +struct ChatResponseData { + volume: f64 +} + +impl PlotValue for ChatResponseData { + fn as_value(&self) -> Value { + Value { x:Utc::now().timestamp() as f64, y: self.volume } + } +} + +impl ChatData { + pub fn new(load_interval:i64) -> Self { + Self { ds: DataSource::new() , last_load: Utc::now(), load_interval } + } +} + +impl Data for ChatData{ + fn load_remote(&mut self, url:&str, ctx:Context) { + if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; } + self.last_load = Utc::now(); + self.fetch::(url, "chat_activity", self.ds.data.clone(), ctx); + } + + fn load_local(&mut self, file:&str, ctx:Context) { + self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file)); + } +} + +pub struct PlayerCountData { + pub ds : DataSource, + load_interval : i64, + last_load : DateTime, +} + +#[derive(Serialize, Deserialize)] +struct PlayerCountResponseData { + count: i32 +} + +impl PlotValue for PlayerCountResponseData { + fn as_value(&self) -> Value { + Value { x:Utc::now().timestamp() as f64, y: self.count as f64 } + } +} + +impl PlayerCountData { + pub fn new(load_interval:i64) -> Self { + Self { ds: DataSource::new() , last_load: Utc::now(), load_interval } + } +} + +impl Data for PlayerCountData{ + fn load_remote(&mut self, url:&str, ctx:Context) { + if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; } + self.last_load = Utc::now(); + self.fetch::(url, "player_count", self.ds.data.clone(), ctx); + } + + fn load_local(&mut self, file:&str, ctx:Context) { + self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file)); + } +} + +pub struct RandomData { + pub ds : DataSource, + load_interval : i64, + last_load : DateTime, + rng: rand::rngs::ThreadRng, +} + +impl RandomData { + #[allow(dead_code)] + pub fn new(load_interval:i64) -> Self { + Self { ds: DataSource::new() , last_load: Utc::now(), load_interval, rng : rand::thread_rng() } + } +} + +impl Data for RandomData{ + fn load_remote(&mut self, _url:&str, ctx:Context) { + if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; } + self.last_load = Utc::now(); + self.ds.data.lock().unwrap().push(Value {x:Utc::now().timestamp() as f64, y:self.rng.gen()}); + ctx.request_repaint(); + } + + fn load_local(&mut self, _file:&str, _ctx:Context) {} +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 4c46ef4..c38b3db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,42 @@ mod app; mod util; -use app::App; - +use std::sync::{Arc, Mutex}; +use tokio::time::{sleep, Duration}; use crate::util::worker::{BackgroundWorker, NativeBackgroundWorker}; +use crate::app::{App, data::store::{SQLiteDataStore, DataStorage}}; // When compiling natively: #[cfg(not(target_arch = "wasm32"))] -fn main() { +fn main() -> ! { let native_options = eframe::NativeOptions::default(); + + let mut store_path = dirs::data_dir().unwrap_or(std::path::PathBuf::from(".")); // TODO get cwd more consistently? + store_path.push("dashboard.db"); - let worker = NativeBackgroundWorker::start(); + println!("{}", store_path.as_path().to_str().unwrap()); + + let store = Arc::new( + SQLiteDataStore::new(store_path) + .unwrap() + ); eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker - "2b2t queue stats", + "dashboard", native_options, - Box::new(|cc| Box::new(App::new(cc))), + Box::new(move |cc| { + let worker = NativeBackgroundWorker::start(); + let ctx = cc.egui_ctx.clone(); + worker.task(async move { + loop { + sleep(Duration::from_secs(1)).await; + ctx.request_repaint(); + // tokio::spawn(async move {store2.fetch_all().await}); + } + }); + + Box::new(App::new(cc, store)) + }), ); // worker.stop(); diff --git a/src/util/worker.rs b/src/util/worker.rs index cfb3557..e7f34b7 100644 --- a/src/util/worker.rs +++ b/src/util/worker.rs @@ -1,4 +1,4 @@ -use tokio::{runtime::Runtime, sync::oneshot::Sender}; +use tokio::{runtime::{Handle, Runtime, Builder}, sync::oneshot::Sender, time::{sleep, Duration}}; pub(crate) trait BackgroundWorker { fn start() -> Self; // TODO make it return an error? Can we even do anything without a background worker @@ -7,24 +7,31 @@ pub(crate) trait BackgroundWorker { } pub(crate) struct NativeBackgroundWorker { - runtime : Runtime, - end_tx : Sender, + runtime : Handle, + // end_tx : Sender, worker : std::thread::JoinHandle, } impl BackgroundWorker for NativeBackgroundWorker { fn start() -> Self { - let runtime = Runtime::new().expect("Failed creating Tokio runtime"); - let (end_tx, end_rx) = tokio::sync::oneshot::channel::(); - let r_handle = runtime.handle().clone(); - let worker = std::thread::spawn(move ||{ - r_handle.block_on(async { - end_rx.await.expect("Error shutting down") + let (rt_tx, rt_rx) = tokio::sync::oneshot::channel::(); + let worker = std::thread::spawn(|| { + let runtime = Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + rt_tx.send(runtime.handle().clone()).unwrap(); + runtime.block_on(async { + loop { + println!("keepalive loop"); + sleep(Duration::from_secs(1)).await; + } }) }); NativeBackgroundWorker { - runtime : runtime, - end_tx : end_tx, + runtime : rt_rx.blocking_recv().unwrap(), + // end_tx : end_tx, worker : worker, } } @@ -34,7 +41,7 @@ impl BackgroundWorker for NativeBackgroundWorker { } fn stop(self) { - self.end_tx.send(true).expect("Failed signaling termination"); - self.worker.join().expect("Failed joining main worker thread"); + // self.end_tx.send(true).expect("Failed signaling termination"); + // self.worker.join().expect("Failed joining main worker thread"); } } \ No newline at end of file