mirror of
https://github.com/hexedtech/codemp-sublime.git
synced 2024-11-23 23:34:48 +01:00
feat: wrote executor on the sublime worker thread, (should be correct)
started working on the runtime, first attepts were not very good... xD Former-commit-id: e3f07d5fc4fb6189931f6673cab9295eaebfb05d
This commit is contained in:
parent
d3388bd6d3
commit
472321cb2d
4 changed files with 175 additions and 65 deletions
24
plugin.py
24
plugin.py
|
@ -5,7 +5,7 @@ import sublime_plugin
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
|
||||||
from Codemp.src.task_manager import tm
|
from Codemp.src.task_manager import rt
|
||||||
from Codemp.src.client import client, VirtualClient
|
from Codemp.src.client import client, VirtualClient
|
||||||
from Codemp.src.logger import inner_logger
|
from Codemp.src.logger import inner_logger
|
||||||
from Codemp.src.utils import safe_listener_detach
|
from Codemp.src.utils import safe_listener_detach
|
||||||
|
@ -37,18 +37,19 @@ def plugin_loaded():
|
||||||
|
|
||||||
# instantiate and start a global asyncio event loop.
|
# instantiate and start a global asyncio event loop.
|
||||||
# pass in the exit_handler coroutine that will be called upon relasing the event loop.
|
# pass in the exit_handler coroutine that will be called upon relasing the event loop.
|
||||||
tm.acquire(disconnect_client)
|
# tm.acquire(disconnect_client)
|
||||||
tm.dispatch(inner_logger.listen(), "codemp-logger")
|
rt.start()
|
||||||
|
rt.dispatch(inner_logger.listen(), "codemp-logger")
|
||||||
|
|
||||||
TEXT_LISTENER = CodempClientTextChangeListener()
|
TEXT_LISTENER = CodempClientTextChangeListener()
|
||||||
|
|
||||||
logger.debug("plugin loaded")
|
logger.debug("plugin loaded")
|
||||||
|
|
||||||
|
|
||||||
async def disconnect_client():
|
def disconnect_client():
|
||||||
global TEXT_LISTENER
|
global TEXT_LISTENER
|
||||||
|
|
||||||
tm.stop_all()
|
# rt.stop_all()
|
||||||
|
|
||||||
if TEXT_LISTENER is not None:
|
if TEXT_LISTENER is not None:
|
||||||
safe_listener_detach(TEXT_LISTENER)
|
safe_listener_detach(TEXT_LISTENER)
|
||||||
|
@ -63,14 +64,17 @@ def plugin_unloaded():
|
||||||
# releasing the runtime, runs the disconnect callback defined when acquiring the event loop.
|
# releasing the runtime, runs the disconnect callback defined when acquiring the event loop.
|
||||||
logger.debug("unloading")
|
logger.debug("unloading")
|
||||||
package_logger.removeHandler(handler)
|
package_logger.removeHandler(handler)
|
||||||
tm.release(False)
|
disconnect_client()
|
||||||
|
rt.stop_loop()
|
||||||
|
|
||||||
|
# tm.release(False)
|
||||||
|
|
||||||
|
|
||||||
# Listeners
|
# Listeners
|
||||||
##############################################################################
|
##############################################################################
|
||||||
class EventListener(sublime_plugin.EventListener):
|
class EventListener(sublime_plugin.EventListener):
|
||||||
def on_exit(self):
|
def on_exit(self):
|
||||||
tm.release(True)
|
disconnect_client()
|
||||||
|
|
||||||
def on_pre_close_window(self, window):
|
def on_pre_close_window(self, window):
|
||||||
if client.active_workspace is None:
|
if client.active_workspace is None:
|
||||||
|
@ -244,7 +248,7 @@ class CodempJoinCommand(sublime_plugin.WindowCommand):
|
||||||
print(workspace_id, buffer_id)
|
print(workspace_id, buffer_id)
|
||||||
if buffer_id == "* Don't Join Any":
|
if buffer_id == "* Don't Join Any":
|
||||||
buffer_id = ""
|
buffer_id = ""
|
||||||
tm.dispatch(JoinCommand(client, workspace_id, buffer_id))
|
rt.dispatch(JoinCommand(client, workspace_id, buffer_id))
|
||||||
|
|
||||||
def is_enabled(self) -> bool:
|
def is_enabled(self) -> bool:
|
||||||
return client.handle is not None
|
return client.handle is not None
|
||||||
|
@ -286,7 +290,7 @@ class JoinWorkspaceIdList(sublime_plugin.ListInputHandler):
|
||||||
|
|
||||||
wid = args["workspace_id"]
|
wid = args["workspace_id"]
|
||||||
if wid != "":
|
if wid != "":
|
||||||
vws = tm.sync(client.join_workspace(wid))
|
vws = rt.block_on(client.join_workspace(wid))
|
||||||
else:
|
else:
|
||||||
vws = None
|
vws = None
|
||||||
try:
|
try:
|
||||||
|
@ -384,7 +388,7 @@ class CodempDisconnectCommand(sublime_plugin.WindowCommand):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tm.sync(disconnect_client())
|
disconnect_client()
|
||||||
|
|
||||||
|
|
||||||
# Leave Workspace Command
|
# Leave Workspace Command
|
||||||
|
|
|
@ -5,7 +5,7 @@ from asyncio import CancelledError
|
||||||
|
|
||||||
from codemp import BufferController
|
from codemp import BufferController
|
||||||
from Codemp.src import globals as g
|
from Codemp.src import globals as g
|
||||||
from Codemp.src.task_manager import tm
|
from Codemp.src.task_manager import rt
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ class VirtualBuffer:
|
||||||
self.view.retarget(self.tmpfile)
|
self.view.retarget(self.tmpfile)
|
||||||
self.view.set_scratch(True)
|
self.view.set_scratch(True)
|
||||||
|
|
||||||
tm.dispatch(
|
rt.dispatch(
|
||||||
self.apply_bufferchange_task(),
|
self.apply_bufferchange_task(),
|
||||||
f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}",
|
f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}",
|
||||||
)
|
)
|
||||||
|
@ -57,7 +57,7 @@ class VirtualBuffer:
|
||||||
del s[g.CODEMP_WORKSPACE_ID]
|
del s[g.CODEMP_WORKSPACE_ID]
|
||||||
self.view.erase_status(g.SUBLIME_STATUS_ID)
|
self.view.erase_status(g.SUBLIME_STATUS_ID)
|
||||||
|
|
||||||
tm.stop(f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}")
|
rt.stop_task(f"{g.BUFFCTL_TASK_PREFIX}-{self.codemp_id}")
|
||||||
logger.info(f"cleaning up virtual buffer '{self.codemp_id}'")
|
logger.info(f"cleaning up virtual buffer '{self.codemp_id}'")
|
||||||
|
|
||||||
async def apply_bufferchange_task(self):
|
async def apply_bufferchange_task(self):
|
||||||
|
|
|
@ -1,76 +1,182 @@
|
||||||
from typing import Optional
|
from typing import Optional, Callable, Any
|
||||||
|
import sublime
|
||||||
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
from ..ext import sublime_asyncio as rt
|
import threading
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
|
# from ..ext import sublime_asyncio as rt
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TaskManager:
|
class sublimeWorkerThreadExecutor(concurrent.futures.Executor):
|
||||||
|
def __init__(self):
|
||||||
|
self._futures_pending = 0
|
||||||
|
self._shutting_down = False
|
||||||
|
|
||||||
|
# reentrant lock: we either increment from the main thread (submit calls)
|
||||||
|
# or we decrement from the worker thread (futures)
|
||||||
|
self._condvar = threading.Condition()
|
||||||
|
|
||||||
|
def submit(
|
||||||
|
self, fn: Callable[..., Any], *args: Any, **kwargs: Any
|
||||||
|
) -> concurrent.futures.Future:
|
||||||
|
if self._shutting_down:
|
||||||
|
raise RuntimeError("Executor is shutting down")
|
||||||
|
|
||||||
|
with self._condvar:
|
||||||
|
self._futures_pending += 1
|
||||||
|
|
||||||
|
logger.debug("Spawning a future in the main thread")
|
||||||
|
future = concurrent.futures.Future()
|
||||||
|
|
||||||
|
def coro() -> None:
|
||||||
|
logger.debug("Running a future from the worker thread")
|
||||||
|
try:
|
||||||
|
future.set_result(fn(*args, **kwargs))
|
||||||
|
except BaseException as e:
|
||||||
|
future.set_exception(e)
|
||||||
|
with self._condvar:
|
||||||
|
self._futures_pending -= 1
|
||||||
|
|
||||||
|
sublime.set_timeout_async(coro)
|
||||||
|
return future
|
||||||
|
|
||||||
|
def shutdown(self, wait: bool = True) -> None:
|
||||||
|
self._shutting_down = True
|
||||||
|
if not wait:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._condvar:
|
||||||
|
self._condvar.wait_for(lambda: self._futures_pending == 0)
|
||||||
|
|
||||||
|
|
||||||
|
class Runtime:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.tasks = []
|
self.tasks = []
|
||||||
self.runtime = rt
|
self.loop = asyncio.get_event_loop()
|
||||||
self.exit_handler_id = None
|
self.loop.set_default_executor(sublimeWorkerThreadExecutor())
|
||||||
|
self.loop.set_debug(True)
|
||||||
|
|
||||||
def acquire(self, exit_handler):
|
def __del__(self):
|
||||||
if self.exit_handler_id is None:
|
logger.debug("closing down the event loop")
|
||||||
# don't allow multiple exit handlers
|
for task in self.tasks:
|
||||||
self.exit_handler_id = self.runtime.acquire(exit_handler)
|
task.cancel()
|
||||||
|
|
||||||
return self.exit_handler_id
|
try:
|
||||||
|
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected crash while shutting down event loop: {e}")
|
||||||
|
|
||||||
def release(self, at_exit):
|
self.loop.close()
|
||||||
self.runtime.release(at_exit=at_exit, exit_handler_id=self.exit_handler_id)
|
|
||||||
self.exit_handler_id = None
|
def start(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stop_loop(self):
|
||||||
|
self.loop.call_soon_threadsafe(lambda: asyncio.get_running_loop().stop())
|
||||||
|
|
||||||
|
def block_on(self, fut):
|
||||||
|
return self.loop.run_until_complete(fut)
|
||||||
|
|
||||||
def dispatch(self, coro, name=None):
|
def dispatch(self, coro, name=None):
|
||||||
self.runtime.dispatch(coro, self.store_named_lambda(name))
|
logging.debug("dispatching coroutine...")
|
||||||
|
|
||||||
def sync(self, coro):
|
def make_task():
|
||||||
return self.runtime.sync(coro)
|
logging.debug("creating task on the loop.")
|
||||||
|
task = self.loop.create_task(coro)
|
||||||
def remove_stopped(self):
|
|
||||||
self.tasks = list(filter(lambda T: not T.cancelled(), self.tasks))
|
|
||||||
|
|
||||||
def store(self, task, name=None):
|
|
||||||
if name is not None:
|
|
||||||
task.set_name(name)
|
task.set_name(name)
|
||||||
self.tasks.append(task)
|
self.tasks.append(task)
|
||||||
self.remove_stopped()
|
|
||||||
|
|
||||||
def store_named_lambda(self, name=None):
|
self.loop.call_soon_threadsafe(make_task)
|
||||||
def _store(task):
|
|
||||||
self.store(task, name)
|
|
||||||
|
|
||||||
return _store
|
|
||||||
|
|
||||||
def get_task(self, name) -> Optional[asyncio.Task]:
|
def get_task(self, name) -> Optional[asyncio.Task]:
|
||||||
return next((t for t in self.tasks if t.get_name() == name), None)
|
return next((t for t in self.tasks if t.get_name() == name), None)
|
||||||
|
|
||||||
def get_task_idx(self, name) -> Optional[int]:
|
def stop_task(self, name):
|
||||||
return next(
|
task = self.get_task(name)
|
||||||
(i for (i, t) in enumerate(self.tasks) if t.get_name() == name), None
|
if task is not None:
|
||||||
)
|
self.block_on(self.wait_for_cancel(task))
|
||||||
|
|
||||||
def pop_task(self, name) -> Optional[asyncio.Task]:
|
async def wait_for_cancel(self, task):
|
||||||
idx = self.get_task_idx(name)
|
|
||||||
if id is not None:
|
|
||||||
return self.tasks.pop(idx)
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def _stop(self, task):
|
|
||||||
task.cancel() # cancelling a task, merely requests a cancellation.
|
task.cancel() # cancelling a task, merely requests a cancellation.
|
||||||
try:
|
try:
|
||||||
await task
|
await task
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
|
|
||||||
def stop(self, name):
|
|
||||||
t = self.get_task(name)
|
|
||||||
if t is not None:
|
|
||||||
self.runtime.dispatch(self._stop(t))
|
|
||||||
|
|
||||||
def stop_all(self):
|
# class TaskManager:
|
||||||
for task in self.tasks:
|
# def __init__(self):
|
||||||
self.runtime.dispatch(self._stop(task))
|
# self.tasks = []
|
||||||
|
# self.runtime = rt
|
||||||
|
# self.exit_handler_id = None
|
||||||
|
|
||||||
|
# def acquire(self, exit_handler):
|
||||||
|
# if self.exit_handler_id is None:
|
||||||
|
# # don't allow multiple exit handlers
|
||||||
|
# self.exit_handler_id = self.runtime.acquire(exit_handler)
|
||||||
|
|
||||||
|
# return self.exit_handler_id
|
||||||
|
|
||||||
|
# def release(self, at_exit):
|
||||||
|
# self.runtime.release(at_exit=at_exit, exit_handler_id=self.exit_handler_id)
|
||||||
|
# self.exit_handler_id = None
|
||||||
|
|
||||||
|
# def dispatch(self, coro, name=None):
|
||||||
|
# self.runtime.dispatch(coro, self.store_named_lambda(name))
|
||||||
|
|
||||||
|
# def sync(self, coro):
|
||||||
|
# return self.runtime.sync(coro)
|
||||||
|
|
||||||
|
# def remove_stopped(self):
|
||||||
|
# self.tasks = list(filter(lambda T: not T.cancelled(), self.tasks))
|
||||||
|
|
||||||
|
# def store(self, task, name=None):
|
||||||
|
# if name is not None:
|
||||||
|
# task.set_name(name)
|
||||||
|
# self.tasks.append(task)
|
||||||
|
# self.remove_stopped()
|
||||||
|
|
||||||
|
# def store_named_lambda(self, name=None):
|
||||||
|
# def _store(task):
|
||||||
|
# self.store(task, name)
|
||||||
|
|
||||||
|
# return _store
|
||||||
|
|
||||||
|
# def get_task(self, name) -> Optional[asyncio.Task]:
|
||||||
|
# return next((t for t in self.tasks if t.get_name() == name), None)
|
||||||
|
|
||||||
|
# def get_task_idx(self, name) -> Optional[int]:
|
||||||
|
# return next(
|
||||||
|
# (i for (i, t) in enumerate(self.tasks) if t.get_name() == name), None
|
||||||
|
# )
|
||||||
|
|
||||||
|
# def pop_task(self, name) -> Optional[asyncio.Task]:
|
||||||
|
# idx = self.get_task_idx(name)
|
||||||
|
# if id is not None:
|
||||||
|
# return self.tasks.pop(idx)
|
||||||
|
# return None
|
||||||
|
|
||||||
|
# async def _stop(self, task):
|
||||||
|
# task.cancel() # cancelling a task, merely requests a cancellation.
|
||||||
|
# try:
|
||||||
|
# await task
|
||||||
|
# except asyncio.CancelledError:
|
||||||
|
# return
|
||||||
|
|
||||||
|
# def stop(self, name):
|
||||||
|
# t = self.get_task(name)
|
||||||
|
# if t is not None:
|
||||||
|
# self.runtime.dispatch(self._stop(t))
|
||||||
|
|
||||||
|
# def stop_all(self):
|
||||||
|
# for task in self.tasks:
|
||||||
|
# self.runtime.dispatch(self._stop(task))
|
||||||
|
|
||||||
|
|
||||||
# singleton instance
|
# # singleton instance
|
||||||
tm = TaskManager()
|
# tm = TaskManager()
|
||||||
|
|
||||||
|
rt = Runtime()
|
||||||
|
|
|
@ -10,7 +10,7 @@ from codemp import Workspace
|
||||||
|
|
||||||
from Codemp.src import globals as g
|
from Codemp.src import globals as g
|
||||||
from Codemp.src.buffers import VirtualBuffer
|
from Codemp.src.buffers import VirtualBuffer
|
||||||
from Codemp.src.task_manager import tm
|
from Codemp.src.task_manager import rt
|
||||||
from Codemp.src.utils import rowcol_to_region
|
from Codemp.src.utils import rowcol_to_region
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -95,7 +95,7 @@ class VirtualWorkspace:
|
||||||
self.materialized = True
|
self.materialized = True
|
||||||
|
|
||||||
def activate(self):
|
def activate(self):
|
||||||
tm.dispatch(
|
rt.dispatch(
|
||||||
self.move_cursor_task(),
|
self.move_cursor_task(),
|
||||||
f"{g.CURCTL_TASK_PREFIX}-{self.id}",
|
f"{g.CURCTL_TASK_PREFIX}-{self.id}",
|
||||||
)
|
)
|
||||||
|
@ -103,7 +103,7 @@ class VirtualWorkspace:
|
||||||
|
|
||||||
def deactivate(self):
|
def deactivate(self):
|
||||||
if self.isactive:
|
if self.isactive:
|
||||||
tm.stop(f"{g.CURCTL_TASK_PREFIX}-{self.id}")
|
rt.stop_task(f"{g.CURCTL_TASK_PREFIX}-{self.id}")
|
||||||
|
|
||||||
self.isactive = False
|
self.isactive = False
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue