paho_mqtt_c: refactor: rename symbols so they all have same prefix (#204)

* paho_mqtt_c: refactor: rename symbols so they all have same prefix

* Update AWS IOT example after MQTT refactoring
This commit is contained in:
Vlad Ivanov 2016-09-15 20:52:57 +03:00 committed by Johan Kanflo
parent 12d0da0c58
commit 8368929a66
18 changed files with 488 additions and 470 deletions

View file

@ -54,8 +54,8 @@ static void beat_task(void *pvParameters) {
} }
} }
static void topic_received(MessageData *md) { static void topic_received(mqtt_message_data_t *md) {
MQTTMessage *message = md->message; mqtt_message_t *message = md->message;
int i; int i;
printf("Received: "); printf("Received: ");
@ -101,7 +101,7 @@ static const char *get_my_id(void) {
return my_id; return my_id;
} }
static int mqtt_ssl_read(Network* n, unsigned char* buffer, int len, static int mqtt_ssl_read(mqtt_network_t * n, unsigned char* buffer, int len,
int timeout_ms) { int timeout_ms) {
int r = ssl_read(ssl_conn, buffer, len, timeout_ms); int r = ssl_read(ssl_conn, buffer, len, timeout_ms);
if (r <= 0 if (r <= 0
@ -114,7 +114,7 @@ static int mqtt_ssl_read(Network* n, unsigned char* buffer, int len,
return r; return r;
} }
static int mqtt_ssl_write(Network* n, unsigned char* buffer, int len, static int mqtt_ssl_write(mqtt_network_t* n, unsigned char* buffer, int len,
int timeout_ms) { int timeout_ms) {
int r = ssl_write(ssl_conn, buffer, len, timeout_ms); int r = ssl_write(ssl_conn, buffer, len, timeout_ms);
if (r <= 0 if (r <= 0
@ -128,12 +128,12 @@ static int mqtt_ssl_write(Network* n, unsigned char* buffer, int len,
static void mqtt_task(void *pvParameters) { static void mqtt_task(void *pvParameters) {
int ret = 0; int ret = 0;
struct Network network; struct mqtt_network network;
MQTTClient client = DefaultClient; mqtt_client_t client = mqtt_client_default;
char mqtt_client_id[20]; char mqtt_client_id[20];
uint8_t mqtt_buf[100]; uint8_t mqtt_buf[100];
uint8_t mqtt_readbuf[100]; uint8_t mqtt_readbuf[100];
MQTTPacket_connectData data = MQTTPacket_connectData_initializer; mqtt_packet_connect_data_t data = mqtt_packet_connect_data_initializer;
memset(mqtt_client_id, 0, sizeof(mqtt_client_id)); memset(mqtt_client_id, 0, sizeof(mqtt_client_id));
strcpy(mqtt_client_id, "ESP-"); strcpy(mqtt_client_id, "ESP-");
@ -153,7 +153,7 @@ static void mqtt_task(void *pvParameters) {
ssl_conn->client_cert_str = client_cert; ssl_conn->client_cert_str = client_cert;
ssl_conn->client_key_str = client_key; ssl_conn->client_key_str = client_key;
NewNetwork(&network); mqtt_network_new(&network);
network.mqttread = mqtt_ssl_read; network.mqttread = mqtt_ssl_read;
network.mqttwrite = mqtt_ssl_write; network.mqttwrite = mqtt_ssl_write;
@ -167,7 +167,7 @@ static void mqtt_task(void *pvParameters) {
continue; continue;
} }
printf("done\n\r"); printf("done\n\r");
NewMQTTClient(&client, &network, 5000, mqtt_buf, 100, mqtt_readbuf, mqtt_client_new(&client, &network, 5000, mqtt_buf, 100, mqtt_readbuf,
100); 100);
data.willFlag = 0; data.willFlag = 0;
@ -178,14 +178,14 @@ static void mqtt_task(void *pvParameters) {
data.password.cstring = NULL; data.password.cstring = NULL;
data.keepAliveInterval = 1000; data.keepAliveInterval = 1000;
printf("Send MQTT connect ... "); printf("Send MQTT connect ... ");
ret = MQTTConnect(&client, &data); ret = mqtt_connect(&client, &data);
if (ret) { if (ret) {
printf("error: %d\n\r", ret); printf("error: %d\n\r", ret);
ssl_destroy(ssl_conn); ssl_destroy(ssl_conn);
continue; continue;
} }
printf("done\r\n"); printf("done\r\n");
MQTTSubscribe(&client, MQTT_SUB_TOPIC, QOS1, topic_received); mqtt_subscribe(&client, MQTT_SUB_TOPIC, MQTT_QOS1, topic_received);
xQueueReset(publish_queue); xQueueReset(publish_queue);
while (wifi_alive && !ssl_reset) { while (wifi_alive && !ssl_reset) {
@ -198,21 +198,21 @@ static void mqtt_task(void *pvParameters) {
task_tick, free_heap, free_stack * 4); task_tick, free_heap, free_stack * 4);
printf("Publishing: %s\r\n", msg); printf("Publishing: %s\r\n", msg);
MQTTMessage message; mqtt_message_t message;
message.payload = msg; message.payload = msg;
message.payloadlen = strlen(msg); message.payloadlen = strlen(msg);
message.dup = 0; message.dup = 0;
message.qos = QOS1; message.qos = MQTT_QOS1;
message.retained = 0; message.retained = 0;
ret = MQTTPublish(&client, MQTT_PUB_TOPIC, &message); ret = mqtt_publish(&client, MQTT_PUB_TOPIC, &message);
if (ret != SUCCESS) { if (ret != MQTT_SUCCESS) {
printf("error while publishing message: %d\n", ret); printf("error while publishing message: %d\n", ret);
break; break;
} }
} }
ret = MQTTYield(&client, 1000); ret = mqtt_yield(&client, 1000);
if (ret == DISCONNECTED) if (ret == MQTT_DISCONNECTED)
break; break;
} }
printf("Connection dropped, request restart\n\r"); printf("Connection dropped, request restart\n\r");

View file

@ -44,10 +44,10 @@ static void beat_task(void *pvParameters)
} }
} }
static void topic_received(MessageData *md) static void topic_received(mqtt_message_data_t *md)
{ {
int i; int i;
MQTTMessage *message = md->message; mqtt_message_t *message = md->message;
printf("Received: "); printf("Received: ");
for( i = 0; i < md->topic->lenstring.len; ++i) for( i = 0; i < md->topic->lenstring.len; ++i)
printf("%c", md->topic->lenstring.data[ i ]); printf("%c", md->topic->lenstring.data[ i ]);
@ -87,14 +87,14 @@ static const char * get_my_id(void)
static void mqtt_task(void *pvParameters) static void mqtt_task(void *pvParameters)
{ {
int ret = 0; int ret = 0;
struct Network network; struct mqtt_network network;
MQTTClient client = DefaultClient; mqtt_client_t client = mqtt_client_default;
char mqtt_client_id[20]; char mqtt_client_id[20];
uint8_t mqtt_buf[100]; uint8_t mqtt_buf[100];
uint8_t mqtt_readbuf[100]; uint8_t mqtt_readbuf[100];
MQTTPacket_connectData data = MQTTPacket_connectData_initializer; mqtt_packet_connect_data_t data = mqtt_packet_connect_data_initializer;
NewNetwork( &network ); mqtt_network_new( &network );
memset(mqtt_client_id, 0, sizeof(mqtt_client_id)); memset(mqtt_client_id, 0, sizeof(mqtt_client_id));
strcpy(mqtt_client_id, "ESP-"); strcpy(mqtt_client_id, "ESP-");
strcat(mqtt_client_id, get_my_id()); strcat(mqtt_client_id, get_my_id());
@ -104,14 +104,14 @@ static void mqtt_task(void *pvParameters)
printf("%s: started\n\r", __func__); printf("%s: started\n\r", __func__);
printf("%s: (Re)connecting to MQTT server %s ... ",__func__, printf("%s: (Re)connecting to MQTT server %s ... ",__func__,
MQTT_HOST); MQTT_HOST);
ret = ConnectNetwork(&network, MQTT_HOST, MQTT_PORT); ret = mqtt_network_connect(&network, MQTT_HOST, MQTT_PORT);
if( ret ){ if( ret ){
printf("error: %d\n\r", ret); printf("error: %d\n\r", ret);
taskYIELD(); taskYIELD();
continue; continue;
} }
printf("done\n\r"); printf("done\n\r");
NewMQTTClient(&client, &network, 5000, mqtt_buf, 100, mqtt_client_new(&client, &network, 5000, mqtt_buf, 100,
mqtt_readbuf, 100); mqtt_readbuf, 100);
data.willFlag = 0; data.willFlag = 0;
@ -122,15 +122,15 @@ static void mqtt_task(void *pvParameters)
data.keepAliveInterval = 10; data.keepAliveInterval = 10;
data.cleansession = 0; data.cleansession = 0;
printf("Send MQTT connect ... "); printf("Send MQTT connect ... ");
ret = MQTTConnect(&client, &data); ret = mqtt_connect(&client, &data);
if(ret){ if(ret){
printf("error: %d\n\r", ret); printf("error: %d\n\r", ret);
DisconnectNetwork(&network); mqtt_network_disconnect(&network);
taskYIELD(); taskYIELD();
continue; continue;
} }
printf("done\r\n"); printf("done\r\n");
MQTTSubscribe(&client, "/esptopic", QOS1, topic_received); mqtt_subscribe(&client, "/esptopic", MQTT_QOS1, topic_received);
xQueueReset(publish_queue); xQueueReset(publish_queue);
while(1){ while(1){
@ -139,25 +139,25 @@ static void mqtt_task(void *pvParameters)
while(xQueueReceive(publish_queue, (void *)msg, 0) == while(xQueueReceive(publish_queue, (void *)msg, 0) ==
pdTRUE){ pdTRUE){
printf("got message to publish\r\n"); printf("got message to publish\r\n");
MQTTMessage message; mqtt_message_t message;
message.payload = msg; message.payload = msg;
message.payloadlen = PUB_MSG_LEN; message.payloadlen = PUB_MSG_LEN;
message.dup = 0; message.dup = 0;
message.qos = QOS1; message.qos = MQTT_QOS1;
message.retained = 0; message.retained = 0;
ret = MQTTPublish(&client, "/beat", &message); ret = mqtt_publish(&client, "/beat", &message);
if (ret != SUCCESS ){ if (ret != MQTT_SUCCESS ){
printf("error while publishing message: %d\n", ret ); printf("error while publishing message: %d\n", ret );
break; break;
} }
} }
ret = MQTTYield(&client, 1000); ret = mqtt_yield(&client, 1000);
if (ret == DISCONNECTED) if (ret == MQTT_DISCONNECTED)
break; break;
} }
printf("Connection dropped, request restart\n\r"); printf("Connection dropped, request restart\n\r");
DisconnectNetwork(&network); mqtt_network_disconnect(&network);
taskYIELD(); taskYIELD();
} }
} }

View file

@ -17,41 +17,41 @@
#include <lwip/arch.h> #include <lwip/arch.h>
#include "MQTTClient.h" #include "MQTTClient.h"
void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage) { static void new_message_data(mqtt_message_data_t* md, mqtt_string_t* aTopicName, mqtt_message_t* aMessgage) {
md->topic = aTopicName; md->topic = aTopicName;
md->message = aMessgage; md->message = aMessgage;
} }
int getNextPacketId(MQTTClient *c) { static int get_next_packet_id(mqtt_client_t *c) {
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; return c->next_packetid = (c->next_packetid == MQTT_MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
} }
int sendPacket(MQTTClient* c, int length, Timer* timer) static int send_packet(mqtt_client_t* c, int length, mqtt_timer_t* timer)
{ {
int rc = FAILURE, int rc = MQTT_FAILURE,
sent = 0; sent = 0;
while (sent < length && !expired(timer)) while (sent < length && !mqtt_timer_expired(timer))
{ {
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length - sent, left_ms(timer)); rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length - sent, mqtt_timer_left_ms(timer));
if (rc < 0) // there was an error writing the data if (rc < 0) // there was an error writing the data
break; break;
sent += rc; sent += rc;
} }
if (sent == length) if (sent == length)
{ {
countdown(&(c->ping_timer), c->keepAliveInterval); // record the fact that we have successfully sent the packet mqtt_timer_countdown(&(c->ping_timer), c->keepAliveInterval); // record the fact that we have successfully sent the packet
rc = SUCCESS; rc = MQTT_SUCCESS;
} }
else else
rc = FAILURE; rc = MQTT_FAILURE;
return rc; return rc;
} }
int decodePacket(MQTTClient* c, int* value, int timeout) static int decode_packet(mqtt_client_t* c, int* value, int timeout)
{ {
unsigned char i; unsigned char i;
int multiplier = 1; int multiplier = 1;
@ -82,29 +82,29 @@ exit:
// Return packet type. If no packet avilable, return FAILURE, or READ_ERROR if timeout // Return packet type. If no packet avilable, return FAILURE, or READ_ERROR if timeout
int readPacket(MQTTClient* c, Timer* timer) static int read_packet(mqtt_client_t* c, mqtt_timer_t* timer)
{ {
int rc = FAILURE; int rc = MQTT_FAILURE;
MQTTHeader header = {0}; mqtt_header_t header = {0};
int len = 0; int len = 0;
int rem_len = 0; int rem_len = 0;
/* 1. read the header byte. This has the packet type in it */ /* 1. read the header byte. This has the packet type in it */
if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, left_ms(timer)) != 1) if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, mqtt_timer_left_ms(timer)) != 1)
goto exit; goto exit;
len = 1; len = 1;
/* 2. read the remaining length. This is variable in itself */ /* 2. read the remaining length. This is variable in itself */
len += decodePacket(c, &rem_len, left_ms(timer)); len += decode_packet(c, &rem_len, mqtt_timer_left_ms(timer));
if (len <= 1 || len + rem_len > c->readbuf_size) /* if packet is too big to fit in our readbuf, abort */ if (len <= 1 || len + rem_len > c->readbuf_size) /* if packet is too big to fit in our readbuf, abort */
{ {
rc = READ_ERROR; rc = MQTT_READ_ERROR;
goto exit; goto exit;
} }
MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ mqtt_packet_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
/* 3. read the rest of the buffer using a callback to supply the rest of the data */ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, left_ms(timer)) != rem_len)) if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, mqtt_timer_left_ms(timer)) != rem_len))
{ {
rc = READ_ERROR; rc = MQTT_READ_ERROR;
goto exit; goto exit;
} }
header.byte = c->readbuf[0]; header.byte = c->readbuf[0];
@ -117,7 +117,7 @@ exit:
// assume topic filter and name is in correct format // assume topic filter and name is in correct format
// # can only be at end // # can only be at end
// + and # can only be next to separator // + and # can only be next to separator
char isTopicMatched(char* topicFilter, MQTTString* topicName) static char is_topic_matched(char* topicFilter, mqtt_string_t* topicName)
{ {
char* curf = topicFilter; char* curf = topicFilter;
char* curn = topicName->lenstring.data; char* curn = topicName->lenstring.data;
@ -145,73 +145,73 @@ char isTopicMatched(char* topicFilter, MQTTString* topicName)
} }
int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) static int deliver_message(mqtt_client_t* c, mqtt_string_t* topicName, mqtt_message_t* message)
{ {
int i; int i;
int rc = FAILURE; int rc = MQTT_FAILURE;
// we have to find the right message handler - indexed by topic // we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MQTT_MAX_MESSAGE_HANDLERS; ++i)
{ {
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || if (c->messageHandlers[i].topicFilter != 0 && (mqtt_packet_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) is_topic_matched((char*)c->messageHandlers[i].topicFilter, topicName)))
{ {
if (c->messageHandlers[i].fp != NULL) if (c->messageHandlers[i].fp != NULL)
{ {
MessageData md; mqtt_message_data_t md;
NewMessageData(&md, topicName, message); new_message_data(&md, topicName, message);
c->messageHandlers[i].fp(&md); c->messageHandlers[i].fp(&md);
rc = SUCCESS; rc = MQTT_SUCCESS;
} }
} }
} }
if (rc == FAILURE && c->defaultMessageHandler != NULL) if (rc == MQTT_FAILURE && c->defaultMessageHandler != NULL)
{ {
MessageData md; mqtt_message_data_t md;
NewMessageData(&md, topicName, message); new_message_data(&md, topicName, message);
c->defaultMessageHandler(&md); c->defaultMessageHandler(&md);
rc = SUCCESS; rc = MQTT_SUCCESS;
} }
return rc; return rc;
} }
int keepalive(MQTTClient* c) static int keepalive(mqtt_client_t* c)
{ {
int rc = SUCCESS; int rc = MQTT_SUCCESS;
if (c->keepAliveInterval == 0) if (c->keepAliveInterval == 0)
{ {
rc = SUCCESS; rc = MQTT_SUCCESS;
goto exit; goto exit;
} }
if (expired(&(c->ping_timer))) if (mqtt_timer_expired(&(c->ping_timer)))
{ {
if (c->ping_outstanding) if (c->ping_outstanding)
{ {
// if ping failure accumulated above MAX_FAIL_ALLOWED, the connection is broken // if ping failure accumulated above MAX_FAIL_ALLOWED, the connection is broken
++(c->fail_count); ++(c->fail_count);
if (c->fail_count >= MAX_FAIL_ALLOWED) if (c->fail_count >= MQTT_MAX_FAIL_ALLOWED)
{ {
rc = DISCONNECTED; rc = MQTT_DISCONNECTED;
goto exit; goto exit;
} }
} }
else else
{ {
Timer timer; mqtt_timer_t timer;
InitTimer(&timer); mqtt_timer_init(&timer);
countdown_ms(&timer, 1000); mqtt_timer_countdown_ms(&timer, 1000);
c->ping_outstanding = 1; c->ping_outstanding = 1;
int len = MQTTSerialize_pingreq(c->buf, c->buf_size); int len = mqtt_serialize_pingreq(c->buf, c->buf_size);
if (len > 0) if (len > 0)
sendPacket(c, len, &timer); send_packet(c, len, &timer);
} }
// re-arm ping counter // re-arm ping counter
countdown(&(c->ping_timer), c->keepAliveInterval); mqtt_timer_countdown(&(c->ping_timer), c->keepAliveInterval);
} }
exit: exit:
@ -219,87 +219,87 @@ exit:
} }
int cycle(MQTTClient* c, Timer* timer) static int cycle(mqtt_client_t* c, mqtt_timer_t* timer)
{ {
// read the socket, see what work is due // read the socket, see what work is due
int packet_type = readPacket(c, timer); int packet_type = read_packet(c, timer);
int len = 0, int len = 0,
rc = SUCCESS; rc = MQTT_SUCCESS;
switch (packet_type) switch (packet_type)
{ {
case CONNACK: case MQTTPACKET_CONNACK:
case PUBACK: case MQTTPACKET_PUBACK:
case SUBACK: case MQTTPACKET_SUBACK:
break; break;
case PUBLISH: case MQTTPACKET_PUBLISH:
{ {
MQTTString topicName; mqtt_string_t topicName;
MQTTMessage msg; mqtt_message_t msg;
if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, if (mqtt_deserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
goto exit; goto exit;
deliverMessage(c, &topicName, &msg); deliver_message(c, &topicName, &msg);
if (msg.qos != QOS0) if (msg.qos != MQTT_QOS0)
{ {
if (msg.qos == QOS1) if (msg.qos == MQTT_QOS1)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); len = mqtt_serialize_ack(c->buf, c->buf_size, MQTTPACKET_PUBACK, 0, msg.id);
else if (msg.qos == QOS2) else if (msg.qos == MQTT_QOS2)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); len = mqtt_serialize_ack(c->buf, c->buf_size, MQTTPACKET_PUBREC, 0, msg.id);
if (len <= 0) if (len <= 0)
rc = FAILURE; rc = MQTT_FAILURE;
else else
rc = sendPacket(c, len, timer); rc = send_packet(c, len, timer);
if (rc == FAILURE) if (rc == MQTT_FAILURE)
goto exit; // there was a problem goto exit; // there was a problem
} }
break; break;
} }
case PUBREC: case MQTTPACKET_PUBREC:
{ {
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) if (mqtt_deserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE; rc = MQTT_FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0) else if ((len = mqtt_serialize_ack(c->buf, c->buf_size, MQTTPACKET_PUBREL, 0, mypacketid)) <= 0)
rc = FAILURE; rc = MQTT_FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet else if ((rc = send_packet(c, len, timer)) != MQTT_SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem rc = MQTT_FAILURE; // there was a problem
if (rc == FAILURE) if (rc == MQTT_FAILURE)
goto exit; // there was a problem goto exit; // there was a problem
break; break;
} }
case PUBCOMP: case MQTTPACKET_PUBCOMP:
break; break;
case PINGRESP: case MQTTPACKET_PINGRESP:
{ {
c->ping_outstanding = 0; c->ping_outstanding = 0;
c->fail_count = 0; c->fail_count = 0;
break; break;
} }
case READ_ERROR: case MQTT_READ_ERROR:
{ {
c->isconnected = 0; // we simulate a disconnect if reading error c->isconnected = 0; // we simulate a disconnect if reading error
rc = DISCONNECTED; // so that the outer layer will reconnect and recover rc = MQTT_DISCONNECTED; // so that the outer layer will reconnect and recover
break; break;
} }
} }
if (c->isconnected) if (c->isconnected)
rc = keepalive(c); rc = keepalive(c);
exit: exit:
if (rc == SUCCESS) if (rc == MQTT_SUCCESS)
rc = packet_type; rc = packet_type;
return rc; return rc;
} }
void NewMQTTClient(MQTTClient* c, Network* network, unsigned int command_timeout_ms, unsigned char* buf, size_t buf_size, unsigned char* readbuf, size_t readbuf_size) void mqtt_client_new(mqtt_client_t* c, mqtt_network_t* network, unsigned int command_timeout_ms, unsigned char* buf, size_t buf_size, unsigned char* readbuf, size_t readbuf_size)
{ {
int i; int i;
c->ipstack = network; c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MQTT_MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0; c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms; c->command_timeout_ms = command_timeout_ms;
c->buf = buf; c->buf = buf;
@ -310,38 +310,38 @@ void NewMQTTClient(MQTTClient* c, Network* network, unsigned int command_timeou
c->ping_outstanding = 0; c->ping_outstanding = 0;
c->fail_count = 0; c->fail_count = 0;
c->defaultMessageHandler = NULL; c->defaultMessageHandler = NULL;
InitTimer(&(c->ping_timer)); mqtt_timer_init(&(c->ping_timer));
} }
int MQTTYield(MQTTClient* c, int timeout_ms) int mqtt_yield(mqtt_client_t* c, int timeout_ms)
{ {
int rc = SUCCESS; int rc = MQTT_SUCCESS;
Timer timer; mqtt_timer_t timer;
InitTimer(&timer); mqtt_timer_init(&timer);
countdown_ms(&timer, timeout_ms); mqtt_timer_countdown_ms(&timer, timeout_ms);
while (!expired(&timer)) while (!mqtt_timer_expired(&timer))
{ {
rc = cycle(c, &timer); rc = cycle(c, &timer);
// cycle could return 0 or packet_type or 65535 if nothing is read // cycle could return 0 or packet_type or 65535 if nothing is read
// cycle returns DISCONNECTED only if keepalive() fails. // cycle returns DISCONNECTED only if keepalive() fails.
if (rc == DISCONNECTED) if (rc == MQTT_DISCONNECTED)
break; break;
rc = SUCCESS; rc = MQTT_SUCCESS;
} }
return rc; return rc;
} }
// only used in single-threaded mode where one command at a time is in process // only used in single-threaded mode where one command at a time is in process
int waitfor(MQTTClient* c, int packet_type, Timer* timer) static int waitfor(mqtt_client_t* c, int packet_type, mqtt_timer_t* timer)
{ {
int rc = FAILURE; int rc = MQTT_FAILURE;
do do
{ {
if (expired(timer)) if (mqtt_timer_expired(timer))
break; // we timed out break; // we timed out
} }
while ((rc = cycle(c, timer)) != packet_type); while ((rc = cycle(c, timer)) != packet_type);
@ -350,15 +350,15 @@ int waitfor(MQTTClient* c, int packet_type, Timer* timer)
} }
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) int mqtt_connect(mqtt_client_t* c, mqtt_packet_connect_data_t* options)
{ {
Timer connect_timer; mqtt_timer_t connect_timer;
int rc = FAILURE; int rc = MQTT_FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; mqtt_packet_connect_data_t default_options = mqtt_packet_connect_data_initializer;
int len = 0; int len = 0;
InitTimer(&connect_timer); mqtt_timer_init(&connect_timer);
countdown_ms(&connect_timer, c->command_timeout_ms); mqtt_timer_countdown_ms(&connect_timer, c->command_timeout_ms);
if (c->isconnected) // don't send connect packet again if we are already connected if (c->isconnected) // don't send connect packet again if we are already connected
goto exit; goto exit;
@ -367,64 +367,64 @@ int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
options = &default_options; // set default options if none were supplied options = &default_options; // set default options if none were supplied
c->keepAliveInterval = options->keepAliveInterval; c->keepAliveInterval = options->keepAliveInterval;
countdown(&(c->ping_timer), c->keepAliveInterval); mqtt_timer_countdown(&(c->ping_timer), c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) if ((len = mqtt_serialize_connect(c->buf, c->buf_size, options)) <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet if ((rc = send_packet(c, len, &connect_timer)) != MQTT_SUCCESS) // send the connect packet
goto exit; // there was a problem goto exit; // there was a problem
// this will be a blocking call, wait for the connack // this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK) if (waitfor(c, MQTTPACKET_CONNACK, &connect_timer) == MQTTPACKET_CONNACK)
{ {
unsigned char connack_rc = 255; unsigned char connack_rc = 255;
char sessionPresent = 0; char sessionPresent = 0;
if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1) if (mqtt_deserialize_connack((unsigned char*)&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1)
rc = connack_rc; rc = connack_rc;
else else
rc = FAILURE; rc = MQTT_FAILURE;
} }
else else
rc = FAILURE; rc = MQTT_FAILURE;
exit: exit:
if (rc == SUCCESS) if (rc == MQTT_SUCCESS)
c->isconnected = 1; c->isconnected = 1;
return rc; return rc;
} }
int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandler handler) int mqtt_subscribe(mqtt_client_t* c, const char* topic, enum mqtt_qos qos, mqtt_message_handler_t handler)
{ {
int rc = FAILURE; int rc = MQTT_FAILURE;
Timer timer; mqtt_timer_t timer;
int len = 0; int len = 0;
MQTTString topicStr = MQTTString_initializer; mqtt_string_t topicStr = mqtt_string_initializer;
topicStr.cstring = (char *)topic; topicStr.cstring = (char *)topic;
InitTimer(&timer); mqtt_timer_init(&timer);
countdown_ms(&timer, c->command_timeout_ms); mqtt_timer_countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topicStr, (int*)&qos); len = mqtt_serialize_subscribe(c->buf, c->buf_size, 0, get_next_packet_id(c), 1, &topicStr, (int*)&qos);
if (len <= 0) if (len <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet if ((rc = send_packet(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
{ {
goto exit; // there was a problem goto exit; // there was a problem
} }
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback if (waitfor(c, MQTTPACKET_SUBACK, &timer) == MQTTPACKET_SUBACK) // wait for suback
{ {
int count = 0, grantedQoS = -1; int count = 0, grantedQoS = -1;
unsigned short mypacketid; unsigned short mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1) if (mqtt_deserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80 rc = grantedQoS; // 0, 1, 2 or 0x80
if (rc != 0x80) if (rc != 0x80)
{ {
int i; int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MQTT_MAX_MESSAGE_HANDLERS; ++i)
{ {
if (c->messageHandlers[i].topicFilter == 0) if (c->messageHandlers[i].topicFilter == 0)
{ {
@ -437,107 +437,107 @@ int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandle
} }
} }
else else
rc = FAILURE; rc = MQTT_FAILURE;
exit: exit:
return rc; return rc;
} }
int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) int mqtt_unsubscribe(mqtt_client_t* c, const char* topicFilter)
{ {
int rc = FAILURE; int rc = MQTT_FAILURE;
Timer timer; mqtt_timer_t timer;
MQTTString topic = MQTTString_initializer; mqtt_string_t topic = mqtt_string_initializer;
topic.cstring = (char *)topicFilter; topic.cstring = (char *)topicFilter;
int len = 0; int len = 0;
InitTimer(&timer); mqtt_timer_init(&timer);
countdown_ms(&timer, c->command_timeout_ms); mqtt_timer_countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) if ((len = mqtt_serialize_unsubscribe(c->buf, c->buf_size, 0, get_next_packet_id(c), 1, &topic)) <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet if ((rc = send_packet(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
goto exit; // there was a problem goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) if (waitfor(c, MQTTPACKET_UNSUBACK, &timer) == MQTTPACKET_UNSUBACK)
{ {
unsigned short mypacketid; // should be the same as the packetid above unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) if (mqtt_deserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
rc = 0; rc = 0;
} }
else else
rc = FAILURE; rc = MQTT_FAILURE;
exit: exit:
return rc; return rc;
} }
int MQTTPublish(MQTTClient* c, const char* topic, MQTTMessage* message) int mqtt_publish(mqtt_client_t* c, const char* topic, mqtt_message_t* message)
{ {
int rc = FAILURE; int rc = MQTT_FAILURE;
Timer timer; mqtt_timer_t timer;
MQTTString topicStr = MQTTString_initializer; mqtt_string_t topicStr = mqtt_string_initializer;
topicStr.cstring = (char *)topic; topicStr.cstring = (char *)topic;
int len = 0; int len = 0;
InitTimer(&timer); mqtt_timer_init(&timer);
countdown_ms(&timer, c->command_timeout_ms); mqtt_timer_countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
if (message->qos == QOS1 || message->qos == QOS2) if (message->qos == MQTT_QOS1 || message->qos == MQTT_QOS2)
message->id = getNextPacketId(c); message->id = get_next_packet_id(c);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, len = mqtt_serialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topicStr, (unsigned char*)message->payload, message->payloadlen); topicStr, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0) if (len <= 0)
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet if ((rc = send_packet(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet
{ {
goto exit; // there was a problem goto exit; // there was a problem
} }
if (message->qos == QOS1) if (message->qos == MQTT_QOS1)
{ {
if (waitfor(c, PUBACK, &timer) == PUBACK) if (waitfor(c, MQTTPACKET_PUBACK, &timer) == MQTTPACKET_PUBACK)
{ {
// We still can receive from broker, treat as recoverable // We still can receive from broker, treat as recoverable
c->fail_count = 0; c->fail_count = 0;
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) if (mqtt_deserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE; rc = MQTT_FAILURE;
else else
rc = SUCCESS; rc = MQTT_SUCCESS;
} }
else else
{ {
rc = FAILURE; rc = MQTT_FAILURE;
} }
} }
else if (message->qos == QOS2) else if (message->qos == MQTT_QOS2)
{ {
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) if (waitfor(c, MQTTPACKET_PUBCOMP, &timer) == MQTTPACKET_PUBCOMP)
{ {
// We still can receive from broker, treat as recoverable // We still can receive from broker, treat as recoverable
c->fail_count = 0; c->fail_count = 0;
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) if (mqtt_deserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE; rc = MQTT_FAILURE;
else else
rc = SUCCESS; rc = MQTT_SUCCESS;
} }
else else
{ {
rc = FAILURE; rc = MQTT_FAILURE;
} }
} }
@ -546,17 +546,17 @@ exit:
} }
int MQTTDisconnect(MQTTClient* c) int mqtt_disconnect(mqtt_client_t* c)
{ {
int rc = FAILURE; int rc = MQTT_FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete mqtt_timer_t timer; // we might wait for incomplete incoming publishes to complete
int len = MQTTSerialize_disconnect(c->buf, c->buf_size); int len = mqtt_serialize_disconnect(c->buf, c->buf_size);
InitTimer(&timer); mqtt_timer_init(&timer);
countdown_ms(&timer, c->command_timeout_ms); mqtt_timer_countdown_ms(&timer, c->command_timeout_ms);
if (len > 0) if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet rc = send_packet(c, len, &timer); // send the disconnect packet
c->isconnected = 0; c->isconnected = 0;
return rc; return rc;

View file

@ -20,36 +20,44 @@
#include "MQTTPacket.h" #include "MQTTPacket.h"
#include "MQTTESP8266.h" #include "MQTTESP8266.h"
#define MAX_PACKET_ID 65535 #define MQTT_MAX_PACKET_ID 65535
#define MAX_MESSAGE_HANDLERS 5 #define MQTT_MAX_MESSAGE_HANDLERS 5
#define MAX_FAIL_ALLOWED 2 #define MQTT_MAX_FAIL_ALLOWED 2
enum QoS { QOS0, QOS1, QOS2 }; enum mqtt_qos {
MQTT_QOS0,
MQTT_QOS1,
MQTT_QOS2
};
// all failure return codes must be negative // all failure return codes must be negative
enum returnCode {READ_ERROR = -4, DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; enum mqtt_return_code {
MQTT_READ_ERROR = -4,
MQTT_DISCONNECTED = -3,
MQTT_BUFFER_OVERFLOW = -2,
MQTT_FAILURE = -1,
MQTT_SUCCESS = 0
};
void NewTimer(Timer*); typedef struct mqtt_message
typedef struct _MQTTMessage
{ {
enum QoS qos; enum mqtt_qos qos;
char retained; char retained;
char dup; char dup;
unsigned short id; unsigned short id;
void *payload; void *payload;
size_t payloadlen; size_t payloadlen;
} MQTTMessage; } mqtt_message_t;
typedef struct _MessageData typedef struct mqtt_message_data
{ {
MQTTString* topic; mqtt_string_t* topic;
MQTTMessage* message; mqtt_message_t* message;
} MessageData; } mqtt_message_data_t;
typedef void (*messageHandler)(MessageData*); typedef void (*mqtt_message_handler_t)(mqtt_message_data_t*);
struct _MQTTClient struct mqtt_client
{ {
unsigned int next_packetid; unsigned int next_packetid;
unsigned int command_timeout_ms; unsigned int command_timeout_ms;
@ -64,28 +72,26 @@ struct _MQTTClient
struct MessageHandlers struct MessageHandlers
{ {
const char* topicFilter; const char* topicFilter;
void (*fp) (MessageData*); void (*fp) (mqtt_message_data_t*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic } messageHandlers[MQTT_MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
void (*defaultMessageHandler) (MessageData*); void (*defaultMessageHandler) (mqtt_message_data_t*);
Network* ipstack; mqtt_network_t* ipstack;
Timer ping_timer; mqtt_timer_t ping_timer;
}; };
typedef struct mqtt_client mqtt_client_t;
typedef struct _MQTTClient MQTTClient; int mqtt_connect(mqtt_client_t* c, mqtt_packet_connect_data_t* options);
int mqtt_publish(mqtt_client_t* c, const char* topic, mqtt_message_t* message);
int mqtt_subscribe(mqtt_client_t* c, const char* topic, enum mqtt_qos qos, mqtt_message_handler_t handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);
void mqtt_client_new(mqtt_client_t*, mqtt_network_t*, unsigned int, unsigned char*, size_t, unsigned char*, size_t);
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options); #define mqtt_client_default {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
int MQTTPublish(MQTTClient* c, const char* topic, MQTTMessage* message);
int MQTTSubscribe(MQTTClient* c, const char* topic, enum QoS qos, messageHandler handler);
int MQTTUnsubscribe(MQTTClient* c, const char* topic);
int MQTTDisconnect(MQTTClient* c);
int MQTTYield(MQTTClient* c, int timeout_ms);
void NewMQTTClient(MQTTClient*, Network*, unsigned int, unsigned char*, size_t, unsigned char*, size_t);
#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
#endif #endif

View file

@ -52,7 +52,7 @@ typedef union
unsigned int username : 1; /**< 3.1 user name */ unsigned int username : 1; /**< 3.1 user name */
} bits; } bits;
#endif #endif
} MQTTConnectFlags; /**< connect flags byte */ } mqtt_connect_flags_t; /**< connect flags byte */
@ -67,9 +67,9 @@ typedef struct
/** The version number of this structure. Must be 0 */ /** The version number of this structure. Must be 0 */
int struct_version; int struct_version;
/** The LWT topic to which the LWT message will be published. */ /** The LWT topic to which the LWT message will be published. */
MQTTString topicName; mqtt_string_t topicName;
/** The LWT payload. */ /** The LWT payload. */
MQTTString message; mqtt_string_t message;
/** /**
* The retained flag for the LWT message (see MQTTAsync_message.retained). * The retained flag for the LWT message (see MQTTAsync_message.retained).
*/ */
@ -79,10 +79,10 @@ typedef struct
* MQTTAsync_message.qos and @ref qos). * MQTTAsync_message.qos and @ref qos).
*/ */
char qos; char qos;
} MQTTPacket_willOptions; } mqtt_packet_will_options_t;
#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 } #define mqtt_packet_will_options_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 }
typedef struct typedef struct
@ -94,14 +94,14 @@ typedef struct
/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 /** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
*/ */
unsigned char MQTTVersion; unsigned char MQTTVersion;
MQTTString clientID; mqtt_string_t clientID;
unsigned short keepAliveInterval; unsigned short keepAliveInterval;
unsigned char cleansession; unsigned char cleansession;
unsigned char willFlag; unsigned char willFlag;
MQTTPacket_willOptions will; mqtt_packet_will_options_t will;
MQTTString username; mqtt_string_t username;
MQTTString password; mqtt_string_t password;
} MQTTPacket_connectData; } mqtt_packet_connect_data_t;
typedef union typedef union
{ {
@ -119,18 +119,18 @@ typedef union
unsigned int sessionpresent : 1; /**< session present flag */ unsigned int sessionpresent : 1; /**< session present flag */
} bits; } bits;
#endif #endif
} MQTTConnackFlags; /**< connack flags byte */ } mqtt_conn_ack_flags_t; /**< connack flags byte */
#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \ #define mqtt_packet_connect_data_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \
MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} } mqtt_packet_will_options_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} }
DLLExport int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options); DLLExport int mqtt_serialize_connect(unsigned char* buf, int buflen, mqtt_packet_connect_data_t* options);
DLLExport int MQTTDeserialize_connect(MQTTPacket_connectData* data, unsigned char* buf, int len); DLLExport int mqtt_deserialize_connect(mqtt_packet_connect_data_t* data, unsigned char* buf, int len);
DLLExport int MQTTSerialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent); DLLExport int mqtt_serialize_connack(unsigned char* buf, int buflen, unsigned char connack_rc, unsigned char sessionPresent);
DLLExport int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen); DLLExport int mqtt_deserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen);
DLLExport int MQTTSerialize_disconnect(unsigned char* buf, int buflen); DLLExport int mqtt_serialize_disconnect(unsigned char* buf, int buflen);
DLLExport int MQTTSerialize_pingreq(unsigned char* buf, int buflen); DLLExport int mqtt_serialize_pingreq(unsigned char* buf, int buflen);
#endif /* MQTTCONNECT_H_ */ #endif /* MQTTCONNECT_H_ */

View file

@ -24,7 +24,7 @@
* @param options the options to be used to build the connect packet * @param options the options to be used to build the connect packet
* @return the length of buffer needed to contain the serialized version of the packet * @return the length of buffer needed to contain the serialized version of the packet
*/ */
int MQTTSerialize_connectLength(MQTTPacket_connectData* options) static int connect_length(mqtt_packet_connect_data_t* options)
{ {
int len = 0; int len = 0;
@ -35,13 +35,13 @@ int MQTTSerialize_connectLength(MQTTPacket_connectData* options)
else if (options->MQTTVersion == 4) else if (options->MQTTVersion == 4)
len = 10; len = 10;
len += MQTTstrlen(options->clientID)+2; len += mqtt_strlen(options->clientID)+2;
if (options->willFlag) if (options->willFlag)
len += MQTTstrlen(options->will.topicName)+2 + MQTTstrlen(options->will.message)+2; len += mqtt_strlen(options->will.topicName)+2 + mqtt_strlen(options->will.message)+2;
if (options->username.cstring || options->username.lenstring.data) if (options->username.cstring || options->username.lenstring.data)
len += MQTTstrlen(options->username)+2; len += mqtt_strlen(options->username)+2;
if (options->password.cstring || options->password.lenstring.data) if (options->password.cstring || options->password.lenstring.data)
len += MQTTstrlen(options->password)+2; len += mqtt_strlen(options->password)+2;
FUNC_EXIT_RC(len); FUNC_EXIT_RC(len);
return len; return len;
@ -55,36 +55,36 @@ int MQTTSerialize_connectLength(MQTTPacket_connectData* options)
* @param options the options to be used to build the connect packet * @param options the options to be used to build the connect packet
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options) int mqtt_serialize_connect(unsigned char* buf, int buflen, mqtt_packet_connect_data_t* options)
{ {
unsigned char *ptr = buf; unsigned char *ptr = buf;
MQTTHeader header = {0}; mqtt_header_t header = {0};
MQTTConnectFlags flags = {0}; mqtt_connect_flags_t flags = {0};
int len = 0; int len = 0;
int rc = -1; int rc = -1;
FUNC_ENTRY; FUNC_ENTRY;
if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen) if (mqtt_packet_len(len = connect_length(options)) > buflen)
{ {
rc = MQTTPACKET_BUFFER_TOO_SHORT; rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit; goto exit;
} }
header.byte = 0; header.byte = 0;
header.bits.type = CONNECT; header.bits.type = MQTTPACKET_CONNECT;
writeChar(&ptr, header.byte); /* write header */ mqtt_write_char(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, len); /* write remaining length */ ptr += mqtt_packet_encode(ptr, len); /* write remaining length */
if (options->MQTTVersion == 4) if (options->MQTTVersion == 4)
{ {
writeCString(&ptr, "MQTT"); mqtt_write_cstr(&ptr, "MQTT");
writeChar(&ptr, (char) 4); mqtt_write_char(&ptr, (char) 4);
} }
else else
{ {
writeCString(&ptr, "MQIsdp"); mqtt_write_cstr(&ptr, "MQIsdp");
writeChar(&ptr, (char) 3); mqtt_write_char(&ptr, (char) 3);
} }
flags.all = 0; flags.all = 0;
@ -101,18 +101,18 @@ int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectDat
if (options->password.cstring || options->password.lenstring.data) if (options->password.cstring || options->password.lenstring.data)
flags.bits.password = 1; flags.bits.password = 1;
writeChar(&ptr, flags.all); mqtt_write_char(&ptr, flags.all);
writeInt(&ptr, options->keepAliveInterval); mqtt_write_int(&ptr, options->keepAliveInterval);
writeMQTTString(&ptr, options->clientID); mqtt_write_mqqt_str(&ptr, options->clientID);
if (options->willFlag) if (options->willFlag)
{ {
writeMQTTString(&ptr, options->will.topicName); mqtt_write_mqqt_str(&ptr, options->will.topicName);
writeMQTTString(&ptr, options->will.message); mqtt_write_mqqt_str(&ptr, options->will.message);
} }
if (flags.bits.username) if (flags.bits.username)
writeMQTTString(&ptr, options->username); mqtt_write_mqqt_str(&ptr, options->username);
if (flags.bits.password) if (flags.bits.password)
writeMQTTString(&ptr, options->password); mqtt_write_mqqt_str(&ptr, options->password);
rc = ptr - buf; rc = ptr - buf;
@ -129,28 +129,28 @@ int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectDat
* @param len the length in bytes of the data in the supplied buffer * @param len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure * @return error code. 1 is success, 0 is failure
*/ */
int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen) int mqtt_deserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen)
{ {
MQTTHeader header = {0}; mqtt_header_t header = {0};
unsigned char* curdata = buf; unsigned char* curdata = buf;
unsigned char* enddata = NULL; unsigned char* enddata = NULL;
int rc = 0; int rc = 0;
int mylen; int mylen;
MQTTConnackFlags flags = {0}; mqtt_conn_ack_flags_t flags = {0};
FUNC_ENTRY; FUNC_ENTRY;
header.byte = readChar(&curdata); header.byte = mqtt_read_char(&curdata);
if (header.bits.type != CONNACK) if (header.bits.type != MQTTPACKET_CONNACK)
goto exit; goto exit;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ curdata += (rc = mqtt_packet_decode_buf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen; enddata = curdata + mylen;
if (enddata - curdata < 2) if (enddata - curdata < 2)
goto exit; goto exit;
flags.all = readChar(&curdata); flags.all = mqtt_read_char(&curdata);
*sessionPresent = flags.bits.sessionpresent; *sessionPresent = flags.bits.sessionpresent;
*connack_rc = readChar(&curdata); *connack_rc = mqtt_read_char(&curdata);
rc = 1; rc = 1;
exit: exit:
@ -166,9 +166,9 @@ exit:
* @param packettype the message type * @param packettype the message type
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_zero(unsigned char* buf, int buflen, unsigned char packettype) int mqtt_serialize_zero(unsigned char* buf, int buflen, unsigned char packettype)
{ {
MQTTHeader header = {0}; mqtt_header_t header = {0};
int rc = -1; int rc = -1;
unsigned char *ptr = buf; unsigned char *ptr = buf;
@ -180,9 +180,9 @@ int MQTTSerialize_zero(unsigned char* buf, int buflen, unsigned char packettype
} }
header.byte = 0; header.byte = 0;
header.bits.type = packettype; header.bits.type = packettype;
writeChar(&ptr, header.byte); /* write header */ mqtt_write_char(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 0); /* write remaining length */ ptr += mqtt_packet_encode(ptr, 0); /* write remaining length */
rc = ptr - buf; rc = ptr - buf;
exit: exit:
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
@ -196,9 +196,9 @@ exit:
* @param buflen the length in bytes of the supplied buffer, to avoid overruns * @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_disconnect(unsigned char* buf, int buflen) int mqtt_serialize_disconnect(unsigned char* buf, int buflen)
{ {
return MQTTSerialize_zero(buf, buflen, DISCONNECT); return mqtt_serialize_zero(buf, buflen, MQTTPACKET_DISCONNECT);
} }
@ -208,7 +208,7 @@ int MQTTSerialize_disconnect(unsigned char* buf, int buflen)
* @param buflen the length in bytes of the supplied buffer, to avoid overruns * @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_pingreq(unsigned char* buf, int buflen) int mqtt_serialize_pingreq(unsigned char* buf, int buflen)
{ {
return MQTTSerialize_zero(buf, buflen, PINGREQ); return mqtt_serialize_zero(buf, buflen, MQTTPACKET_PINGREQ);
} }

View file

@ -33,32 +33,32 @@
* @param buflen the length in bytes of the data in the supplied buffer * @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success * @return error code. 1 is success
*/ */
int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, int mqtt_deserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, mqtt_string_t* topicName,
unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen) unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)
{ {
MQTTHeader header = {0}; mqtt_header_t header = {0};
unsigned char* curdata = buf; unsigned char* curdata = buf;
unsigned char* enddata = NULL; unsigned char* enddata = NULL;
int rc = 0; int rc = 0;
int mylen = 0; int mylen = 0;
FUNC_ENTRY; FUNC_ENTRY;
header.byte = readChar(&curdata); header.byte = mqtt_read_char(&curdata);
if (header.bits.type != PUBLISH) if (header.bits.type != MQTTPACKET_PUBLISH)
goto exit; goto exit;
*dup = header.bits.dup; *dup = header.bits.dup;
*qos = header.bits.qos; *qos = header.bits.qos;
*retained = header.bits.retain; *retained = header.bits.retain;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ curdata += (rc = mqtt_packet_decode_buf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen; enddata = curdata + mylen;
if (!readMQTTLenString(topicName, &curdata, enddata) || if (!mqtt_read_str_len(topicName, &curdata, enddata) ||
enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */ enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */
goto exit; goto exit;
if (*qos > 0) if (*qos > 0)
*packetid = readInt(&curdata); *packetid = mqtt_read_int(&curdata);
*payloadlen = enddata - curdata; *payloadlen = enddata - curdata;
*payload = curdata; *payload = curdata;
@ -79,25 +79,25 @@ exit:
* @param buflen the length in bytes of the data in the supplied buffer * @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure * @return error code. 1 is success, 0 is failure
*/ */
int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen) int mqtt_deserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen)
{ {
MQTTHeader header = {0}; mqtt_header_t header = {0};
unsigned char* curdata = buf; unsigned char* curdata = buf;
unsigned char* enddata = NULL; unsigned char* enddata = NULL;
int rc = 0; int rc = 0;
int mylen; int mylen;
FUNC_ENTRY; FUNC_ENTRY;
header.byte = readChar(&curdata); header.byte = mqtt_read_char(&curdata);
*dup = header.bits.dup; *dup = header.bits.dup;
*packettype = header.bits.type; *packettype = header.bits.type;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ curdata += (rc = mqtt_packet_decode_buf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen; enddata = curdata + mylen;
if (enddata - curdata < 2) if (enddata - curdata < 2)
goto exit; goto exit;
*packetid = readInt(&curdata); *packetid = mqtt_read_int(&curdata);
rc = 1; rc = 1;
exit: exit:

View file

@ -28,7 +28,7 @@
#include "MQTTESP8266.h" #include "MQTTESP8266.h"
char expired(Timer* timer) char mqtt_timer_expired(mqtt_timer_t* timer)
{ {
portTickType now = xTaskGetTickCount(); portTickType now = xTaskGetTickCount();
int32_t left = timer->end_time - now; int32_t left = timer->end_time - now;
@ -36,20 +36,20 @@ char expired(Timer* timer)
} }
void countdown_ms(Timer* timer, unsigned int timeout) void mqtt_timer_countdown_ms(mqtt_timer_t* timer, unsigned int timeout)
{ {
portTickType now = xTaskGetTickCount(); portTickType now = xTaskGetTickCount();
timer->end_time = now + timeout / portTICK_RATE_MS; timer->end_time = now + timeout / portTICK_RATE_MS;
} }
void countdown(Timer* timer, unsigned int timeout) void mqtt_timer_countdown(mqtt_timer_t* timer, unsigned int timeout)
{ {
countdown_ms(timer, timeout * 1000); mqtt_timer_countdown_ms(timer, timeout * 1000);
} }
int left_ms(Timer* timer) int mqtt_timer_left_ms(mqtt_timer_t* timer)
{ {
portTickType now = xTaskGetTickCount(); portTickType now = xTaskGetTickCount();
int32_t left = timer->end_time - now; int32_t left = timer->end_time - now;
@ -57,14 +57,14 @@ int left_ms(Timer* timer)
} }
void InitTimer(Timer* timer) void mqtt_timer_init(mqtt_timer_t* timer)
{ {
timer->end_time = 0; timer->end_time = 0;
} }
int mqtt_esp_read(Network* n, unsigned char* buffer, int len, int timeout_ms) int mqtt_esp_read(mqtt_network_t* n, unsigned char* buffer, int len, int timeout_ms)
{ {
struct timeval tv; struct timeval tv;
fd_set fdset; fd_set fdset;
@ -89,7 +89,7 @@ int mqtt_esp_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
} }
int mqtt_esp_write(Network* n, unsigned char* buffer, int len, int timeout_ms) int mqtt_esp_write(mqtt_network_t* n, unsigned char* buffer, int len, int timeout_ms)
{ {
struct timeval tv; struct timeval tv;
fd_set fdset; fd_set fdset;
@ -115,7 +115,7 @@ int mqtt_esp_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
void NewNetwork(Network* n) void mqtt_network_new(mqtt_network_t* n)
{ {
n->my_socket = -1; n->my_socket = -1;
n->mqttread = mqtt_esp_read; n->mqttread = mqtt_esp_read;
@ -148,7 +148,7 @@ static int host2addr(const char *hostname , struct in_addr *in)
} }
int ConnectNetwork(Network* n, const char* host, int port) int mqtt_network_connect(mqtt_network_t* n, const char* host, int port)
{ {
struct sockaddr_in addr; struct sockaddr_in addr;
int ret; int ret;
@ -179,7 +179,7 @@ int ConnectNetwork(Network* n, const char* host, int port)
} }
int DisconnectNetwork(Network* n) int mqtt_network_disconnect(mqtt_network_t* n)
{ {
close(n->my_socket); close(n->my_socket);
n->my_socket = -1; n->my_socket = -1;

View file

@ -24,36 +24,34 @@
#include <FreeRTOS.h> #include <FreeRTOS.h>
#include <portmacro.h> #include <portmacro.h>
typedef struct Timer Timer; typedef struct mqtt_timer mqtt_timer_t;
struct Timer struct mqtt_timer
{ {
portTickType end_time; portTickType end_time;
}; };
typedef struct Network Network; typedef struct mqtt_network mqtt_network_t;
struct Network struct mqtt_network
{ {
int my_socket; int my_socket;
int (*mqttread) (Network*, unsigned char*, int, int); int (*mqttread) (mqtt_network_t*, unsigned char*, int, int);
int (*mqttwrite) (Network*, unsigned char*, int, int); int (*mqttwrite) (mqtt_network_t*, unsigned char*, int, int);
}; };
char expired(Timer*); char mqtt_timer_expired(mqtt_timer_t*);
void countdown_ms(Timer*, unsigned int); void mqtt_timer_countdown_ms(mqtt_timer_t*, unsigned int);
void countdown(Timer*, unsigned int); void mqtt_timer_countdown(mqtt_timer_t*, unsigned int);
int left_ms(Timer*); int mqtt_timer_left_ms(mqtt_timer_t*);
void mqtt_timer_init(mqtt_timer_t*);
void InitTimer(Timer*); int mqtt_esp_read(mqtt_network_t*, unsigned char*, int, int);
int mqtt_esp_write(mqtt_network_t*, unsigned char*, int, int);
int mqtt_esp_read(Network*, unsigned char*, int, int); void mqtt_esp_disconnect(mqtt_network_t*);
int mqtt_esp_write(Network*, unsigned char*, int, int);
void mqtt_esp_disconnect(Network*);
void NewNetwork(Network* n);
int ConnectNetwork(Network* n, const char* host, int port);
int DisconnectNetwork(Network* n);
void mqtt_network_new(mqtt_network_t* n);
int mqtt_network_connect(mqtt_network_t* n, const char* host, int port);
int mqtt_network_disconnect(mqtt_network_t* n);
#endif /* _MQTT_ESP8266_H_ */ #endif /* _MQTT_ESP8266_H_ */

View file

@ -20,18 +20,18 @@
#include "StackTrace.h" #include "StackTrace.h"
#include "MQTTPacket.h" #include "MQTTPacket.h"
const char* MQTTPacket_getName(unsigned short packetid); const char* mqtt_packet_get_name(unsigned short packetid);
int MQTTStringFormat_connect(char* strbuf, int strbuflen, MQTTPacket_connectData* data); int mqtt_string_format_connect(char* strbuf, int strbuflen, mqtt_packet_connect_data_t* data);
int MQTTStringFormat_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent); int mqtt_string_format_connack(char* strbuf, int strbuflen, unsigned char connack_rc, unsigned char sessionPresent);
int MQTTStringFormat_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained, int mqtt_string_format_publish(char* strbuf, int strbuflen, unsigned char dup, int qos, unsigned char retained,
unsigned short packetid, MQTTString topicName, unsigned char* payload, int payloadlen); unsigned short packetid, mqtt_string_t topicName, unsigned char* payload, int payloadlen);
int MQTTStringFormat_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid); int mqtt_string_format_ack(char* strbuf, int strbuflen, unsigned char packettype, unsigned char dup, unsigned short packetid);
int MQTTStringFormat_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count, int mqtt_string_format_subscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int count,
MQTTString topicFilters[], int requestedQoSs[]); mqtt_string_t topicFilters[], int requestedQoSs[]);
int MQTTStringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs); int mqtt_string_format_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs);
int MQTTStringFormat_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid, int mqtt_string_format_unsubscribe(char* strbuf, int strbuflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[]); int count, mqtt_string_t topicFilters[]);
char* MQTTFormat_toClientString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); char* mqtt_format_to_client_string(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
char* MQTTFormat_toServerString(char* strbuf, int strbuflen, unsigned char* buf, int buflen); char* mqtt_format_to_server_string(char* strbuf, int strbuflen, unsigned char* buf, int buflen);
#endif #endif

View file

@ -25,7 +25,7 @@
* @param length the length to be encoded * @param length the length to be encoded
* @return the number of bytes written to buffer * @return the number of bytes written to buffer
*/ */
int MQTTPacket_encode(unsigned char* buf, int length) int mqtt_packet_encode(unsigned char* buf, int length)
{ {
int rc = 0; int rc = 0;
@ -50,7 +50,7 @@ int MQTTPacket_encode(unsigned char* buf, int length)
* @param value the decoded length returned * @param value the decoded length returned
* @return the number of bytes read from the socket * @return the number of bytes read from the socket
*/ */
int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value) int mqtt_packet_decode(int (*getcharfn)(unsigned char*, int), int* value)
{ {
unsigned char c; unsigned char c;
int multiplier = 1; int multiplier = 1;
@ -80,7 +80,7 @@ exit:
} }
int MQTTPacket_len(int rem_len) int mqtt_packet_len(int rem_len)
{ {
rem_len += 1; /* header byte */ rem_len += 1; /* header byte */
@ -99,7 +99,7 @@ int MQTTPacket_len(int rem_len)
static unsigned char* bufptr; static unsigned char* bufptr;
int bufchar(unsigned char* c, int count) static int bufchar(unsigned char* c, int count)
{ {
int i; int i;
@ -109,10 +109,10 @@ int bufchar(unsigned char* c, int count)
} }
int MQTTPacket_decodeBuf(unsigned char* buf, int* value) int mqtt_packet_decode_buf(unsigned char* buf, int* value)
{ {
bufptr = buf; bufptr = buf;
return MQTTPacket_decode(bufchar, value); return mqtt_packet_decode(bufchar, value);
} }
@ -121,7 +121,7 @@ int MQTTPacket_decodeBuf(unsigned char* buf, int* value)
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the integer value calculated * @return the integer value calculated
*/ */
int readInt(unsigned char** pptr) int mqtt_read_int(unsigned char** pptr)
{ {
unsigned char* ptr = *pptr; unsigned char* ptr = *pptr;
int len = 256*(*ptr) + (*(ptr+1)); int len = 256*(*ptr) + (*(ptr+1));
@ -135,7 +135,7 @@ int readInt(unsigned char** pptr)
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the character read * @return the character read
*/ */
char readChar(unsigned char** pptr) char mqtt_read_char(unsigned char** pptr)
{ {
char c = **pptr; char c = **pptr;
(*pptr)++; (*pptr)++;
@ -148,7 +148,7 @@ char readChar(unsigned char** pptr)
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param c the character to write * @param c the character to write
*/ */
void writeChar(unsigned char** pptr, char c) void mqtt_write_char(unsigned char** pptr, char c)
{ {
**pptr = c; **pptr = c;
(*pptr)++; (*pptr)++;
@ -160,7 +160,7 @@ void writeChar(unsigned char** pptr, char c)
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param anInt the integer to write * @param anInt the integer to write
*/ */
void writeInt(unsigned char** pptr, int anInt) void mqtt_write_int(unsigned char** pptr, int anInt)
{ {
**pptr = (unsigned char)(anInt / 256); **pptr = (unsigned char)(anInt / 256);
(*pptr)++; (*pptr)++;
@ -174,34 +174,34 @@ void writeInt(unsigned char** pptr, int anInt)
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param string the C string to write * @param string the C string to write
*/ */
void writeCString(unsigned char** pptr, const char* string) void mqtt_write_cstr(unsigned char** pptr, const char* string)
{ {
int len = strlen(string); int len = strlen(string);
writeInt(pptr, len); mqtt_write_int(pptr, len);
memcpy(*pptr, string, len); memcpy(*pptr, string, len);
*pptr += len; *pptr += len;
} }
int getLenStringLen(char* ptr) //int getLenStringLen(char* ptr)
{ //{
int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1)); // int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
return len; // return len;
} //}
void writeMQTTString(unsigned char** pptr, MQTTString mqttstring) void mqtt_write_mqqt_str(unsigned char** pptr, mqtt_string_t mqttstring)
{ {
if (mqttstring.lenstring.len > 0) if (mqttstring.lenstring.len > 0)
{ {
writeInt(pptr, mqttstring.lenstring.len); mqtt_write_int(pptr, mqttstring.lenstring.len);
memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len); memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len);
*pptr += mqttstring.lenstring.len; *pptr += mqttstring.lenstring.len;
} }
else if (mqttstring.cstring) else if (mqttstring.cstring)
writeCString(pptr, mqttstring.cstring); mqtt_write_cstr(pptr, mqttstring.cstring);
else else
writeInt(pptr, 0); mqtt_write_int(pptr, 0);
} }
@ -211,7 +211,7 @@ void writeMQTTString(unsigned char** pptr, MQTTString mqttstring)
* @param enddata pointer to the end of the data: do not read beyond * @param enddata pointer to the end of the data: do not read beyond
* @return 1 if successful, 0 if not * @return 1 if successful, 0 if not
*/ */
int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata) int mqtt_read_str_len(mqtt_string_t* mqttstring, unsigned char** pptr, unsigned char* enddata)
{ {
int rc = 0; int rc = 0;
@ -219,7 +219,7 @@ int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned ch
/* the first two bytes are the length of the string */ /* the first two bytes are the length of the string */
if (enddata - (*pptr) > 1) /* enough length to read the integer? */ if (enddata - (*pptr) > 1) /* enough length to read the integer? */
{ {
mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */ mqttstring->lenstring.len = mqtt_read_int(pptr); /* increments pptr to point past length */
if (&(*pptr)[mqttstring->lenstring.len] <= enddata) if (&(*pptr)[mqttstring->lenstring.len] <= enddata)
{ {
mqttstring->lenstring.data = (char*)*pptr; mqttstring->lenstring.data = (char*)*pptr;
@ -238,7 +238,7 @@ int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned ch
* @param mqttstring the string to return the length of * @param mqttstring the string to return the length of
* @return the length of the string * @return the length of the string
*/ */
int MQTTstrlen(MQTTString mqttstring) int mqtt_strlen(mqtt_string_t mqttstring)
{ {
int rc = 0; int rc = 0;
@ -256,7 +256,7 @@ int MQTTstrlen(MQTTString mqttstring)
* @param bptr the C string to compare * @param bptr the C string to compare
* @return boolean - equal or not * @return boolean - equal or not
*/ */
int MQTTPacket_equals(MQTTString* a, char* bptr) int mqtt_packet_equals(mqtt_string_t* a, char* bptr)
{ {
int alen = 0, int alen = 0,
blen = 0; blen = 0;
@ -286,10 +286,10 @@ int MQTTPacket_equals(MQTTString* a, char* bptr)
* @return integer MQTT packet type, or -1 on error * @return integer MQTT packet type, or -1 on error
* @note the whole message must fit into the caller's buffer * @note the whole message must fit into the caller's buffer
*/ */
int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)) int mqtt_packet_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int))
{ {
int rc = -1; int rc = -1;
MQTTHeader header = {0}; mqtt_header_t header = {0};
int len = 0; int len = 0;
int rem_len = 0; int rem_len = 0;
@ -299,8 +299,8 @@ int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*
len = 1; len = 1;
/* 2. read the remaining length. This is variable in itself */ /* 2. read the remaining length. This is variable in itself */
MQTTPacket_decode(getfn, &rem_len); mqtt_packet_decode(getfn, &rem_len);
len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */ len += mqtt_packet_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */
/* 3. read the rest of the buffer using a callback to supply the rest of the data */ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
if((rem_len + len) > buflen) if((rem_len + len) > buflen)
@ -320,7 +320,7 @@ exit:
* @param value the decoded length returned * @param value the decoded length returned
* @return integer the number of bytes read from the socket, 0 for call again, or -1 on error * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error
*/ */
static int MQTTPacket_decodenb(MQTTTransport *trp) static int decodenb(mqtt_transport_t *trp)
{ {
unsigned char c; unsigned char c;
int rc = MQTTPACKET_READ_ERROR; int rc = MQTTPACKET_READ_ERROR;
@ -357,10 +357,10 @@ exit:
* @return integer MQTT packet type, 0 for call again, or -1 on error * @return integer MQTT packet type, 0 for call again, or -1 on error
* @note the whole message must fit into the caller's buffer * @note the whole message must fit into the caller's buffer
*/ */
int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp) int mqtt_packet_readnb(unsigned char* buf, int buflen, mqtt_transport_t *trp)
{ {
int rc = -1, frc; int rc = -1, frc;
MQTTHeader header = {0}; mqtt_header_t header = {0};
switch(trp->state){ switch(trp->state){
default: default:
@ -377,11 +377,11 @@ int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp)
/*FALLTHROUGH*/ /*FALLTHROUGH*/
/* read the remaining length. This is variable in itself */ /* read the remaining length. This is variable in itself */
case 1: case 1:
if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR) if((frc=decodenb(trp)) == MQTTPACKET_READ_ERROR)
goto exit; goto exit;
if(frc == 0) if(frc == 0)
return 0; return 0;
trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */ trp->len = 1 + mqtt_packet_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
if((trp->rem_len + trp->len) > buflen) if((trp->rem_len + trp->len) > buflen)
goto exit; goto exit;
++trp->state; ++trp->state;

View file

@ -42,9 +42,20 @@ enum errors
enum msgTypes enum msgTypes
{ {
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, MQTTPACKET_CONNECT = 1,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, MQTTPACKET_CONNACK,
PINGREQ, PINGRESP, DISCONNECT MQTTPACKET_PUBLISH,
MQTTPACKET_PUBACK,
MQTTPACKET_PUBREC,
MQTTPACKET_PUBREL,
MQTTPACKET_PUBCOMP,
MQTTPACKET_SUBSCRIBE,
MQTTPACKET_SUBACK,
MQTTPACKET_UNSUBSCRIBE,
MQTTPACKET_UNSUBACK,
MQTTPACKET_PINGREQ,
MQTTPACKET_PINGRESP,
MQTTPACKET_DISCONNECT
}; };
/** /**
@ -70,23 +81,23 @@ typedef union
unsigned int type : 4; /**< message type nibble */ unsigned int type : 4; /**< message type nibble */
} bits; } bits;
#endif #endif
} MQTTHeader; } mqtt_header_t;
typedef struct typedef struct
{ {
int len; int len;
char* data; char* data;
} MQTTLenString; } mqtt_string_len_t;
typedef struct typedef struct
{ {
char* cstring; char* cstring;
MQTTLenString lenstring; mqtt_string_len_t lenstring;
} MQTTString; } mqtt_string_t;
#define MQTTString_initializer {NULL, {0, NULL}} #define mqtt_string_initializer {NULL, {0, NULL}}
int MQTTstrlen(MQTTString mqttstring); int mqtt_strlen(mqtt_string_t mqttstring);
#include "MQTTConnect.h" #include "MQTTConnect.h"
#include "MQTTPublish.h" #include "MQTTPublish.h"
@ -94,25 +105,25 @@ int MQTTstrlen(MQTTString mqttstring);
#include "MQTTUnsubscribe.h" #include "MQTTUnsubscribe.h"
#include "MQTTFormat.h" #include "MQTTFormat.h"
int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid); int mqtt_serialize_ack(unsigned char* buf, int buflen, unsigned char type, unsigned char dup, unsigned short packetid);
int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen); int mqtt_deserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, unsigned char* buf, int buflen);
int MQTTPacket_len(int rem_len); int mqtt_packet_len(int rem_len);
int MQTTPacket_equals(MQTTString* a, char* b); int mqtt_packet_equals(mqtt_string_t* a, char* b);
int MQTTPacket_encode(unsigned char* buf, int length); int mqtt_packet_encode(unsigned char* buf, int length);
int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value); int mqtt_packet_decode(int (*getcharfn)(unsigned char*, int), int* value);
int MQTTPacket_decodeBuf(unsigned char* buf, int* value); int mqtt_packet_decode_buf(unsigned char* buf, int* value);
int readInt(unsigned char** pptr); int mqtt_read_int(unsigned char** pptr);
char readChar(unsigned char** pptr); char mqtt_read_char(unsigned char** pptr);
void writeChar(unsigned char** pptr, char c); void mqtt_write_char(unsigned char** pptr, char c);
void writeInt(unsigned char** pptr, int anInt); void mqtt_write_int(unsigned char** pptr, int anInt);
int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata); int mqtt_read_str_len(mqtt_string_t* mqttstring, unsigned char** pptr, unsigned char* enddata);
void writeCString(unsigned char** pptr, const char* string); void mqtt_write_cstr(unsigned char** pptr, const char* string);
void writeMQTTString(unsigned char** pptr, MQTTString mqttstring); void mqtt_write_mqqt_str(unsigned char** pptr, mqtt_string_t mqttstring);
DLLExport int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int)); DLLExport int mqtt_packet_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int));
typedef struct { typedef struct {
int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */ int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */
@ -121,9 +132,9 @@ typedef struct {
int rem_len; int rem_len;
int len; int len;
char state; char state;
}MQTTTransport; } mqtt_transport_t;
int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp); int mqtt_packet_readnb(unsigned char* buf, int buflen, mqtt_transport_t *trp);
#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */ #ifdef __cplusplus /* If this is a C++ compiler, use C linkage */
} }

View file

@ -25,14 +25,14 @@
#define DLLExport #define DLLExport
#endif #endif
DLLExport int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid, DLLExport int mqtt_serialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid,
MQTTString topicName, unsigned char* payload, int payloadlen); mqtt_string_t topicName, unsigned char* payload, int payloadlen);
DLLExport int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, DLLExport int mqtt_deserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, mqtt_string_t* topicName,
unsigned char** payload, int* payloadlen, unsigned char* buf, int len); unsigned char** payload, int* payloadlen, unsigned char* buf, int len);
DLLExport int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid); DLLExport int mqtt_serialize_puback(unsigned char* buf, int buflen, unsigned short packetid);
DLLExport int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid); DLLExport int mqtt_serialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid);
DLLExport int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid); DLLExport int mqtt_serialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid);
#endif /* MQTTPUBLISH_H_ */ #endif /* MQTTPUBLISH_H_ */

View file

@ -28,11 +28,11 @@
* @param payloadlen the length of the payload to be sent * @param payloadlen the length of the payload to be sent
* @return the length of buffer needed to contain the serialized version of the packet * @return the length of buffer needed to contain the serialized version of the packet
*/ */
int MQTTSerialize_publishLength(int qos, MQTTString topicName, int payloadlen) static int publish_length(int qos, mqtt_string_t topicName, int payloadlen)
{ {
int len = 0; int len = 0;
len += 2 + MQTTstrlen(topicName) + payloadlen; len += 2 + mqtt_strlen(topicName) + payloadlen;
if (qos > 0) if (qos > 0)
len += 2; /* packetid */ len += 2; /* packetid */
return len; return len;
@ -52,33 +52,33 @@ int MQTTSerialize_publishLength(int qos, MQTTString topicName, int payloadlen)
* @param payloadlen integer - the length of the MQTT payload * @param payloadlen integer - the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error * @return the length of the serialized data. <= 0 indicates error
*/ */
int MQTTSerialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid, int mqtt_serialize_publish(unsigned char* buf, int buflen, unsigned char dup, int qos, unsigned char retained, unsigned short packetid,
MQTTString topicName, unsigned char* payload, int payloadlen) mqtt_string_t topicName, unsigned char* payload, int payloadlen)
{ {
unsigned char *ptr = buf; unsigned char *ptr = buf;
MQTTHeader header = {0}; mqtt_header_t header = {0};
int rem_len = 0; int rem_len = 0;
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_publishLength(qos, topicName, payloadlen)) > buflen) if (mqtt_packet_len(rem_len = publish_length(qos, topicName, payloadlen)) > buflen)
{ {
rc = MQTTPACKET_BUFFER_TOO_SHORT; rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit; goto exit;
} }
header.bits.type = PUBLISH; header.bits.type = MQTTPACKET_PUBLISH;
header.bits.dup = dup; header.bits.dup = dup;
header.bits.qos = qos; header.bits.qos = qos;
header.bits.retain = retained; header.bits.retain = retained;
writeChar(&ptr, header.byte); /* write header */ mqtt_write_char(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; ptr += mqtt_packet_encode(ptr, rem_len); /* write remaining length */;
writeMQTTString(&ptr, topicName); mqtt_write_mqqt_str(&ptr, topicName);
if (qos > 0) if (qos > 0)
writeInt(&ptr, packetid); mqtt_write_int(&ptr, packetid);
memcpy(ptr, payload, payloadlen); memcpy(ptr, payload, payloadlen);
ptr += payloadlen; ptr += payloadlen;
@ -101,9 +101,9 @@ exit:
* @param packetid the MQTT packet identifier * @param packetid the MQTT packet identifier
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char packettype, unsigned char dup, unsigned short packetid) int mqtt_serialize_ack(unsigned char* buf, int buflen, unsigned char packettype, unsigned char dup, unsigned short packetid)
{ {
MQTTHeader header = {0}; mqtt_header_t header = {0};
int rc = 0; int rc = 0;
unsigned char *ptr = buf; unsigned char *ptr = buf;
@ -115,11 +115,11 @@ int MQTTSerialize_ack(unsigned char* buf, int buflen, unsigned char packettype,
} }
header.bits.type = packettype; header.bits.type = packettype;
header.bits.dup = dup; header.bits.dup = dup;
header.bits.qos = (packettype == PUBREL) ? 1 : 0; header.bits.qos = (packettype == MQTTPACKET_PUBREL) ? 1 : 0;
writeChar(&ptr, header.byte); /* write header */ mqtt_write_char(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ ptr += mqtt_packet_encode(ptr, 2); /* write remaining length */
writeInt(&ptr, packetid); mqtt_write_int(&ptr, packetid);
rc = ptr - buf; rc = ptr - buf;
exit: exit:
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
@ -134,9 +134,9 @@ exit:
* @param packetid integer - the MQTT packet identifier * @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packetid) int mqtt_serialize_puback(unsigned char* buf, int buflen, unsigned short packetid)
{ {
return MQTTSerialize_ack(buf, buflen, PUBACK, 0, packetid); return mqtt_serialize_ack(buf, buflen, MQTTPACKET_PUBACK, 0, packetid);
} }
@ -148,9 +148,9 @@ int MQTTSerialize_puback(unsigned char* buf, int buflen, unsigned short packeti
* @param packetid integer - the MQTT packet identifier * @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid) int mqtt_serialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid)
{ {
return MQTTSerialize_ack(buf, buflen, PUBREL, dup, packetid); return mqtt_serialize_ack(buf, buflen, MQTTPACKET_PUBREL, dup, packetid);
} }
@ -161,9 +161,9 @@ int MQTTSerialize_pubrel(unsigned char* buf, int buflen, unsigned char dup, uns
* @param packetid integer - the MQTT packet identifier * @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0 * @return serialized length, or error if 0
*/ */
int MQTTSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid) int mqtt_serialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid)
{ {
return MQTTSerialize_ack(buf, buflen, PUBCOMP, 0, packetid); return mqtt_serialize_ack(buf, buflen, MQTTPACKET_PUBCOMP, 0, packetid);
} }

View file

@ -25,15 +25,17 @@
#define DLLExport #define DLLExport
#endif #endif
DLLExport int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, #include "MQTTPacket.h"
int count, MQTTString topicFilters[], int requestedQoSs[]);
DLLExport int MQTTDeserialize_subscribe(unsigned char* dup, unsigned short* packetid, DLLExport int mqtt_serialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid,
int maxcount, int* count, MQTTString topicFilters[], int requestedQoSs[], unsigned char* buf, int len); int count, mqtt_string_t topicFilters[], int requestedQoSs[]);
DLLExport int MQTTSerialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs); DLLExport int mqtt_deserialize_subscribe(unsigned char* dup, unsigned short* packetid,
int maxcount, int* count, mqtt_string_t topicFilters[], int requestedQoSs[], unsigned char* buf, int len);
DLLExport int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int len); DLLExport int mqtt_serialize_suback(unsigned char* buf, int buflen, unsigned short packetid, int count, int* grantedQoSs);
DLLExport int mqtt_deserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int len);
#endif /* MQTTSUBSCRIBE_H_ */ #endif /* MQTTSUBSCRIBE_H_ */

View file

@ -14,6 +14,7 @@
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/ *******************************************************************************/
#include <espressif/esp_common.h> #include <espressif/esp_common.h>
#include "MQTTPacket.h" #include "MQTTPacket.h"
#include "StackTrace.h" #include "StackTrace.h"
@ -25,13 +26,13 @@
* @param topicFilters the array of topic filter strings to be used in the publish * @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet * @return the length of buffer needed to contain the serialized version of the packet
*/ */
int MQTTSerialize_subscribeLength(int count, MQTTString topicFilters[]) static int subscribe_length(int count, mqtt_string_t topicFilters[])
{ {
int i; int i;
int len = 2; /* packetid */ int len = 2; /* packetid */
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */ len += 2 + mqtt_strlen(topicFilters[i]) + 1; /* length + topic + req_qos */
return len; return len;
} }
@ -47,36 +48,36 @@ int MQTTSerialize_subscribeLength(int count, MQTTString topicFilters[])
* @param requestedQoSs - array of requested QoS * @param requestedQoSs - array of requested QoS
* @return the length of the serialized data. <= 0 indicates error * @return the length of the serialized data. <= 0 indicates error
*/ */
int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count, int mqtt_serialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count,
MQTTString topicFilters[], int requestedQoSs[]) mqtt_string_t topicFilters[], int requestedQoSs[])
{ {
unsigned char *ptr = buf; unsigned char *ptr = buf;
MQTTHeader header = {0}; mqtt_header_t header = {0};
int rem_len = 0; int rem_len = 0;
int rc = 0; int rc = 0;
int i = 0; int i = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_subscribeLength(count, topicFilters)) > buflen) if (mqtt_packet_len(rem_len = subscribe_length(count, topicFilters)) > buflen)
{ {
rc = MQTTPACKET_BUFFER_TOO_SHORT; rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit; goto exit;
} }
header.byte = 0; header.byte = 0;
header.bits.type = SUBSCRIBE; header.bits.type = MQTTPACKET_SUBSCRIBE;
header.bits.dup = dup; header.bits.dup = dup;
header.bits.qos = 1; header.bits.qos = 1;
writeChar(&ptr, header.byte); /* write header */ mqtt_write_char(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; ptr += mqtt_packet_encode(ptr, rem_len); /* write remaining length */;
writeInt(&ptr, packetid); mqtt_write_int(&ptr, packetid);
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
{ {
writeMQTTString(&ptr, topicFilters[i]); mqtt_write_mqqt_str(&ptr, topicFilters[i]);
writeChar(&ptr, requestedQoSs[i]); mqtt_write_char(&ptr, requestedQoSs[i]);
} }
rc = ptr - buf; rc = ptr - buf;
@ -97,25 +98,25 @@ exit:
* @param buflen the length in bytes of the data in the supplied buffer * @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure * @return error code. 1 is success, 0 is failure
*/ */
int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen) int mqtt_deserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen)
{ {
MQTTHeader header = {0}; mqtt_header_t header = {0};
unsigned char* curdata = buf; unsigned char* curdata = buf;
unsigned char* enddata = NULL; unsigned char* enddata = NULL;
int rc = 0; int rc = 0;
int mylen; int mylen;
FUNC_ENTRY; FUNC_ENTRY;
header.byte = readChar(&curdata); header.byte = mqtt_read_char(&curdata);
if (header.bits.type != SUBACK) if (header.bits.type != MQTTPACKET_SUBACK)
goto exit; goto exit;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ curdata += (rc = mqtt_packet_decode_buf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen; enddata = curdata + mylen;
if (enddata - curdata < 2) if (enddata - curdata < 2)
goto exit; goto exit;
*packetid = readInt(&curdata); *packetid = mqtt_read_int(&curdata);
*count = 0; *count = 0;
while (curdata < enddata) while (curdata < enddata)
@ -125,7 +126,7 @@ int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count,
rc = -1; rc = -1;
goto exit; goto exit;
} }
grantedQoSs[(*count)++] = readChar(&curdata); grantedQoSs[(*count)++] = mqtt_read_char(&curdata);
} }
rc = 1; rc = 1;

View file

@ -25,14 +25,14 @@
#define DLLExport #define DLLExport
#endif #endif
DLLExport int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, DLLExport int mqtt_serialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[]); int count, mqtt_string_t topicFilters[]);
DLLExport int MQTTDeserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int max_count, int* count, MQTTString topicFilters[], DLLExport int mqtt_deserialize_unsubscribe(unsigned char* dup, unsigned short* packetid, int max_count, int* count, mqtt_string_t topicFilters[],
unsigned char* buf, int len); unsigned char* buf, int len);
DLLExport int MQTTSerialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid); DLLExport int mqtt_serialize_unsuback(unsigned char* buf, int buflen, unsigned short packetid);
DLLExport int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int len); DLLExport int mqtt_deserialize_unsuback(unsigned short* packetid, unsigned char* buf, int len);
#endif /* MQTTUNSUBSCRIBE_H_ */ #endif /* MQTTUNSUBSCRIBE_H_ */

View file

@ -25,13 +25,13 @@
* @param topicFilters the array of topic filter strings to be used in the publish * @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet * @return the length of buffer needed to contain the serialized version of the packet
*/ */
int MQTTSerialize_unsubscribeLength(int count, MQTTString topicFilters[]) static int unsubscribe_length(int count, mqtt_string_t topicFilters[])
{ {
int i; int i;
int len = 2; /* packetid */ int len = 2; /* packetid */
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
len += 2 + MQTTstrlen(topicFilters[i]); /* length + topic*/ len += 2 + mqtt_strlen(topicFilters[i]); /* length + topic*/
return len; return len;
} }
@ -46,34 +46,34 @@ int MQTTSerialize_unsubscribeLength(int count, MQTTString topicFilters[])
* @param topicFilters - array of topic filter names * @param topicFilters - array of topic filter names
* @return the length of the serialized data. <= 0 indicates error * @return the length of the serialized data. <= 0 indicates error
*/ */
int MQTTSerialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int mqtt_serialize_unsubscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid,
int count, MQTTString topicFilters[]) int count, mqtt_string_t topicFilters[])
{ {
unsigned char *ptr = buf; unsigned char *ptr = buf;
MQTTHeader header = {0}; mqtt_header_t header = {0};
int rem_len = 0; int rem_len = 0;
int rc = -1; int rc = -1;
int i = 0; int i = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_unsubscribeLength(count, topicFilters)) > buflen) if (mqtt_packet_len(rem_len = unsubscribe_length(count, topicFilters)) > buflen)
{ {
rc = MQTTPACKET_BUFFER_TOO_SHORT; rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit; goto exit;
} }
header.byte = 0; header.byte = 0;
header.bits.type = UNSUBSCRIBE; header.bits.type = MQTTPACKET_UNSUBSCRIBE;
header.bits.dup = dup; header.bits.dup = dup;
header.bits.qos = 1; header.bits.qos = 1;
writeChar(&ptr, header.byte); /* write header */ mqtt_write_char(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; ptr += mqtt_packet_encode(ptr, rem_len); /* write remaining length */;
writeInt(&ptr, packetid); mqtt_write_int(&ptr, packetid);
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
writeMQTTString(&ptr, topicFilters[i]); mqtt_write_mqqt_str(&ptr, topicFilters[i]);
rc = ptr - buf; rc = ptr - buf;
exit: exit:
@ -89,15 +89,15 @@ exit:
* @param buflen the length in bytes of the data in the supplied buffer * @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure * @return error code. 1 is success, 0 is failure
*/ */
int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int buflen) int mqtt_deserialize_unsuback(unsigned short* packetid, unsigned char* buf, int buflen)
{ {
unsigned char type = 0; unsigned char type = 0;
unsigned char dup = 0; unsigned char dup = 0;
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen); rc = mqtt_deserialize_ack(&type, &dup, packetid, buf, buflen);
if (type == UNSUBACK) if (type == MQTTPACKET_UNSUBACK)
rc = 1; rc = 1;
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;