Paho MQTT Embedded C client: Use submodule pointing to upstream repo

Due to code organisation of upstream repo, MQTTClient still needs
copying into our repo.
This commit is contained in:
Angus Gratton 2016-05-28 16:23:13 +10:00
parent 9b21c54fc5
commit 42880fded5
23 changed files with 240 additions and 1828 deletions

4
.gitmodules vendored
View file

@ -10,4 +10,6 @@
[submodule "bootloader/rboot"] [submodule "bootloader/rboot"]
path = bootloader/rboot path = bootloader/rboot
url = https://github.com/raburton/rboot.git url = https://github.com/raburton/rboot.git
[submodule "extras/paho_mqtt_c/org.eclipse.paho.mqtt.embedded-c"]
path = extras/paho_mqtt_c/org.eclipse.paho.mqtt.embedded-c
url = https://git.eclipse.org/r/paho/org.eclipse.paho.mqtt.embedded-c

View file

@ -1,2 +1,6 @@
PROGRAM=blink PROGRAM=blink
EXTRA_COMPONENTS = extras/mbedtls
include ../../common.mk include ../../common.mk

View file

@ -8,6 +8,10 @@
#include "task.h" #include "task.h"
#include "esp8266.h" #include "esp8266.h"
#include "mbedtls/aes.h"
#include "xtensa_ops.h"
#include <string.h>
const int gpio = 2; const int gpio = 2;
/* This task uses the high level GPIO API (esp_gpio.h) to blink an LED. /* This task uses the high level GPIO API (esp_gpio.h) to blink an LED.
@ -53,6 +57,32 @@ void blinkenRegisterTask(void *pvParameters)
void user_init(void) void user_init(void)
{ {
uart_set_baud(0, 115200); uart_set_baud(0, 115200);
xTaskCreate(blinkenTask, (signed char *)"blinkenTask", 256, NULL, 2, NULL); static uint8_t data[1024];
//xTaskCreate(blinkenRegisterTask, (signed char *)"blinkenRegisterTask", 256, NULL, 2, NULL); static uint8_t output[1024];
static uint8_t iv[16];
static uint8_t key[256 / 8];
memset(data, 0, sizeof(data));
memset(iv, 0, sizeof(iv));
mbedtls_aes_context ctx;
uint32_t before, after;
RSR(before, CCOUNT)
mbedtls_aes_init(&ctx);
mbedtls_aes_setkey_enc(&ctx, key, 256);
for(int r = 0; r < 10; r++) {
mbedtls_aes_crypt_cbc(&ctx,
MBEDTLS_AES_ENCRYPT,
sizeof(data),
iv,
data,
output);
memcpy(data, output, 1024);
}
RSR(after, CCOUNT);
printf("cycle count %d\n", after - before);
vPortExitCritical();
while(1) {}
} }

View file

@ -14,6 +14,8 @@
#include <string.h> #include <string.h>
#include <xtensa_ops.h>
#include "FreeRTOS.h" #include "FreeRTOS.h"
#include "task.h" #include "task.h"
@ -92,6 +94,10 @@ void http_get_task(void *pvParameters)
unsigned char buf[1024]; unsigned char buf[1024];
const char *pers = "ssl_client1"; const char *pers = "ssl_client1";
uint64_t total_delta = 0;
uint32_t delta_count = 0;
uint32_t min_delta = UINT32_MAX;
mbedtls_entropy_context entropy; mbedtls_entropy_context entropy;
mbedtls_ctr_drbg_context ctr_drbg; mbedtls_ctr_drbg_context ctr_drbg;
mbedtls_ssl_context ssl; mbedtls_ssl_context ssl;
@ -210,6 +216,9 @@ void http_get_task(void *pvParameters)
*/ */
printf(" . Performing the SSL/TLS handshake..."); printf(" . Performing the SSL/TLS handshake...");
uint32_t before, after;
before = xTaskGetTickCount();
while((ret = mbedtls_ssl_handshake(&ssl)) != 0) while((ret = mbedtls_ssl_handshake(&ssl)) != 0)
{ {
if(ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) if(ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE)
@ -219,6 +228,15 @@ void http_get_task(void *pvParameters)
} }
} }
after = xTaskGetTickCount();
uint32_t delta = after - before;
total_delta += delta;
delta_count += 1;
if(delta < min_delta) {
min_delta = delta;
}
printf("\n\n**** %d tick handshake - average %d min %d ***\n\n", delta, (uint32_t)(total_delta/delta_count), min_delta);
printf(" ok\n"); printf(" ok\n");
/* /*

View file

@ -1,3 +1,18 @@
/*
MQTT Example Client
Connects to mosquitto test server, publishes to /beat and
subscribes to /esptopic
If using mosquitto, then commands to interact with this example are:
mosquitto_pub -h test.mosquitto.org -t /esptopic -m "Hello!"
mosquitto_sub -h test.mosquitto.org -t /beat
Sample code originally by @baoshi, adapted by Yudi Ludkevich & Angus
Gratton. BSD Licensed.
*/
#include "espressif/esp_common.h" #include "espressif/esp_common.h"
#include "esp/uart.h" #include "esp/uart.h"
@ -10,15 +25,13 @@
#include <espressif/esp_sta.h> #include <espressif/esp_sta.h>
#include <espressif/esp_wifi.h> #include <espressif/esp_wifi.h>
#include <paho_mqtt_c/MQTTESP8266.h> #include <MQTTClient.h>
#include <paho_mqtt_c/MQTTClient.h>
#include <semphr.h> #include <semphr.h>
/* You can use http://test.mosquitto.org/ to test mqtt_client instead /* You can use http://test.mosquitto.org/ to test mqtt_client instead
* of setting up your own MQTT server */ * of setting up your own MQTT server */
#define MQTT_HOST ("test.mosquitto.org") #define MQTT_HOST "test.mosquitto.org"
#define MQTT_PORT 1883 #define MQTT_PORT 1883
#define MQTT_USER NULL #define MQTT_USER NULL
@ -36,21 +49,21 @@ static void beat_task(void *pvParameters)
while (1) { while (1) {
vTaskDelayUntil(&xLastWakeTime, 10000 / portTICK_RATE_MS); vTaskDelayUntil(&xLastWakeTime, 10000 / portTICK_RATE_MS);
printf("beat\r\n");
snprintf(msg, PUB_MSG_LEN, "Beat %d\r\n", count++); snprintf(msg, PUB_MSG_LEN, "Beat %d\r\n", count++);
printf(msg);
if (xQueueSend(publish_queue, (void *)msg, 0) == pdFALSE) { if (xQueueSend(publish_queue, (void *)msg, 0) == pdFALSE) {
printf("Publish queue overflow.\r\n"); printf("Publish queue overflow.\r\n");
} }
} }
} }
static void topic_received(MessageData *md) static void topic_received(struct MessageData *md)
{ {
int i; int i;
MQTTMessage *message = md->message; MQTTMessage *message = md->message;
printf("Received: "); printf("Received: ");
for( i = 0; i < md->topic->lenstring.len; ++i) for( i = 0; i < md->topicName->lenstring.len; ++i)
printf("%c", md->topic->lenstring.data[ i ]); printf("%c", md->topicName->lenstring.data[ i ]);
printf(" = "); printf(" = ");
for( i = 0; i < (int)message->payloadlen; ++i) for( i = 0; i < (int)message->payloadlen; ++i)
@ -86,9 +99,9 @@ static const char * get_my_id(void)
static void mqtt_task(void *pvParameters) static void mqtt_task(void *pvParameters)
{ {
int ret = 0; int ret = 0;
struct Network network; Client client;
MQTTClient client = DefaultClient; Network network;
char mqtt_client_id[20]; char mqtt_client_id[20];
uint8_t mqtt_buf[100]; uint8_t mqtt_buf[100];
uint8_t mqtt_readbuf[100]; uint8_t mqtt_readbuf[100];
@ -111,7 +124,7 @@ static void mqtt_task(void *pvParameters)
continue; continue;
} }
printf("done\n\r"); printf("done\n\r");
NewMQTTClient(&client, &network, 5000, mqtt_buf, 100, MQTTClient(&client, &network, 5000, mqtt_buf, 100,
mqtt_readbuf, 100); mqtt_readbuf, 100);
data.willFlag = 0; data.willFlag = 0;
@ -153,7 +166,7 @@ static void mqtt_task(void *pvParameters)
} }
ret = MQTTYield(&client, 1000); ret = MQTTYield(&client, 1000);
if (ret == DISCONNECTED) if (ret == FAILURE)
break; break;
} }
printf("Connection dropped, request restart\n\r"); printf("Connection dropped, request restart\n\r");

View file

@ -13,26 +13,25 @@
* Contributors: * Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/ *******************************************************************************/
#include <espressif/esp_common.h>
#include <lwip/arch.h>
#include "MQTTClient.h" #include "MQTTClient.h"
void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage) { void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage) {
md->topic = aTopicName; md->topicName = aTopicName;
md->message = aMessgage; md->message = aMessgage;
} }
int getNextPacketId(MQTTClient *c) { int getNextPacketId(Client *c) {
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
} }
int sendPacket(MQTTClient* c, int length, Timer* timer) int sendPacket(Client* c, int length, Timer* timer)
{ {
int rc = FAILURE, int rc = FAILURE,
sent = 0; sent = 0;
while (sent < length && !expired(timer)) while (sent < length && !expired(timer))
{ {
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer)); rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer));
@ -42,7 +41,7 @@ int sendPacket(MQTTClient* c, int length, Timer* timer)
} }
if (sent == length) if (sent == length)
{ {
countdown(&(c->ping_timer), c->keepAliveInterval); // record the fact that we have successfully sent the packet countdown(&c->ping_timer, c->keepAliveInterval); // record the fact that we have successfully sent the packet
rc = SUCCESS; rc = SUCCESS;
} }
else else
@ -51,7 +50,26 @@ int sendPacket(MQTTClient* c, int length, Timer* timer)
} }
int decodePacket(MQTTClient* c, int* value, int timeout) void MQTTClient(Client* c, Network* network, unsigned int command_timeout_ms, unsigned char* buf, size_t buf_size, unsigned char* readbuf, size_t readbuf_size)
{
int i;
c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms;
c->buf = buf;
c->buf_size = buf_size;
c->readbuf = readbuf;
c->readbuf_size = readbuf_size;
c->isconnected = 0;
c->ping_outstanding = 0;
c->defaultMessageHandler = NULL;
InitTimer(&c->ping_timer);
}
int decodePacket(Client* c, int* value, int timeout)
{ {
unsigned char i; unsigned char i;
int multiplier = 1; int multiplier = 1;
@ -79,7 +97,7 @@ exit:
} }
int readPacket(MQTTClient* c, Timer* timer) int readPacket(Client* c, Timer* timer)
{ {
int rc = FAILURE; int rc = FAILURE;
MQTTHeader header = {0}; MQTTHeader header = {0};
@ -102,7 +120,6 @@ int readPacket(MQTTClient* c, Timer* timer)
header.byte = c->readbuf[0]; header.byte = c->readbuf[0];
rc = header.bits.type; rc = header.bits.type;
exit: exit:
//dmsg_printf("readPacket=%d\r\n", rc);
return rc; return rc;
} }
@ -110,12 +127,12 @@ exit:
// assume topic filter and name is in correct format // assume topic filter and name is in correct format
// # can only be at end // # can only be at end
// + and # can only be next to separator // + and # can only be next to separator
char isTopicMatched(char* topicFilter, MQTTString* topicName) char isTopicMatched(char* topicFilter, MQTTString* topicName)
{ {
char* curf = topicFilter; char* curf = topicFilter;
char* curn = topicName->lenstring.data; char* curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len; char* curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end) while (*curf && curn < curn_end)
{ {
if (*curn == '/' && *curf != '/') if (*curn == '/' && *curf != '/')
@ -133,12 +150,12 @@ char isTopicMatched(char* topicFilter, MQTTString* topicName)
curf++; curf++;
curn++; curn++;
}; };
return (curn == curn_end) && (*curf == '\0'); return (curn == curn_end) && (*curf == '\0');
} }
int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) int deliverMessage(Client* c, MQTTString* topicName, MQTTMessage* message)
{ {
int i; int i;
int rc = FAILURE; int rc = FAILURE;
@ -158,22 +175,22 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
} }
} }
} }
if (rc == FAILURE && c->defaultMessageHandler != NULL) if (rc == FAILURE && c->defaultMessageHandler != NULL)
{ {
MessageData md; MessageData md;
NewMessageData(&md, topicName, message); NewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md); c->defaultMessageHandler(&md);
rc = SUCCESS; rc = SUCCESS;
} }
return rc; return rc;
} }
int keepalive(MQTTClient* c) int keepalive(Client* c)
{ {
int rc = SUCCESS; int rc = FAILURE;
if (c->keepAliveInterval == 0) if (c->keepAliveInterval == 0)
{ {
@ -181,30 +198,17 @@ int keepalive(MQTTClient* c)
goto exit; goto exit;
} }
if (expired(&(c->ping_timer))) if (expired(&c->ping_timer))
{ {
if (c->ping_outstanding) if (!c->ping_outstanding)
{
// if ping failure accumulated above MAX_FAIL_ALLOWED, the connection is broken
++(c->fail_count);
if (c->fail_count >= MAX_FAIL_ALLOWED)
{
rc = DISCONNECTED;
goto exit;
}
}
else
{ {
Timer timer; Timer timer;
InitTimer(&timer); InitTimer(&timer);
countdown_ms(&timer, 1000); countdown_ms(&timer, 1000);
c->ping_outstanding = 1;
int len = MQTTSerialize_pingreq(c->buf, c->buf_size); int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
if (len > 0) if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
sendPacket(c, len, &timer); c->ping_outstanding = 1;
} }
// re-arm ping counter
countdown(&(c->ping_timer), c->keepAliveInterval);
} }
exit: exit:
@ -212,11 +216,11 @@ exit:
} }
int cycle(MQTTClient* c, Timer* timer) int cycle(Client* c, Timer* timer)
{ {
// read the socket, see what work is due // read the socket, see what work is due
unsigned short packet_type = readPacket(c, timer); unsigned short packet_type = readPacket(c, timer);
int len = 0, int len = 0,
rc = SUCCESS; rc = SUCCESS;
@ -242,8 +246,8 @@ int cycle(MQTTClient* c, Timer* timer)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
if (len <= 0) if (len <= 0)
rc = FAILURE; rc = FAILURE;
else else
rc = sendPacket(c, len, timer); rc = sendPacket(c, len, timer);
if (rc == FAILURE) if (rc == FAILURE)
goto exit; // there was a problem goto exit; // there was a problem
} }
@ -266,14 +270,10 @@ int cycle(MQTTClient* c, Timer* timer)
case PUBCOMP: case PUBCOMP:
break; break;
case PINGRESP: case PINGRESP:
{ c->ping_outstanding = 0;
c->ping_outstanding = 0;
c->fail_count = 0;
}
break; break;
} }
if (c->isconnected) keepalive(c);
rc = keepalive(c);
exit: exit:
if (rc == SUCCESS) if (rc == SUCCESS)
rc = packet_type; rc = packet_type;
@ -281,69 +281,49 @@ exit:
} }
void NewMQTTClient(MQTTClient* c, Network* network, unsigned int command_timeout_ms, unsigned char* buf, size_t buf_size, unsigned char* readbuf, size_t readbuf_size) int MQTTYield(Client* c, int timeout_ms)
{
int i;
c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms;
c->buf = buf;
c->buf_size = buf_size;
c->readbuf = readbuf;
c->readbuf_size = readbuf_size;
c->isconnected = 0;
c->ping_outstanding = 0;
c->fail_count = 0;
c->defaultMessageHandler = NULL;
InitTimer(&(c->ping_timer));
}
int MQTTYield(MQTTClient* c, int timeout_ms)
{ {
int rc = SUCCESS; int rc = SUCCESS;
Timer timer; Timer timer;
InitTimer(&timer); InitTimer(&timer);
countdown_ms(&timer, timeout_ms); countdown_ms(&timer, timeout_ms);
while (!expired(&timer)) while (!expired(&timer))
{ {
rc = cycle(c, &timer); if (cycle(c, &timer) == FAILURE)
// cycle could return 0 or packet_type or 65535 if nothing is read {
// cycle returns DISCONNECTED only if keepalive() fails. rc = FAILURE;
if (rc == DISCONNECTED)
break; break;
rc = SUCCESS; }
} }
return rc; return rc;
} }
// only used in single-threaded mode where one command at a time is in process // only used in single-threaded mode where one command at a time is in process
int waitfor(MQTTClient* c, int packet_type, Timer* timer) int waitfor(Client* c, int packet_type, Timer* timer)
{ {
int rc = FAILURE; int rc = FAILURE;
do do
{ {
if (expired(timer)) if (expired(timer))
break; // we timed out break; // we timed out
} }
while ((rc = cycle(c, timer)) != packet_type); while ((rc = cycle(c, timer)) != packet_type);
return rc; return rc;
} }
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) int MQTTConnect(Client* c, MQTTPacket_connectData* options)
{ {
Timer connect_timer; Timer connect_timer;
int rc = FAILURE; int rc = FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
int len = 0; int len = 0;
InitTimer(&connect_timer); InitTimer(&connect_timer);
countdown_ms(&connect_timer, c->command_timeout_ms); countdown_ms(&connect_timer, c->command_timeout_ms);
@ -352,15 +332,14 @@ int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
if (options == 0) if (options == 0)
options = &default_options; // set default options if none were supplied options = &default_options; // set default options if none were supplied
c->keepAliveInterval = options->keepAliveInterval; c->keepAliveInterval = options->keepAliveInterval;
countdown(&(c->ping_timer), c->keepAliveInterval); countdown(&c->ping_timer, c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
goto exit; // there was a problem goto exit; // there was a problem
// this will be a blocking call, wait for the connack // this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK) if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
{ {
@ -373,7 +352,7 @@ int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
} }
else else
rc = FAILURE; rc = FAILURE;
exit: exit:
if (rc == SUCCESS) if (rc == SUCCESS)
c->isconnected = 1; c->isconnected = 1;
@ -381,33 +360,32 @@ exit:
} }
int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandler handler) int MQTTSubscribe(Client* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; Timer timer;
int len = 0; int len = 0;
MQTTString topicStr = MQTTString_initializer; MQTTString topic = MQTTString_initializer;
topicStr.cstring = (char *)topic; topic.cstring = (char *)topicFilter;
InitTimer(&timer); InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms); countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topicStr, (int*)&qos); len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
if (len <= 0) if (len <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
{
goto exit; // there was a problem goto exit; // there was a problem
}
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{ {
int count = 0, grantedQoS = -1; int count = 0, grantedQoS = -1;
unsigned short mypacketid; unsigned short mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1) if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80 rc = grantedQoS; // 0, 1, 2 or 0x80
if (rc != 0x80) if (rc != 0x80)
{ {
int i; int i;
@ -415,126 +393,111 @@ int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandle
{ {
if (c->messageHandlers[i].topicFilter == 0) if (c->messageHandlers[i].topicFilter == 0)
{ {
c->messageHandlers[i].topicFilter = topic; c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = handler; c->messageHandlers[i].fp = messageHandler;
rc = 0; rc = 0;
break; break;
} }
} }
} }
} }
else else
rc = FAILURE; rc = FAILURE;
exit: exit:
return rc; return rc;
} }
int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) int MQTTUnsubscribe(Client* c, const char* topicFilter)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; Timer timer;
MQTTString topic = MQTTString_initializer; MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter; topic.cstring = (char *)topicFilter;
int len = 0; int len = 0;
InitTimer(&timer); InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms); countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
{ {
unsigned short mypacketid; // should be the same as the packetid above unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
rc = 0; rc = 0;
} }
else else
rc = FAILURE; rc = FAILURE;
exit: exit:
return rc; return rc;
} }
int MQTTPublish(MQTTClient* c, const char* topic, MQTTMessage* message) int MQTTPublish(Client* c, const char* topicName, MQTTMessage* message)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; Timer timer;
MQTTString topicStr = MQTTString_initializer; MQTTString topic = MQTTString_initializer;
topicStr.cstring = (char *)topic; topic.cstring = (char *)topicName;
int len = 0; int len = 0;
InitTimer(&timer); InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms); countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
if (message->qos == QOS1 || message->qos == QOS2) if (message->qos == QOS1 || message->qos == QOS2)
message->id = getNextPacketId(c); message->id = getNextPacketId(c);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topicStr, (unsigned char*)message->payload, message->payloadlen); topic, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0) if (len <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
{
goto exit; // there was a problem goto exit; // there was a problem
}
if (message->qos == QOS1) if (message->qos == QOS1)
{ {
if (waitfor(c, PUBACK, &timer) == PUBACK) if (waitfor(c, PUBACK, &timer) == PUBACK)
{ {
// We still can receive from broker, treat as recoverable
c->fail_count = 0;
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE; rc = FAILURE;
else
rc = SUCCESS;
} }
else else
{
rc = FAILURE; rc = FAILURE;
}
} }
else if (message->qos == QOS2) else if (message->qos == QOS2)
{ {
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{ {
// We still can receive from broker, treat as recoverable
c->fail_count = 0;
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE; rc = FAILURE;
else
rc = SUCCESS;
} }
else else
{
rc = FAILURE; rc = FAILURE;
}
} }
exit: exit:
return rc; return rc;
} }
int MQTTDisconnect(MQTTClient* c) int MQTTDisconnect(Client* c)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete Timer timer; // we might wait for incomplete incoming publishes to complete
int len = MQTTSerialize_disconnect(c->buf, c->buf_size); int len = MQTTSerialize_disconnect(c->buf, c->buf_size);
@ -544,7 +507,7 @@ int MQTTDisconnect(MQTTClient* c)
if (len > 0) if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet rc = sendPacket(c, len, &timer); // send the disconnect packet
c->isconnected = 0; c->isconnected = 0;
return rc; return rc;
} }

View file

@ -18,20 +18,24 @@
#define __MQTT_CLIENT_C_ #define __MQTT_CLIENT_C_
#include "MQTTPacket.h" #include "MQTTPacket.h"
#include "MQTTESP8266.h" #include "stdio.h"
#include "MQTTESP8266.h" //Platform specific implementation header file
#define MAX_PACKET_ID 65535 #define MAX_PACKET_ID 65535
#define MAX_MESSAGE_HANDLERS 5 #define MAX_MESSAGE_HANDLERS 5
#define MAX_FAIL_ALLOWED 2
enum QoS { QOS0, QOS1, QOS2 }; enum QoS { QOS0, QOS1, QOS2 };
// all failure return codes must be negative // all failure return codes must be negative
enum returnCode {DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
void NewTimer(Timer*); void NewTimer(Timer*);
typedef struct _MQTTMessage typedef struct MQTTMessage MQTTMessage;
typedef struct MessageData MessageData;
struct MQTTMessage
{ {
enum QoS qos; enum QoS qos;
char retained; char retained;
@ -39,26 +43,37 @@ typedef struct _MQTTMessage
unsigned short id; unsigned short id;
void *payload; void *payload;
size_t payloadlen; size_t payloadlen;
} MQTTMessage; };
typedef struct _MessageData struct MessageData
{ {
MQTTString* topic;
MQTTMessage* message; MQTTMessage* message;
} MessageData; MQTTString* topicName;
};
typedef void (*messageHandler)(MessageData*); typedef void (*messageHandler)(MessageData*);
struct _MQTTClient typedef struct Client Client;
{
int MQTTConnect (Client*, MQTTPacket_connectData*);
int MQTTPublish (Client*, const char*, MQTTMessage*);
int MQTTSubscribe (Client*, const char*, enum QoS, messageHandler);
int MQTTUnsubscribe (Client*, const char*);
int MQTTDisconnect (Client*);
int MQTTYield (Client*, int);
void setDefaultMessageHandler(Client*, messageHandler);
void MQTTClient(Client*, Network*, unsigned int, unsigned char*, size_t, unsigned char*, size_t);
struct Client {
unsigned int next_packetid; unsigned int next_packetid;
unsigned int command_timeout_ms; unsigned int command_timeout_ms;
size_t buf_size, readbuf_size; size_t buf_size, readbuf_size;
unsigned char *buf; unsigned char *buf;
unsigned char *readbuf; unsigned char *readbuf;
unsigned int keepAliveInterval; unsigned int keepAliveInterval;
char ping_outstanding; char ping_outstanding;
int fail_count;
int isconnected; int isconnected;
struct MessageHandlers struct MessageHandlers
@ -66,26 +81,13 @@ struct _MQTTClient
const char* topicFilter; const char* topicFilter;
void (*fp) (MessageData*); void (*fp) (MessageData*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
void (*defaultMessageHandler) (MessageData*); void (*defaultMessageHandler) (MessageData*);
Network* ipstack; Network* ipstack;
Timer ping_timer; Timer ping_timer;
}; };
typedef struct _MQTTClient MQTTClient;
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options);
int MQTTPublish(MQTTClient* c, const char* topic, MQTTMessage* message);
int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandler handler);
int MQTTUnsubscribe(MQTTClient* c, const char* topic);
int MQTTDisconnect(MQTTClient* c);
int MQTTYield(MQTTClient* c, int timeout_ms);
void NewMQTTClient(MQTTClient*, Network*, unsigned int, unsigned char*, size_t, unsigned char*, size_t);
#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0} #define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
#endif #endif

View file

@ -1,136 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTCONNECT_H_
#define MQTTCONNECT_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
typedef union
{
unsigned char all; /**< all connect flags */
#if defined(REVERSED)
struct
{
unsigned int username : 1; /**< 3.1 user name */
unsigned int password : 1; /**< 3.1 password */
unsigned int willRetain : 1; /**< will retain setting */
unsigned int willQoS : 2; /**< will QoS value */
unsigned int will : 1; /**< will flag */
unsigned int cleansession : 1; /**< clean session flag */
unsigned int : 1; /**< unused */
} bits;
#else
struct
{
unsigned int : 1; /**< unused */
unsigned int cleansession : 1; /**< cleansession flag */
unsigned int will : 1; /**< will flag */
unsigned int willQoS : 2; /**< will QoS value */
unsigned int willRetain : 1; /**< will retain setting */
unsigned int password : 1; /**< 3.1 password */
unsigned int username : 1; /**< 3.1 user name */
} bits;
#endif
} MQTTConnectFlags; /**< connect flags byte */
/**
* Defines the MQTT "Last Will and Testament" (LWT) settings for
* the connect packet.
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTW. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** The LWT topic to which the LWT message will be published. */
MQTTString topicName;
/** The LWT payload. */
MQTTString message;
/**
* The retained flag for the LWT message (see MQTTAsync_message.retained).
*/
unsigned char retained;
/**
* The quality of service setting for the LWT message (see
* MQTTAsync_message.qos and @ref qos).
*/
char qos;
} MQTTPacket_willOptions;
#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 }
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
*/
unsigned char MQTTVersion;
MQTTString clientID;
unsigned short keepAliveInterval;
unsigned char cleansession;
unsigned char willFlag;
MQTTPacket_willOptions will;
MQTTString username;
MQTTString password;
} MQTTPacket_connectData;
typedef union
{
unsigned char all; /**< all connack flags */
#if defined(REVERSED)
struct
{
unsigned int sessionpresent : 1; /**< session present flag */
unsigned int : 7; /**< unused */
} bits;
#else
struct
{
unsigned int : 7; /**< unused */
unsigned int sessionpresent : 1; /**< session present flag */
} bits;
#endif
} MQTTConnackFlags; /**< connack flags byte */
#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \
MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} }
DLLExport int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options);
DLLExport int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int len);
DLLExport int MQTTSerialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent);
DLLExport int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen);
DLLExport int MQTTSerialize_disconnect(unsigned char* buf, int buflen);
DLLExport int MQTTSerialize_pingreq(unsigned char* buf, int buflen);
#endif /* MQTTCONNECT_H_ */

View file

@ -1,214 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include <espressif/esp_common.h>
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT connect packet that would be produced using the supplied connect options.
* @param options the options to be used to build the connect packet
* @return the length of buffer needed to contain the serialized version of the packet
*/
int MQTTSerialize_connectLength(MQTTPacket_connectData* options)
{
int len = 0;
FUNC_ENTRY;
if (options->MQTTVersion == 3)
len = 12; /* variable depending on MQTT or MQIsdp */
else if (options->MQTTVersion == 4)
len = 10;
len += MQTTstrlen(options->clientID)+2;
if (options->willFlag)
len += MQTTstrlen(options->will.topicName)+2 + MQTTstrlen(options->will.message)+2;
if (options->username.cstring || options->username.lenstring.data)
len += MQTTstrlen(options->username)+2;
if (options->password.cstring || options->password.lenstring.data)
len += MQTTstrlen(options->password)+2;
FUNC_EXIT_RC(len);
return len;
}
/**
* Serializes the connect options into the buffer.
* @param buf the buffer into which the packet will be serialized
* @param len the length in bytes of the supplied buffer
* @param options the options to be used to build the connect packet
* @return serialized length, or error if 0
*/
int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)
{
unsigned char *ptr = buf;
MQTTHeader header = {0};
MQTTConnectFlags flags = {0};
int len = 0;
int rc = -1;
FUNC_ENTRY;
if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = CONNECT;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, len); /* write remaining length */
if (options->MQTTVersion == 4)
{
writeCString(&ptr, "MQTT");
writeChar(&ptr, (char) 4);
}
else
{
writeCString(&ptr, "MQIsdp");
writeChar(&ptr, (char) 3);
}
flags.all = 0;
flags.bits.cleansession = options->cleansession;
flags.bits.will = (options->willFlag) ? 1 : 0;
if (flags.bits.will)
{
flags.bits.willQoS = options->will.qos;
flags.bits.willRetain = options->will.retained;
}
if (options->username.cstring || options->username.lenstring.data)
flags.bits.username = 1;
if (options->password.cstring || options->password.lenstring.data)
flags.bits.password = 1;
writeChar(&ptr, flags.all);
writeInt(&ptr, options->keepAliveInterval);
writeMQTTString(&ptr, options->clientID);
if (options->willFlag)
{
writeMQTTString(&ptr, options->will.topicName);
writeMQTTString(&ptr, options->will.message);
}
if (flags.bits.username)
writeMQTTString(&ptr, options->username);
if (flags.bits.password)
writeMQTTString(&ptr, options->password);
rc = ptr - buf;
exit: FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into connack data - return code
* @param sessionPresent the session present flag returned (only for MQTT 3.1.1)
* @param connack_rc returned integer value of the connack return code
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen)
{
MQTTHeader header = {0};
unsigned char* curdata = buf;
unsigned char* enddata = NULL;
int rc = 0;
int mylen;
MQTTConnackFlags flags = {0};
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != CONNACK)
goto exit;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
goto exit;
flags.all = readChar(&curdata);
*sessionPresent = flags.bits.sessionpresent;
*connack_rc = readChar(&curdata);
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes a 0-length packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @param packettype the message type
* @return serialized length, or error if 0
*/
int MQTTSerialize_zero(unsigned char* buf, int buflen, unsigned char packettype)
{
MQTTHeader header = {0};
int rc = -1;
unsigned char *ptr = buf;
FUNC_ENTRY;
if (buflen < 2)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = packettype;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 0); /* write remaining length */
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes a disconnect packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @return serialized length, or error if 0
*/
int MQTTSerialize_disconnect(unsigned char* buf, int buflen)
{
return MQTTSerialize_zero(buf, buflen, DISCONNECT);
}
/**
* Serializes a disconnect packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @return serialized length, or error if 0
*/
int MQTTSerialize_pingreq(unsigned char* buf, int buflen)
{
return MQTTSerialize_zero(buf, buflen, PINGREQ);
}

View file

@ -1,107 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include <espressif/esp_common.h>
#include "StackTrace.h"
#include "MQTTPacket.h"
#include <string.h>
#define min(a, b) ((a < b) ? 1 : 0)
/**
* Deserializes the supplied (wire) buffer into publish data
* @param dup returned integer - the MQTT dup flag
* @param qos returned integer - the MQTT QoS value
* @param retained returned integer - the MQTT retained flag
* @param packetid returned integer - the MQTT packet identifier
* @param topicName returned MQTTString - the MQTT topic in the publish
* @param payload returned byte buffer - the MQTT publish payload
* @param payloadlen returned integer - the length of the MQTT payload
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success
*/
int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,
unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)
{
MQTTHeader header = {0};
unsigned char* curdata = buf;
unsigned char* enddata = NULL;
int rc = 0;
int mylen = 0;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != PUBLISH)
goto exit;
*dup = header.bits.dup;
*qos = header.bits.qos;
*retained = header.bits.retain;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (!readMQTTLenString(topicName, &curdata, enddata) ||
enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */
goto exit;
if (*qos > 0)
*packetid = readInt(&curdata);
*payloadlen = enddata - curdata;
*payload = curdata;
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into an ack
* @param packettype returned integer - the MQTT packet type
* @param dup returned integer - the MQTT dup flag
* @param packetid returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen)
{
MQTTHeader header = {0};
unsigned char* curdata = buf;
unsigned char* enddata = NULL;
int rc = 0;
int mylen;
FUNC_ENTRY;
header.byte = readChar(&curdata);
*dup = header.bits.dup;
*packettype = header.bits.type;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
goto exit;
*packetid = readInt(&curdata);
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

View file

@ -1,37 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTTFORMAT_H)
#define MQTTFORMAT_H
#include "StackTrace.h"
#include "MQTTPacket.h"
const char* MQTTPacket_getName(unsigned short packetid);
int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data);
int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent);
int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained,
unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen);
int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid);
int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count,
MQTTString topicFilters[], int requestedQoSs[]);
int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs);
int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[]);
char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
#endif

View file

@ -1,409 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Sergio R. Caprile - non-blocking packet read functions for stream transport
*******************************************************************************/
#include <espressif/esp_common.h>
#include "StackTrace.h"
#include "MQTTPacket.h"
#include <string.h>
/**
* Encodes the message length according to the MQTT algorithm
* @param buf the buffer into which the encoded data is written
* @param length the length to be encoded
* @return the number of bytes written to buffer
*/
int MQTTPacket_encode(unsigned char* buf, int length)
{
int rc = 0;
FUNC_ENTRY;
do
{
char d = length % 128;
length /= 128;
/* if there are more digits to encode, set the top bit of this digit */
if (length > 0)
d |= 0x80;
buf[rc++] = d;
} while (length > 0);
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Decodes the message length according to the MQTT algorithm
* @param getcharfn pointer to function to read the next character from the data source
* @param value the decoded length returned
* @return the number of bytes read from the socket
*/
int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value)
{
unsigned char c;
int multiplier = 1;
int len = 0;
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
FUNC_ENTRY;
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = (*getcharfn)(&c, 1);
if (rc != 1)
goto exit;
*value += (c & 127) * multiplier;
multiplier *= 128;
} while ((c & 128) != 0);
exit:
FUNC_EXIT_RC(len);
return len;
}
int MQTTPacket_len(int rem_len)
{
rem_len += 1; /* header byte */
/* now remaining_length field */
if (rem_len < 128)
rem_len += 1;
else if (rem_len < 16384)
rem_len += 2;
else if (rem_len < 2097151)
rem_len += 3;
else
rem_len += 4;
return rem_len;
}
static unsigned char* bufptr;
int bufchar(unsigned char* c, int count)
{
int i;
for (i = 0; i < count; ++i)
*c = *bufptr++;
return count;
}
int MQTTPacket_decodeBuf(unsigned char* buf, int* value)
{
bufptr = buf;
return MQTTPacket_decode(bufchar, value);
}
/**
* Calculates an integer from two bytes read from the input buffer
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the integer value calculated
*/
int readInt(unsigned char** pptr)
{
unsigned char* ptr = *pptr;
int len = 256*(*ptr) + (*(ptr+1));
*pptr += 2;
return len;
}
/**
* Reads one character from the input buffer.
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the character read
*/
char readChar(unsigned char** pptr)
{
char c = **pptr;
(*pptr)++;
return c;
}
/**
* Writes one character to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param c the character to write
*/
void writeChar(unsigned char** pptr, char c)
{
**pptr = c;
(*pptr)++;
}
/**
* Writes an integer as 2 bytes to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param anInt the integer to write
*/
void writeInt(unsigned char** pptr, int anInt)
{
**pptr = (unsigned char)(anInt / 256);
(*pptr)++;
**pptr = (unsigned char)(anInt % 256);
(*pptr)++;
}
/**
* Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param string the C string to write
*/
void writeCString(unsigned char** pptr, const char* string)
{
int len = strlen(string);
writeInt(pptr, len);
memcpy(*pptr, string, len);
*pptr += len;
}
int getLenStringLen(char* ptr)
{
int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
return len;
}
void writeMQTTString(unsigned char** pptr, MQTTString mqttstring)
{
if (mqttstring.lenstring.len > 0)
{
writeInt(pptr, mqttstring.lenstring.len);
memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len);
*pptr += mqttstring.lenstring.len;
}
else if (mqttstring.cstring)
writeCString(pptr, mqttstring.cstring);
else
writeInt(pptr, 0);
}
/**
* @param mqttstring the MQTTString structure into which the data is to be read
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param enddata pointer to the end of the data: do not read beyond
* @return 1 if successful, 0 if not
*/
int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata)
{
int rc = 0;
FUNC_ENTRY;
/* the first two bytes are the length of the string */
if (enddata - (*pptr) > 1) /* enough length to read the integer? */
{
mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */
if (&(*pptr)[mqttstring->lenstring.len] <= enddata)
{
mqttstring->lenstring.data = (char*)*pptr;
*pptr += mqttstring->lenstring.len;
rc = 1;
}
}
mqttstring->cstring = NULL;
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Return the length of the MQTTstring - C string if there is one, otherwise the length delimited string
* @param mqttstring the string to return the length of
* @return the length of the string
*/
int MQTTstrlen(MQTTString mqttstring)
{
int rc = 0;
if (mqttstring.cstring)
rc = strlen(mqttstring.cstring);
else
rc = mqttstring.lenstring.len;
return rc;
}
/**
* Compares an MQTTString to a C string
* @param a the MQTTString to compare
* @param bptr the C string to compare
* @return boolean - equal or not
*/
int MQTTPacket_equals(MQTTString* a, char* bptr)
{
int alen = 0,
blen = 0;
char *aptr;
if (a->cstring)
{
aptr = a->cstring;
alen = strlen(a->cstring);
}
else
{
aptr = a->lenstring.data;
alen = a->lenstring.len;
}
blen = strlen(bptr);
return (alen == blen) && (strncmp(aptr, bptr, alen) == 0);
}
/**
* Helper function to read packet data from some source into a buffer
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param getfn pointer to a function which will read any number of bytes from the needed source
* @return integer MQTT packet type, or -1 on error
* @note the whole message must fit into the caller's buffer
*/
int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int))
{
int rc = -1;
MQTTHeader header = {0};
int len = 0;
int rem_len = 0;
/* 1. read the header byte. This has the packet type in it */
if ((*getfn)(buf, 1) != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
MQTTPacket_decode(getfn, &rem_len);
len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if((rem_len + len) > buflen)
goto exit;
if ((*getfn)(buf + len, rem_len) != rem_len)
goto exit;
header.byte = buf[0];
rc = header.bits.type;
exit:
return rc;
}
/**
* Decodes the message length according to the MQTT algorithm, non-blocking
* @param trp pointer to a transport structure holding what is needed to solve getting data from it
* @param value the decoded length returned
* @return integer the number of bytes read from the socket, 0 for call again, or -1 on error
*/
static int MQTTPacket_decodenb(MQTTTransport *trp)
{
unsigned char c;
int rc = MQTTPACKET_READ_ERROR;
FUNC_ENTRY;
if(trp->len == 0){ /* initialize on first call */
trp->multiplier = 1;
trp->rem_len = 0;
}
do {
int frc;
if (++(trp->len) > MAX_NO_OF_REMAINING_LENGTH_BYTES)
goto exit;
if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1)
goto exit;
if (frc == 0){
rc = 0;
goto exit;
}
trp->rem_len += (c & 127) * trp->multiplier;
trp->multiplier *= 128;
} while ((c & 128) != 0);
rc = trp->len;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Helper function to read packet data from some source into a buffer, non-blocking
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param trp pointer to a transport structure holding what is needed to solve getting data from it
* @return integer MQTT packet type, 0 for call again, or -1 on error
* @note the whole message must fit into the caller's buffer
*/
int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp)
{
int rc = -1, frc;
MQTTHeader header = {0};
switch(trp->state){
default:
trp->state = 0;
/*FALLTHROUGH*/
case 0:
/* read the header byte. This has the packet type in it */
if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1)
goto exit;
if (frc == 0)
return 0;
trp->len = 0;
++trp->state;
/*FALLTHROUGH*/
/* read the remaining length. This is variable in itself */
case 1:
if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR)
goto exit;
if(frc == 0)
return 0;
trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
if((trp->rem_len + trp->len) > buflen)
goto exit;
++trp->state;
/*FALLTHROUGH*/
case 2:
/* read the rest of the buffer using a callback to supply the rest of the data */
if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1)
goto exit;
if (frc == 0)
return 0;
trp->rem_len -= frc;
trp->len += frc;
if(trp->rem_len)
return 0;
header.byte = buf[0];
rc = header.bits.type;
break;
}
exit:
trp->state = 0;
return rc;
}

View file

@ -1,133 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTPACKET_H_
#define MQTTPACKET_H_
#if defined(__cplusplus) /* If this is a C++ compiler, use C linkage */
extern "C" {
#endif
#if defined(WIN32_DLL) || defined(WIN64_DLL)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO)
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#else
#define DLLImport
#define DLLExport
#endif
enum errors
{
MQTTPACKET_BUFFER_TOO_SHORT = -2,
MQTTPACKET_READ_ERROR = -1,
MQTTPACKET_READ_COMPLETE
};
enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT
};
/**
* Bitfields for the MQTT header byte.
*/
typedef union
{
unsigned char byte; /**< the whole byte */
#if defined(REVERSED)
struct
{
unsigned int type : 4; /**< message type nibble */
unsigned int dup : 1; /**< DUP flag bit */
unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
unsigned int retain : 1; /**< retained flag bit */
} bits;
#else
struct
{
unsigned int retain : 1; /**< retained flag bit */
unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
unsigned int dup : 1; /**< DUP flag bit */
unsigned int type : 4; /**< message type nibble */
} bits;
#endif
} MQTTHeader;
typedef struct
{
int len;
char* data;
} MQTTLenString;
typedef struct
{
char* cstring;
MQTTLenString lenstring;
} MQTTString;
#define MQTTString_initializer {NULL, {0, NULL}}
int MQTTstrlen(MQTTString mqttstring);
#include "MQTTConnect.h"
#include "MQTTPublish.h"
#include "MQTTSubscribe.h"
#include "MQTTUnsubscribe.h"
#include "MQTTFormat.h"
int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid);
int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen);
int MQTTPacket_len(int rem_len);
int MQTTPacket_equals(MQTTString* a, char* b);
int MQTTPacket_encode(unsigned char* buf, int length);
int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value);
int MQTTPacket_decodeBuf(unsigned char* buf, int* value);
int readInt(unsigned char** pptr);
char readChar(unsigned char** pptr);
void writeChar(unsigned char** pptr, char c);
void writeInt(unsigned char** pptr, int anInt);
int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata);
void writeCString(unsigned char** pptr, const char* string);
void writeMQTTString(unsigned char** pptr, MQTTString mqttstring);
DLLExport int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int));
typedef struct {
int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */
void *sck; /* pointer to whatever the system may use to identify the transport */
int multiplier;
int rem_len;
int len;
char state;
}MQTTTransport;
int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp);
#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */
}
#endif
#endif /* MQTTPACKET_H_ */

View file

@ -1,38 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTPUBLISH_H_
#define MQTTPUBLISH_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
DLLExport int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid,
MQTTString topicName, unsigned char* payload, int payloadlen);
DLLExport int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,
unsigned char** payload, int* payloadlen, unsigned char* buf, int len);
DLLExport int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid);
DLLExport int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid);
DLLExport int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid);
#endif /* MQTTPUBLISH_H_ */

View file

@ -1,169 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144
*******************************************************************************/
#include <espressif/esp_common.h>
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT publish packet that would be produced using the supplied parameters
* @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0)
* @param topicName the topic name to be used in the publish
* @param payloadlen the length of the payload to be sent
* @return the length of buffer needed to contain the serialized version of the packet
*/
int MQTTSerialize_publishLength(int qos, MQTTString topicName, int payloadlen)
{
int len = 0;
len += 2 + MQTTstrlen(topicName) + payloadlen;
if (qos > 0)
len += 2; /* packetid */
return len;
}
/**
* Serializes the supplied publish data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer - the MQTT dup flag
* @param qos integer - the MQTT QoS value
* @param retained integer - the MQTT retained flag
* @param packetid integer - the MQTT packet identifier
* @param topicName MQTTString - the MQTT topic in the publish
* @param payload byte buffer - the MQTT publish payload
* @param payloadlen integer - the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error
*/
int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid,
MQTTString topicName, unsigned char* payload, int payloadlen)
{
unsigned char *ptr = buf;
MQTTHeader header = {0};
int rem_len = 0;
int rc = 0;
FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_publishLength(qos, topicName, payloadlen)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.bits.type = PUBLISH;
header.bits.dup = dup;
header.bits.qos = qos;
header.bits.retain = retained;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */;
writeMQTTString(&ptr, topicName);
if (qos > 0)
writeInt(&ptr, packetid);
memcpy(ptr, payload, payloadlen);
ptr += payloadlen;
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes the ack packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param type the MQTT packet type
* @param dup the MQTT dup flag
* @param packetid the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char packettype, unsigned char dup, unsigned short packetid)
{
MQTTHeader header = {0};
int rc = 0;
unsigned char *ptr = buf;
FUNC_ENTRY;
if (buflen < 4)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.bits.type = packettype;
header.bits.dup = dup;
header.bits.qos = (packettype == PUBREL) ? 1 : 0;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */
writeInt(&ptr, packetid);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes a puback packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBACK, 0, packetid);
}
/**
* Serializes a pubrel packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBREL, dup, packetid);
}
/**
* Serializes a pubrel packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBCOMP, 0, packetid);
}

View file

@ -1,39 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTSUBSCRIBE_H_
#define MQTTSUBSCRIBE_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
DLLExport int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[], int requestedQoSs[]);
DLLExport int MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid,
int maxcount, int* count, MQTTString topicFilters[], int requestedQoSs[], unsigned char* buf, int len);
DLLExport int MQTTSerialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs);
DLLExport int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int len);
#endif /* MQTTSUBSCRIBE_H_ */

View file

@ -1,137 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include <espressif/esp_common.h>
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters
* @param count the number of topic filter strings in topicFilters
* @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet
*/
int MQTTSerialize_subscribeLength(int count, MQTTString topicFilters[])
{
int i;
int len = 2; /* packetid */
for (i = 0; i < count; ++i)
len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */
return len;
}
/**
* Serializes the supplied subscribe data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied bufferr
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @param count - number of members in the topicFilters and reqQos arrays
* @param topicFilters - array of topic filter names
* @param requestedQoSs - array of requested QoS
* @return the length of the serialized data. <= 0 indicates error
*/
int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count,
MQTTString topicFilters[], int requestedQoSs[])
{
unsigned char *ptr = buf;
MQTTHeader header = {0};
int rem_len = 0;
int rc = 0;
int i = 0;
FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_subscribeLength(count, topicFilters)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = SUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = 1;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */;
writeInt(&ptr, packetid);
for (i = 0; i < count; ++i)
{
writeMQTTString(&ptr, topicFilters[i]);
writeChar(&ptr, requestedQoSs[i]);
}
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into suback data
* @param packetid returned integer - the MQTT packet identifier
* @param maxcount - the maximum number of members allowed in the grantedQoSs array
* @param count returned integer - number of members in the grantedQoSs array
* @param grantedQoSs returned array of integers - the granted qualities of service
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen)
{
MQTTHeader header = {0};
unsigned char* curdata = buf;
unsigned char* enddata = NULL;
int rc = 0;
int mylen;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != SUBACK)
goto exit;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
goto exit;
*packetid = readInt(&curdata);
*count = 0;
while (curdata < enddata)
{
if (*count > maxcount)
{
rc = -1;
goto exit;
}
grantedQoSs[(*count)++] = readChar(&curdata);
}
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

View file

@ -1,38 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTUNSUBSCRIBE_H_
#define MQTTUNSUBSCRIBE_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
DLLExport int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[]);
DLLExport int MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int max_count, int* count, MQTTString topicFilters[],
unsigned char* buf, int len);
DLLExport int MQTTSerialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid);
DLLExport int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int len);
#endif /* MQTTUNSUBSCRIBE_H_ */

View file

@ -1,106 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include <espressif/esp_common.h>
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters
* @param count the number of topic filter strings in topicFilters
* @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet
*/
int MQTTSerialize_unsubscribeLength(int count, MQTTString topicFilters[])
{
int i;
int len = 2; /* packetid */
for (i = 0; i < count; ++i)
len += 2 + MQTTstrlen(topicFilters[i]); /* length + topic*/
return len;
}
/**
* Serializes the supplied unsubscribe data into the supplied buffer, ready for sending
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @param count - number of members in the topicFilters array
* @param topicFilters - array of topic filter names
* @return the length of the serialized data. <= 0 indicates error
*/
int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[])
{
unsigned char *ptr = buf;
MQTTHeader header = {0};
int rem_len = 0;
int rc = -1;
int i = 0;
FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_unsubscribeLength(count, topicFilters)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = UNSUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = 1;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */;
writeInt(&ptr, packetid);
for (i = 0; i < count; ++i)
writeMQTTString(&ptr, topicFilters[i]);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into unsuback data
* @param packetid returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int buflen)
{
unsigned char type = 0;
unsigned char dup = 0;
int rc = 0;
FUNC_ENTRY;
rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen);
if (type == UNSUBACK)
rc = 1;
FUNC_EXIT_RC(rc);
return rc;
}

View file

@ -0,0 +1,16 @@
# Paho MQTT Embedded C Client
https://www.eclipse.org/paho/clients/c/embedded/
ESP8266 port based on the port done by @baoshi.
## Directory Organisation
* org.eclipse.paho.mqtt.embedded-c/ is the upstream project.
* MQTTClient.c is copied verbatim from org.eclipse.paho.mqtt.embedded-c/MQTTClient-C/src/ (as it needs to be in the same directory as MQTTClient.h)
* MQTTClient.h is copied from the same place, and has one line changed to include the upstream platform header file.
... any time the submodule is updated, those two source files should also be refreshed.

View file

@ -1,77 +0,0 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix for bug #434081
*******************************************************************************/
#ifndef STACKTRACE_H_
#define STACKTRACE_H_
#define NOSTACKTRACE 1
#if defined(NOSTACKTRACE)
#define FUNC_ENTRY
#define FUNC_ENTRY_NOLOG
#define FUNC_ENTRY_MED
#define FUNC_ENTRY_MAX
#define FUNC_EXIT
#define FUNC_EXIT_NOLOG
#define FUNC_EXIT_MED
#define FUNC_EXIT_MAX
#define FUNC_EXIT_RC(x)
#define FUNC_EXIT_MED_RC(x)
#define FUNC_EXIT_MAX_RC(x)
#else
#if defined(WIN32)
#define inline __inline
#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM)
#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1)
#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM)
#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM)
#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM)
#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1)
#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM)
#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM)
#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM)
#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM)
#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM)
#else
#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM)
#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1)
#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM)
#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM)
#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM)
#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1)
#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM)
#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM)
#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM)
#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM)
#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM)
void StackTrace_entry(const char* name, int line, int trace);
void StackTrace_exit(const char* name, int line, void* return_value, int trace);
void StackTrace_printStack(FILE* dest);
char* StackTrace_get(unsigned long);
#endif
#endif
#endif /* STACKTRACE_H_ */

View file

@ -1,9 +1,12 @@
# Component makefile for extras/paho_mqtt_c # Component makefile for extras/paho_mqtt_c
# expected anyone using bmp driver includes it as 'paho_mqtt_c/MQTT*.h' MQTT_PACKET_DIR=$(paho_mqtt_c_ROOT)org.eclipse.paho.mqtt.embedded-c/MQTTPacket/src/
INC_DIRS += $(paho_mqtt_c_ROOT)..
# args for passing into compile rule generation paho_mqtt_c_SRC_DIR = $(paho_mqtt_c_ROOT) $(MQTT_PACKET_DIR)
paho_mqtt_c_SRC_DIR = $(paho_mqtt_c_ROOT)
# upstream MQTT code has some unused variables
paho_mqtt_c_CFLAGS = $(CFLAGS) -Wno-unused-but-set-variable
INC_DIRS += $(paho_mqtt_c_ROOT) $(MQTT_PACKET_DIR)
$(eval $(call component_compile_rules,paho_mqtt_c)) $(eval $(call component_compile_rules,paho_mqtt_c))

@ -0,0 +1 @@
Subproject commit 5714645c762177ff08086224a7a9ce0b9d541316