fix: spawn tokio runtime in another thread

This commit is contained in:
əlemi 2022-06-06 04:42:06 +02:00
parent 5ded746434
commit ccf4a64c15
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 249 additions and 19 deletions

View file

@ -13,6 +13,7 @@ path = "src/main.rs"
crate-type = ["cdylib", "rlib"] crate-type = ["cdylib", "rlib"]
[dependencies] [dependencies]
dirs = "4"
rand = "0.8" rand = "0.8"
chrono = { version = "0.4", features = ["wasmbind"] } chrono = { version = "0.4", features = ["wasmbind"] }
eframe = { version = "0.18", features = ["persistence"] } eframe = { version = "0.18", features = ["persistence"] }

201
src/app/data/source.rs Normal file
View file

@ -0,0 +1,201 @@
use std::sync::{Arc, Mutex};
use rand::Rng;
use std::io::{Write, Read};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize, de::{DeserializeOwned}};
use eframe::egui::{plot::Value, Context};
pub fn native_save(name: &str, data:String) -> std::io::Result<()> {
let mut file = std::fs::File::create(name)?;
file.write_all(data.as_bytes())?;
return Ok(());
}
pub struct DataSource {
data : Arc<Mutex<Vec<Value>>>,
}
#[derive(Serialize, Deserialize)]
struct SerializableValue {
x : f64,
y : f64,
}
impl DataSource {
pub fn new() -> Self {
Self{ data: Arc::new(Mutex::new(Vec::new())) }
}
pub fn view(&self) -> Vec<Value> { // TODO handle errors
return self.data.lock().unwrap().clone();
}
pub fn serialize(&self) -> String {
let mut out : Vec<SerializableValue> = Vec::new();
for value in self.view() {
out.push(SerializableValue { x: value.x, y: value.y });
}
return serde_json::to_string(&out).unwrap();
}
}
pub trait PlotValue {
fn as_value(&self) -> Value;
}
pub trait Data {
fn load_remote(&mut self, url:&str, ctx:Context);
fn load_local(&mut self, file:&str, ctx:Context);
fn read(&mut self, file:&str, storage:Arc<Mutex<Vec<Value>>>, ctx:Context) -> std::io::Result<()> {
let mut file = std::fs::File::open(file)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let data : Vec<SerializableValue> = serde_json::from_str(contents.as_str())?;
for v in data {
storage.lock().unwrap().push(Value { x: v.x, y: v.y });
}
ctx.request_repaint();
Ok(())
}
fn fetch<T>(&mut self, base:&str, endpoint:&str, storage:Arc<Mutex<Vec<Value>>>, ctx:Context)
where T : DeserializeOwned + PlotValue {
let request = ehttp::Request::get(format!("{}/{}", base, endpoint));
ehttp::fetch(request, move |result: ehttp::Result<ehttp::Response>| {
let data : T = serde_json::from_slice(result.unwrap().bytes.as_slice()).unwrap();
storage.lock().unwrap().push(data.as_value());
ctx.request_repaint();
});
}
}
pub struct TpsData {
pub ds: DataSource,
load_interval : i64,
last_load : DateTime<Utc>,
}
#[derive(Serialize, Deserialize)]
struct TpsResponseData {
tps: f64
}
impl PlotValue for TpsResponseData {
fn as_value(&self) -> Value {
Value { x: Utc::now().timestamp() as f64, y: self.tps }
}
}
impl TpsData {
pub fn new(load_interval:i64) -> Self {
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval }
}
}
impl Data for TpsData{
fn load_remote(&mut self, url:&str, ctx:Context) {
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
self.last_load = Utc::now();
self.fetch::<TpsResponseData>(url, "tps", self.ds.data.clone(), ctx);
}
fn load_local(&mut self, file:&str, ctx:Context) {
self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file));
}
}
pub struct ChatData {
pub ds : DataSource,
load_interval : i64,
last_load : DateTime<Utc>,
}
#[derive(Serialize, Deserialize)]
struct ChatResponseData {
volume: f64
}
impl PlotValue for ChatResponseData {
fn as_value(&self) -> Value {
Value { x:Utc::now().timestamp() as f64, y: self.volume }
}
}
impl ChatData {
pub fn new(load_interval:i64) -> Self {
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval }
}
}
impl Data for ChatData{
fn load_remote(&mut self, url:&str, ctx:Context) {
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
self.last_load = Utc::now();
self.fetch::<ChatResponseData>(url, "chat_activity", self.ds.data.clone(), ctx);
}
fn load_local(&mut self, file:&str, ctx:Context) {
self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file));
}
}
pub struct PlayerCountData {
pub ds : DataSource,
load_interval : i64,
last_load : DateTime<Utc>,
}
#[derive(Serialize, Deserialize)]
struct PlayerCountResponseData {
count: i32
}
impl PlotValue for PlayerCountResponseData {
fn as_value(&self) -> Value {
Value { x:Utc::now().timestamp() as f64, y: self.count as f64 }
}
}
impl PlayerCountData {
pub fn new(load_interval:i64) -> Self {
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval }
}
}
impl Data for PlayerCountData{
fn load_remote(&mut self, url:&str, ctx:Context) {
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
self.last_load = Utc::now();
self.fetch::<PlayerCountResponseData>(url, "player_count", self.ds.data.clone(), ctx);
}
fn load_local(&mut self, file:&str, ctx:Context) {
self.read(file, self.ds.data.clone(), ctx).unwrap_or_else(|_err| println!("Could not load {}", file));
}
}
pub struct RandomData {
pub ds : DataSource,
load_interval : i64,
last_load : DateTime<Utc>,
rng: rand::rngs::ThreadRng,
}
impl RandomData {
#[allow(dead_code)]
pub fn new(load_interval:i64) -> Self {
Self { ds: DataSource::new() , last_load: Utc::now(), load_interval, rng : rand::thread_rng() }
}
}
impl Data for RandomData{
fn load_remote(&mut self, _url:&str, ctx:Context) {
if (Utc::now() - self.last_load).num_seconds() < self.load_interval { return; }
self.last_load = Utc::now();
self.ds.data.lock().unwrap().push(Value {x:Utc::now().timestamp() as f64, y:self.rng.gen()});
ctx.request_repaint();
}
fn load_local(&mut self, _file:&str, _ctx:Context) {}
}

View file

@ -1,21 +1,42 @@
mod app; mod app;
mod util; mod util;
use app::App; use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
use crate::util::worker::{BackgroundWorker, NativeBackgroundWorker}; use crate::util::worker::{BackgroundWorker, NativeBackgroundWorker};
use crate::app::{App, data::store::{SQLiteDataStore, DataStorage}};
// When compiling natively: // When compiling natively:
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
fn main() { fn main() -> ! {
let native_options = eframe::NativeOptions::default(); let native_options = eframe::NativeOptions::default();
let worker = NativeBackgroundWorker::start(); let mut store_path = dirs::data_dir().unwrap_or(std::path::PathBuf::from(".")); // TODO get cwd more consistently?
store_path.push("dashboard.db");
println!("{}", store_path.as_path().to_str().unwrap());
let store = Arc::new(
SQLiteDataStore::new(store_path)
.unwrap()
);
eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker eframe::run_native( // TODO replace this with a loop that ends so we can cleanly exit the background worker
"2b2t queue stats", "dashboard",
native_options, native_options,
Box::new(|cc| Box::new(App::new(cc))), Box::new(move |cc| {
let worker = NativeBackgroundWorker::start();
let ctx = cc.egui_ctx.clone();
worker.task(async move {
loop {
sleep(Duration::from_secs(1)).await;
ctx.request_repaint();
// tokio::spawn(async move {store2.fetch_all().await});
}
});
Box::new(App::new(cc, store))
}),
); );
// worker.stop(); // worker.stop();

View file

@ -1,4 +1,4 @@
use tokio::{runtime::Runtime, sync::oneshot::Sender}; use tokio::{runtime::{Handle, Runtime, Builder}, sync::oneshot::Sender, time::{sleep, Duration}};
pub(crate) trait BackgroundWorker { pub(crate) trait BackgroundWorker {
fn start() -> Self; // TODO make it return an error? Can we even do anything without a background worker fn start() -> Self; // TODO make it return an error? Can we even do anything without a background worker
@ -7,24 +7,31 @@ pub(crate) trait BackgroundWorker {
} }
pub(crate) struct NativeBackgroundWorker { pub(crate) struct NativeBackgroundWorker {
runtime : Runtime, runtime : Handle,
end_tx : Sender<bool>, // end_tx : Sender<bool>,
worker : std::thread::JoinHandle<bool>, worker : std::thread::JoinHandle<bool>,
} }
impl BackgroundWorker for NativeBackgroundWorker { impl BackgroundWorker for NativeBackgroundWorker {
fn start() -> Self { fn start() -> Self {
let runtime = Runtime::new().expect("Failed creating Tokio runtime"); let (rt_tx, rt_rx) = tokio::sync::oneshot::channel::<Handle>();
let (end_tx, end_rx) = tokio::sync::oneshot::channel::<bool>(); let worker = std::thread::spawn(|| {
let r_handle = runtime.handle().clone(); let runtime = Builder::new_multi_thread()
let worker = std::thread::spawn(move ||{ .worker_threads(1)
r_handle.block_on(async { .enable_all()
end_rx.await.expect("Error shutting down") .build()
.unwrap();
rt_tx.send(runtime.handle().clone()).unwrap();
runtime.block_on(async {
loop {
println!("keepalive loop");
sleep(Duration::from_secs(1)).await;
}
}) })
}); });
NativeBackgroundWorker { NativeBackgroundWorker {
runtime : runtime, runtime : rt_rx.blocking_recv().unwrap(),
end_tx : end_tx, // end_tx : end_tx,
worker : worker, worker : worker,
} }
} }
@ -34,7 +41,7 @@ impl BackgroundWorker for NativeBackgroundWorker {
} }
fn stop(self) { fn stop(self) {
self.end_tx.send(true).expect("Failed signaling termination"); // self.end_tx.send(true).expect("Failed signaling termination");
self.worker.join().expect("Failed joining main worker thread"); // self.worker.join().expect("Failed joining main worker thread");
} }
} }