diff --git a/devlib/utils/asyn.py b/devlib/utils/asyn.py index c0e415612..66510a46a 100644 --- a/devlib/utils/asyn.py +++ b/devlib/utils/asyn.py @@ -20,23 +20,19 @@ import abc import asyncio +import asyncio.events import functools import itertools import contextlib import pathlib import os.path import inspect +import sys +import threading +from concurrent.futures import ThreadPoolExecutor +from weakref import WeakSet, WeakKeyDictionary -# Allow nesting asyncio loops, which is necessary for: -# * Being able to call the blocking variant of a function from an async -# function for backward compat -# * Critically, run the blocking variant of a function in a Jupyter notebook -# environment, since it also uses asyncio. -# -# Maybe there is still hope for future versions of Python though: -# https://bugs.python.org/issue22239 -import nest_asyncio -nest_asyncio.apply() +from greenlet import greenlet def create_task(awaitable, name=None): @@ -292,12 +288,288 @@ def __set_name__(self, owner, name): self.name = name +class _Genlet(greenlet): + """ + Generator-like object based on ``greenlets``. It allows nested :class:`_Genlet` + to make their parent yield on their behalf, as if callees could decide to + be annotated ``yield from`` without modifying the caller. + """ + @classmethod + def from_coro(cls, coro): + """ + Create a :class:`_Genlet` from a given coroutine, treating it as a + generator. + """ + f = lambda value: self.consume_coro(coro, value) + self = cls(f) + return self + + def consume_coro(self, coro, value): + """ + Send ``value`` to ``coro`` then consume the coroutine, passing all its + yielded actions to the enclosing :class:`_Genlet`. This allows crossing + blocking calls layers as if they were async calls with `await`. + """ + excep = None + while True: + try: + if excep is None: + future = coro.send(value) + else: + future = coro.throw(excep) + + except StopIteration as e: + return e.value + else: + # Switch back to the consumer that returns the values via + # send() + try: + value = self.consumer_genlet.switch(future) + except BaseException as e: + excep = e + value = None + else: + excep = None + + + @classmethod + def get_enclosing(cls): + """ + Get the immediately enclosing :class:`_Genlet` in the callstack or + ``None``. + """ + g = greenlet.getcurrent() + while not (isinstance(g, cls) or g is None): + g = g.parent + return g + + def _send_throw(self, value, excep): + self.consumer_genlet = greenlet.getcurrent() + + # Switch back to the function yielding values + if excep is None: + result = self.switch(value) + else: + result = self.throw(excep) + + if self: + return result + else: + raise StopIteration(result) + + def gen_send(self, x): + """ + Similar to generators' ``send`` method. + """ + return self._send_throw(x, None) + + def gen_throw(self, x): + """ + Similar to generators' ``throw`` method. + """ + return self._send_throw(None, x) + + +class _AwaitableGenlet: + """ + Wrap a coroutine with a :class:`_Genlet` and wrap that to be awaitable. + """ + + @classmethod + def wrap_coro(cls, coro): + if _Genlet.get_enclosing() is None: + # Create a top-level _Genlet that all nested runs will use to yield + # their futures + aw = cls(coro) + async def coro_f(): + return await aw + return coro_f() + else: + return coro + + def __init__(self, coro): + self._coro = coro + + def __await__(self): + coro = self._coro + is_started = inspect.iscoroutine(coro) and coro.cr_running + + def genf(): + gen = _Genlet.from_coro(coro) + value = None + excep = None + + # The coroutine is already started, so we need to dispatch the + # value from the upcoming send() to the gen without running + # gen first. + if is_started: + try: + value = yield + except BaseException as e: + excep = e + + while True: + try: + if excep is None: + future = gen.gen_send(value) + else: + future = gen.gen_throw(excep) + except StopIteration as e: + return e.value + + try: + value = yield future + except BaseException as e: + excep = e + value = None + else: + excep = None + + gen = genf() + if is_started: + # Start the generator so it waits at the first yield point + gen.gen_send(None) + + return gen + + +def allow_nested_run(coro): + """ + Wrap the coroutine ``coro`` such that nested calls to :func:`run` will be + allowed. + + .. warning:: The coroutine needs to be consumed in the same OS thread it + was created in. + """ + return _allow_nested_run(coro, loop=None) + + +def _allow_nested_run(coro, loop=None): + return _do_allow_nested_run(coro) + + +def _do_allow_nested_run(coro): + return _AwaitableGenlet.wrap_coro(coro) + + +# This thread runs coroutines that cannot be ran on the event loop in the +# current thread. Instead, they are scheduled in a separate thread where +# another event loop has been setup, so we can wrap coroutines before +# dispatching them there. +_CORO_THREAD_EXECUTOR = ThreadPoolExecutor(max_workers=1) +def _coro_thread_f(coro): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + _install_task_factory(loop) + # The coroutine needs to be wrapped in the same thread that will consume it, + coro = _allow_nested_run(coro, loop) + return loop.run_until_complete(coro) + + +def _run_in_thread(coro): + # This is a truly blocking operation, which will block the caller's event + # loop. However, this also prevents most thread safety issues as the + # calling code will not run concurrently with the coroutine. We also don't + # have a choice anyway. + future = _CORO_THREAD_EXECUTOR.submit(_coro_thread_f, coro) + return future.result() + + +_PATCHED_LOOP_LOCK = threading.Lock() +_PATCHED_LOOP = WeakSet() + +def _install_task_factory(loop): + """ + Install a task factory on the given event ``loop`` so that top-level + coroutines are wrapped using :func:`allow_nested_run`. This ensures that + the nested :func:`run` infrastructure will be available. + """ + def install(loop): + if sys.version_info >= (3, 11): + def default_factory(loop, coro, context=None): + return asyncio.Task(coro, loop=loop, context=context) + else: + def default_factory(loop, coro, context=None): + return asyncio.Task(coro, loop=loop) + + make_task = loop.get_task_factory() or default_factory + def factory(loop, coro, context=None): + coro = _allow_nested_run(coro, loop) + return make_task(loop, coro, context=context) + + loop.set_task_factory(factory) + + with _PATCHED_LOOP_LOCK: + if loop in _PATCHED_LOOP: + return + else: + install(loop) + _PATCHED_LOOP.add(loop) + + +def _patch_current_loop(): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + pass + else: + _install_task_factory(loop) + + +# Patch the currently running event loop if any, to increase the chances of not +# having to use the _CORO_THREAD_EXECUTOR +_patch_current_loop() + + def run(coro): """ Similar to :func:`asyncio.run` but can be called while an event loop is - running. + running if a coroutine higher in the callstack has been wrapped using + :func:`allow_nested_run`. """ - return asyncio.run(coro) + + # Ensure we have a fresh coroutine. inspect.getcoroutinestate() does not + # work on all objects that asyncio creates on some version of Python, such + # as iterable_coroutine + assert not (inspect.iscoroutine(coro) and coro.cr_running) + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # We are not currently running an event loop, so it's ok to just use + # asyncio.run() and let it create one. + # Once the coroutine is wrapped, we will be able to yield across + # blocking function boundaries thanks to _Genlet + return asyncio.run(_do_allow_nested_run(coro)) + else: + # Increase the odds that in the future, we have a wrapped coroutine in + # our callstack to avoid the _run_in_thread() path. + _install_task_factory(loop) + + if loop.is_running(): + g = _Genlet.get_enclosing() + if g is None: + # If we are not running under a wrapped coroutine, we don't + # have a choice and we need to run in a separate event loop. We + # cannot just create another event loop and install it, as + # asyncio forbids that, so the only choice is doing this in a + # separate thread that we fully control. + return _run_in_thread(coro) + else: + # This requires that we have an coroutine wrapped with + # allow_nested_run() higher in the callstack, that we will be + # able to use as a conduit to yield the futures. + return g.consume_coro(coro, None) + else: + # In the odd case a loop was installed but is not running, we just + # use it. With _install_task_factory(), we should have the + # top-level Task run an instrumented coroutine (wrapped with + # allow_nested_run()) + return loop.run_until_complete(coro) def asyncf(f): diff --git a/setup.py b/setup.py index 7447af316..cba25a26b 100644 --- a/setup.py +++ b/setup.py @@ -105,6 +105,7 @@ def _load_path(filepath): 'pytest', 'lxml', # More robust xml parsing 'nest_asyncio', # Allows running nested asyncio loops + 'greenlet', # Allows running nested asyncio loops 'future', # for the "past" Python package 'ruamel.yaml >= 0.15.72', # YAML formatted config parsing ],