feat: spawn run_forever loop on a separate thread which will receive tasks

to run.
feat: add a special function to run blocking functions on the worker thread
fix: wait for the thread to finish when stopping.

Former-commit-id: 33a10f6a22be1a46f272da198f6e672a798ee2c6
This commit is contained in:
cschen 2024-08-10 19:03:44 +02:00
parent 472321cb2d
commit 1b9809e167

View file

@ -55,9 +55,13 @@ class sublimeWorkerThreadExecutor(concurrent.futures.Executor):
class Runtime: class Runtime:
def __init__(self): def __init__(self):
self.tasks = [] self.tasks = []
self.loop = asyncio.get_event_loop() self.loop = asyncio.new_event_loop()
self.loop.set_default_executor(sublimeWorkerThreadExecutor()) self.loop.set_default_executor(sublimeWorkerThreadExecutor())
self.loop.set_debug(True) self.loop.set_debug(True)
self.thread = threading.Thread(
target=self.loop.run_forever, name="codemp-asyncio-loop"
)
self.thread.start()
def __del__(self): def __del__(self):
logger.debug("closing down the event loop") logger.debug("closing down the event loop")
@ -69,16 +73,15 @@ class Runtime:
except Exception as e: except Exception as e:
logger.error(f"Unexpected crash while shutting down event loop: {e}") logger.error(f"Unexpected crash while shutting down event loop: {e}")
self.loop.close() self.stop_loop()
self.thread.join()
def start(self):
pass
def stop_loop(self): def stop_loop(self):
self.loop.call_soon_threadsafe(lambda: asyncio.get_running_loop().stop()) self.loop.call_soon_threadsafe(lambda: asyncio.get_running_loop().stop())
self.thread.join()
def block_on(self, fut): def run_blocking(self, fut, *args, **kwargs):
return self.loop.run_until_complete(fut) return self.loop.run_in_executor(None, fut, *args, **kwargs)
def dispatch(self, coro, name=None): def dispatch(self, coro, name=None):
logging.debug("dispatching coroutine...") logging.debug("dispatching coroutine...")
@ -97,7 +100,7 @@ class Runtime:
def stop_task(self, name): def stop_task(self, name):
task = self.get_task(name) task = self.get_task(name)
if task is not None: if task is not None:
self.block_on(self.wait_for_cancel(task)) self.dispatch(self.wait_for_cancel(task))
async def wait_for_cancel(self, task): async def wait_for_cancel(self, task):
task.cancel() # cancelling a task, merely requests a cancellation. task.cancel() # cancelling a task, merely requests a cancellation.