first ugly impl of background callbacks with uuid dict
This commit is contained in:
parent
cf35e2cf24
commit
a8240c8df3
1 changed files with 25 additions and 9 deletions
|
@ -1,5 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
from asyncio import Task
|
from asyncio import Task
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
|
@ -33,6 +35,7 @@ class Client:
|
||||||
_processing : bool
|
_processing : bool
|
||||||
_authenticated : bool
|
_authenticated : bool
|
||||||
_worker : Task
|
_worker : Task
|
||||||
|
_callbacks = Dict[str, Task]
|
||||||
|
|
||||||
_packet_callbacks : Dict[Type[Packet], List[Callable]]
|
_packet_callbacks : Dict[Type[Packet], List[Callable]]
|
||||||
_logger : logging.Logger
|
_logger : logging.Logger
|
||||||
|
@ -65,6 +68,7 @@ class Client:
|
||||||
self._authenticated = False
|
self._authenticated = False
|
||||||
|
|
||||||
self._packet_callbacks = {}
|
self._packet_callbacks = {}
|
||||||
|
self._callbacks = {}
|
||||||
|
|
||||||
self._logger = LOGGER.getChild(f"{self.host}:{self.port}")
|
self._logger = LOGGER.getChild(f"{self.host}:{self.port}")
|
||||||
|
|
||||||
|
@ -76,6 +80,18 @@ class Client:
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self.started and self.dispatcher.connected
|
return self.started and self.dispatcher.connected
|
||||||
|
|
||||||
|
def _run_async(self, func, pkt:Packet):
|
||||||
|
key = str(uuid.uuid4()) # ugly!
|
||||||
|
|
||||||
|
async def wrapper(packet:Packet):
|
||||||
|
try:
|
||||||
|
await func(packet)
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.error("Exception in callback %s for packet %s | %s", func.__name__, str(packet), str(e))
|
||||||
|
self._callbacks.pop(key, None)
|
||||||
|
|
||||||
|
self._callbacks[key] = asyncio.get_event_loop().create_task(wrapper(pkt))
|
||||||
|
|
||||||
def on_packet(self, packet:Type[Packet], *args) -> Callable: # receive *args for retro compatibility
|
def on_packet(self, packet:Type[Packet], *args) -> Callable: # receive *args for retro compatibility
|
||||||
def wrapper(fun):
|
def wrapper(fun):
|
||||||
if packet not in self._packet_callbacks:
|
if packet not in self._packet_callbacks:
|
||||||
|
@ -133,23 +149,26 @@ class Client:
|
||||||
loop.run_forever(idle())
|
loop.run_forever(idle())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
self._logger.info("Received SIGINT, stopping...")
|
self._logger.info("Received SIGINT, stopping...")
|
||||||
else:
|
try:
|
||||||
self._logger.warning("Client terminating...")
|
|
||||||
|
|
||||||
loop.run_until_complete(self.stop())
|
loop.run_until_complete(self.stop())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
self._logger.info("Received SIGINT, stopping for real")
|
||||||
|
loop.run_until_complete(self.stop(wait_tasks=False))
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self._processing = True
|
self._processing = True
|
||||||
self._worker = asyncio.get_event_loop().create_task(self._client_worker())
|
self._worker = asyncio.get_event_loop().create_task(self._client_worker())
|
||||||
self._logger.info("Minecraft client started")
|
self._logger.info("Minecraft client started")
|
||||||
|
|
||||||
async def stop(self, block=True):
|
async def stop(self, block=True, wait_tasks=True):
|
||||||
self._processing = False
|
self._processing = False
|
||||||
if self.dispatcher.connected:
|
if self.dispatcher.connected:
|
||||||
await self.dispatcher.disconnect(block=block)
|
await self.dispatcher.disconnect(block=block)
|
||||||
if block:
|
if block:
|
||||||
await self._worker
|
await self._worker
|
||||||
self._logger.info("Minecraft client stopped")
|
self._logger.info("Minecraft client stopped")
|
||||||
|
if block and wait_tasks:
|
||||||
|
await asyncio.gather(*list(self._callbacks.values()))
|
||||||
|
|
||||||
async def _client_worker(self):
|
async def _client_worker(self):
|
||||||
while self._processing:
|
while self._processing:
|
||||||
|
@ -251,8 +270,5 @@ class Client:
|
||||||
for packet_type in (Packet, type(packet)): # check both callbacks for base class and instance class
|
for packet_type in (Packet, type(packet)): # check both callbacks for base class and instance class
|
||||||
if packet_type in self._packet_callbacks:
|
if packet_type in self._packet_callbacks:
|
||||||
for cb in self._packet_callbacks[packet_type]:
|
for cb in self._packet_callbacks[packet_type]:
|
||||||
try: # TODO run in executor to not block
|
self._run_async(cb, packet)
|
||||||
await cb(packet)
|
|
||||||
except Exception as e:
|
|
||||||
self._logger.exception("Exception while handling callback")
|
|
||||||
return False
|
return False
|
||||||
|
|
Loading…
Reference in a new issue