Skip to content

Commit

Permalink
Extend interfaces, add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
taras committed Aug 15, 2024
1 parent 4517dbe commit 7fc17ca
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 54 deletions.
23 changes: 23 additions & 0 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ API reference

.. automodule:: picows

Functions
---------

.. autofunction:: ws_connect
.. autofunction:: ws_create_server

Classes
-------

.. autoclass:: WSFrame
:members:

Expand Down Expand Up @@ -48,6 +54,23 @@ API reference
:members:

.. autoclass:: WSTransport
:members:

.. py:method:: send_reuse_external_buffer(WSMsgType msg_type, char* message, size_t message_size)
:param msg_type: Message type
:param message: Pointer to a message payload
:param message_size: Size of the message payload

**Available only from Cython.**

Send a frame over websocket with a message as its payload.
Don't copy message, reuse its memory and append websocket header in front of the message

**Message's buffer should have at least 10 bytes in front of the message pointer available for writing.**

Enums
-----

.. autoenum:: WSMsgType
.. autoenum:: WSCloseCode
169 changes: 115 additions & 54 deletions picows/picows.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import os
import socket
import struct
import urllib.parse
from typing import cast, Tuple, Optional
from ssl import SSLContext
from typing import cast, Tuple, Optional, Callable

cimport cython

Expand Down Expand Up @@ -637,8 +638,9 @@ cdef class WSListener:

cpdef on_ws_connected(self, WSTransport transport):
"""
Called after websocket handshake is complete and websocket is ready to send and receive frames.\n
:param transport: :any:`WSTransport` object
Called after websocket handshake is complete and websocket is ready to send and receive frames.
"""
pass

Expand All @@ -659,8 +661,9 @@ cdef class WSListener:

cpdef on_ws_disconnected(self, WSTransport transport):
"""
Called when websocket has been disconnected.\
`transport`: :any:`WSTransport`\n
:param transport: :any:`WSTransport`
Called when websocket has been disconnected.
"""
pass

Expand All @@ -685,47 +688,43 @@ cdef class WSTransport:
self._frame_builder = WSFrameBuilder(is_client_side)

cdef send_reuse_external_buffer(self, WSMsgType msg_type, char* message, size_t message_size):
"""
Send a frame over the websocket with a message as its payload.
The message is a bytes-like object.
Don't copy message, reuse its memory and append websocket header in front of the message
Message's buffer should have at least 10 bytes in front of the message pointer available for writing.
This API is only available from Cython.
"""
frame = self._frame_builder.prepare_frame_in_external_buffer(msg_type, <uint8_t*>message, message_size)
self._transport.write(frame)

cpdef send(self, WSMsgType msg_type, message):
"""
Send a frame over the websocket with a message as its payload.
"""
:param msg_type: :any:`WSMsgType` enum value\n
:param message: an optional bytes-like object
`msg_type`: one of :any:`WSMsgType` enum values\n
`message`: an optional bytes-like object
Send a frame over websocket with a message as its payload.
"""
frame = self._frame_builder.prepare_frame(msg_type, message)
self._transport.write(frame)

cpdef send_ping(self, message=None):
"""
Send a PING control frame with an optional message.\n
`message`: an optional bytes-like object
:param message: an optional bytes-like object
Send a PING control frame with an optional message.
"""
self.send(WSMsgType.PING, message)

cpdef send_pong(self, message=None):
"""
Send a PONG control frame with an optional message.\n
`message`: an optional bytes-like object
:param message: an optional bytes-like object
Send a PONG control frame with an optional message.
"""
self.send(WSMsgType.PONG, message)

cpdef send_close(self, WSCloseCode close_code=WSCloseCode.NO_INFO, close_message=None):
"""
:param close_code: :any:`WSCloseCode` value
:param close_message: an optional bytes-like object
Send a CLOSE control frame with an optional message.
This method doesn't disconnect the underlying transport.
Does nothing if the underlying transport is already disconnected.\n
`close_code`: :any:`WSCloseCode` value\n
`close_message`: an optional bytes-like object
Does nothing if the underlying transport is already disconnected.
"""
if self._transport.is_closing():
return
Expand Down Expand Up @@ -773,7 +772,6 @@ cdef class WSTransport:
self._logger.log(PICOWS_DEBUG_LL, "Send upgrade response: %s", handshake_response)
self._transport.write(handshake_response)


cdef mark_disconnected(self):
if not self._disconnected_future.done():
self._disconnected_future.set_result(None)
Expand All @@ -784,27 +782,32 @@ cdef class WSProtocol:
bytes _host_port
bytes _ws_path
object _logger #: Logger
WSFrameParser _frame_parser
object _loop
object _handshake_timeout_handle
bint _log_debug_enabled
bint _is_client_side
bint _disconnect_on_exception
bint _log_debug_enabled

object _websocket_handshake_timeout
object _handshake_timeout_handle

object _loop
WSFrameParser _frame_parser

WSTransport transport
WSListener listener

def __init__(self, str host_port, str ws_path, bint is_client_side, ws_listener_factory, str logger_name,
bint disconnect_on_exception):
bint disconnect_on_exception, websocket_handshake_timeout):
self._host_port = host_port.encode()
self._ws_path = ws_path.encode() if ws_path else b"/"
self._logger = logging.getLogger(f"pico_ws.{logger_name}")
self._frame_parser = None
self._loop = asyncio.get_running_loop()
self._handshake_timeout_handle = None
self._log_debug_enabled = self._logger.isEnabledFor(PICOWS_DEBUG_LL)
self._is_client_side = is_client_side
self._disconnect_on_exception = disconnect_on_exception
self._log_debug_enabled = self._logger.isEnabledFor(PICOWS_DEBUG_LL)
self._websocket_handshake_timeout = websocket_handshake_timeout

self._handshake_timeout_handle = None
self._loop = asyncio.get_running_loop()
self._frame_parser = None

self.transport = None
self.listener = ws_listener_factory()
Expand Down Expand Up @@ -844,7 +847,8 @@ cdef class WSProtocol:
if self._is_client_side:
self.transport.send_http_handshake(self._ws_path, self._host_port, self._frame_parser.websocket_key_b64)
else:
self._handshake_timeout_handle = self._loop.call_later(2, self._handshake_timeout)
self._handshake_timeout_handle = self._loop.call_later(
self._websocket_handshake_timeout, self._handshake_timeout)

def connection_lost(self, exc):
self._logger.info("Disconnected")
Expand Down Expand Up @@ -956,74 +960,131 @@ cdef class WSProtocol:


async def ws_connect(str url: str,
ws_listener_factory,
ws_listener_factory: Callable[[], WSListener],
str logger_name: str,
ssl=None, bint disconnect_on_exception=True) -> Tuple[WSTransport, WSListener]:
ssl: Optional[SSLContext]=None,
bint disconnect_on_exception: bool=True,
ssl_handshake_timeout: int=5,
ssl_shutdown_timeout: int=5,
websocket_handshake_timeout: int=5,
local_addr: Optional[Tuple[str, int]]=None,
) -> Tuple[WSTransport, WSListener]:
"""
:param url:
:param url: Destination URL
:param ws_listener_factory:
A parameterless factory function that returns a user handler. User handler has to derive from :any:`WSListener`.
:param logger_name:
:param ss:
picows will use `picows.<logger_name>` logger to do all the logging.
:param ssl: optional SSLContext to override default one when wss scheme is used
:param disconnect_on_exception:
:return:
Indicates whether the client should initiate disconnect on any exception
thrown from WSListener.on_ws* callbacks
:param ssl_handshake_timeout:
is (for a TLS connection) the time in seconds to wait for the TLS handshake to complete before aborting the connection.
:param ssl_shutdown_timeout:
is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
:param websocket_handshake_timeout:
is the time in seconds to wait for the websocket server to reply to websocket handshake request
:param local_addr:
if given, is a (local_host, local_port) tuple used to bind the socket locally. The local_host and local_port
are looked up using getaddrinfo(), similarly to host and port from url.
:return: :any:`WSTransport` object and a user handler returned by `ws_listener_factory()'
Open a websocket connection to a given URL.
"""

url_parts = urllib.parse.urlparse(url, allow_fragments=False)

if url_parts.scheme == "wss":
ssl = ssl or True
port = url_parts.port or 443
ssl_handshake_timeout = 2
ssl_shutdown_timeout = 2
elif url_parts.scheme == "ws":
ssl_context = None
port = url_parts.port or 80
ssl_handshake_timeout = None
ssl_shutdown_timeout = None
else:
raise ValueError(f"invalid url scheme: {url}")

ws_protocol_factory = lambda: WSProtocol(url_parts.netloc, url_parts.path, True, ws_listener_factory, logger_name, disconnect_on_exception)
ws_protocol_factory = lambda: WSProtocol(url_parts.netloc, url_parts.path, True, ws_listener_factory,
logger_name, disconnect_on_exception, websocket_handshake_timeout)

cdef WSProtocol ws_protocol

(_, ws_protocol) = await asyncio.get_running_loop().create_connection(
ws_protocol_factory, url_parts.hostname, port, ssl=ssl,
ssl_handshake_timeout=ssl_handshake_timeout, ssl_shutdown_timeout=ssl_shutdown_timeout)
ws_protocol_factory, url_parts.hostname, port,
local_addr=local_addr,
ssl=ssl,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)

await ws_protocol.wait_until_handshake_complete()
ws_protocol.listener.on_ws_connected(ws_protocol.transport)

return ws_protocol.transport, ws_protocol.listener


async def ws_create_server(str url, ws_listener_factory, str logger_name, ssl_context=None,
disconnect_on_exception=True) -> asyncio.Server:
async def ws_create_server(str url,
ws_listener_factory,
str logger_name, ssl_context=None,
disconnect_on_exception=True,
ssl_handshake_timeout: int=5,
ssl_shutdown_timeout: int=5,
websocket_handshake_timeout: int=5,
reuse_port: bool=None,
start_serving: bool=False
) -> asyncio.Server:
"""
:param url:
Defines which interface and port to bind on and what scheme ('ws' or 'wss') to use.
Currently, the path part of the URL is completely ignored.
:param ws_listener_factory:
A parameterless factory function that returns a user handler for a newly accepted connection.
User handler has to derive from :any:`WSListener`.
:param logger_name:
picows will use `picows.<logger_name>` logger to do all the logging.
:param ssl: optional SSLContext to override default one when wss scheme is used
:param disconnect_on_exception:
Indicates whether the client should initiate disconnect on any exception
thrown from WSListener.on_ws* callbacks
:param ssl_handshake_timeout:
is (for a TLS connection) the time in seconds to wait for the TLS handshake to complete before aborting the connection.
:param ssl_shutdown_timeout:
is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
:param websocket_handshake_timeout:
is the time in seconds to wait for the websocket server to receive to websocket handshake request before aborting the connection.
:param reuse_port:
tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to,
so long as they all set this flag when being created. This option is not supported on Windows
:param start_serving:
causes the created server to start accepting connections immediately. When set to False,
the user should await on `Server.start_serving()` or `Server.serve_forever()` to make the server to start
accepting connections.
:return: asyncio.Server object
Create a TCP server listening on interface and port specified by `url`.
"""
url_parts = urllib.parse.urlparse(url, allow_fragments=False)

if url_parts.scheme == "wss":
ssl_context = ssl_context or True
port = url_parts.port or 443
ssl_handshake_timeout = 2
ssl_shutdown_timeout = 2
elif url_parts.scheme == "ws":
ssl_context = None
port = url_parts.port or 80
ssl_handshake_timeout = None
ssl_shutdown_timeout = None
else:
raise ValueError(f"invalid url scheme: {url}")

ws_protocol_factory = lambda: WSProtocol(url_parts.netloc, url_parts.path, False, ws_listener_factory, logger_name,
disconnect_on_exception)
disconnect_on_exception, websocket_handshake_timeout)

cdef WSProtocol ws_protocol

server = await asyncio.get_running_loop().create_server(
ws_protocol_factory,
host=url_parts.hostname, port=port,
ssl=ssl_context,
ssl_handshake_timeout=ssl_handshake_timeout, ssl_shutdown_timeout=ssl_shutdown_timeout,
start_serving=False)
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout,
reuse_port=reuse_port,
start_serving=start_serving)

return server

0 comments on commit 7fc17ca

Please sign in to comment.