diff --git a/awscrt/mqtt5.py b/awscrt/mqtt5.py index 856ff07bf..140f987bf 100644 --- a/awscrt/mqtt5.py +++ b/awscrt/mqtt5.py @@ -906,6 +906,72 @@ def _init_user_properties(user_properties_tuples): return [UserProperty(name=name, value=value) for (name, value) in user_properties_tuples] +class OutboundTopicAliasBehaviorType(IntEnum): + """An enumeration that controls how the client applies topic aliasing to outbound publish packets. + + Topic alias behavior is described in `MQTT5 Topic Aliasing `_ + """ + + DEFAULT = 0, + """Maps to Disabled. This keeps the client from being broken (by default) if the broker + topic aliasing implementation has a problem. + """ + + MANUAL = 1, + """ + Outbound aliasing is the user's responsibility. Client will cache and use + previously-established aliases if they fall within the negotiated limits of the connection. + + The user must still always submit a full topic in their publishes because disconnections disrupt + topic alias mappings unpredictably. The client will properly use a requested alias when the most-recently-seen + binding for a topic alias value matches the alias and topic in the publish packet. + """ + + LRU = 2, + """ (Recommended) The client will ignore any user-specified topic aliasing and instead use an LRU cache to drive + alias usage. + """ + + DISABLED = 3, + """Completely disable outbound topic aliasing.""" + + +class InboundTopicAliasBehaviorType(IntEnum): + """An enumeration that controls whether or not the client allows the broker to send publishes that use topic + aliasing. + + Topic alias behavior is described in `MQTT5 Topic Aliasing `_ + """ + + DEFAULT = 0, + """Maps to Disabled. This keeps the client from being broken (by default) if the broker + topic aliasing implementation has a problem. + """ + + ENABLED = 1, + """Allow the server to send PUBLISH packets to the client that use topic aliasing""" + + DISABLED = 2, + """Forbid the server from sending PUBLISH packets to the client that use topic aliasing""" + + +@dataclass +class TopicAliasingOptions: + """ + Configuration for all client topic aliasing behavior. + + Args: + outbound_behavior (OutboundTopicAliasBehaviorType): Controls what kind of outbound topic aliasing behavior the client should attempt to use. If topic aliasing is not supported by the server, this setting has no effect and any attempts to directly manipulate the topic alias id in outbound publishes will be ignored. If left undefined, then outbound topic aliasing is disabled. + outbound_cache_max_size (int): If outbound topic aliasing is set to LRU, this controls the maximum size of the cache. If outbound topic aliasing is set to LRU and this is zero or undefined, a sensible default is used (25). If outbound topic aliasing is not set to LRU, then this setting has no effect. + inbound_behavior (InboundTopicAliasBehaviorType): Controls whether or not the client allows the broker to use topic aliasing when sending publishes. Even if inbound topic aliasing is enabled, it is up to the server to choose whether or not to use it. If left undefined, then inbound topic aliasing is disabled. + inbound_cache_max_size (int): If inbound topic aliasing is enabled, this will control the size of the inbound alias cache. If inbound aliases are enabled and this is zero or undefined, then a sensible default will be used (25). If inbound aliases are disabled, this setting has no effect. Behaviorally, this value overrides anything present in the topic_alias_maximum field of the CONNECT packet options. + """ + outbound_behavior: OutboundTopicAliasBehaviorType = None + outbound_cache_max_size: int = None + inbound_behavior: InboundTopicAliasBehaviorType = None + inbound_cache_max_size: int = None + + @dataclass class NegotiatedSettings: """ @@ -925,9 +991,9 @@ class NegotiatedSettings: session_expiry_interval_sec (int): The amount of time in seconds the server will retain the MQTT session after a disconnect. receive_maximum_from_server (int): The number of in-flight QoS 1 and QoS 2 publications the server is willing to process concurrently. maximum_packet_size_to_server (int): The maximum packet size the server is willing to accept. - topic_alias_maximum_to_server (int): Not Currently Supported - topic_alias_maximum_to_client (int): Not Currently Supported - server_keep_alive_sec (int): The maximum amount of time in seconds between client packets. The client should use PINGREQs to ensure this limit is not breached. The server will disconnect the client for inactivity if no MQTT packet is received in a time interval equal to 1.5 x this value. + topic_alias_maximum_to_server (int): the maximum allowed topic alias value on publishes sent from client to server + topic_alias_maximum_to_client (int): the maximum allowed topic alias value on publishes sent from server to client + server_keep_alive_sec (int): The maximum amount of time in seconds between client packets. The client will use PINGREQs to ensure this limit is not breached. The server will disconnect the client for inactivity if no MQTT packet is received in a time interval equal to 1.5 x this value. retain_available (bool): Whether the server supports retained messages. wildcard_subscriptions_available (bool): Whether the server supports wildcard subscriptions. subscription_identifiers_available (bool): Whether the server supports subscription identifiers @@ -960,6 +1026,7 @@ class ConnackPacket: retain_available (bool): Indicates whether the server supports retained messages. If None, retained messages are supported. maximum_packet_size (int): Specifies the maximum packet size, in bytes, that the server is willing to accept. If None, there is no limit beyond what is imposed by the MQTT spec itself. assigned_client_identifier (str): Specifies a client identifier assigned to this connection by the server. Only valid when the client id of the preceding CONNECT packet was left empty. + topic_alias_maximum (int): The maximum allowed value for topic aliases in outbound publish packets. If 0 or None, then outbound topic aliasing is not allowed. reason_string (str): Additional diagnostic information about the result of the connection attempt. user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet. wildcard_subscriptions_available (bool): Indicates whether the server supports wildcard subscriptions. If None, wildcard subscriptions are supported. @@ -977,6 +1044,7 @@ class ConnackPacket: retain_available: bool = None maximum_packet_size: int = None assigned_client_identifier: str = None + topic_alias_maximum: int = None reason_string: str = None user_properties: 'Sequence[UserProperty]' = None wildcard_subscriptions_available: bool = None @@ -1090,7 +1158,7 @@ class PublishPacket: topic (str): The topic associated with this PUBLISH packet. payload_format_indicator (PayloadFormatIndicator): Property specifying the format of the payload data. The mqtt5 client does not enforce or use this value in a meaningful way. message_expiry_interval_sec (int): Sent publishes - indicates the maximum amount of time allowed to elapse for message delivery before the server should instead delete the message (relative to a recipient). Received publishes - indicates the remaining amount of time (from the server's perspective) before the message would have been deleted relative to the subscribing client. If left None, indicates no expiration timeout. - topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name. + topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name. On outbound publishes, this will only be used if the outbound topic aliasing behavior has been set to Manual. response_topic (str): Opaque topic string intended to assist with request/response implementations. Not internally meaningful to MQTT5 or this client. correlation_data (Any): Opaque binary data used to correlate between publish messages, as a potential method for request-response implementation. Not internally meaningful to MQTT5. subscription_identifiers (Sequence[int]): The subscription identifiers of all the subscriptions this message matched. @@ -1315,6 +1383,7 @@ class ClientOptions: ping_timeout_ms (int): The time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will close the current connection. connack_timeout_ms (int): The time interval to wait after sending a CONNECT request for a CONNACK to arrive. If one does not arrive, the connection will be shut down. ack_timeout_sec (int): The time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before failing the operation. + topic_aliasing_options (TopicAliasingOptions): All configurable options with respect to client topic aliasing behavior. on_publish_callback_fn (Callable[[PublishReceivedData],]): Callback for all publish packets received by client. on_lifecycle_event_stopped_fn (Callable[[LifecycleStoppedData],]): Callback for Lifecycle Event Stopped. on_lifecycle_event_attempting_connect_fn (Callable[[LifecycleAttemptingConnectData],]): Callback for Lifecycle Event Attempting Connect. @@ -1340,6 +1409,7 @@ class ClientOptions: ping_timeout_ms: int = None connack_timeout_ms: int = None ack_timeout_sec: int = None + topic_aliasing_options: TopicAliasingOptions = None on_publish_callback_fn: Callable[[PublishReceivedData], None] = None on_lifecycle_event_stopped_fn: Callable[[LifecycleStoppedData], None] = None on_lifecycle_event_attempting_connect_fn: Callable[[LifecycleAttemptingConnectData], None] = None @@ -1463,6 +1533,8 @@ def _on_lifecycle_connection_success( connack_maximum_packet_size_exists, connack_maximum_packet_size, connack_assigned_client_identifier, + connack_topic_alias_maximum_exists, + connack_topic_alias_maximum, connack_reason_string, connack_user_properties_tuples, connack_wildcard_subscriptions_available_exist, @@ -1504,6 +1576,8 @@ def _on_lifecycle_connection_success( if connack_maximum_packet_size_exists: connack_packet.maximum_packet_size = connack_maximum_packet_size connack_packet.assigned_client_identifier = connack_assigned_client_identifier + if connack_topic_alias_maximum_exists: + connack_packet.topic_alias_maximum = connack_topic_alias_maximum connack_packet.reason_string = connack_reason_string connack_packet.user_properties = _init_user_properties(connack_user_properties_tuples) if connack_wildcard_subscriptions_available_exist: @@ -1756,6 +1830,7 @@ def __init__(self, client_options: ClientOptions): client_options.min_connected_time_to_reset_reconnect_delay_ms, client_options.ping_timeout_ms, client_options.ack_timeout_sec, + client_options.topic_aliasing_options, websocket_is_none, core) diff --git a/source/mqtt5_client.c b/source/mqtt5_client.c index d6f0f3ce0..a6d7270ae 100644 --- a/source/mqtt5_client.c +++ b/source/mqtt5_client.c @@ -363,7 +363,7 @@ static void s_lifecycle_event_connection_success( result = PyObject_CallMethod( client->client_core, "_on_lifecycle_connection_success", - "(OiOIOHOiOOOIs#s#OOOOOOOOHs#s#iIHIHHHOOOOO)", + "(OiOIOHOiOOOIs#OIs#OOOOOOOOHs#s#iIHIHHHOOOOO)", /* connack packet */ /* O */ connack->session_present ? Py_True : Py_False, /* i */ (int)connack->reason_code, @@ -379,6 +379,8 @@ static void s_lifecycle_event_connection_success( /* I */ (unsigned int)(connack->maximum_packet_size ? *connack->maximum_packet_size : 0), /* s */ connack->assigned_client_identifier ? connack->assigned_client_identifier->ptr : NULL, /* # */ connack->assigned_client_identifier ? connack->assigned_client_identifier->len : 0, + /* O */ connack->topic_alias_maximum ? Py_True : Py_False, + /* I */ (unsigned int)(connack->topic_alias_maximum ? *connack->topic_alias_maximum : 0), /* s */ connack->reason_string ? connack->reason_string->ptr : NULL, /* # */ connack->reason_string ? connack->reason_string->len : 0, /* O */ user_property_count > 0 ? user_properties_list : Py_None, @@ -729,6 +731,69 @@ PyObject *aws_py_mqtt5_ws_handshake_transform_complete(PyObject *self, PyObject Py_RETURN_NONE; } +static bool s_py_topic_aliasing_options_init( + struct aws_mqtt5_client_topic_alias_options *topic_aliasing_options, + PyObject *py_topic_aliasing_options) { + AWS_ZERO_STRUCT(*topic_aliasing_options); + + bool success = false; + PyObject *py_outbound_behavior = PyObject_GetAttrString(py_topic_aliasing_options, "outbound_behavior"); + PyObject *py_outbound_cache_max_size = PyObject_GetAttrString(py_topic_aliasing_options, "outbound_cache_max_size"); + PyObject *py_inbound_behavior = PyObject_GetAttrString(py_topic_aliasing_options, "inbound_behavior"); + PyObject *py_inbound_cache_max_size = PyObject_GetAttrString(py_topic_aliasing_options, "inbound_cache_max_size"); + + if (py_outbound_behavior != NULL && !PyObject_GetAsOptionalIntEnum( + py_outbound_behavior, + "TopicAliasingOptions", + "outbound_behavior", + &topic_aliasing_options->outbound_topic_alias_behavior)) { + if (PyErr_Occurred()) { + goto done; + } + } + + if (py_outbound_cache_max_size != NULL && !PyObject_GetAsOptionalUint16( + py_outbound_cache_max_size, + "TopicAliasingOptions", + "outbound_cache_max_size", + &topic_aliasing_options->outbound_alias_cache_max_size)) { + if (PyErr_Occurred()) { + goto done; + } + } + + if (py_inbound_behavior != NULL && !PyObject_GetAsOptionalIntEnum( + py_inbound_behavior, + "TopicAliasingOptions", + "inbound_behavior", + &topic_aliasing_options->inbound_topic_alias_behavior)) { + if (PyErr_Occurred()) { + goto done; + } + } + + if (py_inbound_cache_max_size != NULL && !PyObject_GetAsOptionalUint16( + py_inbound_cache_max_size, + "TopicAliasingOptions", + "inbound_cache_max_size", + &topic_aliasing_options->inbound_alias_cache_size)) { + if (PyErr_Occurred()) { + goto done; + } + } + + success = true; + +done: + + Py_XDECREF(py_outbound_behavior); + Py_XDECREF(py_outbound_cache_max_size); + Py_XDECREF(py_inbound_behavior); + Py_XDECREF(py_inbound_cache_max_size); + + return success; +} + /******************************************************************************* * Client Init ******************************************************************************/ @@ -782,13 +847,14 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { PyObject *min_connected_time_to_reset_reconnect_delay_ms_py; /* optional uint64_t */ PyObject *ping_timeout_ms_py; /* optional uint32_t */ PyObject *ack_timeout_seconds_py; /* optional uint32_t */ + PyObject *topic_aliasing_options_py; /* optional TopicAliasingOptions */ /* Callbacks */ PyObject *is_websocket_none_py; PyObject *client_core_py; if (!PyArg_ParseTuple( args, - "Os#HOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOO", + "Os#HOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOO", /* O */ &self_py, /* s */ &host_name.ptr, /* # */ &host_name.len, @@ -839,6 +905,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { /* O */ &min_connected_time_to_reset_reconnect_delay_ms_py, /* O */ &ping_timeout_ms_py, /* O */ &ack_timeout_seconds_py, + /* O */ &topic_aliasing_options_py, /* O */ &is_websocket_none_py, /* O */ &client_core_py)) { @@ -871,6 +938,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { AWS_ZERO_STRUCT(will_options); struct aws_http_proxy_options proxy_options; + struct aws_mqtt5_client_topic_alias_options topic_aliasing_options; struct aws_tls_ctx *tls_ctx = NULL; @@ -1021,6 +1089,13 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { goto done; } + if (topic_aliasing_options_py != Py_None) { + if (!s_py_topic_aliasing_options_init(&topic_aliasing_options, topic_aliasing_options_py)) { + goto done; + } + client_options.topic_aliasing_options = &topic_aliasing_options; + } + /* CONNECT OPTIONS */ connect_options.client_id = client_id; @@ -1160,12 +1235,6 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) { goto done; } - will.topic_alias = PyObject_GetAsOptionalUint16( - will_topic_alias_py, AWS_PYOBJECT_KEY_WILL_PACKET, AWS_PYOBJECT_KEY_TOPIC_ALIAS, &will_topic_alias_tmp); - if (PyErr_Occurred()) { - goto done; - } - if (will_response_topic.ptr) { will.response_topic = &will_response_topic; }