Skip to content

Commit

Permalink
Simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Aug 13, 2022
1 parent 45ff15a commit d542989
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 129 deletions.
14 changes: 10 additions & 4 deletions Doc/library/asyncio-runner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ to simplify async code usage for common wide-spread scenarios.
Running an asyncio Program
==========================

.. function:: run(coro, *, debug=None)
.. function:: run(coro, *, debug=None, running_ok=False)

Execute the :term:`coroutine` *coro* and return the result.

Expand All @@ -37,9 +37,15 @@ Running an asyncio Program
debug mode explicitly. ``None`` is used to respect the global
:ref:`asyncio-debug-mode` settings.

This function always creates a new event loop and closes it at
the end. It should be used as a main entry point for asyncio
programs, and should ideally only be called once.
If *running_ok* is ``False``, this function always creates a new event loop and closes
it at the end. It should be used as a main entry point for asyncio programs, and should
ideally only be called once.

If *running_ok* is ``True``, this function allows running the passed coroutine even if
this code is already running in an event loop. In other words, it allows re-entering
the event loop, while an exception would be raised if *running_ok* were ``False``. If
this function is called inside an already running event loop, the same loop is used,
but it is not closed at the end.

Example::

Expand Down
212 changes: 127 additions & 85 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import sys
import warnings
import weakref
from contextlib import contextmanager
from heapq import heappop

try:
import ssl
Expand Down Expand Up @@ -388,10 +386,7 @@ async def wait_closed(self):

class BaseEventLoop(events.AbstractEventLoop):

_is_proactorloop = False

def __init__(self):
self._num_runs_pending = 0
self._timer_cancelled_count = 0
self._closed = False
self._stopping = False
Expand Down Expand Up @@ -586,75 +581,75 @@ def _do_shutdown(self, future):
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex)

def _check_running(self):
pass

@contextmanager
def manage_run(self):
"""Set up the loop for running."""
def _check_running(self, running_ok=False):
if self.is_running():
raise RuntimeError('This event loop is already running')
if not running_ok and events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')

def run_forever(self, running_ok=False):
"""Run until stop() is called."""
self._check_closed()
self._check_running(running_ok=running_ok)
self._set_coroutine_origin_tracking(self._debug)
old_thread_id = self._thread_id
old_running_loop = events._get_running_loop()
self._thread_id = threading.get_ident()

old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
self._thread_id = threading.get_ident()
events._set_running_loop(self)
self._num_runs_pending += 1
if self._is_proactorloop:
if self._self_reading_future is None:
self.call_soon(self._loop_self_reading)
yield
while True:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
self._thread_id = old_thread_id
events._set_running_loop(old_running_loop)
self._num_runs_pending -= 1
if self._is_proactorloop:
if (self._num_runs_pending == 0
and self._self_reading_future is not None):
ov = self._self_reading_future._ov
self._self_reading_future.cancel()
if ov is not None:
self._proactor._unregister(ov)
self._self_reading_future = None

@contextmanager
def manage_asyncgens(self):
if not hasattr(sys, 'get_asyncgen_hooks'):
# Python version is too old.
return
old_agen_hooks = sys.get_asyncgen_hooks()
try:
self._set_coroutine_origin_tracking(self._debug)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(
firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
yield
finally:
self._set_coroutine_origin_tracking(False)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(*old_agen_hooks)
sys.set_asyncgen_hooks(*old_agen_hooks)

def run_forever(self):
with self.manage_run(), self.manage_asyncgens():
while True:
self._run_once()
if self._stopping:
break
self._stopping = False
def run_until_complete(self, future, running_ok=False):
"""Run until the Future is done.
def run_until_complete(self, future):
with self.manage_run():
f = tasks.ensure_future(future, loop=self)
if f is not future:
f._log_destroy_pending = False
while not f.done():
self._run_once()
if self._stopping:
break
if not f.done():
raise RuntimeError(
'Event loop stopped before Future completed.')
return f.result()
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
self._check_closed()
self._check_running(running_ok=running_ok)

new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False

future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever(running_ok=running_ok)
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')

return future.result()

def stop(self):
"""Stop running the event loop.
Expand Down Expand Up @@ -1839,35 +1834,82 @@ def _timer_handle_cancelled(self, handle):
self._timer_cancelled_count += 1

def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
Simplified re-implementation of asyncio's _run_once that
runs handles as they become ready.
"""
ready = self._ready
scheduled = self._scheduled
while scheduled and scheduled[0]._cancelled:
heappop(scheduled)

timeout = (
0 if ready or self._stopping
else min(max(
scheduled[0]._when - self.time(), 0), 86400) if scheduled
else None)

sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)

heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False

timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

event_list = self._selector.select(timeout)
self._process_events(event_list)

# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while scheduled and scheduled[0]._when < end_time:
handle = heappop(scheduled)
ready.append(handle)

for _ in range(len(ready)):
if not ready:
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = ready.popleft()
if not handle._cancelled:
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None
handle = None # Needed to break cycles when an exception occurs.

def _set_coroutine_origin_tracking(self, enabled):
if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
Expand Down
31 changes: 22 additions & 9 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,13 @@ def get_event_loop():


def _get_event_loop(stacklevel=3):
loop = _get_running_loop()
if loop is None:
loop = get_event_loop_policy().get_event_loop()
return loop
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
import warnings
warnings.warn('There is no current event loop',
DeprecationWarning, stacklevel=stacklevel)
return get_event_loop_policy().get_event_loop()


def set_event_loop(loop):
Expand Down Expand Up @@ -821,8 +824,18 @@ def set_child_watcher(watcher):
_py__get_event_loop = _get_event_loop


_c__get_running_loop = _get_running_loop
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
_c__get_event_loop = _get_event_loop
try:
# get_event_loop() is one of the most frequently called
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
get_running_loop, get_event_loop, _get_event_loop)
except ImportError:
pass
else:
# Alias C implementations for testing purposes.
_c__get_running_loop = _get_running_loop
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
_c__get_event_loop = _get_event_loop
Loading

0 comments on commit d542989

Please sign in to comment.