diff --git a/plugin.py b/plugin.py index 47cd1cd..e2fbe54 100644 --- a/plugin.py +++ b/plugin.py @@ -5,7 +5,7 @@ import sublime_plugin import logging import random -from Codemp.src.task_manager import tm +from Codemp.src.task_manager import rt from Codemp.src.client import client, VirtualClient from Codemp.src.logger import inner_logger from Codemp.src.utils import safe_listener_detach @@ -37,18 +37,19 @@ def plugin_loaded(): # instantiate and start a global asyncio event loop. # pass in the exit_handler coroutine that will be called upon relasing the event loop. - tm.acquire(disconnect_client) - tm.dispatch(inner_logger.listen(), "codemp-logger") + # tm.acquire(disconnect_client) + rt.start() + rt.dispatch(inner_logger.listen(), "codemp-logger") TEXT_LISTENER = CodempClientTextChangeListener() logger.debug("plugin loaded") -async def disconnect_client(): +def disconnect_client(): global TEXT_LISTENER - tm.stop_all() + # rt.stop_all() if TEXT_LISTENER is not None: safe_listener_detach(TEXT_LISTENER) @@ -63,14 +64,17 @@ def plugin_unloaded(): # releasing the runtime, runs the disconnect callback defined when acquiring the event loop. logger.debug("unloading") package_logger.removeHandler(handler) - tm.release(False) + disconnect_client() + rt.stop_loop() + + # tm.release(False) # Listeners ############################################################################## class EventListener(sublime_plugin.EventListener): def on_exit(self): - tm.release(True) + disconnect_client() def on_pre_close_window(self, window): if client.active_workspace is None: @@ -244,7 +248,7 @@ class CodempJoinCommand(sublime_plugin.WindowCommand): print(workspace_id, buffer_id) if buffer_id == "* Don't Join Any": buffer_id = "" - tm.dispatch(JoinCommand(client, workspace_id, buffer_id)) + rt.dispatch(JoinCommand(client, workspace_id, buffer_id)) def is_enabled(self) -> bool: return client.handle is not None @@ -286,7 +290,7 @@ class JoinWorkspaceIdList(sublime_plugin.ListInputHandler): wid = args["workspace_id"] if wid != "": - vws = tm.sync(client.join_workspace(wid)) + vws = rt.block_on(client.join_workspace(wid)) else: vws = None try: @@ -384,7 +388,7 @@ class CodempDisconnectCommand(sublime_plugin.WindowCommand): return False def run(self): - tm.sync(disconnect_client()) + disconnect_client() # Leave Workspace Command diff --git a/src/buffers.py b/src/buffers.py index 9825eb3..eaf7866 100644 --- a/src/buffers.py +++ b/src/buffers.py @@ -5,7 +5,7 @@ from asyncio import CancelledError from codemp import BufferController from Codemp.src import globals as g -from Codemp.src.task_manager import tm +from Codemp.src.task_manager import rt logger = logging.getLogger(__name__) @@ -36,7 +36,7 @@ class VirtualBuffer: self.view.retarget(self.tmpfile) self.view.set_scratch(True) - tm.dispatch( + rt.dispatch( self.apply_bufferchange_task(), f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}", ) @@ -57,7 +57,7 @@ class VirtualBuffer: del s[g.CODEMP_WORKSPACE_ID] self.view.erase_status(g.SUBLIME_STATUS_ID) - tm.stop(f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}") + rt.stop_task(f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}") logger.info(f"cleaning up virtual buffer '{self.codemp_id}'") async def apply_bufferchange_task(self): diff --git a/src/task_manager.py b/src/task_manager.py index 2ee7226..e995968 100644 --- a/src/task_manager.py +++ b/src/task_manager.py @@ -1,76 +1,182 @@ -from typing import Optional +from typing import Optional, Callable, Any +import sublime +import logging import asyncio -from ..ext import sublime_asyncio as rt +import threading +import concurrent.futures + +# from ..ext import sublime_asyncio as rt + +logger = logging.getLogger(__name__) -class TaskManager: +class sublimeWorkerThreadExecutor(concurrent.futures.Executor): + def __init__(self): + self._futures_pending = 0 + self._shutting_down = False + + # reentrant lock: we either increment from the main thread (submit calls) + # or we decrement from the worker thread (futures) + self._condvar = threading.Condition() + + def submit( + self, fn: Callable[..., Any], *args: Any, **kwargs: Any + ) -> concurrent.futures.Future: + if self._shutting_down: + raise RuntimeError("Executor is shutting down") + + with self._condvar: + self._futures_pending += 1 + + logger.debug("Spawning a future in the main thread") + future = concurrent.futures.Future() + + def coro() -> None: + logger.debug("Running a future from the worker thread") + try: + future.set_result(fn(*args, **kwargs)) + except BaseException as e: + future.set_exception(e) + with self._condvar: + self._futures_pending -= 1 + + sublime.set_timeout_async(coro) + return future + + def shutdown(self, wait: bool = True) -> None: + self._shutting_down = True + if not wait: + return + + with self._condvar: + self._condvar.wait_for(lambda: self._futures_pending == 0) + + +class Runtime: def __init__(self): self.tasks = [] - self.runtime = rt - self.exit_handler_id = None + self.loop = asyncio.get_event_loop() + self.loop.set_default_executor(sublimeWorkerThreadExecutor()) + self.loop.set_debug(True) - def acquire(self, exit_handler): - if self.exit_handler_id is None: - # don't allow multiple exit handlers - self.exit_handler_id = self.runtime.acquire(exit_handler) + def __del__(self): + logger.debug("closing down the event loop") + for task in self.tasks: + task.cancel() - return self.exit_handler_id + try: + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + except Exception as e: + logger.error(f"Unexpected crash while shutting down event loop: {e}") - def release(self, at_exit): - self.runtime.release(at_exit=at_exit, exit_handler_id=self.exit_handler_id) - self.exit_handler_id = None + self.loop.close() + + def start(self): + pass + + def stop_loop(self): + self.loop.call_soon_threadsafe(lambda: asyncio.get_running_loop().stop()) + + def block_on(self, fut): + return self.loop.run_until_complete(fut) def dispatch(self, coro, name=None): - self.runtime.dispatch(coro, self.store_named_lambda(name)) + logging.debug("dispatching coroutine...") - def sync(self, coro): - return self.runtime.sync(coro) - - def remove_stopped(self): - self.tasks = list(filter(lambda T: not T.cancelled(), self.tasks)) - - def store(self, task, name=None): - if name is not None: + def make_task(): + logging.debug("creating task on the loop.") + task = self.loop.create_task(coro) task.set_name(name) - self.tasks.append(task) - self.remove_stopped() + self.tasks.append(task) - def store_named_lambda(self, name=None): - def _store(task): - self.store(task, name) - - return _store + self.loop.call_soon_threadsafe(make_task) def get_task(self, name) -> Optional[asyncio.Task]: return next((t for t in self.tasks if t.get_name() == name), None) - def get_task_idx(self, name) -> Optional[int]: - return next( - (i for (i, t) in enumerate(self.tasks) if t.get_name() == name), None - ) + def stop_task(self, name): + task = self.get_task(name) + if task is not None: + self.block_on(self.wait_for_cancel(task)) - def pop_task(self, name) -> Optional[asyncio.Task]: - idx = self.get_task_idx(name) - if id is not None: - return self.tasks.pop(idx) - return None - - async def _stop(self, task): + async def wait_for_cancel(self, task): task.cancel() # cancelling a task, merely requests a cancellation. try: await task except asyncio.CancelledError: return - def stop(self, name): - t = self.get_task(name) - if t is not None: - self.runtime.dispatch(self._stop(t)) - def stop_all(self): - for task in self.tasks: - self.runtime.dispatch(self._stop(task)) +# class TaskManager: +# def __init__(self): +# self.tasks = [] +# self.runtime = rt +# self.exit_handler_id = None + +# def acquire(self, exit_handler): +# if self.exit_handler_id is None: +# # don't allow multiple exit handlers +# self.exit_handler_id = self.runtime.acquire(exit_handler) + +# return self.exit_handler_id + +# def release(self, at_exit): +# self.runtime.release(at_exit=at_exit, exit_handler_id=self.exit_handler_id) +# self.exit_handler_id = None + +# def dispatch(self, coro, name=None): +# self.runtime.dispatch(coro, self.store_named_lambda(name)) + +# def sync(self, coro): +# return self.runtime.sync(coro) + +# def remove_stopped(self): +# self.tasks = list(filter(lambda T: not T.cancelled(), self.tasks)) + +# def store(self, task, name=None): +# if name is not None: +# task.set_name(name) +# self.tasks.append(task) +# self.remove_stopped() + +# def store_named_lambda(self, name=None): +# def _store(task): +# self.store(task, name) + +# return _store + +# def get_task(self, name) -> Optional[asyncio.Task]: +# return next((t for t in self.tasks if t.get_name() == name), None) + +# def get_task_idx(self, name) -> Optional[int]: +# return next( +# (i for (i, t) in enumerate(self.tasks) if t.get_name() == name), None +# ) + +# def pop_task(self, name) -> Optional[asyncio.Task]: +# idx = self.get_task_idx(name) +# if id is not None: +# return self.tasks.pop(idx) +# return None + +# async def _stop(self, task): +# task.cancel() # cancelling a task, merely requests a cancellation. +# try: +# await task +# except asyncio.CancelledError: +# return + +# def stop(self, name): +# t = self.get_task(name) +# if t is not None: +# self.runtime.dispatch(self._stop(t)) + +# def stop_all(self): +# for task in self.tasks: +# self.runtime.dispatch(self._stop(task)) -# singleton instance -tm = TaskManager() +# # singleton instance +# tm = TaskManager() + +rt = Runtime() diff --git a/src/workspace.py b/src/workspace.py index 13a62a6..5776667 100644 --- a/src/workspace.py +++ b/src/workspace.py @@ -10,7 +10,7 @@ from codemp import Workspace from Codemp.src import globals as g from Codemp.src.buffers import VirtualBuffer -from Codemp.src.task_manager import tm +from Codemp.src.task_manager import rt from Codemp.src.utils import rowcol_to_region logger = logging.getLogger(__name__) @@ -95,7 +95,7 @@ class VirtualWorkspace: self.materialized = True def activate(self): - tm.dispatch( + rt.dispatch( self.move_cursor_task(), f"{g.CURCTL_TASK_PREFIX}-{self.id}", ) @@ -103,7 +103,7 @@ class VirtualWorkspace: def deactivate(self): if self.isactive: - tm.stop(f"{g.CURCTL_TASK_PREFIX}-{self.id}") + rt.stop_task(f"{g.CURCTL_TASK_PREFIX}-{self.id}") self.isactive = False