Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure HTTP/2 connections are gracefully closed on async cancellation #757

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## Unreleased

- Ensure that the cancellation of HTTP/2 connections does not break the state. (#757)
- Add support for Python 3.12. (#807)

## 0.18.0 (September 8th, 2023)
Expand Down
40 changes: 27 additions & 13 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,21 @@ async def handle_async_request(self, request: Request) -> Response:
)
self._max_streams_semaphore = AsyncSemaphore(local_settings_max_streams)

for _ in range(local_settings_max_streams - self._max_streams):
await self._max_streams_semaphore.acquire()

await self._max_streams_semaphore.acquire()
try:
for _ in range(local_settings_max_streams - self._max_streams):
await self._max_streams_semaphore.acquire()
except BaseException:
with AsyncShieldCancellation():
async with self._state_lock:
await self._state_housekeeping()
raise
try:
await self._max_streams_semaphore.acquire()
except BaseException: # pragma: no cover
with AsyncShieldCancellation():
async with self._state_lock:
await self._state_housekeeping()
raise
karpetrosyan marked this conversation as resolved.
Show resolved Hide resolved

try:
stream_id = self._h2_state.get_next_available_stream_id()
Expand Down Expand Up @@ -405,17 +416,20 @@ async def _response_closed(self, stream_id: int) -> None:
await self._max_streams_semaphore.release()
del self._events[stream_id]
async with self._state_lock:
if self._connection_terminated and not self._events:
await self._state_housekeeping()

async def _state_housekeeping(self) -> None:
if self._connection_terminated and not self._events:
await self.aclose()

elif self._state == HTTPConnectionState.ACTIVE and not self._events:
self._state = HTTPConnectionState.IDLE
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
if self._used_all_stream_ids: # pragma: nocover
await self.aclose()

elif self._state == HTTPConnectionState.ACTIVE and not self._events:
self._state = HTTPConnectionState.IDLE
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
if self._used_all_stream_ids: # pragma: nocover
await self.aclose()

async def aclose(self) -> None:
# Note that this method unilaterally closes the connection, and does
# not have any kind of locking in place around it.
Expand Down
40 changes: 27 additions & 13 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,21 @@ def handle_request(self, request: Request) -> Response:
)
self._max_streams_semaphore = Semaphore(local_settings_max_streams)

for _ in range(local_settings_max_streams - self._max_streams):
self._max_streams_semaphore.acquire()

self._max_streams_semaphore.acquire()
try:
for _ in range(local_settings_max_streams - self._max_streams):
self._max_streams_semaphore.acquire()
except BaseException:
with ShieldCancellation():
with self._state_lock:
self._state_housekeeping()
raise
try:
self._max_streams_semaphore.acquire()
except BaseException: # pragma: no cover
with ShieldCancellation():
with self._state_lock:
self._state_housekeeping()
raise

try:
stream_id = self._h2_state.get_next_available_stream_id()
Expand Down Expand Up @@ -405,17 +416,20 @@ def _response_closed(self, stream_id: int) -> None:
self._max_streams_semaphore.release()
del self._events[stream_id]
with self._state_lock:
if self._connection_terminated and not self._events:
self._state_housekeeping()

def _state_housekeeping(self) -> None:
if self._connection_terminated and not self._events:
self.close()

elif self._state == HTTPConnectionState.ACTIVE and not self._events:
self._state = HTTPConnectionState.IDLE
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
if self._used_all_stream_ids: # pragma: nocover
self.close()

elif self._state == HTTPConnectionState.ACTIVE and not self._events:
self._state = HTTPConnectionState.IDLE
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
if self._used_all_stream_ids: # pragma: nocover
self.close()

def close(self) -> None:
# Note that this method unilaterally closes the connection, and does
# not have any kind of locking in place around it.
Expand Down
19 changes: 19 additions & 0 deletions tests/test_cancellations.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,22 @@ async def test_h2_timeout_during_response():

assert not conn.is_closed()
assert conn.is_idle()


@pytest.mark.anyio
async def test_h2_unstarted_request_cancellation(monkeypatch):
async def slow_acquire(self: httpcore._synchronization.AsyncSemaphore) -> None:
await anyio.sleep(999)

monkeypatch.setattr(
httpcore._synchronization.AsyncSemaphore, "acquire", slow_acquire
)

origin = httpcore.Origin(b"http", b"example.com", 80)
stream = HandshakeThenSlowWriteStream()
async with httpcore.AsyncHTTP2Connection(origin, stream) as conn:
with anyio.move_on_after(0.01):
await conn.request("GET", "http://example.com")

assert not conn.is_closed()
assert conn.is_idle()