Skip to content

Commit

Permalink
Restore uvloop.new_event_loop and other missing uvloop members to typ…
Browse files Browse the repository at this point in the history
…ing (#573)
  • Loading branch information
graingert authored Oct 15, 2023
1 parent 5ddf38b commit 5c500ee
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 78 deletions.
140 changes: 79 additions & 61 deletions uvloop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
__all__ = ('new_event_loop', 'install', 'EventLoopPolicy')


_T = _typing.TypeVar("_T")


class Loop(__BaseLoop, __asyncio.AbstractEventLoop): # type: ignore[misc]
pass

Expand All @@ -34,69 +37,84 @@ def install() -> None:
__asyncio.set_event_loop_policy(EventLoopPolicy())


def run(main, *, loop_factory=new_event_loop, debug=None, **run_kwargs):
"""The preferred way of running a coroutine with uvloop."""

async def wrapper():
# If `loop_factory` is provided we want it to return
# either uvloop.Loop or a subtype of it, assuming the user
# is using `uvloop.run()` intentionally.
loop = __asyncio._get_running_loop()
if not isinstance(loop, Loop):
raise TypeError('uvloop.run() uses a non-uvloop event loop')
return await main

vi = _sys.version_info[:2]

if vi <= (3, 10):
# Copied from python/cpython

if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

if not __asyncio.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = loop_factory()
try:
__asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(wrapper())
finally:
if _typing.TYPE_CHECKING:
def run(
main: _typing.Coroutine[_typing.Any, _typing.Any, _T],
*,
loop_factory: _typing.Optional[
_typing.Callable[[], Loop]
] = new_event_loop,
debug: _typing.Optional[bool]=None,
) -> _T:
"""The preferred way of running a coroutine with uvloop."""
else:
def run(main, *, loop_factory=new_event_loop, debug=None, **run_kwargs):
"""The preferred way of running a coroutine with uvloop."""

async def wrapper():
# If `loop_factory` is provided we want it to return
# either uvloop.Loop or a subtype of it, assuming the user
# is using `uvloop.run()` intentionally.
loop = __asyncio._get_running_loop()
if not isinstance(loop, Loop):
raise TypeError('uvloop.run() uses a non-uvloop event loop')
return await main

vi = _sys.version_info[:2]

if vi <= (3, 10):
# Copied from python/cpython

if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

if not __asyncio.iscoroutine(main):
raise ValueError(
"a coroutine was expected, got {!r}".format(main)
)

loop = loop_factory()
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor())
__asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(wrapper())
finally:
__asyncio.set_event_loop(None)
loop.close()

elif vi == (3, 11):
if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

with __asyncio.Runner(
loop_factory=loop_factory,
debug=debug,
**run_kwargs
) as runner:
return runner.run(wrapper())

else:
assert vi >= (3, 12)
return __asyncio.run(
wrapper(),
loop_factory=loop_factory,
debug=debug,
**run_kwargs
)


def _cancel_all_tasks(loop):
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(
loop.shutdown_default_executor()
)
finally:
__asyncio.set_event_loop(None)
loop.close()

elif vi == (3, 11):
if __asyncio._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

with __asyncio.Runner(
loop_factory=loop_factory,
debug=debug,
**run_kwargs
) as runner:
return runner.run(wrapper())

else:
assert vi >= (3, 12)
return __asyncio.run(
wrapper(),
loop_factory=loop_factory,
debug=debug,
**run_kwargs
)


def _cancel_all_tasks(loop: __asyncio.AbstractEventLoop) -> None:
# Copied from python/cpython

to_cancel = __asyncio.all_tasks(loop)
Expand Down
17 changes: 0 additions & 17 deletions uvloop/__init__.pyi

This file was deleted.

0 comments on commit 5c500ee

Please sign in to comment.