Skip to content

Commit

Permalink
Adding ack information, renaming subscribe options.
Browse files Browse the repository at this point in the history
  • Loading branch information
CIPop committed Jul 26, 2023
1 parent 62ac1d7 commit 868706c
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 18 deletions.
5 changes: 5 additions & 0 deletions MQTTClient-C/src/MQTTClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,18 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo
int count = 0;
unsigned short mypacketid;
unsigned char grantedQoS = QOS0;

#if defined(MQTTV5)
// TODO: V5 deserialization and QoS adapter.
#else
int retval = MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size);
data->grantedQoS = grantedQoS;
if (retval == 1)
{
if (data->grantedQoS != 0x80)
rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
}
#endif /* MQTTV5 */
}
else
rc = FAILURE;
Expand Down
43 changes: 38 additions & 5 deletions MQTTClient-C/src/MQTTClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,40 @@ typedef struct MQTTConnackData

typedef struct MQTTSubackData
{
enum QoS grantedQoS;
#if defined(MQTTV5)
enum ReasonCodes* reasonCodes;
MQTTProperties* properties;
#else
enum QoS grantedQoS;
#endif /* MQTTV5 */
} MQTTSubackData;

typedef void (*messageHandler)(MessageData*);
/**
* @brief Data structure containing information about a published message.
* @note This structure is used for both QoS1 (PUBACK) and QoS2 (PUBCOMP) messages.
*/
typedef struct MQTTPubDoneData
{
// id is omitted as it is already present in the MQTTMessage structure
unsigned char dup;
#if defined(MQTTV5)
enum ReasonCodes reasonCode;
MQTTProperties* properties;
#endif /* MQTTV5 */
} MQTTPubDoneData;

/**
* @brief Callback type for handling incoming messages.
* @note Separate callbacks can be used for each subscription filter.
*
*/
typedef void (*messageHandler)(MessageData* received);

// TODO: Getting properties, reason codes from all control messages:
#if defined(MQTTV5)
/**
* @brief Control handler used for asynchronous MQTTv5 messages such as DISCONNECT, AUTH, and ACKs.
* @brief Callback type used for asynchronous MQTTv5 DISCONNECT and AUTH.
* @note Separate callbacks should be used for each control message type.
*
*/
typedef void (*controlHandler)(MQTTProperties* properties, unsigned char reasonCode, unsigned short id);
Expand Down Expand Up @@ -202,6 +223,18 @@ DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options);
*/
DLLExport int MQTTPublish(MQTTClient* client, const char* topic, MQTTMessage* message);

/**
* @brief MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs.
* @note This function blocks until the QoS1 PUBACK or QoS2 PUBCOMP is received.
*
* @param client
* @param topic
* @param message
* @param ack Acknowledgement information (from either a PUBACK or PUBCOMP message).
* @return success code
*/
DLLExport int MQTTPublishWithResults(MQTTClient* client, const char* topic, MQTTMessage* message, MQTTPubDoneData* ack);

/** MQTT SetMessageHandler - set or remove a per topic message handler
* @param client - the client object to use
* @param topicFilter - the topic filter set the message handler for
Expand All @@ -216,7 +249,7 @@ DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, mess
* @param message - the message to send
* @return success code
*/
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler messageHandler);

/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use
Expand All @@ -225,7 +258,7 @@ DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum Qo
* @param data - suback granted QoS returned
* @return success code
*/
DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data);
DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS qos, messageHandler messageHandler, MQTTSubackData* data);

/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning.
* @param client - the client object to use
Expand Down
10 changes: 7 additions & 3 deletions MQTTClient-C/src/V5/MQTTV5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ DLLExport int MQTTV5Connect(MQTTClient* client, MQTTPacket_connectData* options,
DLLExport int MQTTV5Publish(MQTTClient* client, const char* topic, MQTTMessage* message,
MQTTProperties* properties);


// TODO: separate mechanism for send-pipeline (maintaining multiple in-flight PUBs w/o acking)
DLLExport int MQTTV5PublishWithResults(MQTTClient* client, const char* topic, MQTTMessage* message,
MQTTProperties* properties, MQTTPubDoneData* ack);

// TODO: separate mechanism for recv batch ACKing (maintaining multiple recvd PUBs w/o acking)
/** MQTT SetMessageHandler - set or remove a per topic message handler
* @param client - the client object to use
* @param topicFilter - the topic filter set the message handler for
Expand All @@ -71,8 +77,6 @@ DLLExport int MQTTV5Publish(MQTTClient* client, const char* topic, MQTTMessage*
*/
DLLExport int MQTTV5SetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler);

DLLExport int MQTTV5SetAckHandler(MQTTClient* c, controlHandler ackHandler);

/**
* @brief MQTT Auth - send an MQTT AUTH packet
*
Expand All @@ -92,7 +96,7 @@ DLLExport int MQTTV5SetAuthHandler(MQTTClient* c, controlHandler authHandler);
* @return success code
*/
DLLExport int MQTTV5Subscribe(MQTTClient* client, const char* topicFilter, enum QoS qos,
messageHandler messageHandler, MQTTProperties* properties);
messageHandler messageHandler, MQTTProperties* properties, MQTTV5Packet_subscribeOptions options);

/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use
Expand Down
2 changes: 1 addition & 1 deletion MQTTPacket/samples/v5pub0sub1.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ int main(int argc, char *argv[])

/* subscribe */
MQTTProperties sub_properties = MQTTProperties_initializer;
struct subscribeOptions sub_options = { 0 };
MQTTV5Packet_subscribeOptions sub_options = { 0 };
topicString.cstring = "substopic";
len = MQTTV5Serialize_subscribe(buf, buflen, 0, msgid, &sub_properties, 1, &topicString, &req_qos, &sub_options);

Expand Down
2 changes: 1 addition & 1 deletion MQTTPacket/samples/v5pub0sub1_nb.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ int main(int argc, char *argv[])

/* subscribe */
MQTTProperties sub_properties = MQTTProperties_initializer;
struct subscribeOptions sub_options = { 0 };
MQTTV5Packet_subscribeOptions sub_options = { 0 };
topicString.cstring = "substopic";
len = MQTTV5Serialize_subscribe(buf, buflen, 0, msgid, &sub_properties, 1, &topicString, &req_qos, &sub_options);

Expand Down
1 change: 1 addition & 0 deletions MQTTPacket/src/MQTTPublish.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ DLLExport int32_t MQTTDeserialize_publish(unsigned char* dup, unsigned char* qos
unsigned char** payload, int32_t* payloadlen, unsigned char* buf, int32_t len);

DLLExport int32_t MQTTSerialize_puback(unsigned char* buf, int32_t buflen, unsigned short packetid);
DLLExport int32_t MQTTSerialize_pubrec(unsigned char* buf, int32_t buflen, unsigned short packetid);
DLLExport int32_t MQTTSerialize_pubrel(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid);
DLLExport int32_t MQTTSerialize_pubcomp(unsigned char* buf, int32_t buflen, unsigned short packetid);

Expand Down
2 changes: 1 addition & 1 deletion MQTTPacket/src/MQTTSubscribeClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ int32_t MQTTSerialize_subscribe(unsigned char* buf, int32_t buflen, unsigned cha
}

int32_t MQTTV5Serialize_subscribe(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid,
MQTTProperties* properties, int count, MQTTString topicFilters[], unsigned char requestedQoSs[], struct subscribeOptions options[])
MQTTProperties* properties, int count, MQTTString topicFilters[], unsigned char requestedQoSs[], MQTTV5Packet_subscribeOptions options[])
#else
int32_t MQTTSerialize_subscribe(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid, int count,
MQTTString topicFilters[], unsigned char requestedQoSs[])
Expand Down
2 changes: 1 addition & 1 deletion MQTTPacket/src/MQTTSubscribeServer.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int32_t MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid,
}

int32_t MQTTV5Deserialize_subscribe(unsigned char* dup, unsigned short* packetid, MQTTProperties* properties,
int maxcount, int* count, MQTTString topicFilters[], unsigned char requestedQoSs[], struct subscribeOptions options[],
int maxcount, int* count, MQTTString topicFilters[], unsigned char requestedQoSs[], MQTTV5Packet_subscribeOptions options[],
unsigned char* buf, int32_t buflen)
#else
int32_t MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid, int maxcount, int* count, MQTTString topicFilters[],
Expand Down
15 changes: 11 additions & 4 deletions MQTTPacket/src/V5/MQTTV5Subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@
#define DLLExport
#endif

struct subscribeOptions
enum subscribe_retain_handling
{
MQTTV5_RETAINED_SEND_ON_SUBSCRIBE = 0,
MQTTV5_RETAINED_SEND_IF_NO_SUBSCRIPTION = 1,
MQTTV5_RETAINED_DO_NOT_SEND = 2,
};

typedef struct
{
unsigned char noLocal; /* 0 or 1 */
unsigned char retainAsPublished; /* 0 or 1 */
unsigned char retainHandling; /* 0, 1 or 2 */
};
} MQTTV5Packet_subscribeOptions;

DLLExport int32_t MQTTV5Serialize_subscribe(unsigned char* buf, int32_t buflen, unsigned char dup, unsigned short packetid,
MQTTProperties* properties, int count, MQTTString topicFilters[], unsigned char requestedQoSs[], struct subscribeOptions options[]);
MQTTProperties* properties, int count, MQTTString topicFilters[], unsigned char requestedQoSs[], MQTTV5Packet_subscribeOptions options[]);

DLLExport int32_t MQTTV5Deserialize_subscribe(unsigned char* dup, unsigned short* packetid, MQTTProperties* properties,
int maxcount, int* count, MQTTString topicFilters[], unsigned char requestedQoSs[], struct subscribeOptions options[],
int maxcount, int* count, MQTTString topicFilters[], unsigned char requestedQoSs[], MQTTV5Packet_subscribeOptions options[],
unsigned char* buf, int len);

DLLExport int32_t MQTTV5Serialize_suback(unsigned char* buf, int32_t buflen, unsigned short packetid,
Expand Down
2 changes: 1 addition & 1 deletion MQTTPacket/test/test2.c
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ int test3(struct Options options)
MQTTString topicStrings2[TOPIC_COUNT] = { MQTTString_initializer, MQTTString_initializer };
unsigned char req_qoss2[TOPIC_COUNT] = {0, 0};

struct subscribeOptions subscribeOptions[2] = {{1, 1, 1}, {1, 0, 2}},
MQTTV5Packet_subscribeOptions subscribeOptions[2] = {{1, 1, 1}, {1, 0, 2}},
outSubscribeOptions[2] = {{0, 0, 0}, {0, 0, 0}};

MQTTProperties properties = MQTTProperties_initializer;
Expand Down
2 changes: 1 addition & 1 deletion MQTTPacket/test/test3.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ int test1(struct Options options)
/* subscribe */
properties.length = properties.count = 0; /* remove existing properties */
topicString.cstring = test_topic;
struct subscribeOptions opts = {0, 0, 0};
MQTTV5Packet_subscribeOptions opts = {0, 0, 0};
opts.noLocal = 0;
opts.retainAsPublished = 1;
opts.retainHandling = 2;
Expand Down

0 comments on commit 868706c

Please sign in to comment.