Import required parts of ameba sdk 4.0b

This commit is contained in:
David Goodlad 2019-04-23 21:10:24 +10:00
parent 2d21e45bba
commit 7319ca1482
737 changed files with 304718 additions and 0 deletions

View file

@ -0,0 +1,852 @@
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTClient.h"
const char * const msg_types_str[]=
{
"Reserved",
"CONNECT",
"CONNACK",
"PUBLISH",
"PUBACK",
"PUBREC",
"PUBREL",
"PUBCOMP",
"SUBSCRIBE",
"SUBACK",
"UNSUBSCRIBE",
"UNSUBACK",
"PINGREQ",
"PINGRESP",
"DISCONNECT",
"Reserved"
};
const char * const mqtt_status_str[]=
{
"MQTT_START",
"MQTT_CONNECT",
"MQTT_SUBTOPIC",
"MQTT_RUNNING"
};
static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
md->topicName = aTopicName;
md->message = aMessage;
}
static int getNextPacketId(MQTTClient *c) {
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
}
static int sendPacket(MQTTClient* c, int length, Timer* timer)
{
int rc = FAILURE,
sent = 0;
while (sent < length && !TimerIsExpired(timer))
{
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
if (rc < 0) // there was an error writing the data
break;
sent += rc;
}
if (sent == length)
{
TimerCountdown(&c->ping_timer, c->keepAliveInterval); // record the fact that we have successfully sent the packet
rc = SUCCESS;
}
else{
rc = FAILURE;
mqtt_printf(MQTT_DEBUG, "Send packet failed");
}
if (c->ipstack->my_socket < 0) {
c->isconnected = 0;
}
return rc;
}
void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
{
int i;
c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms;
c->buf = sendbuf;
c->buf_size = sendbuf_size;
c->readbuf = readbuf;
c->readbuf_size = readbuf_size;
c->isconnected = 0;
c->ping_outstanding = 0;
c->defaultMessageHandler = NULL;
c->next_packetid = 1;
c->ipstack->m2m_rxevent = 0;
c->mqttstatus = MQTT_START;
TimerInit(&c->cmd_timer);
TimerInit(&c->ping_timer);
}
static int decodePacket(MQTTClient* c, int* value, int timeout)
{
unsigned char i;
int multiplier = 1;
int len = 0;
const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
if (rc != 1)
goto exit;
*value += (i & 127) * multiplier;
multiplier *= 128;
} while ((i & 128) != 0);
exit:
return len;
}
static int readPacket(MQTTClient* c, Timer* timer)
{
int rc = FAILURE;
MQTTHeader header = {0};
int len = 0;
int rem_len = 0;
/* 1. read the header byte. This has the packet type in it */
if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)) != 1){
mqtt_printf(MQTT_MSGDUMP, "read packet header failed");
goto exit;
}
len = 1;
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
if(len + rem_len > c->readbuf_size){
mqtt_printf(MQTT_WARNING, "rem_len = %d, read buffer will overflow", rem_len);
rc = BUFFER_OVERFLOW;
goto exit;
}
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)){
mqtt_printf(MQTT_MSGDUMP, "read the rest of the data failed");
goto exit;
}
header.byte = c->readbuf[0];
rc = header.bits.type;
exit:
if (c->ipstack->my_socket < 0) {
c->isconnected = 0;
}
return rc;
}
// assume topic filter and name is in correct format
// # can only be at end
// + and # can only be next to separator
static char isTopicMatched(char* topicFilter, MQTTString* topicName)
{
char* curf = topicFilter;
char* curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end)
{
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+')
{ // skip until we meet the next separator, or end of string
char* nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/')
nextpos = ++curn + 1;
}
else if (*curf == '#')
curn = curn_end - 1; // skip until end of string
curf++;
curn++;
};
return (curn == curn_end) && (*curf == '\0');
}
int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
{
int i;
int rc = FAILURE;
// we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
{
if (c->messageHandlers[i].fp != NULL)
{
MessageData md;
NewMessageData(&md, topicName, message);
c->messageHandlers[i].fp(&md);
rc = SUCCESS;
}
}
}
if (rc == FAILURE && c->defaultMessageHandler != NULL)
{
MessageData md;
NewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md);
rc = SUCCESS;
}
return rc;
}
int keepalive(MQTTClient* c)
{
int rc = FAILURE;
if (c->keepAliveInterval == 0)
{
rc = SUCCESS;
goto exit;
}
if (TimerIsExpired(&c->ping_timer))
{
if (!c->ping_outstanding)
{
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
c->ping_outstanding = 1;
}
}
exit:
return rc;
}
int cycle(MQTTClient* c, Timer* timer)
{
// read the socket, see what work is due
unsigned short packet_type = readPacket(c, timer);
int len = 0, rc = SUCCESS;
if (packet_type == (unsigned short)BUFFER_OVERFLOW || packet_type == (unsigned short)FAILURE) {
rc = FAILURE;
goto exit;
}
mqtt_printf(MQTT_DEBUG, "Read packet type: %d", packet_type);
switch (packet_type)
{
case CONNACK:
case PUBACK:
case SUBACK:
break;
case PUBLISH:
{
MQTTString topicName;
MQTTMessage msg;
int intQoS;
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
goto exit;
msg.qos = (enum QoS)intQoS;
deliverMessage(c, &topicName, &msg);
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
if (len <= 0)
rc = FAILURE;
else {
#if 1
sendPacket(c, len, timer);
#else
// it's odd that ACK PUB also need success
rc = sendPacket(c, len, timer);
#endif
}
if (rc == FAILURE)
goto exit; // there was a problem
}
break;
}
case PUBREC:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
break;
}
case PUBREL:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBCOMP, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
break;
}
case PUBCOMP:
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
}
exit:
keepalive(c);
if (rc == SUCCESS)
rc = packet_type;
return rc;
}
int MQTTYield(MQTTClient* c, int timeout_ms)
{
int rc = SUCCESS;
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, timeout_ms);
do
{
if (cycle(c, &timer) == FAILURE)
{
rc = FAILURE;
break;
}
} while (!TimerIsExpired(&timer));
return rc;
}
int waitfor(MQTTClient* c, int packet_type, Timer* timer)
{
int rc = FAILURE;
do
{
if (TimerIsExpired(timer))
break; // we timed out
}
while ((rc = cycle(c, timer)) != packet_type);
return rc;
}
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
{
Timer connect_timer;
int rc = FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
int len = 0;
if (c->isconnected) /* don't send connect packet again if we are already connected */
goto exit;
TimerInit(&connect_timer);
TimerCountdownMS(&connect_timer, c->command_timeout_ms);
if (options == 0)
options = &default_options; /* set default options if none were supplied */
c->keepAliveInterval = options->keepAliveInterval;
TimerCountdown(&c->ping_timer, c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
goto exit; // there was a problem
#if defined(WAIT_FOR_ACK)
// this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
{
unsigned char connack_rc = 255;
unsigned char sessionPresent = 0;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1)
rc = connack_rc;
else
rc = FAILURE;
}
else{
mqtt_printf(MQTT_DEBUG, "Not received CONNACK");
rc = FAILURE;
}
#endif
exit:
if (rc == SUCCESS)
c->isconnected = 1;
return rc;
}
int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
int rc = FAILURE;
Timer timer;
int len = 0;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
#if defined(WAIT_FOR_ACK)
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{
int count = 0, grantedQoS = -1;
unsigned short mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80
if (rc != 0x80)
{
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == topicFilter)
{
rc = 0;
goto exit; //already subscribed
}
}
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == 0)
{
c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = messageHandler;
rc = 0;
break;
}
}
}
}
else
rc = FAILURE;
#endif
exit:
return rc;
}
int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
int len = 0;
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
#if defined(WAIT_FOR_ACK)
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
{
unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
rc = 0;
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == topicFilter)
{
c->messageHandlers[i].topicFilter = 0;
c->messageHandlers[i].fp = NULL;
}
}
}
else
rc = FAILURE;
#endif
exit:
return rc;
}
int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicName;
int len = 0;
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if (message->qos == QOS1 || message->qos == QOS2)
message->id = getNextPacketId(c);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
#if defined(WAIT_FOR_ACK)
if (message->qos == QOS1)
{
if (waitfor(c, PUBACK, &timer) == PUBACK)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else{
rc = FAILURE;
mqtt_printf(MQTT_DEBUG, "Not received PUBACK");
}
}
else if (message->qos == QOS2)
{
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else{
rc = FAILURE;
mqtt_printf(MQTT_DEBUG, "Not received PUBCOMP");
}
}
#endif
exit:
return rc;
}
int MQTTDisconnect(MQTTClient* c)
{
int rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete
int len = 0;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_disconnect(c->buf, c->buf_size);
if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet
c->isconnected = 0;
return rc;
}
#if defined(MQTT_TASK)
void MQTTSetStatus(MQTTClient* c, int mqttstatus)
{
c->mqttstatus = mqttstatus;
mqtt_printf(MQTT_INFO, "Set mqtt status to %s", mqtt_status_str[mqttstatus]);
}
int MQTTDataHandle(MQTTClient* c, fd_set *readfd, MQTTPacket_connectData *connectData, messageHandler messageHandler, char* address, char* topic)
{
short packet_type = 0;
int rc = 0;
int mqttstatus = c->mqttstatus;
int mqtt_rxevent = 0;
int mqtt_fd = c->ipstack->my_socket;
mqtt_rxevent = (mqtt_fd >= 0) ? FD_ISSET( mqtt_fd, readfd) : 0;
if(mqttstatus == MQTT_START) {
mqtt_printf(MQTT_INFO, "MQTT start");
if(c->isconnected){
c->isconnected = 0;
}
mqtt_printf(MQTT_INFO, "Connect Network \"%s\"", address);
if((rc = NetworkConnect(c->ipstack, address, 1883)) != 0){
mqtt_printf(MQTT_INFO, "Return code from network connect is %d\n", rc);
goto exit;
}
mqtt_printf(MQTT_INFO, "\"%s\" Connected", address);
mqtt_printf(MQTT_INFO, "Start MQTT connection");
TimerInit(&c->cmd_timer);
TimerCountdownMS(&c->cmd_timer, c->command_timeout_ms);
if ((rc = MQTTConnect(c, connectData)) != 0){
mqtt_printf(MQTT_INFO, "Return code from MQTT connect is %d\n", rc);
goto exit;
}
MQTTSetStatus(c, MQTT_CONNECT);
goto exit;
}
if(mqtt_rxevent){
c->ipstack->m2m_rxevent = 0;
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
packet_type = readPacket(c, &timer);
if(packet_type > 0 && packet_type < 15)
mqtt_printf(MQTT_DEBUG, "Read packet type is %s", msg_types_str[packet_type]);
else{
mqtt_printf(MQTT_DEBUG, "Read packet type is %d", packet_type);
MQTTSetStatus(c, MQTT_START);
c->ipstack->disconnect(c->ipstack);
rc = FAILURE;
goto exit;
}
}
switch(mqttstatus){
case MQTT_CONNECT:
if (packet_type == CONNACK)
{
unsigned char connack_rc = 255;
unsigned char sessionPresent = 0;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1){
rc = connack_rc;
mqtt_printf(MQTT_INFO, "MQTT Connected");
TimerInit(&c->cmd_timer);
TimerCountdownMS(&c->cmd_timer, c->command_timeout_ms);
if ((rc = MQTTSubscribe(c, topic, QOS2, messageHandler)) != 0){
mqtt_printf(MQTT_INFO, "Return code from MQTT subscribe is %d\n", rc);
}else{
mqtt_printf(MQTT_INFO, "Subscribe to Topic: %s", topic);
MQTTSetStatus(c, MQTT_SUBTOPIC);
}
}else{
mqtt_printf(MQTT_DEBUG, "Deserialize CONNACK failed");
rc = FAILURE;
}
}else if(TimerIsExpired(&c->cmd_timer)){
mqtt_printf(MQTT_DEBUG, "Not received CONNACK");
rc = FAILURE;
}
if(rc == FAILURE){
MQTTSetStatus(c, MQTT_START);
}
break;
case MQTT_SUBTOPIC:
if(packet_type == SUBACK){
int count = 0, grantedQoS = -1;
unsigned short mypacketid;
int isSubscribed = 0;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1){
rc = grantedQoS; // 0, 1, 2 or 0x80
mqtt_printf(MQTT_DEBUG, "grantedQoS: %d", grantedQoS);
}
if (rc != 0x80)
{
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == topic)
{
isSubscribed = 1;
break;
}
}
if(!isSubscribed)
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == 0)
{
c->messageHandlers[i].topicFilter = topic;
c->messageHandlers[i].fp = messageHandler;
break;
}
}
rc = 0;
MQTTSetStatus(c, MQTT_RUNNING);
}
}else if(TimerIsExpired(&c->cmd_timer)){
mqtt_printf(MQTT_DEBUG, "Not received SUBACK");
rc = FAILURE;
}
if(rc == FAILURE){
MQTTSetStatus(c, MQTT_START);
}
break;
case MQTT_RUNNING:
if(packet_type>0){
int len = 0;
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 10000);
switch(packet_type){
case CONNACK:
break;
case PUBACK:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
break;
}
case SUBACK:
break;
case UNSUBACK:
break;
case PUBLISH:
{
MQTTString topicName;
MQTTMessage msg;
int intQoS;
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
{
rc = FAILURE;
mqtt_printf(MQTT_DEBUG, "Deserialize PUBLISH failed");
goto exit;
}
msg.qos = (enum QoS)intQoS;
deliverMessage(c, &topicName, &msg);
if (msg.qos != QOS0)
{
if (msg.qos == QOS1){
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
mqtt_printf(MQTT_DEBUG, "send PUBACK");
}else if (msg.qos == QOS2){
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
mqtt_printf(MQTT_DEBUG, "send PUBREC");
}else{
mqtt_printf(MQTT_DEBUG, "invalid QoS: %d", msg.qos);
}
if (len <= 0){
rc = FAILURE;
mqtt_printf(MQTT_DEBUG, "Serialize_ack failed");
goto exit;
}else{
if((rc = sendPacket(c, len, &timer)) == FAILURE){
MQTTSetStatus(c, MQTT_START);
goto exit; // there was a problem
}
}
}
break;
}
case PUBREC:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1){
mqtt_printf(MQTT_DEBUG, "Deserialize PUBREC failed");
rc = FAILURE;
}else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0){
mqtt_printf(MQTT_DEBUG, "Serialize PUBREL failed");
rc = FAILURE;
}else if ((rc = sendPacket(c, len, &timer)) != SUCCESS){ // send the PUBREL packet
rc = FAILURE; // there was a problem
MQTTSetStatus(c, MQTT_START);
}
break;
}
case PUBREL:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1){
mqtt_printf(MQTT_DEBUG, "Deserialize PUBREL failed");
rc = FAILURE;
}else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBCOMP, 0, mypacketid)) <= 0){
mqtt_printf(MQTT_DEBUG, "Serialize PUBCOMP failed");
rc = FAILURE;
}else if ((rc = sendPacket(c, len, &timer)) != SUCCESS){ // send the PUBCOMP packet
rc = FAILURE; // there was a problem
MQTTSetStatus(c, MQTT_START);
}
break;
}
case PUBCOMP:
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
}
}
keepalive(c);
break;
default:
break;
}
exit:
return rc;
}
#endif

View file

@ -0,0 +1,202 @@
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - documentation and platform specific header
*******************************************************************************/
#if !defined(__MQTT_CLIENT_C_)
#define __MQTT_CLIENT_C_
#if defined(__cplusplus)
extern "C" {
#endif
#if defined(WIN32_DLL) || defined(WIN64_DLL)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO)
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#else
#define DLLImport
#define DLLExport
#endif
#include "../MQTTPacket/MQTTPacket.h"
#include "stdio.h"
#include "MQTTFreertos.h"
#define MQTT_TASK
#if !defined(MQTT_TASK)
#define WAIT_FOR_ACK
#endif
#define MQTT_SENDBUF_LEN 1024
#define MQTT_READBUF_LEN 1024
enum mqtt_status{
MQTT_START = 0,
MQTT_CONNECT = 1,
MQTT_SUBTOPIC = 2,
MQTT_RUNNING = 3
};
#if defined(MQTTCLIENT_PLATFORM_HEADER)
/* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value
* into a string constant suitable for use with include.
*/
#define xstr(s) str(s)
#define str(s) #s
#include xstr(MQTTCLIENT_PLATFORM_HEADER)
#endif
#define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */
#if !defined(MAX_MESSAGE_HANDLERS)
#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */
#endif
enum QoS { QOS0, QOS1, QOS2 };
/* all failure return codes must be negative */
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1 };//, SUCCESS = 0
/* The Platform specific header must define the Network and Timer structures and functions
* which operate on them.
*
typedef struct Network
{
int (*mqttread)(Network*, unsigned char* read_buffer, int, int);
int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int);
} Network;*/
/* The Timer structure must be defined in the platform specific header,
* and have the following functions to operate on it. */
extern void TimerInit(Timer*);
extern char TimerIsExpired(Timer*);
extern void TimerCountdownMS(Timer*, unsigned int);
extern void TimerCountdown(Timer*, unsigned int);
extern int TimerLeftMS(Timer*);
typedef struct MQTTMessage
{
enum QoS qos;
unsigned char retained;
unsigned char dup;
unsigned short id;
void *payload;
size_t payloadlen;
} MQTTMessage;
typedef struct MessageData
{
MQTTMessage* message;
MQTTString* topicName;
} MessageData;
typedef void (*messageHandler)(MessageData*);
typedef struct MQTTClient
{
unsigned int next_packetid,
command_timeout_ms;
size_t buf_size,
readbuf_size;
unsigned char *buf,
*readbuf;
unsigned int keepAliveInterval;
char ping_outstanding;
int isconnected;
struct MessageHandlers
{
const char* topicFilter;
void (*fp) (MessageData*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */
void (*defaultMessageHandler) (MessageData*);
Network* ipstack;
Timer ping_timer;
Timer cmd_timer;
int mqttstatus;
} MQTTClient;
#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
/**
* Create an MQTT client object
* @param client
* @param network
* @param command_timeout_ms
* @param
*/
DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size);
/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this
* @param options - connect options
* @return success code
*/
DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options);
/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
* @param client - the client object to use
* @param topic - the topic to publish to
* @param message - the message to send
* @return success code
*/
DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);
/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use
* @param topicFilter - the topic filter to subscribe to
* @param message - the message to send
* @return success code
*/
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning.
* @param client - the client object to use
* @param topicFilter - the topic filter to unsubscribe from
* @return success code
*/
DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter);
/** MQTT Disconnect - send an MQTT disconnect packet and close the connection
* @param client - the client object to use
* @return success code
*/
DLLExport int MQTTDisconnect(MQTTClient* client);
/** MQTT Yield - MQTT background
* @param client - the client object to use
* @param time - the time, in milliseconds, to yield for
* @return success code
*/
DLLExport int MQTTYield(MQTTClient* client, int time);
#if defined(MQTT_TASK)
void MQTTSetStatus(MQTTClient* c, int mqttstatus);
int MQTTDataHandle(MQTTClient* c, fd_set *readfd, MQTTPacket_connectData *connectData, messageHandler messageHandler, char* address, char* topic);
#endif
#if defined(__cplusplus)
}
#endif
#endif

View file

@ -0,0 +1,881 @@
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
* Ian Craggs - convert to FreeRTOS
*******************************************************************************/
#include "MQTTFreertos.h"
#include "netdb.h"
#define LWIP_IPV6 0
#if LWIP_IPV6
#define inet_ntop(af,src,dst,size) \
(((af) == AF_INET6) ? ip6addr_ntoa_r((src),(dst),(size)) \
: (((af) == AF_INET) ? ipaddr_ntoa_r((src),(dst),(size)) : NULL))
#define inet_pton(af,src,dst) \
(((af) == AF_INET6) ? inet6_aton((src),(dst)) \
: (((af) == AF_INET) ? inet_aton((src),(dst)) : 0))
#else /* LWIP_IPV6 */
#define inet_ntop(af,src,dst,size) \
(((af) == AF_INET) ? ipaddr_ntoa_r((src),(dst),(size)) : NULL)
#define inet_pton(af,src,dst) \
(((af) == AF_INET) ? inet_aton((src),(dst)) : 0)
#endif /* LWIP_IPV6 */
int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
{
int rc = 0;
uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5);
UBaseType_t uxTaskPriority = uxTaskPriorityGet(NULL); /* set the priority as the same as the calling task*/
rc = xTaskCreate(fn, /* The function that implements the task. */
"MQTTTask", /* Just a text name for the task to aid debugging. */
usTaskStackSize, /* The stack size is defined in FreeRTOSIPConfig.h. */
arg, /* The task parameter, not used in this case. */
uxTaskPriority, /* The priority assigned to the task is defined in FreeRTOSConfig.h. */
&thread->task); /* The task handle is not used. */
return rc;
}
void MutexInit(Mutex* mutex)
{
mutex->sem = xSemaphoreCreateMutex();
}
int MutexLock(Mutex* mutex)
{
return xSemaphoreTake(mutex->sem, portMAX_DELAY);
}
int MutexUnlock(Mutex* mutex)
{
return xSemaphoreGive(mutex->sem);
}
void TimerCountdownMS(Timer* timer, unsigned int timeout_ms)
{
timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */
}
void TimerCountdown(Timer* timer, unsigned int timeout)
{
TimerCountdownMS(timer, timeout * 1000);
}
int TimerLeftMS(Timer* timer)
{
xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */
return (timer->xTicksToWait * portTICK_PERIOD_MS);
}
char TimerIsExpired(Timer* timer)
{
return xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait) == pdTRUE;
}
void TimerInit(Timer* timer)
{
timer->xTicksToWait = 0;
memset(&timer->xTimeOut, '\0', sizeof(timer->xTimeOut));
}
#if CONFIG_USE_POLARSSL
int FreeRTOS_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
TimeOut_t xTimeOut;
int recvLen = 0;
int so_error;
socklen_t errlen = sizeof(so_error);
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do
{
int rc = 0;
#if defined(LWIP_SO_SNDRCVTIMEO_NONSTANDARD) && (LWIP_SO_SNDRCVTIMEO_NONSTANDARD == 0)
// timeout format is changed in lwip 1.5.0
struct timeval timeout;
timeout.tv_sec = xTicksToWait / 1000;
timeout.tv_usec = ( xTicksToWait % 1000 ) * 1000;
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(struct timeval));
#else
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, &xTicksToWait, sizeof(xTicksToWait));
#endif
#if (MQTT_OVER_SSL)
if (n->use_ssl) {
rc = ssl_read(n->ssl, buffer + recvLen, len - recvLen);
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &errlen);
if (so_error && so_error != EAGAIN) {
n->disconnect(n);
}
} else
#endif
rc = recv(n->my_socket, buffer + recvLen, len - recvLen, 0);
if (rc > 0)
recvLen += rc;
else if (rc < 0)
{
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &errlen);
if (so_error != EAGAIN) {
n->disconnect(n);
}
recvLen = rc;
break;
}
} while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
return recvLen;
}
int FreeRTOS_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
TimeOut_t xTimeOut;
int sentLen = 0;
int so_error;
socklen_t errlen = sizeof(so_error);
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do
{
int rc = 0;
#if defined(LWIP_SO_SNDRCVTIMEO_NONSTANDARD) && (LWIP_SO_SNDRCVTIMEO_NONSTANDARD == 0)
// timeout format is changed in lwip 1.5.0
struct timeval timeout;
timeout.tv_sec = xTicksToWait / 1000;
timeout.tv_usec = ( xTicksToWait % 1000 ) * 1000;
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(struct timeval));
#else
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, &xTicksToWait, sizeof(xTicksToWait));
#endif
#if (MQTT_OVER_SSL)
if (n->use_ssl) {
rc = ssl_write(n->ssl, buffer + sentLen, len - sentLen);
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &errlen);
if (so_error && so_error != EAGAIN) {
n->disconnect(n);
}
} else
#endif
rc = send(n->my_socket, buffer + sentLen, len - sentLen, 0);
if (rc > 0)
sentLen += rc;
else if (rc < 0)
{
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &len);
if (so_error != EAGAIN) {
n->disconnect(n);
}
sentLen = rc;
break;
}
} while (sentLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
return sentLen;
}
void FreeRTOS_disconnect(Network* n)
{
shutdown(n->my_socket, SHUT_RDWR);
close(n->my_socket);
n->my_socket = -1;
#if (MQTT_OVER_SSL)
if (n->use_ssl) {
ssl_free(n->ssl);
free(n->ssl);
n->ssl = NULL;
}
#endif
}
void NetworkInit(Network* n)
{
n->my_socket = -1;
n->mqttread = FreeRTOS_read;
n->mqttwrite = FreeRTOS_write;
n->disconnect = FreeRTOS_disconnect;
#if (MQTT_OVER_SSL)
n->use_ssl = 0;
n->ssl = NULL;
n->rootCA = NULL;
n->clientCA = NULL;
n->private_key = NULL;
#endif
}
#if (MQTT_OVER_SSL)
static int mqtt_tls_verify( void *data, x509_crt *crt, int depth, int *flags )
{
char buf[1024];
mqtt_printf(MQTT_DEBUG, "\nVerify requested for (Depth %d):\n", depth );
x509_crt_info( buf, sizeof( buf ) - 1, "", crt );
mqtt_printf(MQTT_DEBUG, "%s", buf );
if( ( (*flags) & BADCERT_EXPIRED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! server certificate has expired\n" );
if( ( (*flags) & BADCERT_REVOKED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! server certificate has been revoked\n" );
if( ( (*flags) & BADCERT_CN_MISMATCH ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! CN mismatch\n" );
if( ( (*flags) & BADCERT_NOT_TRUSTED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! self-signed or not signed by a trusted CA\n" );
if( ( (*flags) & BADCRL_NOT_TRUSTED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! CRL not trusted\n" );
if( ( (*flags) & BADCRL_EXPIRED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! CRL expired\n" );
if( ( (*flags) & BADCERT_OTHER ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! other (unknown) flag\n" );
if ( ( *flags ) == 0 )
mqtt_printf(MQTT_DEBUG, " This certificate has no flags\n" );
return( 0 );
}
static int my_random(void *p_rng, unsigned char *output, size_t output_len)
{
rtw_get_random_bytes(output, output_len);
return 0;
}
#endif // #if (MQTT_OVER_SSL)
int NetworkConnect(Network* n, char* addr, int port)
{
struct sockaddr_in sAddr;
int retVal = -1;
struct hostent *hptr;
char **pptr;
char str[32];
int keepalive_enable = 1;
int keep_idle = 30;
if(n->my_socket >= 0){
n->disconnect(n);
}
if ((hptr = gethostbyname(addr)) == 0)
{
mqtt_printf(MQTT_DEBUG, "gethostbyname failed!");
goto exit;
}
pptr = hptr->h_addr_list;
for(; *pptr!=NULL; pptr++)
{
inet_ntop(hptr->h_addrtype, (const ip_addr_t *)*pptr, str, sizeof(str));
}
inet_ntop(hptr->h_addrtype, (const ip_addr_t *)hptr->h_addr, str, sizeof(str));
sAddr.sin_family = AF_INET;
sAddr.sin_port = htons(port);
sAddr.sin_addr.s_addr = inet_addr(str);
mqtt_printf(MQTT_DEBUG, "addr = %s", str);
if ((n->my_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
goto exit;
}
setsockopt( n->my_socket, SOL_SOCKET, SO_KEEPALIVE,
(const char *) &keepalive_enable, sizeof( keepalive_enable ) );
setsockopt( n->my_socket, IPPROTO_TCP, TCP_KEEPIDLE,
(const char *) &keep_idle, sizeof( keep_idle ) );
if ((retVal = connect(n->my_socket, (struct sockaddr*)&sAddr, sizeof(sAddr))) < 0)
{
close(n->my_socket);
mqtt_printf(MQTT_DEBUG, "Connect failed!!");
goto exit;
}
#if (MQTT_OVER_SSL)
x509_crt *root_crt;
x509_crt *client_crt;
pk_context *client_rsa;
root_crt = NULL;
client_crt = NULL;
client_rsa = NULL;
if ( n->use_ssl != 0 ) {
memory_set_own(pvPortMalloc, vPortFree);
n->ssl = (ssl_context *) malloc( sizeof(ssl_context) );
if ( n->ssl == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc ssl failed!");
goto err;
}
memset(n->ssl, 0, sizeof(ssl_context));
if ( ssl_init(n->ssl) != 0 ) {
mqtt_printf(MQTT_DEBUG, "init ssl failed!");
goto err;
}
ssl_set_endpoint(n->ssl, SSL_IS_CLIENT);
if (n->rootCA != NULL) {
root_crt = (x509_crt *) polarssl_malloc( sizeof(x509_crt) );
if ( root_crt == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc root_crt failed!");
goto err;
}
memset(root_crt, 0, sizeof(x509_crt));
ssl_set_authmode( n->ssl, SSL_VERIFY_REQUIRED );
if (x509_crt_parse( root_crt, n->rootCA, strlen(n->rootCA) ) != 0) {
mqtt_printf(MQTT_DEBUG, "parse root_crt failed!");
goto err;
}
ssl_set_ca_chain( n->ssl, root_crt, NULL, NULL );
ssl_set_verify( n->ssl, mqtt_tls_verify, NULL );
mqtt_printf(MQTT_DEBUG, "root_crt parse done");
} else {
ssl_set_authmode(n->ssl, SSL_VERIFY_NONE);
}
if (n->clientCA != NULL && n->private_key != NULL) {
client_crt = (x509_crt *) polarssl_malloc( sizeof(x509_crt) );
if ( client_crt == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc client_crt failed!");
goto err;
}
memset(client_crt, 0, sizeof(x509_crt));
x509_crt_init(client_crt);
client_rsa = (pk_context *) polarssl_malloc( sizeof(pk_context) );
if ( client_rsa == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc client_rsa failed!");
goto err;
}
memset(client_rsa, 0, sizeof(pk_context));
pk_init(client_rsa);
if ( x509_crt_parse(client_crt, n->clientCA, strlen(n->clientCA)) != 0 ) {
mqtt_printf(MQTT_DEBUG, "parse client_crt failed!");
goto err;
}
if ( pk_parse_key(client_rsa, n->private_key, strlen(n->private_key), NULL, 0) != 0 ) {
mqtt_printf(MQTT_DEBUG, "parse client_rsa failed!");
goto err;
}
ssl_set_own_cert(n->ssl, client_crt, client_rsa);
mqtt_printf(MQTT_DEBUG, "client_crt parse done");
}
ssl_set_rng(n->ssl, my_random, NULL);
ssl_set_bio(n->ssl, net_recv, &n->my_socket, net_send, &n->my_socket);
retVal = ssl_handshake(n->ssl);
if (retVal < 0) {
mqtt_printf(MQTT_DEBUG, "ssl handshake failed err:-0x%04X", -retVal);
goto err;
} else {
mqtt_printf(MQTT_DEBUG, "ssl handshake success");
}
}
if (client_rsa) {
pk_free(client_rsa);
polarssl_free(client_rsa);
}
if (client_crt) {
x509_crt_free(client_crt);
polarssl_free(client_crt);
}
if (root_crt) {
x509_crt_free(root_crt);
polarssl_free(root_crt);
}
goto exit;
err:
if (client_rsa) {
pk_free(client_rsa);
polarssl_free(client_rsa);
}
if (client_crt) {
x509_crt_free(client_crt);
polarssl_free(client_crt);
}
if (root_crt) {
x509_crt_free(root_crt);
polarssl_free(root_crt);
}
net_close(n->my_socket);
ssl_free(n->ssl);
free(n->ssl);
retVal = -1;
#endif // #if (MQTT_OVER_SSL)
exit:
return retVal;
}
#elif CONFIG_USE_MBEDTLS /* CONFIG_USE_POLARSSL */
int FreeRTOS_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
TimeOut_t xTimeOut;
int recvLen = 0;
int so_error;
socklen_t errlen = sizeof(so_error);
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do
{
int rc = 0;
#if defined(LWIP_SO_SNDRCVTIMEO_NONSTANDARD) && (LWIP_SO_SNDRCVTIMEO_NONSTANDARD == 0)
// timeout format is changed in lwip 1.5.0
struct timeval timeout;
timeout.tv_sec = xTicksToWait / 1000;
timeout.tv_usec = ( xTicksToWait % 1000 ) * 1000;
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(struct timeval));
#else
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, &xTicksToWait, sizeof(xTicksToWait));
#endif
#if (MQTT_OVER_SSL)
if (n->use_ssl) {
rc = mbedtls_ssl_read(n->ssl, buffer + recvLen, len - recvLen);
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &errlen);
if (so_error && so_error != EAGAIN) {
n->disconnect(n);
}
} else
#endif
rc = recv(n->my_socket, buffer + recvLen, len - recvLen, 0);
if (rc > 0)
recvLen += rc;
else if (rc < 0)
{
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &errlen);
if (so_error != EAGAIN) {
n->disconnect(n);
}
recvLen = rc;
break;
}
} while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
return recvLen;
}
int FreeRTOS_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
TimeOut_t xTimeOut;
int sentLen = 0;
int so_error;
socklen_t errlen = sizeof(so_error);
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do
{
int rc = 0;
#if defined(LWIP_SO_SNDRCVTIMEO_NONSTANDARD) && (LWIP_SO_SNDRCVTIMEO_NONSTANDARD == 0)
// timeout format is changed in lwip 1.5.0
struct timeval timeout;
timeout.tv_sec = xTicksToWait / 1000;
timeout.tv_usec = ( xTicksToWait % 1000 ) * 1000;
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(struct timeval));
#else
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, &xTicksToWait, sizeof(xTicksToWait));
#endif
#if (MQTT_OVER_SSL)
if (n->use_ssl) {
rc = mbedtls_ssl_write(n->ssl, buffer + sentLen, len - sentLen);
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &errlen);
if (so_error && so_error != EAGAIN) {
n->disconnect(n);
}
} else
#endif
rc = send(n->my_socket, buffer + sentLen, len - sentLen, 0);
if (rc > 0)
sentLen += rc;
else if (rc < 0)
{
getsockopt(n->my_socket, SOL_SOCKET, SO_ERROR, &so_error, &len);
if (so_error != EAGAIN) {
n->disconnect(n);
}
sentLen = rc;
break;
}
} while (sentLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
return sentLen;
}
void FreeRTOS_disconnect(Network* n)
{
shutdown(n->my_socket, SHUT_RDWR);
close(n->my_socket);
n->my_socket = -1;
#if (MQTT_OVER_SSL)
if (n->use_ssl) {
mbedtls_ssl_free(n->ssl);
mbedtls_ssl_config_free(n->conf);
free(n->ssl);
free(n->conf);
n->ssl = NULL;
n->conf = NULL;
}
#endif
}
void NetworkInit(Network* n)
{
n->my_socket = -1;
n->mqttread = FreeRTOS_read;
n->mqttwrite = FreeRTOS_write;
n->disconnect = FreeRTOS_disconnect;
#if (MQTT_OVER_SSL)
n->use_ssl = 0;
n->ssl = NULL;
n->conf = NULL;
n->rootCA = NULL;
n->clientCA = NULL;
n->private_key = NULL;
#endif
}
#if (MQTT_OVER_SSL)
static int mqtt_tls_verify( void *data, mbedtls_x509_crt *crt, int depth, int *flags )
{
char buf[1024];
mqtt_printf(MQTT_DEBUG, "\nVerify requested for (Depth %d):\n", depth );
mbedtls_x509_crt_info( buf, sizeof( buf ) - 1, "", crt );
mqtt_printf(MQTT_DEBUG, "%s", buf );
if( ( (*flags) & MBEDTLS_X509_BADCERT_EXPIRED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! server certificate has expired\n" );
if( ( (*flags) & MBEDTLS_X509_BADCERT_REVOKED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! server certificate has been revoked\n" );
if( ( (*flags) & MBEDTLS_X509_BADCERT_CN_MISMATCH ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! CN mismatch\n" );
if( ( (*flags) & MBEDTLS_X509_BADCERT_NOT_TRUSTED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! self-signed or not signed by a trusted CA\n" );
if( ( (*flags) & MBEDTLS_X509_BADCRL_NOT_TRUSTED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! CRL not trusted\n" );
if( ( (*flags) & MBEDTLS_X509_BADCRL_EXPIRED ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! CRL expired\n" );
if( ( (*flags) & MBEDTLS_X509_BADCERT_OTHER ) != 0 )
mqtt_printf(MQTT_DEBUG, " ! other (unknown) flag\n" );
if ( ( *flags ) == 0 )
mqtt_printf(MQTT_DEBUG, " This certificate has no flags\n" );
return( 0 );
}
static void* my_calloc(size_t nelements, size_t elementSize)
{
size_t size;
void *ptr = NULL;
size = nelements * elementSize;
ptr = pvPortMalloc(size);
if(ptr)
memset(ptr, 0, size);
return ptr;
}
static int my_random(void *p_rng, unsigned char *output, size_t output_len)
{
rtw_get_random_bytes(output, output_len);
return 0;
}
#endif // #if (MQTT_OVER_SSL)
int NetworkConnect(Network* n, char* addr, int port)
{
struct sockaddr_in sAddr;
int retVal = -1;
struct hostent *hptr;
char **pptr;
char str[32];
int keepalive_enable = 1;
int keep_idle = 30;
if(n->my_socket >= 0){
n->disconnect(n);
}
if ((hptr = gethostbyname(addr)) == 0)
{
mqtt_printf(MQTT_DEBUG, "gethostbyname failed!");
goto exit;
}
pptr = hptr->h_addr_list;
for(; *pptr!=NULL; pptr++)
{
inet_ntop(hptr->h_addrtype, (const ip_addr_t *)*pptr, str, sizeof(str));
}
inet_ntop(hptr->h_addrtype, (const ip_addr_t *)hptr->h_addr, str, sizeof(str));
sAddr.sin_family = AF_INET;
sAddr.sin_port = htons(port);
sAddr.sin_addr.s_addr = inet_addr(str);
mqtt_printf(MQTT_DEBUG, "addr = %s", str);
if ((n->my_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
goto exit;
}
setsockopt( n->my_socket, SOL_SOCKET, SO_KEEPALIVE,
(const char *) &keepalive_enable, sizeof( keepalive_enable ) );
setsockopt( n->my_socket, IPPROTO_TCP, TCP_KEEPIDLE,
(const char *) &keep_idle, sizeof( keep_idle ) );
if ((retVal = connect(n->my_socket, (struct sockaddr*)&sAddr, sizeof(sAddr))) < 0)
{
close(n->my_socket);
mqtt_printf(MQTT_DEBUG, "Connect failed!!");
goto exit;
}
#if (MQTT_OVER_SSL)
mbedtls_x509_crt *root_crt;
mbedtls_x509_crt *client_crt;
mbedtls_pk_context *client_rsa;
root_crt = NULL;
client_crt = NULL;
client_rsa = NULL;
if ( n->use_ssl != 0 ) {
mbedtls_platform_set_calloc_free(my_calloc, vPortFree);
n->ssl = (mbedtls_ssl_context *) malloc( sizeof(mbedtls_ssl_context) );
n->conf = (mbedtls_ssl_config *) malloc( sizeof(mbedtls_ssl_config) );
if (( n->ssl == NULL )||( n->conf == NULL )) {
mqtt_printf(MQTT_DEBUG, "malloc ssl failed!");
goto err;
}
mbedtls_ssl_init(n->ssl);
mbedtls_ssl_config_init(n->conf);
if (n->rootCA != NULL) {
root_crt = (mbedtls_x509_crt *) mbedtls_calloc( sizeof(mbedtls_x509_crt), 1);
if ( root_crt == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc root_crt failed!");
goto err;
}
mbedtls_x509_crt_init(root_crt);
if (mbedtls_x509_crt_parse( root_crt, n->rootCA, strlen(n->rootCA)+1 ) != 0) {
mqtt_printf(MQTT_DEBUG, "parse root_crt failed!");
goto err;
}
mbedtls_ssl_conf_ca_chain( n->conf, root_crt, NULL);
mbedtls_ssl_conf_authmode(n->conf, MBEDTLS_SSL_VERIFY_REQUIRED);
mbedtls_ssl_conf_verify( n->conf, mqtt_tls_verify, NULL );
mqtt_printf(MQTT_DEBUG, "root_crt parse done");
} else {
mbedtls_ssl_conf_authmode(n->conf, MBEDTLS_SSL_VERIFY_NONE);
}
if (n->clientCA != NULL && n->private_key != NULL) {
client_crt = (mbedtls_x509_crt *) mbedtls_calloc( sizeof(mbedtls_x509_crt), 1);
if ( client_crt == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc client_crt failed!");
goto err;
}
mbedtls_x509_crt_init(client_crt);
client_rsa = (mbedtls_pk_context *) mbedtls_calloc( sizeof(mbedtls_pk_context), 1);
if ( client_rsa == NULL ) {
mqtt_printf(MQTT_DEBUG, "malloc client_rsa failed!");
goto err;
}
mbedtls_pk_init(client_rsa);
if ( mbedtls_x509_crt_parse(client_crt, n->clientCA, strlen(n->clientCA)+1) != 0 ) {
mqtt_printf(MQTT_DEBUG, "parse client_crt failed!");
goto err;
}
if ( mbedtls_pk_parse_key(client_rsa, n->private_key, strlen(n->private_key)+1, NULL, 0) != 0 ) {
mqtt_printf(MQTT_DEBUG, "parse client_rsa failed!");
goto err;
}
}
mbedtls_ssl_conf_own_cert(n->conf, client_crt, client_rsa);
mbedtls_ssl_set_bio(n->ssl, &n->my_socket, mbedtls_net_send, mbedtls_net_recv, NULL);
mbedtls_ssl_conf_rng(n->conf, my_random, NULL);
if((mbedtls_ssl_config_defaults(n->conf,
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
mqtt_printf(MQTT_DEBUG, "ssl config defaults failed!");
goto err;
}
if((mbedtls_ssl_setup(n->ssl, n->conf)) != 0) {
mqtt_printf(MQTT_DEBUG,"mbedtls_ssl_setup failed!");
goto err;
}
retVal = mbedtls_ssl_handshake(n->ssl);
if (retVal < 0) {
mqtt_printf(MQTT_DEBUG, "ssl handshake failed err:-0x%04X", -retVal);
goto err;
} else {
mqtt_printf(MQTT_DEBUG, "ssl handshake success");
}
}
if (client_rsa) {
mbedtls_pk_free(client_rsa);
mbedtls_free(client_rsa);
}
if (client_crt) {
mbedtls_x509_crt_free(client_crt);
mbedtls_free(client_crt);
}
if (root_crt) {
mbedtls_x509_crt_free(root_crt);
mbedtls_free(root_crt);
}
goto exit;
err:
if (client_rsa) {
mbedtls_pk_free(client_rsa);
mbedtls_free(client_rsa);
}
if (client_crt) {
mbedtls_x509_crt_free(client_crt);
mbedtls_free(client_crt);
}
if (root_crt) {
mbedtls_x509_crt_free(root_crt);
mbedtls_free(root_crt);
}
mbedtls_net_free(&n->my_socket);
mbedtls_ssl_free(n->ssl);
mbedtls_ssl_config_free(n->conf);
free(n->ssl);
free(n->conf);
retVal = -1;
#endif // #if (MQTT_OVER_SSL)
exit:
return retVal;
}
#endif /* CONFIG_USE_POLARSSL */
#if 0
int NetworkConnectTLS(Network *n, char* addr, int port, SlSockSecureFiles_t* certificates, unsigned char sec_method, unsigned int cipher, char server_verify)
{
SlSockAddrIn_t sAddr;
int addrSize;
int retVal;
unsigned long ipAddress;
retVal = sl_NetAppDnsGetHostByName(addr, strlen(addr), &ipAddress, AF_INET);
if (retVal < 0) {
return -1;
}
sAddr.sin_family = AF_INET;
sAddr.sin_port = sl_Htons((unsigned short)port);
sAddr.sin_addr.s_addr = sl_Htonl(ipAddress);
addrSize = sizeof(SlSockAddrIn_t);
n->my_socket = sl_Socket(SL_AF_INET, SL_SOCK_STREAM, SL_SEC_SOCKET);
if (n->my_socket < 0) {
return -1;
}
SlSockSecureMethod method;
method.secureMethod = sec_method;
retVal = sl_SetSockOpt(n->my_socket, SL_SOL_SOCKET, SL_SO_SECMETHOD, &method, sizeof(method));
if (retVal < 0) {
return retVal;
}
SlSockSecureMask mask;
mask.secureMask = cipher;
retVal = sl_SetSockOpt(n->my_socket, SL_SOL_SOCKET, SL_SO_SECURE_MASK, &mask, sizeof(mask));
if (retVal < 0) {
return retVal;
}
if (certificates != NULL) {
retVal = sl_SetSockOpt(n->my_socket, SL_SOL_SOCKET, SL_SO_SECURE_FILES, certificates->secureFiles, sizeof(SlSockSecureFiles_t));
if (retVal < 0)
{
return retVal;
}
}
retVal = sl_Connect(n->my_socket, (SlSockAddr_t *)&sAddr, addrSize);
if (retVal < 0) {
if (server_verify || retVal != -453) {
sl_Close(n->my_socket);
return retVal;
}
}
SysTickIntRegister(SysTickIntHandler);
SysTickPeriodSet(80000);
SysTickEnable();
return retVal;
}
#endif

View file

@ -0,0 +1,122 @@
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTTFreeRTOS_H)
#define MQTTFreeRTOS_H
#include "FreeRTOS.h"
#include "semphr.h"
#include "task.h"
#include "lwip/sockets.h"
#include "osdep_service.h"
#define MQTT_OVER_SSL (1)
#if (MQTT_OVER_SSL)
#if CONFIG_USE_POLARSSL
#include "polarssl/config.h"
#include "polarssl/net.h"
#include "polarssl/ssl.h"
#include "polarssl/error.h"
#include "polarssl/memory.h"
#elif CONFIG_USE_MBEDTLS
#include "mbedtls/config.h"
#include "mbedtls/platform.h"
#include "mbedtls/net_sockets.h"
#include "mbedtls/ssl.h"
#include "mbedtls/error.h"
#include "mbedtls/debug.h"
#endif
#endif
enum {
MQTT_EXCESSIVE, MQTT_MSGDUMP, MQTT_DEBUG, MQTT_INFO, MQTT_ALWAYS, MQTT_WARNING, MQTT_ERROR
};
#define FreeRTOS_Select select
#define mqtt_printf(level, fmt, arg...) \
do {\
if (level >= MQTT_DEBUG) {\
{\
printf("\r\n[%d]mqtt:", rtw_get_current_time());\
printf(fmt, ##arg);\
printf("\n\r");\
} \
}\
}while(0)
typedef struct Timer
{
TickType_t xTicksToWait;
TimeOut_t xTimeOut;
} Timer;
typedef struct Network Network;
struct Network
{
int my_socket;
int (*mqttread) (Network*, unsigned char*, int, int);
int (*mqttwrite) (Network*, unsigned char*, int, int);
void (*disconnect) (Network*);
int m2m_rxevent;
#if (MQTT_OVER_SSL)
unsigned char use_ssl;
#if CONFIG_USE_POLARSSL
ssl_context *ssl;
#elif CONFIG_USE_MBEDTLS
mbedtls_ssl_context *ssl;
mbedtls_ssl_config *conf;
#endif
char *rootCA;
char *clientCA;
char *private_key;
#endif
};
void TimerInit(Timer*);
char TimerIsExpired(Timer*);
void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*);
typedef struct Mutex
{
SemaphoreHandle_t sem;
} Mutex;
void MutexInit(Mutex*);
int MutexLock(Mutex*);
int MutexUnlock(Mutex*);
typedef struct Thread
{
TaskHandle_t task;
} Thread;
int ThreadStart(Thread*, void (*fn)(void*), void* arg);
int FreeRTOS_read(Network*, unsigned char*, int, int);
int FreeRTOS_write(Network*, unsigned char*, int, int);
void FreeRTOS_disconnect(Network*);
void NetworkInit(Network*);
int NetworkConnect(Network*, char*, int);
/*int NetworkConnectTLS(Network*, char*, int, SlSockSecureFiles_t*, unsigned char, unsigned int, char);*/
#endif