Skip to content

Commit

Permalink
Merge pull request #8 from tarasko/feature/autoping
Browse files Browse the repository at this point in the history
Implement auto-ping feature
  • Loading branch information
tarasko authored Oct 6, 2024
2 parents 55a5de4 + 25ca108 commit 2e48c7d
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 87 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
html_static_path = ['_static']

html_theme_options = {
'page_width': '1200px', # Set this to your desired width
'sidebar_width': '400px', # Adjust the sidebar width as well
'page_width': '1300px', # Set this to your desired width
'sidebar_width': '450px', # Adjust the sidebar width as well
}
7 changes: 7 additions & 0 deletions picows/picows.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ cdef class WSTransport:
readonly bint is_client_side
readonly bint is_secure

bint auto_ping_expect_pong

object _logger #: Logger
bint _log_debug_enabled
object _disconnected_future #: asyncio.Future
Expand All @@ -95,6 +97,7 @@ cdef class WSTransport:
cpdef send_pong(self, message=*)
cpdef send_close(self, WSCloseCode close_code=*, close_message=*)
cpdef disconnect(self)
cpdef notify_user_specific_pong_received(self)

cdef inline _send_http_handshake(self, bytes ws_path, bytes host_port, bytes websocket_key_b64)
cdef inline _send_http_handshake_response(self, bytes accept_val)
Expand All @@ -109,6 +112,10 @@ cdef class WSListener:
cpdef on_ws_connected(self, WSTransport transport)
cpdef on_ws_frame(self, WSTransport transport, WSFrame frame)
cpdef on_ws_disconnected(self, WSTransport transport)

cpdef send_user_specific_ping(self, WSTransport transport)
cpdef is_user_specific_pong(self, WSFrame frame)

cpdef pause_writing(self)
cpdef resume_writing(self)

175 changes: 167 additions & 8 deletions picows/picows.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import struct
import urllib.parse
from ssl import SSLContext
from typing import cast, Tuple, Optional, Callable

from multidict import CIMultiDict

cimport cython

from cpython.bytes cimport PyBytes_GET_SIZE, PyBytes_AS_STRING, PyBytes_FromStringAndSize, PyBytes_CheckExact
from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE, PyByteArray_CheckExact
from cpython.memoryview cimport PyMemoryView_FromMemory
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
from cpython.buffer cimport PyBUF_WRITE, PyBUF_READ, PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
from cpython.unicode cimport PyUnicode_FromStringAndSize, PyUnicode_DecodeASCII
from libc cimport errno

from libc cimport errno
from libc.string cimport memmove, memcpy, strerror
from libc.stdlib cimport rand

Expand Down Expand Up @@ -49,7 +49,8 @@ cdef extern from "picows_compat.h" nogil:

cdef ssize_t PICOWS_SOCKET_ERROR
int picows_get_errno()
ssize_t send(int sockfd, const void* buf, size_t len, int flags);
double picows_get_monotonic_time()
ssize_t send(int sockfd, const void* buf, size_t len, int flags)


class WSError(RuntimeError):
Expand Down Expand Up @@ -160,7 +161,6 @@ cdef class WSFrame:
"""
Received websocket frame.
Internally WSFrame just points to a chunk of memory in the receiving buffer without copying or owning memory.\n
.. DANGER::
Only use WSFrame object during :any:`WSListener.on_ws_frame` callback. WSFrame objects are essentially just
pointers to the underlying receiving buffer. After :any:`WSListener.on_ws_frame` has completed the buffer
Expand Down Expand Up @@ -322,7 +322,7 @@ cdef class WSListener:
WSFrame is essentially just a pointer to a chunk of memory in the receiving buffer. It does not own
the memory. Do NOT cache or store WSFrame object for later processing because the data may be invalidated
after :any:`WSListener.on_ws_frame` is complete.
Process the payload immediatelly or just copy it with one of `WSFrame.get_*` methods.
Process the payload immediately or just copy it with one of `WSFrame.get_*` methods.
:param transport: :any:`WSTransport` object
:param frame: :any:`WSFrame` object
Expand All @@ -337,6 +337,42 @@ cdef class WSListener:
"""
pass

cpdef send_user_specific_ping(self, WSTransport transport):
"""
Called when the auto-ping logic wants to send a ping to a remote peer.
User can override this method to send something else instead of
the standard PING frame.
Default implementation:
.. code:: python
def send_user_specific_ping(self, transport: picows.WSTransport)
return transport.send_ping()
:param transport: :any:`WSTransport`
"""
transport.send_ping()

cpdef is_user_specific_pong(self, WSFrame frame):
"""
Called before :any:`WSListener.on_ws_frame` if auto ping is enabled and pong is expected.
User can override this method to indicate that the received frame is a
valid response to a previously sent user specific ping message.
The default implementation just do:
.. code:: python
def is_user_specific_pong(self, frame: picows.WSFrame)
return frame.msg_type == WSMsgType.PONG
:return: Returns True if the frame is a response to a previously send ping. In such case the frame will be *consumed* by the protocol, i.e :any:`WSListener.on_ws_frame` will not be called for this frame.
"""
return frame.msg_type == WSMsgType.PONG

cpdef pause_writing(self):
"""
Called when the underlying transport’s buffer goes over the high watermark.
Expand All @@ -355,6 +391,7 @@ cdef class WSTransport:
self.underlying_transport = underlying_transport
self.is_client_side = is_client_side
self.is_secure = underlying_transport.get_extra_info('ssl_object') is not None
self.auto_ping_expect_pong = False
self._logger = logger
self._log_debug_enabled = self._logger.isEnabledFor(PICOWS_DEBUG_LL)
self._disconnected_future = loop.create_future()
Expand Down Expand Up @@ -532,6 +569,24 @@ cdef class WSTransport:
"""
await asyncio.shield(self._disconnected_future)

cpdef notify_user_specific_pong_received(self):
"""
Notify the auto-ping loop that a user-specific pong message
has been received.
This method is useful when determining whether a frame contains a
user-specific pong is too expensive for is_user_specific_pong
(for example, it may require full JSON parsing).
In such cases, :any:`WSListener.is_user_specific_pong` should always
return `False`, and the logic in :any:`WSListener.on_ws_frame` should
call :any:`WSTransport.notify_user_specific_pong_received`.
It is safe to call this method even if auto-ping is disabled or
the auto-ping loop doesn’t expect pong messages.
In such cases, the method simply does nothing.
"""
self.auto_ping_expect_pong = False

cdef _send_http_handshake(self, bytes ws_path, bytes host_port, bytes websocket_key_b64):
initial_handshake = (b"GET %b HTTP/1.1\r\n"
b"Host: %b\r\n"
Expand Down Expand Up @@ -645,6 +700,12 @@ cdef class WSProtocol:
bytes _websocket_key_b64
size_t _max_frame_size

bint _enable_auto_ping
double _auto_ping_idle_timeout
double _auto_ping_reply_timeout
object _auto_ping_loop_task
double _last_data_time

# The following are the parts of an unfinished frame
# Once the frame is finished WSFrame is created and returned
WSParserState _state
Expand All @@ -662,7 +723,8 @@ cdef class WSProtocol:
uint8_t _f_payload_length_flag

def __init__(self, str host_port, str ws_path, bint is_client_side, ws_listener_factory, str logger_name,
bint disconnect_on_exception, websocket_handshake_timeout):
bint disconnect_on_exception, websocket_handshake_timeout,
enable_auto_ping, auto_ping_idle_timeout, auto_ping_reply_timeout):
self.transport = None
self.listener = None

Expand All @@ -684,6 +746,16 @@ cdef class WSProtocol:
self._websocket_key_b64 = b64encode(os.urandom(16))
self._max_frame_size = 1024 * 1024

self._enable_auto_ping = enable_auto_ping
self._auto_ping_idle_timeout = auto_ping_idle_timeout
self._auto_ping_reply_timeout = auto_ping_reply_timeout
self._auto_ping_loop_task = None
self._last_data_time = 0

if self._enable_auto_ping:
assert self._auto_ping_reply_timeout <= self._auto_ping_idle_timeout, \
"auto_ping_reply_timeout can't be bigger than auto_ping_idle_timeout"

self._state = WSParserState.WAIT_UPGRADE_RESPONSE
self._buffer = MemoryBuffer()
self._f_new_data_start_pos = 0
Expand Down Expand Up @@ -749,6 +821,9 @@ cdef class WSProtocol:
if self._handshake_timeout_handle is not None:
self._handshake_timeout_handle.cancel()

if self._auto_ping_loop_task is not None and not self._auto_ping_loop_task.done():
self._auto_ping_loop_task.cancel()

self.transport._mark_disconnected()

def eof_received(self) -> bool:
Expand Down Expand Up @@ -827,6 +902,8 @@ cdef class WSProtocol:
if not self._negotiate():
return

self._last_data_time = picows_get_monotonic_time()

cdef WSFrame frame = self._get_next_frame()
if frame is None:
return
Expand Down Expand Up @@ -894,8 +971,51 @@ cdef class WSProtocol:
self._handshake_timeout_handle = None
self._handshake_complete_future.set_result(None)
self._invoke_on_ws_connected()
self._last_data_time = picows_get_monotonic_time()
if self._enable_auto_ping:
self._auto_ping_loop_task = self._loop.create_task(self._auto_ping_loop())
return True

async def _auto_ping_loop(self):
cdef double now
cdef double prev_last_data_time
cdef double idle_delay
cdef object sleep = asyncio.sleep
try:
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Auto-ping loop started with idle_timeout=%s, reply_timeout=%s",
self._auto_ping_idle_timeout, self._auto_ping_reply_timeout)

while True:
now = picows_get_monotonic_time()
idle_delay = self._last_data_time + self._auto_ping_idle_timeout - now
prev_last_data_time = self._last_data_time
await sleep(idle_delay)

if self._last_data_time > prev_last_data_time:
continue

self.listener.send_user_specific_ping(self.transport)

self.transport.auto_ping_expect_pong = True
await sleep(self._auto_ping_reply_timeout)
if self.transport.auto_ping_expect_pong:
# Pong hasn't arrived withing specified interval
self.transport.send_close(WSCloseCode.GOING_AWAY, f"peer has not replied to ping/heartbeat request within {self._auto_ping_reply_timeout} second(s)".encode())
# Give a chance for the transport to send close message
# But don't wait for any tcp confirmation, use abort()
# because normal disconnect may hang until OS TCP/IP timeout
# for ACK is fired.
await sleep(0.01)
self.transport.underlying_transport.abort()
except asyncio.CancelledError:
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Auto-ping loop cancelled")
except:
self._logger.exception("Auto-ping loop failed, disconnect websocket")
self.transport.send_close(WSCloseCode.INTERNAL_ERROR, b"an exception occurred in auto-ping loop")
self.transport.disconnect()

cdef inline tuple _try_read_upgrade_request(self):
cdef bytes data = PyBytes_FromStringAndSize(self._buffer.data, self._f_new_data_start_pos)
cdef list request = <list>data.split(b"\r\n\r\n", 1)
Expand Down Expand Up @@ -1153,6 +1273,13 @@ cdef class WSProtocol:

cdef inline _invoke_on_ws_frame(self, WSFrame frame):
try:
if self._enable_auto_ping and self.transport.auto_ping_expect_pong:
if self.listener.is_user_specific_pong(frame):
self.transport.auto_ping_expect_pong = False
if self._log_debug_enabled:
self._logger.log(PICOWS_DEBUG_LL, "Received pong for the previously sent ping, reset expect_pong flag")
return

self.listener.on_ws_frame(self.transport, frame)
except Exception as e:
if self._disconnect_on_exception:
Expand Down Expand Up @@ -1193,6 +1320,9 @@ async def ws_connect(ws_listener_factory: Callable[[], WSListener],
bint disconnect_on_exception: bool=True,
websocket_handshake_timeout=5,
logger_name: str="client",
enable_auto_ping: bool = False,
auto_ping_idle_timeout: float = 10,
auto_ping_reply_timeout: float = 10,
**kwargs
) -> Tuple[WSTransport, WSListener]:
"""
Expand All @@ -1212,6 +1342,18 @@ async def ws_connect(ws_listener_factory: Callable[[], WSListener],
is the time in seconds to wait for the websocket client to receive websocket handshake response before aborting the connection.
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:param enable_auto_ping:
Enable detection of a stale connection by periodically pinging remote peer.
.. note::
This does NOT enable automatic replies to incoming `ping` requests.
Library user is always supposed to explicitly implement replies
to incoming `ping` requests in `WSListener.on_ws_frame`
:param auto_ping_idle_timeout:
how long to wait before sending `ping` request when there is no
incoming data.
:param auto_ping_reply_timeout:
how long to wait for a `pong` reply before shutting down connection.
:return: :any:`WSTransport` object and a user handler returned by `ws_listener_factory()`
"""

Expand All @@ -1234,7 +1376,8 @@ async def ws_connect(ws_listener_factory: Callable[[], WSListener],
if url_parts.query:
path_plus_query += "?" + url_parts.query
ws_protocol_factory = lambda: WSProtocol(url_parts.netloc, path_plus_query, True, ws_listener_factory,
logger_name, disconnect_on_exception, websocket_handshake_timeout)
logger_name, disconnect_on_exception, websocket_handshake_timeout,
enable_auto_ping, auto_ping_idle_timeout, auto_ping_reply_timeout)

cdef WSProtocol ws_protocol

Expand All @@ -1253,6 +1396,9 @@ async def ws_create_server(ws_listener_factory: Callable[[WSUpgradeRequest], Opt
bint disconnect_on_exception: bool=True,
websocket_handshake_timeout=5,
str logger_name: str="server",
enable_auto_ping: bool = False,
auto_ping_idle_timeout: float = 20,
auto_ping_reply_timeout: float = 20,
**kwargs
) -> asyncio.Server:
"""
Expand Down Expand Up @@ -1291,10 +1437,23 @@ async def ws_create_server(ws_listener_factory: Callable[[WSUpgradeRequest], Opt
is the time in seconds to wait for the websocket server to receive websocket handshake request before aborting the connection.
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:param enable_auto_ping:
Enable detection of a stale connection by periodically pinging remote peer.
.. note::
This does NOT enable automatic replies to incoming `ping` requests.
Library user is always supposed to explicitly implement replies
to incoming `ping` requests in `WSListener.on_ws_frame`
:param auto_ping_idle_timeout:
how long to wait before sending `ping` request when there is no
incoming data.
:param auto_ping_reply_timeout:
how long to wait for a `pong` reply before shutting down connection.
:return: `asyncio.Server <https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server>`_ object
"""
ws_protocol_factory = lambda: WSProtocol(None, None, False, ws_listener_factory, logger_name,
disconnect_on_exception, websocket_handshake_timeout)
disconnect_on_exception, websocket_handshake_timeout,
enable_auto_ping, auto_ping_idle_timeout, auto_ping_reply_timeout)

return await asyncio.get_running_loop().create_server(
ws_protocol_factory,
Expand Down
Loading

0 comments on commit 2e48c7d

Please sign in to comment.