diff --git a/aiocraft/client.py b/aiocraft/client.py index 7f39d68..968b39c 100644 --- a/aiocraft/client.py +++ b/aiocraft/client.py @@ -1,5 +1,7 @@ import asyncio import logging +import uuid + from asyncio import Task from enum import Enum @@ -33,6 +35,7 @@ class Client: _processing : bool _authenticated : bool _worker : Task + _callbacks = Dict[str, Task] _packet_callbacks : Dict[Type[Packet], List[Callable]] _logger : logging.Logger @@ -65,6 +68,7 @@ class Client: self._authenticated = False self._packet_callbacks = {} + self._callbacks = {} self._logger = LOGGER.getChild(f"{self.host}:{self.port}") @@ -76,6 +80,18 @@ class Client: def connected(self) -> bool: return self.started and self.dispatcher.connected + def _run_async(self, func, pkt:Packet): + key = str(uuid.uuid4()) # ugly! + + async def wrapper(packet:Packet): + try: + await func(packet) + except Exception as e: + self._logger.error("Exception in callback %s for packet %s | %s", func.__name__, str(packet), str(e)) + self._callbacks.pop(key, None) + + self._callbacks[key] = asyncio.get_event_loop().create_task(wrapper(pkt)) + def on_packet(self, packet:Type[Packet], *args) -> Callable: # receive *args for retro compatibility def wrapper(fun): if packet not in self._packet_callbacks: @@ -133,23 +149,26 @@ class Client: loop.run_forever(idle()) except KeyboardInterrupt: self._logger.info("Received SIGINT, stopping...") - else: - self._logger.warning("Client terminating...") - - loop.run_until_complete(self.stop()) + try: + loop.run_until_complete(self.stop()) + except KeyboardInterrupt: + self._logger.info("Received SIGINT, stopping for real") + loop.run_until_complete(self.stop(wait_tasks=False)) async def start(self): self._processing = True self._worker = asyncio.get_event_loop().create_task(self._client_worker()) self._logger.info("Minecraft client started") - async def stop(self, block=True): + async def stop(self, block=True, wait_tasks=True): self._processing = False if self.dispatcher.connected: await self.dispatcher.disconnect(block=block) if block: await self._worker self._logger.info("Minecraft client stopped") + if block and wait_tasks: + await asyncio.gather(*list(self._callbacks.values())) async def _client_worker(self): while self._processing: @@ -251,8 +270,5 @@ class Client: for packet_type in (Packet, type(packet)): # check both callbacks for base class and instance class if packet_type in self._packet_callbacks: for cb in self._packet_callbacks[packet_type]: - try: # TODO run in executor to not block - await cb(packet) - except Exception as e: - self._logger.exception("Exception while handling callback") + self._run_async(cb, packet) return False