Replies: 4 comments 9 replies
-
The auto ping mechanism should work .. I already prepared two clients : https://github.com/Tapanhaz/Shoonya-Websocket with manual ping management in python and https://github.com/Tapanhaz/Kite-Ticker with auto ping in cython . almost all broker websocket requires ping in idle time. |
Beta Was this translation helpful? Give feedback.
-
When it comes to picows, it relies fully on original transport/protocol behavior of asyncio. Something like this:
I'm thinking about adding a new parameter to ws_connect: auto_ping_strategy with 2 possible values PING_PERIODICALLY and PING_WHEN_IDLE.
picows auto_ping solves a) and c) by sending either websocket PING/PONG or user defined messages (which could be application level messages) To achieve a) and c) it doesn't really matter what is send and received and, imho it is overkill to send both websockets PING/PONG and user-level heartbeats. You can just choose one (application level preferrably), and it will work for all practical cases. I don't think any remote peer would ever have logic to check if you indeed send websocket PING or heartbeat every N seconds. They all just immediately reply and, maybe, log but nobody would set any timer to check that you're being regular with your pings or heartbeats.
What I really don't recommend to do is to use async when handling data. I would keep the whole data path non-async or at least do async at the very last step, only if really necessary. So, this whole asyncio.Queue() thing, I would definitely not do it! :) I would pass all market events to autotrader directly from on_ws_frame. After ws_connect has succeed it is only necessary to wait_disconnected().
I will clarify this in the docs. I personally just do ws_connect and wait_disconnected in the loop. The data is passed to a non-async callback directly from on_ws_frame. But again, for performance, I would strongly advise not to use async callbacks and queues in the data path.
|
Beta Was this translation helpful? Give feedback.
-
@tarasko @Tapanhaz Thanks for the reply.
async def _subscribe(
self, payload: Dict[str, Any], callback: Callable[..., Any], *args, **kwargs
):
WSClientFactory = lambda: WSClient("Binance") # noqa: E731
while True:
if not self._listener and self._transport:
transport, listener = await ws_connect(
WSClientFactory,
self._url,
enable_auto_ping=True,
auto_ping_idle_timeout=self._ping_idle_timeout,
auto_ping_reply_timeout=self._ping_reply_timeout,
)
try:
transport.send(WSMsgType.TEXT, json.dumps(payload).encode("utf-8"))
while True:
msg = await listener.msg_queue.get()
if asyncio.iscoroutinefunction(callback):
await callback(msg, *args, **kwargs)
else:
callback(msg, *args, **kwargs)
listener.msg_queue.task_done()
except WSError as e:
print(f"Connection error: {e}")
await asyncio.sleep(1)
while True:
msg = await listener.msg_queue.get()
if asyncio.iscoroutinefunction(callback):
await callback(msg, *args, **kwargs)
else:
callback(msg, *args, **kwargs)
listener.msg_queue.task_done() |
Beta Was this translation helpful? Give feedback.
-
I rewrote my code a bit, and I'm not sure if this is the best practice. Since I needed to use the producer-consumer pattern, I still went with async.Queue. I also have several questions:
async def _handle_connection(self):
reconnect = False
while True:
try:
await self._connect(reconnect)
# TODO: when reconnecting, need to resubscribe to the channels
await self._transport.wait_disconnected()
except WSError as e:
print(f"Connection error: {e}")
reconnect = True
await asyncio.sleep(1) Here’s my code: from picows import WSListener, WSTransport, WSFrame, WSMsgType, ws_connect, WSError
import asyncio
import json
import orjson
import time
from asynciolimiter import Limiter
class WSClient(WSListener):
def __init__(self, exchange_id: str = ""):
self._exchange_id = exchange_id
self.msg_queue = asyncio.Queue()
def on_ws_connected(self, transport: WSTransport):
print(f"Connected to {self._exchange_id} Websocket.")
def on_ws_disconnected(self, transport: WSTransport):
print(f"Disconnected from {self._exchange_id} Websocket.")
def on_ws_frame(self, transport: WSTransport, frame: WSFrame):
if frame.msg_type == WSMsgType.PING:
transport.send_pong(frame.get_payload_as_bytes())
return
try:
msg = orjson.loads(frame.get_payload_as_bytes())
self.msg_queue.put_nowait(msg)
except Exception as e:
print(frame.get_payload_as_bytes())
print(f"Error parsing message: {e}")
class BinanceWsManager:
def __init__(self, url: str):
self._url = url
self._ping_idle_timeout = 2
self._ping_reply_timeout = 1
self._listener = None
self._transport = None
self._tasks = []
self._limiter = Limiter(5/1) # 5 requests per second
async def _connect(self, reconnect: bool = False):
if not self._transport and not self._listener or reconnect:
WSClientFactory = lambda: WSClient("Binance") # noqa: E731
self._transport, self._listener = await ws_connect(
WSClientFactory,
self._url,
enable_auto_ping=True,
auto_ping_idle_timeout=self._ping_idle_timeout,
auto_ping_reply_timeout=self._ping_reply_timeout,
)
async def _handle_connection(self):
reconnect = False
while True:
try:
await self._connect(reconnect)
# TODO: when reconnecting, need to resubscribe to the channels
await self._transport.wait_disconnected()
except WSError as e:
print(f"Connection error: {e}")
reconnect = True
await asyncio.sleep(1)
async def subscribe_book_ticker(self, symbol):
await self._connect()
await self._limiter.wait()
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@bookTicker"],
"id": id,
}
self._transport.send(WSMsgType.TEXT, json.dumps(payload).encode("utf-8"))
async def subscribe_trade(self, symbol):
await self._connect()
await self._limiter.wait()
id = int(time.time() * 1000)
payload = {
"method": "SUBSCRIBE",
"params": [f"{symbol.lower()}@trade"],
"id": id,
}
self._transport.send(WSMsgType.TEXT, json.dumps(payload).encode("utf-8"))
async def _msg_handler(self):
while True:
msg = await self._listener.msg_queue.get()
# TODO: handle different event types of messages
print(msg)
self._listener.msg_queue.task_done()
async def start(self):
asyncio.create_task(self._msg_handler())
await self._handle_connection()
async def main():
try:
url = "wss://stream.binance.com:9443/ws"
ws_manager = BinanceWsManager(url)
await ws_manager.subscribe_book_ticker("BTCUSDT")
await ws_manager.subscribe_book_ticker("ETHUSDT")
await ws_manager.subscribe_book_ticker("SOLUSDT")
await ws_manager.subscribe_book_ticker("BNBUSDT")
await ws_manager.subscribe_trade("BTCUSDT")
await ws_manager.subscribe_trade("ETHUSDT")
await ws_manager.subscribe_trade("BNBUSDT")
await ws_manager.subscribe_trade("SOLUSDT")
await ws_manager.start()
except asyncio.CancelledError:
print("Websocket closed.")
if __name__ == "__main__":
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
-
Here is my code of connecting to the binance websocket exchange. First of all, picows is an awesome package, and it's faster than
websockets 13.0.1
. I have few questions on the code bellow:on_ws_connected
called when it's running, but when I end it withctrl+C
, onlyprint("Websocket closed.")
runs and I don't seeDisconnected from {self._exchange_id} Websocket.
?auto_ping_idle_timeout
What does this parameter mean? I looked at the documentation, and it says it's how long to wait before sending a ping request when there's no incoming data. So if there's always data coming in, it won't automatically ping, right?Based on my understanding: there are 2 different types of ping/pong. One ping/pong is part of the websocket protocol and tests whether the websocket connection is still open and whether the other participant is still responding. I am not sure this part is handled automactically if
enable_auto_ping=True,
is set toTrue
. And the other type is more like a keep alive on the security level and should be implemented separately! For example, some exchanges like bybit require the client to send heartbeat packet, they recommend to send theping
heartbeat packet every 20 seconds to maintain the connection, So if I setauto_ping_idle_timeout = 20
, will I send aping
every 20 seconds?I find
binance
websocket server would send ping to me, even if I have not enabled theenable_auto_ping
, Since every ws push runsif frame.msg_type == WSMsgType.PING:
once, is there any way to preventping
from enteringon_ws_frame
, thus I can remove the if statement.For subscribing to a large number of symbols, like 50-100, I can use two methods:
send subscribe
every time I subscribe to one.transport
for each symbol, running them concurrently through a coroutine.Since the data arrives in order, if I put all of them into an async queue, I think there might be data congestion. The second method means a new WSListener is created for each symbol, but I'm concerned if that’s going to use too many resources. I'm wondering which one is the best practice. Do you have any suggestions?
transport
disconnects, will it raise aWSError
? Could you please provide best practices for handling reconnections?ping
, do I need to handle it inon_ws_frame
like the code bellow:Here is my entire code:
Beta Was this translation helpful? Give feedback.
All reactions