Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unclosed 'MemoryObjectReceiveStream' upon exception in 'BaseHTTPMiddleware' children #2778

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

nikitagashkov
Copy link

@nikitagashkov nikitagashkov commented Nov 30, 2024

Fixes #2772 and, probably, #2603.

Summary

This PR addresses the proper cleanup of memory object streams within TestClient and BaseHTTPMiddleware.

Checklist

  • I understand that this PR may be closed in case there was no previous discussion. (This doesn't apply to typos!)
  • I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • I've updated the documentation accordingly.

↑ Looks like no changes in docs needed, this is just a bugfix.

Comment on lines 214 to 216
# BaseHTTPMiddleware creates a TaskGroup which copies the context
# and erases any changes to it made within the TaskGroup
assert ctxvar.get() == "set by middleware"
Copy link
Member

@Kludex Kludex Nov 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the pytest.mark.xfail to avoid this comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the comment on #2777 (comment), can we change the AssertionError for another error? ValueError maybe?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite, any error within a middleware would cause a dangling stream, but since this is addressed in this PR as well, I've reverted the change, so xfail in rightfully back now

Comment on lines 214 to 216
# BaseHTTPMiddleware creates a TaskGroup which copies the context
# and erases any changes to it made within the TaskGroup
assert ctxvar.get() == "set by middleware"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the comment on #2777 (comment), can we change the AssertionError for another error? ValueError maybe?

tests/middleware/test_base.py Outdated Show resolved Hide resolved
@Kludex
Copy link
Member

Kludex commented Dec 3, 2024

@nikitagashkov Sorry about my previous comment. I think this PR looks great already. I'll close the other one, and we can focus here.

@Kludex Kludex marked this pull request as ready for review December 3, 2024 07:16
@@ -144,7 +144,7 @@ async def send_no_error(message: Message) -> None:
async def coro() -> None:
nonlocal app_exc

async with send_stream:
async with send_stream, recv_stream:
Copy link
Member

@graingert graingert Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi - you can use a synchronous with with memory object streams

Copy link
Member

@graingert graingert Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably you want the with on line 112, and then pass a clone into coro - which would have its own stream and with.

send_stream, recv_stream = anyio.create_memory_object_stream()
with send_stream, recv_stream:
    ...
    async def coro(send_stream):
        with send_stream:
            ...
    ...
    task_group.start_soon(coro, send_stream.clone())
    send_stream.close()

the approach I use is:

  • use a with as soon as you create a memory object stream
  • use a with as soon as you receive a memory object stream from another task
  • pass streams between tasks using .clone()
  • pass streams between async functions in the same task without using .clone()

If you've done any Rust programming you might get annoyed you have to do all this manually

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the current approach on this PR wrong, or can I merge it?

Copy link
Member

@graingert graingert Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not understanding why you have to catch ClosedResourceError here, I think you might be able to loose data if you close the receive stream with an item in? Really the receive end should close it's end

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, proposal from @graingert makes more sense than initial one from me: it's better to ensure cleanup with a context manager right away + coro doesn't really work with recv_stream, so it's weird to handle it where I originally put it

One thing that stops me from finalizing is a deadlock in some tests (e.g., test_fully_evaluated_response) that I don't quite see sources of 🤨

@kurtmckee
Copy link

@nikitagashkov, I submitted #2792 to address the ResourceWarning that affects downstream projects.

I was able to address the problem I was seeing in starlette/middleware/base.py by modifying just one location, around line 134:

              async def close_recv_stream_on_response_sent() -> None:
-                 await response_sent.wait()
-                 recv_stream.close()
+                 # The stream must always be closed, even if an exception occurs.
+                 try:
+                     await response_sent.wait()
+                 finally:
+                     recv_stream.close()

This was sufficient to resolve the problem in the starlette test suite, as well as what I was seeing in the downstream project.

(I can't leave a comment directly on the changes in the PR because that line isn't actually touched in this PR.)

I'm commenting to recommend against the more significant changes to starlette/middleware/base.py, at least not without additional tests that tease out some other issue that isn't covered by the diff above.

@Kludex
Copy link
Member

Kludex commented Dec 13, 2024

This was sufficient to resolve the problem in the starlette test suite, as well as what I was seeing in the downstream project.

Well... You need to remove the ignored warning on the pyproject.toml on that PR to prove the point.

[...] at least not without additional tests that tease out some other issue that isn't covered by the diff above.

There's 100% coverage here, and the point is that this PR removes the warning on the pyproject.toml.

@kurtmckee
Copy link

100% correct. Nevertheless, I'm voicing concern that the changes in base.py might could be more targeted.

@kurtmckee
Copy link

Confirmed it locally on this PR's branch. The diff to close_recv_stream_on_response_sent() I posted above is the only change needed in starlette/middleware/base.py.

gh co 2778
git checkout master -- starlette/middleware/base.py

# Apply the diff above

pytest

@Kludex
Copy link
Member

Kludex commented Dec 14, 2024

100% correct. Nevertheless, I'm voicing concern that the changes in base.py might could be more targeted.

Ah ok. Yeah, valid point. Thanks! 👍

I checked locally, it passes the tests with this small diff. Can we have this less invasive change instead?

recv_stream: MemoryObjectReceiveStream[Message]
send_stream, recv_stream = anyio.create_memory_object_stream()
with recv_stream, send_stream:
async with anyio.create_task_group() as task_group:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can move collapse_excgroups down here, and save a level of indentation:

with recv_stream, send_stream, collapse_excgroups():
    async with anyio.create_task_group() as task_group:
        ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants