diff --git a/MQTTClient-C/src/CMakeLists.txt b/MQTTClient-C/src/CMakeLists.txt index a2327ea9..632ca4d7 100644 --- a/MQTTClient-C/src/CMakeLists.txt +++ b/MQTTClient-C/src/CMakeLists.txt @@ -19,12 +19,17 @@ file(GLOB SOURCES "*.c" "linux/*.c") -add_library( - paho-embed-mqtt3cc SHARED - ${SOURCES} -) +add_library(paho-embed-mqtt3cc SHARED ${SOURCES}) install(TARGETS paho-embed-mqtt3cc DESTINATION /usr/lib) target_include_directories(paho-embed-mqtt3cc PRIVATE "linux") target_link_libraries(paho-embed-mqtt3cc paho-embed-mqtt3c) target_compile_definitions(paho-embed-mqtt3cc PRIVATE MQTTCLIENT_PLATFORM_HEADER=MQTTLinux.h MQTTCLIENT_QOS2=1) + +file(GLOB SOURCES "*.c" "V5/*.c" "linux/*.c") +add_library(paho-embed-mqtt5cc SHARED ${SOURCES}) +install(TARGETS paho-embed-mqtt5cc DESTINATION /usr/lib) +target_include_directories(paho-embed-mqtt5cc PRIVATE "linux") +target_link_libraries(paho-embed-mqtt5cc paho-embed-mqtt5c) +target_compile_definitions(paho-embed-mqtt5cc PRIVATE + MQTTCLIENT_PLATFORM_HEADER=MQTTLinux.h MQTTCLIENT_QOS2=1 MQTTV5) diff --git a/MQTTClient-C/src/V5/MQTTV5Client.c b/MQTTClient-C/src/V5/MQTTV5Client.c new file mode 100755 index 00000000..dac86edf --- /dev/null +++ b/MQTTClient-C/src/V5/MQTTV5Client.c @@ -0,0 +1,716 @@ +/******************************************************************************* + * Copyright (c) 2023 Microsoft Corporation. All rights reserved. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + *******************************************************************************/ + +#include "MQTTClient.h" + +#include +#include + +static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) { + md->topicName = aTopicName; + md->message = aMessage; +} + + +static int getNextPacketId(MQTTClient *c) { + return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; +} + + +static int sendPacket(MQTTClient* c, int32_t length, Timer* timer) +{ + int rc = FAILURE, + sent = 0; + + while (sent < length && !TimerIsExpired(timer)) + { + rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length - sent, TimerLeftMS(timer)); + if (rc < 0) // there was an error writing the data + break; + sent += rc; + } + if (sent == length) + { + TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet + rc = SUCCESS; + } + else + rc = FAILURE; + return rc; +} + + +void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, + unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) +{ + int i; + c->ipstack = network; + + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + c->messageHandlers[i].topicFilter = 0; + c->command_timeout_ms = command_timeout_ms; + c->buf = sendbuf; + c->buf_size = sendbuf_size; + c->readbuf = readbuf; + c->readbuf_size = readbuf_size; + c->isconnected = 0; + c->cleansession = 0; + c->ping_outstanding = 0; + c->defaultMessageHandler = NULL; + c->next_packetid = 1; + TimerInit(&c->last_sent); + TimerInit(&c->last_received); + TimerInit(&c->pingresp_timer); +#if defined(MQTT_TASK) + MutexInit(&c->mutex); +#endif +} + + +static int decodePacket(MQTTClient* c, int* value, int timeout) +{ + unsigned char i; + int multiplier = 1; + int32_t len = 0; + const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; + + *value = 0; + do + { + int rc = MQTTPACKET_READ_ERROR; + + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) + { + rc = MQTTPACKET_READ_ERROR; /* bad data */ + goto exit; + } + rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout); + if (rc != 1) + goto exit; + *value += (i & 127) * multiplier; + multiplier *= 128; + } while ((i & 128) != 0); +exit: + return len; +} + + +static int readPacket(MQTTClient* c, Timer* timer) +{ + MQTTHeader header = {0}; + int32_t len = 0; + int rem_len = 0; + + /* 1. read the header byte. This has the packet type in it */ + int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)); + if (rc != 1) + goto exit; + + len = 1; + /* 2. read the remaining length. This is variable in itself */ + decodePacket(c, &rem_len, TimerLeftMS(timer)); + len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ + + if (rem_len > (c->readbuf_size - len)) + { + rc = BUFFER_OVERFLOW; + goto exit; + } + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) { + rc = 0; + goto exit; + } + + header.byte = c->readbuf[0]; + rc = header.bits.type; + if (c->keepAliveInterval > 0) + TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet +exit: + return rc; +} + + +// assume topic filter and name is in correct format +// # can only be at end +// + and # can only be next to separator +static char isTopicMatched(char* topicFilter, MQTTString* topicName) +{ + char* curf = topicFilter; + char* curn = topicName->lenstring.data; + char* curn_end = curn + topicName->lenstring.len; + + while (*curf && curn < curn_end) + { + if (*curn == '/' && *curf != '/') + break; + if (*curf != '+' && *curf != '#' && *curf != *curn) + break; + if (*curf == '+') + { // skip until we meet the next separator, or end of string + char* nextpos = curn + 1; + while (nextpos < curn_end && *nextpos != '/') + nextpos = ++curn + 1; + } + else if (*curf == '#') + curn = curn_end - 1; // skip until end of string + curf++; + curn++; + }; + + return (curn == curn_end) && (*curf == '\0' || *curf == '#'); +} + + +int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) +{ + int i; + int rc = FAILURE; + + // we have to find the right message handler - indexed by topic + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || + isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) + { + if (c->messageHandlers[i].fp != NULL) + { + MessageData md; + NewMessageData(&md, topicName, message); + c->messageHandlers[i].fp(&md); + rc = SUCCESS; + } + } + } + + if (rc == FAILURE && c->defaultMessageHandler != NULL) + { + MessageData md; + NewMessageData(&md, topicName, message); + c->defaultMessageHandler(&md); + rc = SUCCESS; + } + + return rc; +} + + +int keepalive(MQTTClient* c) +{ + int rc = SUCCESS; + + if (c->keepAliveInterval == 0) + goto exit; + + // If we are waiting for a ping response, check if it has been too long + if (c->ping_outstanding) + { + if (TimerIsExpired(&c->pingresp_timer)) + { + rc = FAILURE; /* PINGRESP not received in keepalive interval */ + goto exit; + } + } else + { + // If we have not sent or received anything in the timeout period, + // send out a ping request + if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received)) + { + Timer timer; + + TimerInit(&timer); + TimerCountdownMS(&timer, 1000); + int32_t len = MQTTSerialize_pingreq(c->buf, c->buf_size); + if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) + { + // send the ping packet + // Expect the PINGRESP within 2 seconds of the PINGREQ + // being sent + TimerCountdownMS(&c->pingresp_timer, 2000 ); + c->ping_outstanding = 1; + } + } + } + +exit: + return rc; +} + + +void MQTTCleanSession(MQTTClient* c) +{ + int i = 0; + + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + c->messageHandlers[i].topicFilter = NULL; +} + + +void MQTTCloseSession(MQTTClient* c) +{ + c->ping_outstanding = 0; + c->isconnected = 0; + if (c->cleansession) + MQTTCleanSession(c); +} + + +int cycle(MQTTClient* c, Timer* timer) +{ + int32_t len = 0, + rc = SUCCESS; + + int packet_type = readPacket(c, timer); /* read the socket, see what work is due */ + + switch (packet_type) + { + default: + /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ + rc = packet_type; + goto exit; + case 0: /* timed out reading packet */ + break; + case CONNACK: + case PUBACK: + case SUBACK: + case UNSUBACK: + break; + case PUBLISH: + { + MQTTString topicName; + MQTTMessage msg; + unsigned char intQoS; + msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ + if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, + (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) + goto exit; + msg.qos = (enum QoS)intQoS; + deliverMessage(c, &topicName, &msg); + if (msg.qos != QOS0) + { + if (msg.qos == QOS1) + len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); + else if (msg.qos == QOS2) + len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); + if (len <= 0) + rc = FAILURE; + else + rc = sendPacket(c, len, timer); + if (rc == FAILURE) + goto exit; // there was a problem + } + break; + } + case PUBREC: + case PUBREL: + { + unsigned short mypacketid; + unsigned char dup, type; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + rc = FAILURE; + else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, + (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) + rc = FAILURE; + else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet + rc = FAILURE; // there was a problem + if (rc == FAILURE) + goto exit; // there was a problem + break; + } + + case PUBCOMP: + break; + case PINGRESP: + c->ping_outstanding = 0; + break; + } + + if (keepalive(c) != SUCCESS) { + //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT + rc = FAILURE; + } + +exit: + if (rc == SUCCESS) + rc = packet_type; + else if (c->isconnected) + MQTTCloseSession(c); + return rc; +} + + +int MQTTYield(MQTTClient* c, int timeout_ms) +{ + int rc = SUCCESS; + Timer timer; + + TimerInit(&timer); + TimerCountdownMS(&timer, timeout_ms); + + do + { + if (cycle(c, &timer) < 0) + { + rc = FAILURE; + break; + } + } while (!TimerIsExpired(&timer)); + + return rc; +} + +int MQTTIsConnected(MQTTClient* client) +{ + return client->isconnected; +} + +void MQTTRun(void* parm) +{ + Timer timer; + MQTTClient* c = (MQTTClient*)parm; + + TimerInit(&timer); + + while (1) + { +#if defined(MQTT_TASK) + MutexLock(&c->mutex); +#endif + TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */ + cycle(c, &timer); +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + } +} + + +#if defined(MQTT_TASK) +int MQTTStartTask(MQTTClient* client) +{ + return ThreadStart(&client->thread, &MQTTRun, client); +} +#endif + + +int waitfor(MQTTClient* c, int packet_type, Timer* timer) +{ + int rc = FAILURE; + + do + { + if (TimerIsExpired(timer)) + break; // we timed out + rc = cycle(c, timer); + } + while (rc != packet_type && rc >= 0); + + return rc; +} + + + + +int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data) +{ + Timer connect_timer; + int rc = FAILURE; + MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; + int32_t len = 0; + +#if defined(MQTT_TASK) + MutexLock(&c->mutex); +#endif + if (c->isconnected) /* don't send connect packet again if we are already connected */ + goto exit; + + TimerInit(&connect_timer); + TimerCountdownMS(&connect_timer, c->command_timeout_ms); + + if (options == 0) + options = &default_options; /* set default options if none were supplied */ + + c->keepAliveInterval = options->keepAliveInterval; + c->cleansession = options->cleansession; + TimerCountdown(&c->last_received, c->keepAliveInterval); + if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) + goto exit; + if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet + goto exit; // there was a problem + + // this will be a blocking call, wait for the connack + if (waitfor(c, CONNACK, &connect_timer) == CONNACK) + { + data->rc = 0; + data->sessionPresent = 0; + if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) + rc = data->rc; + else + rc = FAILURE; + } + else + rc = FAILURE; + +exit: + if (rc == SUCCESS) + { + c->isconnected = 1; + c->ping_outstanding = 0; + } + +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + + return rc; +} + + +int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) +{ + MQTTConnackData data; + return MQTTConnectWithResults(c, options, &data); +} + + +int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler) +{ + int rc = FAILURE; + int i = -1; + + /* first check for an existing matching slot */ + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0) + { + if (messageHandler == NULL) /* remove existing */ + { + c->messageHandlers[i].topicFilter = NULL; + c->messageHandlers[i].fp = NULL; + } + rc = SUCCESS; /* return i when adding new subscription */ + break; + } + } + /* if no existing, look for empty slot (unless we are removing) */ + if (messageHandler != NULL) { + if (rc == FAILURE) + { + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (c->messageHandlers[i].topicFilter == NULL) + { + rc = SUCCESS; + break; + } + } + } + if (i < MAX_MESSAGE_HANDLERS) + { + c->messageHandlers[i].topicFilter = topicFilter; + c->messageHandlers[i].fp = messageHandler; + } + } + return rc; +} + + +int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos, + messageHandler messageHandler, MQTTSubackData* data) +{ + int rc = FAILURE; + Timer timer; + int32_t len = 0; + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)topicFilter; + +#if defined(MQTT_TASK) + MutexLock(&c->mutex); +#endif + if (!c->isconnected) + goto exit; + + TimerInit(&timer); + TimerCountdownMS(&timer, c->command_timeout_ms); + + unsigned char _qos = qos; + len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, &_qos); + if (len <= 0) + goto exit; + if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet + goto exit; // there was a problem + + if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback + { + int count = 0; + unsigned short mypacketid; + unsigned char grantedQoS = QOS0; + int retval = MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size); + data->grantedQoS = grantedQoS; + if (retval == 1) + { + if (data->grantedQoS != 0x80) + rc = MQTTSetMessageHandler(c, topicFilter, messageHandler); + } + } + else + rc = FAILURE; + +exit: + if (rc == FAILURE) + MQTTCloseSession(c); +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + return rc; +} + + +int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, + messageHandler messageHandler) +{ + MQTTSubackData data; + return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data); +} + + +int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) +{ + int rc = FAILURE; + Timer timer; + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)topicFilter; + int32_t len = 0; + +#if defined(MQTT_TASK) + MutexLock(&c->mutex); +#endif + if (!c->isconnected) + goto exit; + + TimerInit(&timer); + TimerCountdownMS(&timer, c->command_timeout_ms); + + if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) + goto exit; + if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet + goto exit; // there was a problem + + if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) + { + unsigned short mypacketid; // should be the same as the packetid above + if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) + { + /* remove the subscription message handler associated with this topic, if there is one */ + MQTTSetMessageHandler(c, topicFilter, NULL); + } + } + else + rc = FAILURE; + +exit: + if (rc == FAILURE) + MQTTCloseSession(c); +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + return rc; +} + + +int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) +{ + int rc = FAILURE; + Timer timer; + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)topicName; + int32_t len = 0; + +#if defined(MQTT_TASK) + MutexLock(&c->mutex); +#endif + if (!c->isconnected) + goto exit; + + TimerInit(&timer); + TimerCountdownMS(&timer, c->command_timeout_ms); + + if (message->qos == QOS1 || message->qos == QOS2) + message->id = getNextPacketId(c); + + len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, + topic, (unsigned char*)message->payload, message->payloadlen); + if (len <= 0) + goto exit; + if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet + goto exit; // there was a problem + + if (message->qos == QOS1) + { + if (waitfor(c, PUBACK, &timer) == PUBACK) + { + unsigned short mypacketid; + unsigned char dup, type; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + rc = FAILURE; + } + else + rc = FAILURE; + } + else if (message->qos == QOS2) + { + if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) + { + unsigned short mypacketid; + unsigned char dup, type; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + rc = FAILURE; + } + else + rc = FAILURE; + } + +exit: + if (rc == FAILURE) + MQTTCloseSession(c); +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + return rc; +} + + +int MQTTDisconnect(MQTTClient* c) +{ + int rc = FAILURE; + Timer timer; // we might wait for incomplete incoming publishes to complete + int32_t len = 0; + +#if defined(MQTT_TASK) + MutexLock(&c->mutex); +#endif + TimerInit(&timer); + TimerCountdownMS(&timer, c->command_timeout_ms); + + len = MQTTSerialize_disconnect(c->buf, c->buf_size); + if (len > 0) + rc = sendPacket(c, len, &timer); // send the disconnect packet + MQTTCloseSession(c); + +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + return rc; +} diff --git a/MQTTClient-C/src/V5/MQTTV5Client.h b/MQTTClient-C/src/V5/MQTTV5Client.h new file mode 100755 index 00000000..a64086f3 --- /dev/null +++ b/MQTTClient-C/src/V5/MQTTV5Client.h @@ -0,0 +1,229 @@ +/******************************************************************************* + * Copyright (c) 2023 Microsoft Corporation. All rights reserved. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + *******************************************************************************/ + +#if !defined(MQTT_CLIENT_H) +#define MQTT_CLIENT_H + +#if defined(__cplusplus) + extern "C" { +#endif + +#if defined(WIN32_DLL) || defined(WIN64_DLL) + #define DLLImport __declspec(dllimport) + #define DLLExport __declspec(dllexport) +#elif defined(LINUX_SO) + #define DLLImport extern + #define DLLExport __attribute__ ((visibility ("default"))) +#else + #define DLLImport + #define DLLExport +#endif + +#include "MQTTPacket.h" + +#if defined(MQTTCLIENT_PLATFORM_HEADER) +/* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value + * into a string constant suitable for use with include. + */ +#define xstr(s) str(s) +#define str(s) #s +#include xstr(MQTTCLIENT_PLATFORM_HEADER) +#endif + +#define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */ + +#if !defined(MAX_MESSAGE_HANDLERS) +#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */ +#endif + +enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 }; + +/* all failure return codes must be negative */ +enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; + +/* The Platform specific header must define the Network and Timer structures and functions + * which operate on them. + * +typedef struct Network +{ + int (*mqttread)(Network*, unsigned char* read_buffer, int, int); + int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int); +} Network;*/ + +/* The Timer structure must be defined in the platform specific header, + * and have the following functions to operate on it. */ +extern void TimerInit(Timer*); +extern char TimerIsExpired(Timer*); +extern void TimerCountdownMS(Timer*, unsigned int); +extern void TimerCountdown(Timer*, unsigned int); +extern int TimerLeftMS(Timer*); + +typedef struct MQTTMessage +{ + enum QoS qos; + unsigned char retained; + unsigned char dup; + unsigned short id; + void *payload; + size_t payloadlen; +} MQTTMessage; + +typedef struct MessageData +{ + MQTTMessage* message; + MQTTString* topicName; +} MessageData; + +typedef struct MQTTConnackData +{ + unsigned char rc; + unsigned char sessionPresent; +} MQTTConnackData; + +typedef struct MQTTSubackData +{ + enum QoS grantedQoS; +} MQTTSubackData; + +typedef void (*messageHandler)(MessageData*); + +typedef struct MQTTClient +{ + unsigned int next_packetid, + command_timeout_ms; + size_t buf_size, + readbuf_size; + unsigned char *buf, + *readbuf; + unsigned int keepAliveInterval; + char ping_outstanding; + int isconnected; + int cleansession; + + struct MessageHandlers + { + const char* topicFilter; + void (*fp) (MessageData*); + } messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */ + + void (*defaultMessageHandler) (MessageData*); + + Network* ipstack; + Timer last_sent, last_received, pingresp_timer; +#if defined(MQTT_TASK) + Mutex mutex; + Thread thread; +#endif +} MQTTClient; + +#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0} + + +/** + * Create an MQTT client object + * @param client + * @param network + * @param command_timeout_ms + * @param + */ +DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms, + unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size); + +/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack + * The nework object must be connected to the network endpoint before calling this + * @param options - connect options + * @return success code + */ +DLLExport int MQTTConnectWithResults(MQTTClient* client, MQTTPacket_connectData* options, + MQTTConnackData* data); + +/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack + * The nework object must be connected to the network endpoint before calling this + * @param options - connect options + * @return success code + */ +DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options); + +/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs + * @param client - the client object to use + * @param topic - the topic to publish to + * @param message - the message to send + * @return success code + */ +DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*); + +/** MQTT SetMessageHandler - set or remove a per topic message handler + * @param client - the client object to use + * @param topicFilter - the topic filter set the message handler for + * @param messageHandler - pointer to the message handler function or NULL to remove + * @return success code + */ +DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler); + +/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. + * @param client - the client object to use + * @param topicFilter - the topic filter to subscribe to + * @param message - the message to send + * @return success code + */ +DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler); + +/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. + * @param client - the client object to use + * @param topicFilter - the topic filter to subscribe to + * @param message - the message to send + * @param data - suback granted QoS returned + * @return success code + */ +DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data); + +/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning. + * @param client - the client object to use + * @param topicFilter - the topic filter to unsubscribe from + * @return success code + */ +DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter); + +/** MQTT Disconnect - send an MQTT disconnect packet and close the connection + * @param client - the client object to use + * @return success code + */ +DLLExport int MQTTDisconnect(MQTTClient* client); + +/** MQTT Yield - MQTT background + * @param client - the client object to use + * @param time - the time, in milliseconds, to yield for + * @return success code + */ +DLLExport int MQTTYield(MQTTClient* client, int time); + +/** MQTT isConnected + * @param client - the client object to use + * @return truth value indicating whether the client is connected to the server + */ +DLLExport int MQTTIsConnected(MQTTClient* client); + +#if defined(MQTT_TASK) +/** MQTT start background thread for a client. After this, MQTTYield should not be called. +* @param client - the client object to use +* @return success code +*/ +DLLExport int MQTTStartTask(MQTTClient* client); +#endif + +#if defined(__cplusplus) + } +#endif + +#endif