diff --git a/awscrt/mqtt.py b/awscrt/mqtt.py index 6990aa34f..8d92acc38 100644 --- a/awscrt/mqtt.py +++ b/awscrt/mqtt.py @@ -15,6 +15,7 @@ from awscrt.http import HttpProxyOptions, HttpRequest from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions from dataclasses import dataclass +from awscrt.mqtt5 import Client as Mqtt5Client class QoS(IntEnum): @@ -48,6 +49,13 @@ class QoS(IntEnum): does not support QoS 2 at time of writing (May 2020). """ + def to_mqtt5(self): + from awscrt.mqtt5 import QoS as Mqtt5QoS + """Convert a Mqtt3 QoS to Mqtt5 QoS + + """ + return Mqtt5QoS(self.value) + def _try_qos(qos_value): """Return None if the value cannot be converted to Qos (ex: 0x80 subscribe failure)""" @@ -350,7 +358,7 @@ def __init__(self, on_connection_closed=None ): - assert isinstance(client, Client) + assert isinstance(client, Client) or isinstance(client, Mqtt5Client) assert callable(on_connection_interrupted) or on_connection_interrupted is None assert callable(on_connection_resumed) or on_connection_resumed is None assert isinstance(will, Will) or will is None @@ -376,6 +384,7 @@ def __init__(self, # init-only self.client = client + self._client_version = 5 if isinstance(client, Mqtt5Client) else 3 self._on_connection_interrupted_cb = on_connection_interrupted self._on_connection_resumed_cb = on_connection_resumed self._use_websockets = use_websockets @@ -404,6 +413,7 @@ def __init__(self, self, client, use_websockets, + self._client_version ) def _check_uses_old_message_callback_signature(self, callback): @@ -646,6 +656,9 @@ def suback(packet_id, topic, qos, error_code): try: assert callable(callback) or callback is None + from awscrt.mqtt5 import QoS as Mqtt5QoS + if (isinstance(qos, Mqtt5QoS)): + qos = qos.to_mqtt3() assert isinstance(qos, QoS) packet_id = _awscrt.mqtt_client_connection_subscribe( self._binding, topic, qos.value, callback_wrapper, suback) @@ -810,6 +823,10 @@ def puback(packet_id, error_code): future.set_result(dict(packet_id=packet_id)) try: + from awscrt.mqtt5 import QoS as Mqtt5QoS + if (isinstance(qos, Mqtt5QoS)): + qos = qos.to_mqtt3() + assert isinstance(qos, QoS) packet_id = _awscrt.mqtt_client_connection_publish(self._binding, topic, payload, qos.value, retain, puback) except Exception as e: future.set_exception(e) diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 1de9d2ef7..856ff07bf 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -41,6 +41,13 @@ class QoS(IntEnum): Note that this client does not currently support QoS 2 as of (August 2022) """ + def to_mqtt3(self): + from awscrt.mqtt import QoS as Mqtt3QoS + """Convert a Mqtt5 QoS to Mqtt3 + + """ + return Mqtt3QoS(self.value) + def _try_qos(value): try: @@ -1630,6 +1637,48 @@ def _on_lifecycle_disconnection( exception=exceptions.from_code(error_code))) +@dataclass +class _Mqtt5to3AdapterOptions: + """This internal class stores the options that required for creating a new Mqtt3 connection from the mqtt5 client + Args: + host_name (str): Host name of the MQTT server to connect to. + port (int): Network port of the MQTT server to connect to. + client_id (str): A unique string identifying the client to the server. Used to restore session state between connections. If left empty, the broker will auto-assign a unique client id. When reconnecting, the mqtt5 client will always use the auto-assigned client id. + socket_options (SocketOptions): The socket properties of the underlying MQTT connections made by the client or None if defaults are used. + min_reconnect_delay_ms (int): The minimum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure. + max_reconnect_delay_ms (int): The maximum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure. + ping_timeout_ms (int): The time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will close the current connection. + keep_alive_secs (int): The keep alive value, in seconds, A PING will automatically be sent at this interval. + ack_timeout_secs (int): The time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before failing the operation. + clean_session (bool): Whether or not to start a clean session with each reconnect. + + The default values are referred from awscrt.mqtt.Connection + """ + + def __init__( + self, + host_name: str, + port: int, + client_id: str, + socket_options: SocketOptions, + min_reconnect_delay_ms: int, + max_reconnect_delay_ms: int, + ping_timeout_ms: int, + keep_alive_secs: int, + ack_timeout_secs: int, + clean_session: int): + self.host_name = host_name + self.port = port + self.client_id = "" if client_id is None else client_id + self.socket_options = socket_options + self.min_reconnect_delay_ms = 5 if min_reconnect_delay_ms is None else min_reconnect_delay_ms + self.max_reconnect_delay_ms: int = 60 if max_reconnect_delay_ms is None else max_reconnect_delay_ms + self.ping_timeout_ms: int = 3000 if ping_timeout_ms is None else ping_timeout_ms + self.keep_alive_secs: int = 1200 if keep_alive_secs is None else keep_alive_secs + self.ack_timeout_secs: int = 0 if ack_timeout_secs is None else ack_timeout_secs + self.clean_session: bool = True if clean_session is None else clean_session + + class Client(NativeResource): """This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities via the AWS Common Runtime @@ -1667,7 +1716,7 @@ def __init__(self, client_options: ClientOptions): will = connect_options.will websocket_is_none = client_options.websocket_handshake_transform is None - + self.tls_ctx = client_options.tls_ctx self._binding = _awscrt.mqtt5_client_new(self, client_options.host_name, client_options.port, @@ -1710,6 +1759,20 @@ def __init__(self, client_options: ClientOptions): websocket_is_none, core) + # Store the options for adapter + self.adapter_options = _Mqtt5to3AdapterOptions( + host_name=client_options.host_name, + port=client_options.port, + client_id=connect_options.client_id, + socket_options=socket_options, + min_reconnect_delay_ms=client_options.min_reconnect_delay_ms, + max_reconnect_delay_ms=client_options.max_reconnect_delay_ms, + ping_timeout_ms=client_options.ping_timeout_ms, + keep_alive_secs=connect_options.keep_alive_interval_sec, + ack_timeout_secs=client_options.ack_timeout_sec, + clean_session=( + client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True)) + def start(self): """Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint. The client will attempt to stay connected using the properties of the reconnect-related parameters in the mqtt5 client configuration. @@ -1853,3 +1916,92 @@ def get_stats(self): result = _awscrt.mqtt5_client_get_stats(self._binding) return OperationStatisticsData(result[0], result[1], result[2], result[3]) + + def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None, + on_connection_success=None, on_connection_failure=None, on_connection_closed=None): + from awscrt.mqtt import Connection + """ Returns a new Mqtt3 Connection Object wraps the Mqtt5 client. + + Args: + on_connection_interrupted: Optional callback invoked whenever the MQTT connection is lost. + The MQTT client will automatically attempt to reconnect. + The function should take the following arguments return nothing: + + * `connection` (:class:`Connection`): This MQTT Connection. + + * `error` (:class:`awscrt.exceptions.AwsCrtError`): Exception which caused connection loss. + + * `**kwargs` (dict): Forward-compatibility kwargs. + + on_connection_resumed: Optional callback invoked whenever the MQTT connection + is automatically resumed. Function should take the following arguments and return nothing: + + * `connection` (:class:`Connection`): This MQTT Connection + + * `return_code` (:class:`ConnectReturnCode`): Connect return + code received from the server. + + * `session_present` (bool): True if resuming existing session. False if new session. + Note that the server has forgotten all previous subscriptions if this is False. + Subscriptions can be re-established via resubscribe_existing_topics(). + + * `**kwargs` (dict): Forward-compatibility kwargs. + + on_connection_success: Optional callback invoked whenever the connection successfully connects. + This callback is invoked for every successful connect and every successful reconnect. + + Function should take the following arguments and return nothing: + + * `connection` (:class:`Connection`): This MQTT Connection + + * `callback_data` (:class:`OnConnectionSuccessData`): The data returned from the connection success. + + on_connection_failure: Optional callback invoked whenever the connection fails to connect. + This callback is invoked for every failed connect and every failed reconnect. + + Function should take the following arguments and return nothing: + + * `connection` (:class:`Connection`): This MQTT Connection + + * `callback_data` (:class:`OnConnectionFailureData`): The data returned from the connection failure. + + on_connection_closed: Optional callback invoked whenever the connection has been disconnected and shutdown successfully. + Function should take the following arguments and return nothing: + + * `connection` (:class:`Connection`): This MQTT Connection + + * `callback_data` (:class:`OnConnectionClosedData`): The data returned from the connection close. + + + Returns: + The (:class:`Connection`) wrapper for the mqtt5 client + """ + return Connection( + self, + self.adapter_options.host_name, + self.adapter_options.port, + self.adapter_options.client_id, + clean_session=self.adapter_options.clean_session, + on_connection_interrupted=on_connection_interrupted, + on_connection_resumed=on_connection_resumed, + on_connection_success=on_connection_success, + on_connection_failure=on_connection_failure, + on_connection_closed=on_connection_closed, + reconnect_min_timeout_secs=self.adapter_options.min_reconnect_delay_ms, + reconnect_max_timeout_secs=self.adapter_options.max_reconnect_delay_ms, + keep_alive_secs=self.adapter_options.keep_alive_secs, + ping_timeout_ms=self.adapter_options.ping_timeout_ms, + protocol_operation_timeout_ms=self.adapter_options.ack_timeout_secs * 1000, + socket_options=self.adapter_options.socket_options, + + # For the arugments below, set it to `None` will directly use the options from mqtt5 client underlying. + will=None, + username=None, + password=None, + # Similar to previous options, set it False will use mqtt5 setup for + # websockets. It is not necessary means the websocket is disabled. + use_websockets=False, + websocket_proxy_options=None, + websocket_handshake_transform=None, + proxy_options=None + ) diff --git a/source/mqtt_client_connection.c b/source/mqtt_client_connection.c index 5cf5e6f96..1fbff0dac 100644 --- a/source/mqtt_client_connection.c +++ b/source/mqtt_client_connection.c @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ #include "mqtt_client_connection.h" +#include "mqtt5_client.h" #include "http.h" #include "io.h" @@ -274,11 +275,21 @@ PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args) { PyObject *self_py; PyObject *client_py; PyObject *use_websocket_py; - if (!PyArg_ParseTuple(args, "OOO", &self_py, &client_py, &use_websocket_py)) { + unsigned char client_version; + if (!PyArg_ParseTuple(args, "OOOb", &self_py, &client_py, &use_websocket_py, &client_version)) { + return NULL; + } + + void *client = NULL; + if (client_version == 3) { + client = aws_py_get_mqtt_client(client_py); + } else if (client_version == 5) { + client = aws_py_get_mqtt5_client(client_py); + } else { + PyErr_SetString(PyExc_TypeError, "Mqtt Client version not supported. Failed to create connection."); return NULL; } - struct aws_mqtt_client *client = aws_py_get_mqtt_client(client_py); if (!client) { return NULL; } @@ -292,7 +303,11 @@ PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args) { /* From hereon, we need to clean up if errors occur */ - py_connection->native = aws_mqtt_client_connection_new(client); + if (client_version == 3) { + py_connection->native = aws_mqtt_client_connection_new(client); + } else if (client_version == 5) { + py_connection->native = aws_mqtt_client_connection_new_from_mqtt5_client(client); + } if (!py_connection->native) { PyErr_SetAwsLastError(); goto connection_new_failed; diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index d225534cc..6607c31b9 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1549,6 +1549,117 @@ def test_operation_statistics_uc1(self): client.stop() callbacks.future_stopped.result(TIMEOUT) + # ============================================================== + # 5to3 ADAPTER TEST CASES + # ============================================================== + def test_5to3Adapter_connection_creation_minimum(self): + client5 = self._create_client() + connection = client5.new_connection() + + def test_5to3Adapter_connection_creation_maximum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + + user_properties = [] + user_properties.append(mqtt5.UserProperty(name="name1", value="value1")) + user_properties.append(mqtt5.UserProperty(name="name2", value="value2")) + + publish_packet = mqtt5.PublishPacket( + payload="TEST_PAYLOAD", + qos=mqtt5.QoS.AT_LEAST_ONCE, + retain=False, + topic="TEST_TOPIC", + payload_format_indicator=mqtt5.PayloadFormatIndicator.AWS_MQTT5_PFI_UTF8, + message_expiry_interval_sec=10, + topic_alias=1, + response_topic="TEST_RESPONSE_TOPIC", + correlation_data="TEST_CORRELATION_DATA", + content_type="TEST_CONTENT_TYPE", + user_properties=user_properties + ) + + connect_options = mqtt5.ConnectPacket( + keep_alive_interval_sec=10, + client_id="TEST_CLIENT", + username="USERNAME", + password="PASSWORD", + session_expiry_interval_sec=100, + request_response_information=1, + request_problem_information=1, + receive_maximum=1000, + maximum_packet_size=10000, + will_delay_interval_sec=1000, + will=publish_packet, + user_properties=user_properties + ) + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883, + connect_options=connect_options, + session_behavior=mqtt5.ClientSessionBehaviorType.CLEAN, + extended_validation_and_flow_control_options=mqtt5.ExtendedValidationAndFlowControlOptions.AWS_IOT_CORE_DEFAULTS, + offline_queue_behavior=mqtt5.ClientOperationQueueBehaviorType.FAIL_ALL_ON_DISCONNECT, + retry_jitter_mode=mqtt5.ExponentialBackoffJitterMode.DECORRELATED, + min_reconnect_delay_ms=100, + max_reconnect_delay_ms=50000, + min_connected_time_to_reset_reconnect_delay_ms=1000, + ping_timeout_ms=1000, + connack_timeout_ms=1000, + ack_timeout_sec=100) + client = self._create_client(client_options=client_options) + connection = client.new_connection() + + def test_5to3Adapter_direct_connect_minimum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST") + input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT")) + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=input_port + ) + callbacks = Mqtt5TestCallbacks() + client = self._create_client(client_options=client_options, callbacks=callbacks) + + connection = client.new_connection() + connection.connect().result(TIMEOUT) + connection.disconnect().result(TIMEOUT) + + def test_5to3Adapter_websocket_connect_minimum(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_WS_MQTT_HOST") + input_port = int(_get_env_variable("AWS_TEST_MQTT5_WS_MQTT_PORT")) + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=input_port + ) + callbacks = Mqtt5TestCallbacks() + client_options.websocket_handshake_transform = callbacks.ws_handshake_transform + + client = self._create_client(client_options=client_options, callbacks=callbacks) + connection = client.new_connection() + connection.connect().result(TIMEOUT) + callbacks.future_connection_success.result(TIMEOUT) + connection.disconnect().result(TIMEOUT) + + def test_5to3Adapter_direct_connect_mutual_tls(self): + input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") + input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") + input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") + + client_options = mqtt5.ClientOptions( + host_name=input_host_name, + port=8883 + ) + tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( + input_cert, + input_key + ) + client_options.tls_ctx = io.ClientTlsContext(tls_ctx_options) + callbacks = Mqtt5TestCallbacks() + client = self._create_client(client_options=client_options, callbacks=callbacks) + connection = client.new_connection() + connection.connect().result(TIMEOUT) + connection.disconnect().result(TIMEOUT) + if __name__ == 'main': unittest.main()