hopeful fix for self-stop

This commit is contained in:
əlemi 2022-04-19 11:54:13 +02:00
parent c87dbd63e2
commit 5527d50ff3
No known key found for this signature in database
GPG key ID: BBCBFE5D7244634E
2 changed files with 9 additions and 12 deletions

View file

@ -10,7 +10,7 @@ from ..events.base import BaseEvent
class CallbacksHolder: class CallbacksHolder:
_callbacks : Dict[Any, List[Callable]] _callbacks : Dict[Any, List[Callable]]
_tasks : Dict[uuid.UUID, asyncio.Event] _tasks : Dict[uuid.UUID, asyncio.Task]
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@ -34,23 +34,20 @@ class CallbacksHolder:
def _wrap(self, cb:Callable, uid:uuid.UUID) -> Callable: def _wrap(self, cb:Callable, uid:uuid.UUID) -> Callable:
async def wrapper(*args): async def wrapper(*args):
try: try:
ret = await cb(*args) return await cb(*args)
except Exception: except Exception:
logging.exception("Exception processing callback") logging.exception("Exception processing callback")
ret = None return None
self._tasks[uid].set() finally:
self._tasks.pop(uid) self._tasks.pop(uid)
return ret
return wrapper return wrapper
def run_callbacks(self, key:Any, *args) -> None: def run_callbacks(self, key:Any, *args) -> None:
for cb in self.trigger(key): for cb in self.trigger(key):
task_id = uuid.uuid4() task_id = uuid.uuid4()
self._tasks[task_id] = asyncio.Event() self._tasks[task_id] = asyncio.get_event_loop().create_task(self._wrap(cb, task_id)(*args))
asyncio.get_event_loop().create_task(self._wrap(cb, task_id)(*args))
async def join_callbacks(self): async def join_callbacks(self):
await asyncio.gather(*list(t.wait() for t in self._tasks.values())) await asyncio.gather(*list(self._tasks.values()))
self._tasks.clear() self._tasks.clear()

View file

@ -221,9 +221,9 @@ class Treepuncher(
await self.dispatcher.disconnect(block=not force) await self.dispatcher.disconnect(block=not force)
if not force: if not force:
await self._worker await self._worker
await self.join_callbacks()
for m in self.modules: for m in self.modules:
await m.cleanup() await m.cleanup()
await self.join_callbacks()
await super().stop() await super().stop()
self.logger.info("Treepuncher stopped") self.logger.info("Treepuncher stopped")