Skip to content

Commit

Permalink
Mqtt5 to Mqtt3 Adapter (#496)
Browse files Browse the repository at this point in the history
* add mqtt5to3 adapter

* fix import path

* try fix syntax

* fix syntax

* remove will

* fix circular imports

* update import name

* fix argument name

* update ack_timeout_secs

* set default value for adapter options

* update comment

* update callbacks in tests

* add tls_ctx on mqtt5client

* clean up codes

* more comments

* update doc comments

* add a util function to convert mqtt5.qos to mqtt3.qos

* update cr

* remove connection from mqtt5 client

* add a util function to convert Mqtt.QoS to Mqtt5.QoS
  • Loading branch information
xiazhvera authored Aug 14, 2023
1 parent 8515c7b commit 4ba26fa
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 5 deletions.
19 changes: 18 additions & 1 deletion awscrt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from awscrt.http import HttpProxyOptions, HttpRequest
from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions
from dataclasses import dataclass
from awscrt.mqtt5 import Client as Mqtt5Client


class QoS(IntEnum):
Expand Down Expand Up @@ -48,6 +49,13 @@ class QoS(IntEnum):
does not support QoS 2 at time of writing (May 2020).
"""

def to_mqtt5(self):
from awscrt.mqtt5 import QoS as Mqtt5QoS
"""Convert a Mqtt3 QoS to Mqtt5 QoS
"""
return Mqtt5QoS(self.value)


def _try_qos(qos_value):
"""Return None if the value cannot be converted to Qos (ex: 0x80 subscribe failure)"""
Expand Down Expand Up @@ -350,7 +358,7 @@ def __init__(self,
on_connection_closed=None
):

assert isinstance(client, Client)
assert isinstance(client, Client) or isinstance(client, Mqtt5Client)
assert callable(on_connection_interrupted) or on_connection_interrupted is None
assert callable(on_connection_resumed) or on_connection_resumed is None
assert isinstance(will, Will) or will is None
Expand All @@ -376,6 +384,7 @@ def __init__(self,

# init-only
self.client = client
self._client_version = 5 if isinstance(client, Mqtt5Client) else 3
self._on_connection_interrupted_cb = on_connection_interrupted
self._on_connection_resumed_cb = on_connection_resumed
self._use_websockets = use_websockets
Expand Down Expand Up @@ -404,6 +413,7 @@ def __init__(self,
self,
client,
use_websockets,
self._client_version
)

def _check_uses_old_message_callback_signature(self, callback):
Expand Down Expand Up @@ -646,6 +656,9 @@ def suback(packet_id, topic, qos, error_code):

try:
assert callable(callback) or callback is None
from awscrt.mqtt5 import QoS as Mqtt5QoS
if (isinstance(qos, Mqtt5QoS)):
qos = qos.to_mqtt3()
assert isinstance(qos, QoS)
packet_id = _awscrt.mqtt_client_connection_subscribe(
self._binding, topic, qos.value, callback_wrapper, suback)
Expand Down Expand Up @@ -810,6 +823,10 @@ def puback(packet_id, error_code):
future.set_result(dict(packet_id=packet_id))

try:
from awscrt.mqtt5 import QoS as Mqtt5QoS
if (isinstance(qos, Mqtt5QoS)):
qos = qos.to_mqtt3()
assert isinstance(qos, QoS)
packet_id = _awscrt.mqtt_client_connection_publish(self._binding, topic, payload, qos.value, retain, puback)
except Exception as e:
future.set_exception(e)
Expand Down
154 changes: 153 additions & 1 deletion awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class QoS(IntEnum):
Note that this client does not currently support QoS 2 as of (August 2022)
"""

def to_mqtt3(self):
from awscrt.mqtt import QoS as Mqtt3QoS
"""Convert a Mqtt5 QoS to Mqtt3
"""
return Mqtt3QoS(self.value)


def _try_qos(value):
try:
Expand Down Expand Up @@ -1630,6 +1637,48 @@ def _on_lifecycle_disconnection(
exception=exceptions.from_code(error_code)))


@dataclass
class _Mqtt5to3AdapterOptions:
"""This internal class stores the options that required for creating a new Mqtt3 connection from the mqtt5 client
Args:
host_name (str): Host name of the MQTT server to connect to.
port (int): Network port of the MQTT server to connect to.
client_id (str): A unique string identifying the client to the server. Used to restore session state between connections. If left empty, the broker will auto-assign a unique client id. When reconnecting, the mqtt5 client will always use the auto-assigned client id.
socket_options (SocketOptions): The socket properties of the underlying MQTT connections made by the client or None if defaults are used.
min_reconnect_delay_ms (int): The minimum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure.
max_reconnect_delay_ms (int): The maximum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter after each connection failure.
ping_timeout_ms (int): The time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will close the current connection.
keep_alive_secs (int): The keep alive value, in seconds, A PING will automatically be sent at this interval.
ack_timeout_secs (int): The time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before failing the operation.
clean_session (bool): Whether or not to start a clean session with each reconnect.
The default values are referred from awscrt.mqtt.Connection
"""

def __init__(
self,
host_name: str,
port: int,
client_id: str,
socket_options: SocketOptions,
min_reconnect_delay_ms: int,
max_reconnect_delay_ms: int,
ping_timeout_ms: int,
keep_alive_secs: int,
ack_timeout_secs: int,
clean_session: int):
self.host_name = host_name
self.port = port
self.client_id = "" if client_id is None else client_id
self.socket_options = socket_options
self.min_reconnect_delay_ms = 5 if min_reconnect_delay_ms is None else min_reconnect_delay_ms
self.max_reconnect_delay_ms: int = 60 if max_reconnect_delay_ms is None else max_reconnect_delay_ms
self.ping_timeout_ms: int = 3000 if ping_timeout_ms is None else ping_timeout_ms
self.keep_alive_secs: int = 1200 if keep_alive_secs is None else keep_alive_secs
self.ack_timeout_secs: int = 0 if ack_timeout_secs is None else ack_timeout_secs
self.clean_session: bool = True if clean_session is None else clean_session


class Client(NativeResource):
"""This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities via the AWS Common Runtime
Expand Down Expand Up @@ -1667,7 +1716,7 @@ def __init__(self, client_options: ClientOptions):
will = connect_options.will

websocket_is_none = client_options.websocket_handshake_transform is None

self.tls_ctx = client_options.tls_ctx
self._binding = _awscrt.mqtt5_client_new(self,
client_options.host_name,
client_options.port,
Expand Down Expand Up @@ -1710,6 +1759,20 @@ def __init__(self, client_options: ClientOptions):
websocket_is_none,
core)

# Store the options for adapter
self.adapter_options = _Mqtt5to3AdapterOptions(
host_name=client_options.host_name,
port=client_options.port,
client_id=connect_options.client_id,
socket_options=socket_options,
min_reconnect_delay_ms=client_options.min_reconnect_delay_ms,
max_reconnect_delay_ms=client_options.max_reconnect_delay_ms,
ping_timeout_ms=client_options.ping_timeout_ms,
keep_alive_secs=connect_options.keep_alive_interval_sec,
ack_timeout_secs=client_options.ack_timeout_sec,
clean_session=(
client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True))

def start(self):
"""Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint.
The client will attempt to stay connected using the properties of the reconnect-related parameters in the mqtt5 client configuration.
Expand Down Expand Up @@ -1853,3 +1916,92 @@ def get_stats(self):

result = _awscrt.mqtt5_client_get_stats(self._binding)
return OperationStatisticsData(result[0], result[1], result[2], result[3])

def new_connection(self, on_connection_interrupted=None, on_connection_resumed=None,
on_connection_success=None, on_connection_failure=None, on_connection_closed=None):
from awscrt.mqtt import Connection
""" Returns a new Mqtt3 Connection Object wraps the Mqtt5 client.
Args:
on_connection_interrupted: Optional callback invoked whenever the MQTT connection is lost.
The MQTT client will automatically attempt to reconnect.
The function should take the following arguments return nothing:
* `connection` (:class:`Connection`): This MQTT Connection.
* `error` (:class:`awscrt.exceptions.AwsCrtError`): Exception which caused connection loss.
* `**kwargs` (dict): Forward-compatibility kwargs.
on_connection_resumed: Optional callback invoked whenever the MQTT connection
is automatically resumed. Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `return_code` (:class:`ConnectReturnCode`): Connect return
code received from the server.
* `session_present` (bool): True if resuming existing session. False if new session.
Note that the server has forgotten all previous subscriptions if this is False.
Subscriptions can be re-established via resubscribe_existing_topics().
* `**kwargs` (dict): Forward-compatibility kwargs.
on_connection_success: Optional callback invoked whenever the connection successfully connects.
This callback is invoked for every successful connect and every successful reconnect.
Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `callback_data` (:class:`OnConnectionSuccessData`): The data returned from the connection success.
on_connection_failure: Optional callback invoked whenever the connection fails to connect.
This callback is invoked for every failed connect and every failed reconnect.
Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `callback_data` (:class:`OnConnectionFailureData`): The data returned from the connection failure.
on_connection_closed: Optional callback invoked whenever the connection has been disconnected and shutdown successfully.
Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `callback_data` (:class:`OnConnectionClosedData`): The data returned from the connection close.
Returns:
The (:class:`Connection`) wrapper for the mqtt5 client
"""
return Connection(
self,
self.adapter_options.host_name,
self.adapter_options.port,
self.adapter_options.client_id,
clean_session=self.adapter_options.clean_session,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed,
reconnect_min_timeout_secs=self.adapter_options.min_reconnect_delay_ms,
reconnect_max_timeout_secs=self.adapter_options.max_reconnect_delay_ms,
keep_alive_secs=self.adapter_options.keep_alive_secs,
ping_timeout_ms=self.adapter_options.ping_timeout_ms,
protocol_operation_timeout_ms=self.adapter_options.ack_timeout_secs * 1000,
socket_options=self.adapter_options.socket_options,

# For the arugments below, set it to `None` will directly use the options from mqtt5 client underlying.
will=None,
username=None,
password=None,
# Similar to previous options, set it False will use mqtt5 setup for
# websockets. It is not necessary means the websocket is disabled.
use_websockets=False,
websocket_proxy_options=None,
websocket_handshake_transform=None,
proxy_options=None
)
21 changes: 18 additions & 3 deletions source/mqtt_client_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/
#include "mqtt_client_connection.h"
#include "mqtt5_client.h"

#include "http.h"
#include "io.h"
Expand Down Expand Up @@ -274,11 +275,21 @@ PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args) {
PyObject *self_py;
PyObject *client_py;
PyObject *use_websocket_py;
if (!PyArg_ParseTuple(args, "OOO", &self_py, &client_py, &use_websocket_py)) {
unsigned char client_version;
if (!PyArg_ParseTuple(args, "OOOb", &self_py, &client_py, &use_websocket_py, &client_version)) {
return NULL;
}

void *client = NULL;
if (client_version == 3) {
client = aws_py_get_mqtt_client(client_py);
} else if (client_version == 5) {
client = aws_py_get_mqtt5_client(client_py);
} else {
PyErr_SetString(PyExc_TypeError, "Mqtt Client version not supported. Failed to create connection.");
return NULL;
}

struct aws_mqtt_client *client = aws_py_get_mqtt_client(client_py);
if (!client) {
return NULL;
}
Expand All @@ -292,7 +303,11 @@ PyObject *aws_py_mqtt_client_connection_new(PyObject *self, PyObject *args) {

/* From hereon, we need to clean up if errors occur */

py_connection->native = aws_mqtt_client_connection_new(client);
if (client_version == 3) {
py_connection->native = aws_mqtt_client_connection_new(client);
} else if (client_version == 5) {
py_connection->native = aws_mqtt_client_connection_new_from_mqtt5_client(client);
}
if (!py_connection->native) {
PyErr_SetAwsLastError();
goto connection_new_failed;
Expand Down
Loading

0 comments on commit 4ba26fa

Please sign in to comment.