Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Cannot call "receive" once a disconnect message has been received. #123

Closed
laxuscullen opened this issue Jun 5, 2024 · 16 comments

Comments

@laxuscullen
Copy link

laxuscullen commented Jun 5, 2024

My code:

class RoomChatWebsocket(WebSocketEndpoint):
    encoding = "json"

    @token.required
    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()

        user = websocket.aniko_user
        room_id = websocket.query_params['room_id']
        
        websocket.room_id = room_id

        messages = red.lrange(f"room_chat_{room_id}", 0, -1)

        if not messages:
            red.expire(f"room_chat_{room_id}", red_expiry)

        join_message = {
            "user": {
                "user_id": user.user_id,
                "username": user.username,
                "pfp": user.get_profile_pic(),
                "tier": user.tier,
                "room_id": room_id,
                "is_system": 1
            },
            "message_data": {
                "text": f"@{user.username} has joined the room!",
            }
        }
        red.lpush(f"room_chat_{room_id}", orjson.dumps(join_message).decode('utf-8'))

        messages = red.lrange(f"room_chat_{room_id}", 0, 100)
        data = [orjson.loads(message) for message in messages]
        
        # tell everyone someone joined
        await broadcast.publish(channel=f"room_{room_id}", message=orjson.dumps(join_message).decode('utf-8'))

        # send chat data to newly joined member
        await websocket.send_json(data)

        # handle subscriptions using anyio and broadcaster
        async with anyio.create_task_group() as task_group:
            # Receiver Task
            async def receiver():
                await self.on_message_received(websocket=websocket)
                task_group.cancel_scope.cancel()
            
            task_group.start_soon(receiver)
            
            # Sender Task
            await self.send_message_to_all(websocket)

    async def send_message_to_all(self, websocket):
        async with broadcast.subscribe(channel=f"room_{websocket.room_id}") as subscriber:
            async for event in subscriber:
                await websocket.send_json(orjson.loads(event.message))

    async def on_message_received(self, websocket: WebSocket):
        user = websocket.aniko_user
        room_id = websocket.room_id
        

        try:
            async for data in websocket.iter_json():

                if websocket.client_state != WebSocketState.CONNECTED:
                    return

                chat_data = {
                    "user": {
                        "user_id": user.user_id,
                        "username": user.username,
                        "pfp": user.get_profile_pic(),
                        "tier": user.tier,
                        "room_id": room_id,
                        "is_system": 0
                    },
                    "message_data": {
                        "text": data['text'],
                    }
                }

                chat_data = orjson.dumps(chat_data).decode('utf-8')
                red.lpush(f"room_chat_{room_id}", chat_data)

                # Publish the message to the broadcast channel
                await broadcast.publish(channel=f"room_{room_id}", message=chat_data)
        except WebSocketDisconnect:
            pass
        
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
        user = websocket.aniko_user
        room_id = websocket.room_id
        

        # Notify others that the user has left
        leave_message = {
            "user": {
                "user_id": user.user_id,
                "username": user.username,
                "pfp": user.get_profile_pic(),
                "tier": user.tier,
                "room_id": room_id,
                "is_system": 1
            },
            "message_data": {
                "text": f"@{user.username} has left the room :(",
            }
        }

        leave_message = orjson.dumps(leave_message).decode('utf-8')
        red.lpush(f"room_chat_{room_id}", leave_message)

        # Publish the leave message to the broadcast channel
        await broadcast.publish(channel=f"room_{room_id}", message=leave_message)

        print(f"Disconnected: {websocket}")

Stacktrace:

INFO:     connection closed
Disconnected: <starlette.websockets.WebSocket object at 0x10a907550>
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 254, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 149, in __call__
    await self.app(scope, receive, send)
  File "/venv/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 26, in __call__
    await self.app(scope, receive, send)
  File "/venv/lib/python3.11/site-packages/starlette/middleware/cors.py", line 76, in __call__
    await self.app(scope, receive, send)
  File "/venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/venv/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/venv/lib/python3.11/site-packages/starlette/routing.py", line 341, in handle
    await self.app(scope, receive, send)
  File "/venv/lib/python3.11/site-packages/starlette/endpoints.py", line 89, in dispatch
    raise exc
  File "/venv/lib/python3.11/site-packages/starlette/endpoints.py", line 78, in dispatch
    message = await websocket.receive()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/starlette/websockets.py", line 56, in receive
    raise RuntimeError(
RuntimeError: Cannot call "receive" once a disconnect message has been received.

Using:
starlette==0.26.1
uvicorn==0.21.1 & gunicorn==20.1.0 (tested using both)

Error happens when client disconnects. I am testing using insomnia.

I tried try except, checking for client state but to no avail. I hope this project is maintained still because this is the best bet I have in implementing a room chat system.

@laxuscullen
Copy link
Author

@alex-oleshkevich since you seem to be the only one actively maintaining this repo and releasing 0.3 soon, is there any way you can help?

@alex-oleshkevich
Copy link
Member

alex-oleshkevich commented Jun 7, 2024

@alex-oleshkevich since you seem to be the only one actively maintaining this repo and releasing 0.3 soon, is there any way you can help?

  1. Please test if it happens on master branch
  2. Provide a failing test case so I can debug it

@alex-oleshkevich
Copy link
Member

This is a Starlette exception. Looks like your websocket had been disconnected, so it was no longer possible to read/receive messages from a closed socket.

@laxuscullen
Copy link
Author

This is a Starlette exception. Looks like your websocket had been disconnected, so it was no longer possible to read/receive messages from a closed socket.

@alex-oleshkevich That’s the problem. It doesn’t happen when I don’t use broadcaster. It only seems to be happening with broadcaster. Client disconnected and starlette handles it gracefully and fires on disconnect. But with broadcaster it’s still listening for new messages after disconnection. What’s so messed up is that try except doesn’t work anywhere in the code that receives or sends either.

@laxuscullen
Copy link
Author

laxuscullen commented Jun 7, 2024

@alex-oleshkevich since you seem to be the only one actively maintaining this repo and releasing 0.3 soon, is there any way you can help?

  1. Please test if it happens on master branch
  2. Provide a failing test case so I can debug it

Yes this is on the master branch. I am using the local copy of the branch instead of from the pip install.

with uvicorn this error can be seen but with gunicorn it crashes right away as mentioned in #127

@alex-oleshkevich
Copy link
Member

This is a Starlette exception. Looks like your websocket had been disconnected, so it was no longer possible to read/receive messages from a closed socket.

@alex-oleshkevich That’s the problem. It doesn’t happen when I don’t use broadcaster. It only seems to be happening with broadcaster. Client disconnected and starlette handles it gracefully and fires on disconnect. But with broadcaster it’s still listening for new messages after disconnection. What’s so messed up is that try except doesn’t work anywhere in the code that receives or sends either.

I don't see how you initialize broacast variable, assuming it is like async with Broadcaster() as broadcast. You don't terminate a listener in the task group, so even after you has called task_group.cancel() the send_messages_to_all is still running. So either make it as part of a task group or publish None to broadcast so it can cancel itself.

@laxuscullen
Copy link
Author

@alex-oleshkevich apolgies for not sharing the startup code, I initiated it with await broadcast.connect() as shown in the readme.
I am also using the sample code provided in the readme as a guide for my code:

async def chatroom_ws(websocket):
await websocket.accept()

async with anyio.create_task_group() as task_group:
    # run until first is complete
    async def run_chatroom_ws_receiver() -> None:
        await chatroom_ws_receiver(websocket=websocket)
        task_group.cancel_scope.cancel()

    task_group.start_soon(run_chatroom_ws_receiver)
    await chatroom_ws_sender(websocket)

Isn’t this sample code right?

@alex-oleshkevich
Copy link
Member

@alex-oleshkevich apolgies for not sharing the startup code, I initiated it with await broadcast.connect() as shown in the readme. I am also using the sample code provided in the readme as a guide for my code:

async def chatroom_ws(websocket): await websocket.accept()

async with anyio.create_task_group() as task_group:
    # run until first is complete
    async def run_chatroom_ws_receiver() -> None:
        await chatroom_ws_receiver(websocket=websocket)
        task_group.cancel_scope.cancel()

    task_group.start_soon(run_chatroom_ws_receiver)
    await chatroom_ws_sender(websocket)

Isn’t this sample code right?

task_group.start_soon(receiver)
task_group.start_soon(self.send_message_to_all, websocket)

@laxuscullen
Copy link
Author

laxuscullen commented Jun 8, 2024

@alex-oleshkevich
I tried your code (which is not same as the read me, does that need to be updated then?)

    @token.required
    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()

        user = websocket.user
        room_id = websocket.query_params['room_id']
        anime_id = websocket.query_params['anime_id']
        websocket.anime_id = anime_id
        websocket.room_id = room_id

        messages = red.lrange(f"room_chat_{room_id}", 0, -1)

        if not messages:
            red.expire(f"room_chat_{room_id}", red_expiry)

        join_message = {
            "user": {
                "user_id": user.user_id,
                "username": user.username,
                "pfp": user.get_profile_pic(),
                "tier": user.tier,
                "room_id": room_id,
                "anime_id": anime_id,
                "is_system": 1
            },
            "message_data": {
                "text": f"@{user.username} has joined the room!",
            }
        }
        
        red.lpush(f"room_chat_{room_id}", orjson.dumps(join_message).decode('utf-8'))

        messages = red.lrange(f"room_chat_{room_id}", 0, 100)
        data = [orjson.loads(message) for message in messages]
        
        # tell everyone someone joined
        await broadcast.publish(channel=f"room_{room_id}", message=orjson.dumps(join_message).decode('utf-8'))

        # send chat data to newly joined member
        await websocket.send_json(data)

        # handle subscriptions using anyio and broadcaster
        async with anyio.create_task_group() as task_group:
            # Receiver Task
            async def receiver():
                await self.on_message_received(websocket=websocket)
                task_group.cancel_scope.cancel()
            
            task_group.start_soon(receiver)
            task_group.start_soon(self.sender, websocket)

    async def sender(self, websocket):
        async with broadcast.subscribe(channel=f"room_{websocket.room_id}") as subscriber:
            async for event in subscriber:
                await websocket.send_json(orjson.loads(event.message))

    async def on_message_received(self, websocket: WebSocket):
        user = websocket.user
        room_id = websocket.room_id
        anime_id = websocket.anime_id

        async for data in websocket.iter_json():

            chat_data = {
                "user": {
                    "user_id": user.user_id,
                    "username": user.username,
                    "pfp": user.get_profile_pic(),
                    "tier": user.tier,
                    "room_id": room_id,
                    "anime_id": anime_id,
                    "is_system": 0
                },
                "message_data": {
                    "text": data['text'],
                }
            }

            chat_data = orjson.dumps(chat_data).decode('utf-8')
            red.lpush(f"room_chat_{room_id}", chat_data)

            # Publish the message to the broadcast channel
            await broadcast.publish(channel=f"room_{room_id}", message=chat_data)
        
    async def on_disconnect(self, websocket: WebSocket, close_code: int):
        user = websocket.user
        room_id = websocket.room_id
        anime_id = websocket.anime_id

        # Notify others that the user has left
        leave_message = {
            "user": {
                "user_id": user.user_id,
                "username": user.username,
                "pfp": user.get_profile_pic(),
                "tier": user.tier,
                "room_id": room_id,
                "anime_id": anime_id,
                "is_system": 1
            },
            "message_data": {
                "text": f"@{user.username} has left the room :(",
            }
        }

        leave_message = orjson.dumps(leave_message).decode('utf-8')
        red.lpush(f"room_chat_{room_id}", leave_message)

        # Publish the leave message to the broadcast channel
        await broadcast.publish(channel=f"room_{room_id}", message=leave_message)
        print(f"Disconnected: {websocket}")

The same RuntimeError is still thrown.

@laxuscullen
Copy link
Author

@alex-oleshkevich i have provided the code and information. Are you not able to reproduce this? Am I the only one facing this lol

@alex-oleshkevich
Copy link
Member

Please provide me full minimal example with Starlette initialization, routes, gunicorn command which I can clone and run.

@laxuscullen
Copy link
Author

laxuscullen commented Jun 10, 2024

Here you go @alex-oleshkevich. I spent time and created a repo with the code i am using. It has the broadcaster master repo in it as well.
https://github.com/laxuscullen/WS

Issue: #127
gunicorn chat:app --name asgi --log-level debug -w 10 --threads 10 -k uvicorn.workers.UvicornH11Worker caused error on startup:

Versions/3.12/lib/python3.12/asyncio/tasks.py", line 417, in create_task
    loop = events.get_running_loop()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: no running event loop
[2024-06-10 10:43:13 -0700] [22357] [INFO] Worker exiting (pid: 22357)
sys:1: RuntimeWarning: coroutine 'RedisBackend._pubsub_listener' was never awaited

Cause: self._listener = asyncio.create_task(self._pubsub_listener()) line 18 in redis.py

This issue:
uvicorn chat:app causes RuntimeError on client disconnect

venv/lib/python3.12/site-packages/starlette/websockets.py", line 60, in receive
    raise RuntimeError(
RuntimeError: Cannot call "receive" once a disconnect message has been received.

I hope this actually helps you.

@alex-oleshkevich
Copy link
Member

@laxuscullen , ok, thank you.

For #127 the fix is ready - #128

As for the second issue - I was able to repro it w/o a broadcaster. The problem may be with Starlette. Please submit a bug there.

As a workaround, you can rewrite your endpoint using function style:

import asyncio

import anyio
from starlette.applications import Starlette
from starlette.routing import WebSocketRoute
from starlette.websockets import WebSocket


async def ws(websocket):
    await websocket.accept()
    await websocket.send_json({})

    async def sender():
        while True:
            await asyncio.sleep(1)

    async def on_message_received(websocket: WebSocket):
        async for data in websocket.iter_json():
            print("------------ got message", data)

    async with anyio.create_task_group() as task_group:

        async def receiver():
            await on_message_received(websocket)
            task_group.cancel_scope.cancel()

        task_group.start_soon(receiver)
        task_group.start_soon(sender)


app = Starlette(
    debug=True,
    routes=[
        WebSocketRoute("/ws", was),
    ],
)

@laxuscullen
Copy link
Author

Thank you @alex-oleshkevich. I am glad I was able to help. Could you please share the code you tried to reproduce the issue w/o broadcaster? If you have it, you could create the issue or I can if you can send me the code. Once again, i thank you for your time and help with this project!

@alex-oleshkevich
Copy link
Member

Just put code of async def ws(websocket): into WebSocketEndpoint.on_connected.

@alex-oleshkevich
Copy link
Member

encode/starlette#2617

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants