Skip to content

Commit

Permalink
Topic Aliasing Support (#525)
Browse files Browse the repository at this point in the history
Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Nov 17, 2023
1 parent c998447 commit e64cc82
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 12 deletions.
83 changes: 79 additions & 4 deletions awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,72 @@ def _init_user_properties(user_properties_tuples):
return [UserProperty(name=name, value=value) for (name, value) in user_properties_tuples]


class OutboundTopicAliasBehaviorType(IntEnum):
"""An enumeration that controls how the client applies topic aliasing to outbound publish packets.
Topic alias behavior is described in `MQTT5 Topic Aliasing <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113>`_
"""

DEFAULT = 0,
"""Maps to Disabled. This keeps the client from being broken (by default) if the broker
topic aliasing implementation has a problem.
"""

MANUAL = 1,
"""
Outbound aliasing is the user's responsibility. Client will cache and use
previously-established aliases if they fall within the negotiated limits of the connection.
The user must still always submit a full topic in their publishes because disconnections disrupt
topic alias mappings unpredictably. The client will properly use a requested alias when the most-recently-seen
binding for a topic alias value matches the alias and topic in the publish packet.
"""

LRU = 2,
""" (Recommended) The client will ignore any user-specified topic aliasing and instead use an LRU cache to drive
alias usage.
"""

DISABLED = 3,
"""Completely disable outbound topic aliasing."""


class InboundTopicAliasBehaviorType(IntEnum):
"""An enumeration that controls whether or not the client allows the broker to send publishes that use topic
aliasing.
Topic alias behavior is described in `MQTT5 Topic Aliasing <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113>`_
"""

DEFAULT = 0,
"""Maps to Disabled. This keeps the client from being broken (by default) if the broker
topic aliasing implementation has a problem.
"""

ENABLED = 1,
"""Allow the server to send PUBLISH packets to the client that use topic aliasing"""

DISABLED = 2,
"""Forbid the server from sending PUBLISH packets to the client that use topic aliasing"""


@dataclass
class TopicAliasingOptions:
"""
Configuration for all client topic aliasing behavior.
Args:
outbound_behavior (OutboundTopicAliasBehaviorType): Controls what kind of outbound topic aliasing behavior the client should attempt to use. If topic aliasing is not supported by the server, this setting has no effect and any attempts to directly manipulate the topic alias id in outbound publishes will be ignored. If left undefined, then outbound topic aliasing is disabled.
outbound_cache_max_size (int): If outbound topic aliasing is set to LRU, this controls the maximum size of the cache. If outbound topic aliasing is set to LRU and this is zero or undefined, a sensible default is used (25). If outbound topic aliasing is not set to LRU, then this setting has no effect.
inbound_behavior (InboundTopicAliasBehaviorType): Controls whether or not the client allows the broker to use topic aliasing when sending publishes. Even if inbound topic aliasing is enabled, it is up to the server to choose whether or not to use it. If left undefined, then inbound topic aliasing is disabled.
inbound_cache_max_size (int): If inbound topic aliasing is enabled, this will control the size of the inbound alias cache. If inbound aliases are enabled and this is zero or undefined, then a sensible default will be used (25). If inbound aliases are disabled, this setting has no effect. Behaviorally, this value overrides anything present in the topic_alias_maximum field of the CONNECT packet options.
"""
outbound_behavior: OutboundTopicAliasBehaviorType = None
outbound_cache_max_size: int = None
inbound_behavior: InboundTopicAliasBehaviorType = None
inbound_cache_max_size: int = None


@dataclass
class NegotiatedSettings:
"""
Expand All @@ -925,9 +991,9 @@ class NegotiatedSettings:
session_expiry_interval_sec (int): The amount of time in seconds the server will retain the MQTT session after a disconnect.
receive_maximum_from_server (int): The number of in-flight QoS 1 and QoS 2 publications the server is willing to process concurrently.
maximum_packet_size_to_server (int): The maximum packet size the server is willing to accept.
topic_alias_maximum_to_server (int): Not Currently Supported
topic_alias_maximum_to_client (int): Not Currently Supported
server_keep_alive_sec (int): The maximum amount of time in seconds between client packets. The client should use PINGREQs to ensure this limit is not breached. The server will disconnect the client for inactivity if no MQTT packet is received in a time interval equal to 1.5 x this value.
topic_alias_maximum_to_server (int): the maximum allowed topic alias value on publishes sent from client to server
topic_alias_maximum_to_client (int): the maximum allowed topic alias value on publishes sent from server to client
server_keep_alive_sec (int): The maximum amount of time in seconds between client packets. The client will use PINGREQs to ensure this limit is not breached. The server will disconnect the client for inactivity if no MQTT packet is received in a time interval equal to 1.5 x this value.
retain_available (bool): Whether the server supports retained messages.
wildcard_subscriptions_available (bool): Whether the server supports wildcard subscriptions.
subscription_identifiers_available (bool): Whether the server supports subscription identifiers
Expand Down Expand Up @@ -960,6 +1026,7 @@ class ConnackPacket:
retain_available (bool): Indicates whether the server supports retained messages. If None, retained messages are supported.
maximum_packet_size (int): Specifies the maximum packet size, in bytes, that the server is willing to accept. If None, there is no limit beyond what is imposed by the MQTT spec itself.
assigned_client_identifier (str): Specifies a client identifier assigned to this connection by the server. Only valid when the client id of the preceding CONNECT packet was left empty.
topic_alias_maximum (int): The maximum allowed value for topic aliases in outbound publish packets. If 0 or None, then outbound topic aliasing is not allowed.
reason_string (str): Additional diagnostic information about the result of the connection attempt.
user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet.
wildcard_subscriptions_available (bool): Indicates whether the server supports wildcard subscriptions. If None, wildcard subscriptions are supported.
Expand All @@ -977,6 +1044,7 @@ class ConnackPacket:
retain_available: bool = None
maximum_packet_size: int = None
assigned_client_identifier: str = None
topic_alias_maximum: int = None
reason_string: str = None
user_properties: 'Sequence[UserProperty]' = None
wildcard_subscriptions_available: bool = None
Expand Down Expand Up @@ -1090,7 +1158,7 @@ class PublishPacket:
topic (str): The topic associated with this PUBLISH packet.
payload_format_indicator (PayloadFormatIndicator): Property specifying the format of the payload data. The mqtt5 client does not enforce or use this value in a meaningful way.
message_expiry_interval_sec (int): Sent publishes - indicates the maximum amount of time allowed to elapse for message delivery before the server should instead delete the message (relative to a recipient). Received publishes - indicates the remaining amount of time (from the server's perspective) before the message would have been deleted relative to the subscribing client. If left None, indicates no expiration timeout.
topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name.
topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name. On outbound publishes, this will only be used if the outbound topic aliasing behavior has been set to Manual.
response_topic (str): Opaque topic string intended to assist with request/response implementations. Not internally meaningful to MQTT5 or this client.
correlation_data (Any): Opaque binary data used to correlate between publish messages, as a potential method for request-response implementation. Not internally meaningful to MQTT5.
subscription_identifiers (Sequence[int]): The subscription identifiers of all the subscriptions this message matched.
Expand Down Expand Up @@ -1315,6 +1383,7 @@ class ClientOptions:
ping_timeout_ms (int): The time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will close the current connection.
connack_timeout_ms (int): The time interval to wait after sending a CONNECT request for a CONNACK to arrive. If one does not arrive, the connection will be shut down.
ack_timeout_sec (int): The time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before failing the operation.
topic_aliasing_options (TopicAliasingOptions): All configurable options with respect to client topic aliasing behavior.
on_publish_callback_fn (Callable[[PublishReceivedData],]): Callback for all publish packets received by client.
on_lifecycle_event_stopped_fn (Callable[[LifecycleStoppedData],]): Callback for Lifecycle Event Stopped.
on_lifecycle_event_attempting_connect_fn (Callable[[LifecycleAttemptingConnectData],]): Callback for Lifecycle Event Attempting Connect.
Expand All @@ -1340,6 +1409,7 @@ class ClientOptions:
ping_timeout_ms: int = None
connack_timeout_ms: int = None
ack_timeout_sec: int = None
topic_aliasing_options: TopicAliasingOptions = None
on_publish_callback_fn: Callable[[PublishReceivedData], None] = None
on_lifecycle_event_stopped_fn: Callable[[LifecycleStoppedData], None] = None
on_lifecycle_event_attempting_connect_fn: Callable[[LifecycleAttemptingConnectData], None] = None
Expand Down Expand Up @@ -1463,6 +1533,8 @@ def _on_lifecycle_connection_success(
connack_maximum_packet_size_exists,
connack_maximum_packet_size,
connack_assigned_client_identifier,
connack_topic_alias_maximum_exists,
connack_topic_alias_maximum,
connack_reason_string,
connack_user_properties_tuples,
connack_wildcard_subscriptions_available_exist,
Expand Down Expand Up @@ -1504,6 +1576,8 @@ def _on_lifecycle_connection_success(
if connack_maximum_packet_size_exists:
connack_packet.maximum_packet_size = connack_maximum_packet_size
connack_packet.assigned_client_identifier = connack_assigned_client_identifier
if connack_topic_alias_maximum_exists:
connack_packet.topic_alias_maximum = connack_topic_alias_maximum
connack_packet.reason_string = connack_reason_string
connack_packet.user_properties = _init_user_properties(connack_user_properties_tuples)
if connack_wildcard_subscriptions_available_exist:
Expand Down Expand Up @@ -1756,6 +1830,7 @@ def __init__(self, client_options: ClientOptions):
client_options.min_connected_time_to_reset_reconnect_delay_ms,
client_options.ping_timeout_ms,
client_options.ack_timeout_sec,
client_options.topic_aliasing_options,
websocket_is_none,
core)

Expand Down
85 changes: 77 additions & 8 deletions source/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ static void s_lifecycle_event_connection_success(
result = PyObject_CallMethod(
client->client_core,
"_on_lifecycle_connection_success",
"(OiOIOHOiOOOIs#s#OOOOOOOOHs#s#iIHIHHHOOOOO)",
"(OiOIOHOiOOOIs#OIs#OOOOOOOOHs#s#iIHIHHHOOOOO)",
/* connack packet */
/* O */ connack->session_present ? Py_True : Py_False,
/* i */ (int)connack->reason_code,
Expand All @@ -379,6 +379,8 @@ static void s_lifecycle_event_connection_success(
/* I */ (unsigned int)(connack->maximum_packet_size ? *connack->maximum_packet_size : 0),
/* s */ connack->assigned_client_identifier ? connack->assigned_client_identifier->ptr : NULL,
/* # */ connack->assigned_client_identifier ? connack->assigned_client_identifier->len : 0,
/* O */ connack->topic_alias_maximum ? Py_True : Py_False,
/* I */ (unsigned int)(connack->topic_alias_maximum ? *connack->topic_alias_maximum : 0),
/* s */ connack->reason_string ? connack->reason_string->ptr : NULL,
/* # */ connack->reason_string ? connack->reason_string->len : 0,
/* O */ user_property_count > 0 ? user_properties_list : Py_None,
Expand Down Expand Up @@ -729,6 +731,69 @@ PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject
Py_RETURN_NONE;
}

static bool s_py_topic_aliasing_options_init(
struct aws_mqtt5_client_topic_alias_options *topic_aliasing_options,
PyObject *py_topic_aliasing_options) {
AWS_ZERO_STRUCT(*topic_aliasing_options);

bool success = false;
PyObject *py_outbound_behavior = PyObject_GetAttrString(py_topic_aliasing_options, "outbound_behavior");
PyObject *py_outbound_cache_max_size = PyObject_GetAttrString(py_topic_aliasing_options, "outbound_cache_max_size");
PyObject *py_inbound_behavior = PyObject_GetAttrString(py_topic_aliasing_options, "inbound_behavior");
PyObject *py_inbound_cache_max_size = PyObject_GetAttrString(py_topic_aliasing_options, "inbound_cache_max_size");

if (py_outbound_behavior != NULL && !PyObject_GetAsOptionalIntEnum(
py_outbound_behavior,
"TopicAliasingOptions",
"outbound_behavior",
&topic_aliasing_options->outbound_topic_alias_behavior)) {
if (PyErr_Occurred()) {
goto done;
}
}

if (py_outbound_cache_max_size != NULL && !PyObject_GetAsOptionalUint16(
py_outbound_cache_max_size,
"TopicAliasingOptions",
"outbound_cache_max_size",
&topic_aliasing_options->outbound_alias_cache_max_size)) {
if (PyErr_Occurred()) {
goto done;
}
}

if (py_inbound_behavior != NULL && !PyObject_GetAsOptionalIntEnum(
py_inbound_behavior,
"TopicAliasingOptions",
"inbound_behavior",
&topic_aliasing_options->inbound_topic_alias_behavior)) {
if (PyErr_Occurred()) {
goto done;
}
}

if (py_inbound_cache_max_size != NULL && !PyObject_GetAsOptionalUint16(
py_inbound_cache_max_size,
"TopicAliasingOptions",
"inbound_cache_max_size",
&topic_aliasing_options->inbound_alias_cache_size)) {
if (PyErr_Occurred()) {
goto done;
}
}

success = true;

done:

Py_XDECREF(py_outbound_behavior);
Py_XDECREF(py_outbound_cache_max_size);
Py_XDECREF(py_inbound_behavior);
Py_XDECREF(py_inbound_cache_max_size);

return success;
}

/*******************************************************************************
* Client Init
******************************************************************************/
Expand Down Expand Up @@ -782,13 +847,14 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
PyObject *min_connected_time_to_reset_reconnect_delay_ms_py; /* optional uint64_t */
PyObject *ping_timeout_ms_py; /* optional uint32_t */
PyObject *ack_timeout_seconds_py; /* optional uint32_t */
PyObject *topic_aliasing_options_py; /* optional TopicAliasingOptions */
/* Callbacks */
PyObject *is_websocket_none_py;
PyObject *client_core_py;

if (!PyArg_ParseTuple(
args,
"Os#HOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOO",
"Os#HOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOO",
/* O */ &self_py,
/* s */ &host_name.ptr,
/* # */ &host_name.len,
Expand Down Expand Up @@ -839,6 +905,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
/* O */ &min_connected_time_to_reset_reconnect_delay_ms_py,
/* O */ &ping_timeout_ms_py,
/* O */ &ack_timeout_seconds_py,
/* O */ &topic_aliasing_options_py,

/* O */ &is_websocket_none_py,
/* O */ &client_core_py)) {
Expand Down Expand Up @@ -871,6 +938,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
AWS_ZERO_STRUCT(will_options);

struct aws_http_proxy_options proxy_options;
struct aws_mqtt5_client_topic_alias_options topic_aliasing_options;

struct aws_tls_ctx *tls_ctx = NULL;

Expand Down Expand Up @@ -1021,6 +1089,13 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
goto done;
}

if (topic_aliasing_options_py != Py_None) {
if (!s_py_topic_aliasing_options_init(&topic_aliasing_options, topic_aliasing_options_py)) {
goto done;
}
client_options.topic_aliasing_options = &topic_aliasing_options;
}

/* CONNECT OPTIONS */

connect_options.client_id = client_id;
Expand Down Expand Up @@ -1160,12 +1235,6 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
goto done;
}

will.topic_alias = PyObject_GetAsOptionalUint16(
will_topic_alias_py, AWS_PYOBJECT_KEY_WILL_PACKET, AWS_PYOBJECT_KEY_TOPIC_ALIAS, &will_topic_alias_tmp);
if (PyErr_Occurred()) {
goto done;
}

if (will_response_topic.ptr) {
will.response_topic = &will_response_topic;
}
Expand Down

0 comments on commit e64cc82

Please sign in to comment.