Skip to content

Commit

Permalink
Merge pull request #11 from tarasko/feature/measure_ping_pong_latency
Browse files Browse the repository at this point in the history
Added measure_roundtrip_time utility function
  • Loading branch information
tarasko authored Oct 10, 2024
2 parents 3fc400a + 1ad315d commit ba3b381
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 10 deletions.
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Features
* Provide Cython .pxd for efficient integration of user Cythonized code with picows
* Ability to check if a frame is the last one in the receiving buffer
* Auto ping-pong with an option to customize ping/pong messages.
* Convenient method to measure websocket roundtrip trip time using ping/pong messages.

Contributing / Building From Source
===================================
Expand Down Expand Up @@ -183,8 +184,8 @@ Contributing / Building From Source
$ python setup.py build_ext --inplace
$ pytest -s -v

# Run specific test
$ pytest -s -v -k test_client_handshake_timeout[uvloop-plain]
# Run specific test with picows debug logs enabled
$ pytest -s -v -k test_client_handshake_timeout[uvloop-plain] --log-cli-level 9

5. Run benchmark::

Expand Down
10 changes: 10 additions & 0 deletions docs/source/guides.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ it does not handle replying to incoming ``PING`` frames.
...
Measuring/checking round-trip time
----------------------------------
**picows** allows to conveniently measure round-trip time to a remote peer using
:any:`measure_roundtrip_time`. This is done by sending PING request multiple
times and measuring response delay.

Checkout an `example <https://raw.githubusercontent.com/tarasko/picows/master/examples/okx_roundtrip_time.py>`_
of how to measure RTT to a popular OKX crypto-currency exchange and initiate
reconnect if it doesn't satisfy a predefined threshold.

Message fragmentation
---------------------
In the WebSocket protocol, there is a distinction between messages and frames.
Expand Down
38 changes: 38 additions & 0 deletions examples/okx_roundtrip_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import logging

import picows
from picows import ws_connect, WSFrame, WSTransport, WSListener, WSMsgType, WSCloseCode

EXPECTED_OKX_ROUNDTRIP_TIME = 0.1

class ClientListener(WSListener):
async def check_okx_roundtrip_time(self, transport: picows.WSTransport):
rtts = await transport.measure_roundtrip_time(5)
if min(rtts) < EXPECTED_OKX_ROUNDTRIP_TIME:
print(f"Minimal rtt {min(rtts):.3f} satisfies required {EXPECTED_OKX_ROUNDTRIP_TIME:.3f}")
else:
print(f"Minimal rtt {min(rtts):.3f} DOES NOT satisfies required {EXPECTED_OKX_ROUNDTRIP_TIME:.3f}, disconnect",
min(rtts), EXPECTED_OKX_ROUNDTRIP_TIME)
transport.disconnect()

def send_user_specific_ping(self, transport: picows.WSTransport):
transport.send(picows.WSMsgType.TEXT, b"ping")

def is_user_specific_pong(self, frame: picows.WSFrame):
return frame.msg_type == picows.WSMsgType.TEXT and frame.get_payload_as_memoryview() == b"pong"

def on_ws_connected(self, transport: WSTransport):
asyncio.get_running_loop().create_task(self.check_okx_roundtrip_time(transport))


async def main(url):
while True:
(transport, client) = await ws_connect(ClientListener, url)
await transport.wait_disconnected()
await asyncio.sleep(5)


if __name__ == '__main__':
logging.basicConfig(level=9)
asyncio.run(main("wss://ws.okx.com:8443/ws/v5/public"))
4 changes: 4 additions & 0 deletions picows/picows.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ cdef class WSTransport:
readonly bint is_secure

bint auto_ping_expect_pong
object pong_received_at_future
object listener_proxy

object _logger #: Logger
bint _log_debug_enabled
Expand All @@ -109,6 +111,8 @@ cdef class WSTransport:


cdef class WSListener:
cdef object __weakref__

cpdef on_ws_connected(self, WSTransport transport)
cpdef on_ws_frame(self, WSTransport transport, WSFrame frame)
cpdef on_ws_disconnected(self, WSTransport transport)
Expand Down
77 changes: 70 additions & 7 deletions picows/picows.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import weakref
from base64 import b64encode, b64decode
import binascii
from hashlib import sha1
Expand All @@ -8,7 +9,7 @@ import socket
import struct
import urllib.parse
from ssl import SSLContext
from typing import cast, Tuple, Optional, Callable
from typing import cast, Tuple, Optional, Callable, List

from multidict import CIMultiDict

Expand Down Expand Up @@ -393,6 +394,8 @@ cdef class WSTransport:
self.is_client_side = is_client_side
self.is_secure = underlying_transport.get_extra_info('ssl_object') is not None
self.auto_ping_expect_pong = False
self.pong_received_at_future = None
self.listener_proxy = None
self._logger = logger
self._log_debug_enabled = self._logger.isEnabledFor(PICOWS_DEBUG_LL)
self._disconnected_future = loop.create_future()
Expand Down Expand Up @@ -571,6 +574,36 @@ cdef class WSTransport:
"""
await asyncio.shield(self._disconnected_future)

async def measure_roundtrip_time(self, int rounds) -> List[float]:
"""
Coroutine that measures roundtrip time by running ping-pong.
:param rounds: how many ping-pong rounds to do
:return: list of measured roundtrip times
"""

cdef double ping_at
cdef double pong_at
cdef int i
cdef list results = []
cdef object shield = asyncio.shield
cdef object create_future = asyncio.get_running_loop().create_future

# If auto-ping is enabled and currently waiting for pong then
# wait until we receive it and only then proceed with our own pings
if self.auto_ping_expect_pong:
self.pong_received_at_future = create_future()
await shield(self.pong_received_at_future)

for i in range(rounds):
self.listener_proxy.send_user_specific_ping(self)
self.pong_received_at_future = create_future()
ping_at = picows_get_monotonic_time()
pong_at = await shield(self.pong_received_at_future)
results.append(pong_at - ping_at)

return results

cpdef notify_user_specific_pong_received(self):
"""
Notify the auto-ping loop that a user-specific pong message
Expand All @@ -588,9 +621,19 @@ cdef class WSTransport:
In such cases, the method simply does nothing.
"""
self.auto_ping_expect_pong = False
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL,
"Reset expect_pong flag because notify_user_specific_pong_received() called")

if self.pong_received_at_future is not None:
self.pong_received_at_future.set_result(picows_get_monotonic_time())
self.pong_received_at_future = None

if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL,
"notify_user_specific_pong_received() for PONG(measure_roundtrip_time), reset expect_pong")
else:
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL,
"notify_user_specific_pong_received() for PONG(idle timeout), reset expect_pong")


cdef _send_http_handshake(self, bytes ws_path, bytes host_port, bytes websocket_key_b64):
initial_handshake = (b"GET %b HTTP/1.1\r\n"
Expand Down Expand Up @@ -829,6 +872,10 @@ cdef class WSProtocol:
if self._auto_ping_loop_task is not None and not self._auto_ping_loop_task.done():
self._auto_ping_loop_task.cancel()

if self.transport.pong_received_at_future is not None:
self.transport.pong_received_at_future.set_exception(ConnectionResetError())
self.transport.pong_received_at_future = None

self.transport._mark_disconnected()

def eof_received(self) -> bool:
Expand Down Expand Up @@ -939,6 +986,7 @@ cdef class WSProtocol:
# Upgrade response hasn't fully arrived yet
return False
self.listener = self._listener_factory()
self.transport.listener_proxy = weakref.proxy(self.listener)
self._listener_factory = None
except Exception as ex:
self.transport.disconnect()
Expand All @@ -960,6 +1008,8 @@ cdef class WSProtocol:
self._listener_factory = None
try:
self.listener = listener_factory(upgrade_request)
if self.listener is not None:
self.transport.listener_proxy = weakref.proxy(self.listener)
except Exception as ex:
self.transport._send_internal_server_error(str(ex))
self.transport.disconnect()
Expand Down Expand Up @@ -1003,6 +1053,12 @@ cdef class WSProtocol:
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Send PING because no new data over the last %s seconds", self._auto_ping_idle_timeout)

if self.transport.pong_received_at_future is not None:
# measure_roundtrip_time is currently doing it's own ping-pongs
# set _last_data_time to now and sleep
self._last_data_time = picows_get_monotonic_time()
continue

self.listener.send_user_specific_ping(self.transport)

self.transport.auto_ping_expect_pong = True
Expand Down Expand Up @@ -1285,11 +1341,18 @@ cdef class WSProtocol:

cdef inline _invoke_on_ws_frame(self, WSFrame frame):
try:
if self._enable_auto_ping and self.transport.auto_ping_expect_pong:
if self._enable_auto_ping and self.transport.auto_ping_expect_pong or self.transport.pong_received_at_future is not None:
if self.listener.is_user_specific_pong(frame):
self.transport.auto_ping_expect_pong = False
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING, reset expect_pong flag")
if self.transport.pong_received_at_future is not None:
self.transport.pong_received_at_future.set_result(picows_get_monotonic_time())
self.transport.pong_received_at_future = None
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(measure_roundtrip_time), reset expect_pong flag")
else:
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Received PONG for the previously sent PING(idle timeout), reset expect_pong flag")

return

self.listener.on_ws_frame(self.transport, frame)
Expand Down
74 changes: 74 additions & 0 deletions tests/test_autoping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from idlelib.pyparse import trans

import async_timeout
import pytest
from aiohttp import WSMsgType

import picows
from picows import WSFrame
from tests.utils import ServerAsyncContext, TIMEOUT, TextFrame, CloseFrame, \
BinaryFrame, materialize_frame

Expand Down Expand Up @@ -251,3 +253,75 @@ def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame):
assert listener.frames[0].msg_type == picows.WSMsgType.PING
assert listener.frames[1].msg_type == picows.WSMsgType.CLOSE
assert listener.frames[1].close_code == picows.WSCloseCode.INTERNAL_ERROR


@pytest.mark.parametrize("use_notify", [False, True], ids=["dont_use_notify", "use_notify"])
@pytest.mark.parametrize("with_auto_ping", [False, True], ids=["no_auto_ping", "with_auto_ping"])
async def test_roundtrip_time(use_notify, with_auto_ping):
class ServerClientListener(picows.WSListener):
def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame):
if frame.msg_type == picows.WSMsgType.PING:
transport.send_pong(frame.get_payload_as_bytes())

server = await picows.ws_create_server(lambda _: ServerClientListener(),
"127.0.0.1", 0)

class ClientListenerUseNotify(picows.WSListener):
def is_user_specific_pong(self, frame):
return False

def on_ws_frame(self, transport, frame):
if frame.msg_type == picows.WSMsgType.PONG:
transport.notify_user_specific_pong_received()

async with ServerAsyncContext(server):
url = f"ws://127.0.0.1:{server.sockets[0].getsockname()[1]}"
listener_factory = ClientListenerUseNotify if use_notify else picows.WSListener
(transport, listener) = await picows.ws_connect(listener_factory, url,
enable_auto_ping=with_auto_ping,
auto_ping_idle_timeout=0.5,
auto_ping_reply_timeout=0.5)
async with async_timeout.timeout(2):
results = await transport.measure_roundtrip_time(5)
assert len(results) == 5
for l in results:
assert l > 0 and l < 1.0

await asyncio.sleep(0.7)

async with async_timeout.timeout(2):
results = await transport.measure_roundtrip_time(5)
assert len(results) == 5
for l in results:
assert l > 0 and l < 1.0
transport.disconnect()

await transport.wait_disconnected()


@pytest.mark.parametrize("with_auto_ping", [False, True], ids=["no_auto_ping", "with_auto_ping"])
async def test_roundtrip_latency_disconnect(with_auto_ping):
class ServerClientListener(picows.WSListener):
def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame):
if frame.msg_type == picows.WSMsgType.PING:
transport.send_pong(frame.get_payload_as_bytes())

server = await picows.ws_create_server(lambda _: ServerClientListener(),
"127.0.0.1", 0)

class ClientListener(picows.WSListener):
def send_user_specific_ping(self, transport):
transport.send_ping()
# Disconnect immediately to test that the client will not hang up
# waiting indefinitely for PONG
transport.disconnect()

async with ServerAsyncContext(server):
url = f"ws://127.0.0.1:{server.sockets[0].getsockname()[1]}"
(transport, listener) = await picows.ws_connect(ClientListener, url,
enable_auto_ping=with_auto_ping,
auto_ping_idle_timeout=0.5,
auto_ping_reply_timeout=0.5)
async with async_timeout.timeout(TIMEOUT):
with pytest.raises(ConnectionResetError):
await transport.measure_roundtrip_time(5)
2 changes: 1 addition & 1 deletion tests/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def on_ws_connected(self, transport: picows.WSTransport):
await transport.wait_disconnected()


@pytest.mark.parametrize("disconnect_on_exception", [True, False])
@pytest.mark.parametrize("disconnect_on_exception", [True, False], ids=["disconnect_on_exception", "no_disconnect_on_exception"])
async def test_ws_on_frame_throw(disconnect_on_exception):
class ServerClientListener(picows.WSListener):
def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame):
Expand Down

0 comments on commit ba3b381

Please sign in to comment.