pc-monitor/script/cpu-net-monitor.py

109 lines
3.6 KiB
Python
Executable file

#!/usr/bin/env python
import asyncio
import sys
import struct
import logging
from time import sleep
import serial
import psutil
BAR_MAX_HEIGHT = 255
class State:
run: bool
device: str
baudrate: int
loop: asyncio.AbstractEventLoop
packets: asyncio.Queue
_port: serial.Serial
_task: asyncio.Task
def __init__(self, device:str="/dev/ttyUSB0", baudrate=57600):
self.run = True
self.baudrate = baudrate
self.device = device
self.packets = asyncio.Queue()
self.port = None
async def run_port_manager(self):
while self.run:
logging.debug("[*] Connecting to device '%s'", self.device)
try:
self.port = await self._loop.run_in_executor(None, serial.Serial, self.device, self.baudrate)
await self._loop.run_in_executor(None, self.port.write, struct.pack("BB", 0, 0))
while self.run:
pkt = await self.packets.get()
logging.debug("[>] Dispatching packet [ %s ]", str(pkt))
await self._loop.run_in_executor(None, self.port.write, pkt)
except serial.SerialException as e:
logging.error("[!] Error operating with device: %s", str(e))
await asyncio.sleep(30)
except Exception as e:
logging.exception("unhandled exception")
self.run = False
finally:
if self.port:
self.port.close()
async def display_polling(self):
rx = 0
tx = 0
while self.run:
cpu_report = await self._loop.run_in_executor(None, psutil.cpu_percent, 0.1, True)
net = psutil.net_io_counters(pernic=True)
d_rx = sum(v.bytes_recv for k, v in net.items() if k != "lo")
d_tx = sum(v.bytes_sent for k, v in net.items() if k != "lo")
p_rx = min(int(d_rx - rx), 255)
p_tx = min(int(d_tx - tx), 255)
w_rx = min(int((d_rx - rx) / 4096), 255)
w_tx = min(int((d_tx - tx) / 4096), 255)
await self.packets.put(struct.pack("BBBBBB", 1, 4, *( int(((x/100) **2) * BAR_MAX_HEIGHT) for x in cpu_report )))
await self.packets.put(struct.pack("BBBB", 2, 2, min(p_tx, 255), min(p_rx, 255)))
await self.packets.put(struct.pack("BBBBBBBBBB", 3, 8, *( int((x/100) * BAR_MAX_HEIGHT) for x in cpu_report ), p_tx, p_rx, w_tx, w_rx))
rx = d_rx
tx = d_tx
# async def cpu_load_leds(self):
# return
# while self.run:
# cpu_report = await self._loop.run_in_executor(None, psutil.cpu_percent, 0.05, True)
# load = [ int(((x/100) **2) * 255) for x in cpu_report ] # mypy whines but percpu returns a list
# logging.info("CPU [%d|%d|%d|%d]", *load)
# await self.packets.put(struct.pack("BBBBBB", 1, 4, *load))
# async def net_traffic_leds(self):
# return
# rx = 0
# tx = 0
# while self.run:
# net = psutil.net_io_counters(pernic=True)
# d_rx = sum(v.bytes_recv for k, v in net.items() if k != "lo")
# d_tx = sum(v.bytes_sent for k, v in net.items() if k != "lo")
# logging.info("NET [TX %d | %d RX]", d_tx - tx, d_rx - rx)
# await self.packets.put(struct.pack("BBBB", 2, 2, min(d_tx - tx, 255), min(d_rx - rx, 255)))
# rx = d_rx
# tx = d_tx
# await asyncio.sleep(0.01)
async def run_tasks(self):
self._loop = asyncio.get_event_loop()
port_manager = self._loop.create_task(self.run_port_manager())
# cpu_leds = self._loop.create_task(self.cpu_load_leds())
# net_leds = self._loop.create_task(self.net_traffic_leds())
display = self._loop.create_task(self.display_polling())
# await asyncio.gather(port_manager, cpu_leds, net_leds, display)
await asyncio.gather(port_manager, display)
if __name__ == "__main__":
if len(sys.argv) < 2:
logging.error("[!] No device specified")
exit(-1)
logging.basicConfig(level=logging.WARNING)
state = State(sys.argv[1], baudrate=115200)
asyncio.run(state.run_tasks())