diff --git a/src/treepuncher/traits/__init__.py b/src/treepuncher/traits/__init__.py new file mode 100644 index 0000000..298ba13 --- /dev/null +++ b/src/treepuncher/traits/__init__.py @@ -0,0 +1,3 @@ +from .callbacks import CallbacksHolder +from .runnable import Runnable + diff --git a/src/treepuncher/traits/callbacks.py b/src/treepuncher/traits/callbacks.py new file mode 100644 index 0000000..5c833a4 --- /dev/null +++ b/src/treepuncher/traits/callbacks.py @@ -0,0 +1,58 @@ +import asyncio +import uuid +import logging + +from inspect import isclass +from typing import Dict, List, Set, Any, Callable, Type + +from ..events.base import BaseEvent + +class CallbacksHolder: + + _callbacks : Dict[Any, List[Callable]] + _tasks : Dict[uuid.UUID, asyncio.Event] + + _logger : logging.Logger + + def __init__(self): + super().__init__() + self._callbacks = {} + self._tasks = {} + + def callback_keys(self, filter:Type = None) -> Set[Any]: + return set(x for x in self._callbacks.keys() if not filter or (isclass(x) and issubclass(x, filter))) + + def register(self, key:Any, callback:Callable): + if key not in self._callbacks: + self._callbacks[key] = [] + self._callbacks[key].append(callback) + return callback + + def trigger(self, key:Any) -> List[Callable]: + if key not in self._callbacks: + return [] + return self._callbacks[key] + + def _wrap(self, cb:Callable, uid:uuid.UUID) -> Callable: + async def wrapper(*args): + try: + ret = await cb(*args) + except Exception: + logging.exception("Exception processing callback") + ret = None + self._tasks[uid].set() + self._tasks.pop(uid) + return ret + return wrapper + + def run_callbacks(self, key:Any, *args) -> None: + for cb in self.trigger(key): + task_id = uuid.uuid4() + self._tasks[task_id] = asyncio.Event() + + asyncio.get_event_loop().create_task(self._wrap(cb, task_id)(*args)) + + async def join_callbacks(self): + await asyncio.gather(*list(t.wait() for t in self._tasks.values())) + self._tasks.clear() + diff --git a/src/treepuncher/traits/runnable.py b/src/treepuncher/traits/runnable.py new file mode 100644 index 0000000..eee0d29 --- /dev/null +++ b/src/treepuncher/traits/runnable.py @@ -0,0 +1,45 @@ +import asyncio +import logging + +from typing import Optional +from signal import signal, SIGINT, SIGTERM, SIGABRT + +class Runnable: + _is_running : bool + _stop_task : Optional[asyncio.Task] + + def __init__(self): + self._is_running = False + self._stop_task = None + + async def start(self): + self._is_running = True + + async def stop(self, force:bool=False): + self._is_running = False + + def run(self): + logging.info("Starting process") + + def signal_handler(signum, __): + if signum == SIGINT: + if self._stop_task: + self._stop_task.cancel() + logging.info("Received SIGINT, terminating") + else: + logging.info("Received SIGINT, stopping gracefully...") + self._stop_task = asyncio.get_event_loop().create_task(self.stop(force=self._stop_task is not None)) + + signal(SIGINT, signal_handler) + + loop = asyncio.get_event_loop() + + async def main(): + await self.start() + while self._is_running: + await asyncio.sleep(1) + + loop.run_until_complete(main()) + + logging.info("Process finished") +