Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Websockets] RuntimeError: Cannot call "receive" once a disconnect message has been received. #2617

Closed
laxuscullen opened this issue Jun 10, 2024 · 15 comments
Assignees
Labels
websocket WebSocket-related

Comments

@laxuscullen
Copy link

laxuscullen commented Jun 10, 2024

As recommended by Alex and confirmed that this seems to be an issue with starlette and not broadcaster, I am creating this issue.

This RuntimeError shows on client disconnect.

I have made a sample code which can reproduce the error at https://github.com/laxuscullen/WS/tree/wo_broadcaster

  File "venv/lib/python3.12/site-packages/starlette/endpoints.py", line 79, in dispatch
    message = await websocket.receive()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.12/site-packages/starlette/websockets.py", line 60, in receive
    raise RuntimeError(
RuntimeError: Cannot call "receive" once a disconnect message has been received.

Important

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.
Fund with Polar
@laxuscullen
Copy link
Author

@alex-oleshkevich is there any update on this?

@Kludex
Copy link
Sponsor Member

Kludex commented Jun 21, 2024

What's the expected behavior here? raise WebSocketDisconect instead?

@laxuscullen
Copy link
Author

What's the expected behavior here? raise WebSocketDisconect instead?

from what i understand, it is not disconnecting the websocket but the websocket disconnect event is fired. wont raising cause a runtime error?

@alex-oleshkevich
Copy link
Member

alex-oleshkevich commented Jun 21, 2024

I haven't had a chance to look at it yet.
As a workaround, use function-style endpoints.

@alex-oleshkevich
Copy link
Member

Steps to reproduce

# examples/test.py
import asyncio
import anyio


from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.routing import WebSocketRoute, Route
from starlette.websockets import WebSocket
from starlette.responses import HTMLResponse


class RoomChatWebsocket(WebSocketEndpoint):
    encoding = "json"

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        await websocket.send_json({})

        async def sender():
            while True:
                await asyncio.sleep(1)
            print("sender exit")

        async def on_message_received(websocket: WebSocket):
            async for data in websocket.iter_json():
                print("------------ got message", data)

        async with anyio.create_task_group() as task_group:

            async def receiver():
                await on_message_received(websocket)
                task_group.cancel_scope.cancel()
                print("cancelled")

            task_group.start_soon(receiver)
            task_group.start_soon(sender)

    async def on_disconnect(self, websocket: WebSocket, close_code: int):
        print(f"Disconnected: {websocket}")


def index_route(request):
    return HTMLResponse('<script>ws = new WebSocket("http://localhost:8000/ws")</script>')


routes = [
    Route("/", index_route),
    WebSocketRoute("/ws", RoomChatWebsocket),
]


app = Starlette(
    debug=True,
    routes=routes,
)
  1. run server: uvicorn example.test:app
  2. open the page http://localhost:8000
  3. refresh the page
  4. browser closes connection, causing the following stack trace
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/alex/projects/starlette/venv/lib/python3.12/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 244, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/projects/starlette/venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 70, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/projects/starlette/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/alex/projects/starlette/starlette/middleware/errors.py", line 151, in __call__
    await self.app(scope, receive, send)
  File "/home/alex/projects/starlette/starlette/middleware/exceptions.py", line 65, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/home/alex/projects/starlette/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/home/alex/projects/starlette/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/home/alex/projects/starlette/starlette/routing.py", line 754, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/alex/projects/starlette/starlette/routing.py", line 774, in app
    await route.handle(scope, receive, send)
  File "/home/alex/projects/starlette/starlette/routing.py", line 371, in handle
    await self.app(scope, receive, send)
  File "/home/alex/projects/starlette/starlette/endpoints.py", line 93, in dispatch
    raise exc
  File "/home/alex/projects/starlette/starlette/endpoints.py", line 80, in dispatch
    message = await websocket.receive()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/projects/starlette/starlette/websockets.py", line 61, in receive
    raise RuntimeError(
RuntimeError: Cannot call "receive" once a disconnect message has been received.

The problem disappears when I remove task_group.cancel_scope.cancel(). I think this is somehow connected to task cancellation.

I also observe a weird behavior. On websocket.disconnect WebSocket.receive does return a message, but this message is not available in WebSocketEndpoint.dispatch. However, then the another loop iteration immideately happens calling receive and fails because WS state is disconnected.

@laxuscullen
Copy link
Author

@alex-oleshkevich whats even weirder is no amount of if checks for websocket state before the loop iteration helps.

@Kludex Kludex added the websocket WebSocket-related label Aug 6, 2024
@Kludex
Copy link
Sponsor Member

Kludex commented Aug 6, 2024

This is not an issue.

Use the on_receive method, instead of calling websocket.iter_json or any other method that calls websocket.receive().

@Kludex Kludex closed this as completed Aug 6, 2024
@laxuscullen
Copy link
Author

This is not an issue.

Use the on_receive method, instead of calling websocket.iter_json or any other method that calls websocket.receive().

Hi, kindly check the guide written on https://github.com/encode/broadcaster
It tells to use iter_text. If i do not use iter_json the websocket refuses to listen to coming messages after receiving it once. Kindly test the code without iter_json.

Could you please edit the code in the guide mentioned on broadcaster page and tell me what should the code look like?

@laxuscullen
Copy link
Author

@Kludex ?

@Kludex
Copy link
Sponsor Member

Kludex commented Aug 15, 2024

No. I cannot. I don't maintain that project.

@laxuscullen
Copy link
Author

No. I cannot. I don't maintain that project.

@Kludex you closed this issue after stating a vague reply that only alex could understand with no further help?

@alex-oleshkevich could you please tell what changes to make in code since your guide for broadcaster contains code with issues?

@alex-oleshkevich
Copy link
Member

alex-oleshkevich commented Aug 15, 2024

@laxuscullen , Kludex refers to this method WebSocketEndpoint.on_receive
See https://www.starlette.io/endpoints/#websocketendpoint

The broadcaster's documentation is correct - it demos usage with function-style code. You use class-based endpoints.
So in your case the solution can look like that (i didn't test it myself, but I hope you understand the idea).

class RoomChatWebsocket(WebSocketEndpoint):
    encoding = "json"

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()

    async def on_receive(self, websocket: WebSocket, data):
		# do something with received data

    async def on_disconnect(self, websocket: WebSocket, close_code: int):
	     print('disconnected')

or use function-style code as broadcaster suggests.

@laxuscullen
Copy link
Author

laxuscullen commented Aug 15, 2024

@alex-oleshkevich I am not sure how do I use the subscriber methods or the sender method. I am not able to figure it out. Your help is appreciated for this class based approach. I prefer class based since I have method decorators code written based on class.

I refactored my code and I am able to use the function based approach.

Also in the function based approach code, there is no on_disconnect?

@alex-oleshkevich
Copy link
Member

@laxuscullen here it is

import asyncio
import os

from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates
from starlette.websockets import WebSocket

from broadcaster import Broadcast

BROADCAST_URL = os.environ.get("BROADCAST_URL", "memory://")

broadcast = Broadcast(BROADCAST_URL)
templates = Jinja2Templates("example/templates")


async def homepage(request):
    template = "index.html"
    context = {"request": request}
    return templates.TemplateResponse(template, context)


class ChatRoomWebsocket(WebSocketEndpoint):
    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        self._listener_task = asyncio.create_task(self.chatroom_ws_sender(websocket))

    async def on_receive(self, websocket: WebSocket, data):
        await broadcast.publish(channel="chatroom", message=data)

    async def chatroom_ws_sender(self, websocket) -> None:
        async with broadcast.subscribe(channel="chatroom") as subscriber:
            async for event in subscriber:
                await websocket.send_text(event.message)

    async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
        if self._listener_task:
            self._listener_task.cancel()


routes = [
    Route("/", homepage),
    WebSocketRoute("/", ChatRoomWebsocket, name="chatroom_ws"),
]

app = Starlette(
    routes=routes,
    on_startup=[broadcast.connect],
    on_shutdown=[broadcast.disconnect],
)

@laxuscullen
Copy link
Author

laxuscullen commented Aug 17, 2024

@alex-oleshkevich first of all, i am very thankful to your for your time and help. Really. The code works. Tested using gunicorn (tested with 10 w and 10 t) and uvicorn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
websocket WebSocket-related
Projects
None yet
Development

No branches or pull requests

3 participants