diff --git a/extras/paho_mqtt_c/MQTTClient.c b/extras/paho_mqtt_c/MQTTClient.c index b964250..4df3d8e 100644 --- a/extras/paho_mqtt_c/MQTTClient.c +++ b/extras/paho_mqtt_c/MQTTClient.c @@ -35,7 +35,7 @@ int sendPacket(MQTTClient* c, int length, Timer* timer) while (sent < length && !expired(timer)) { - rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer)); + rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length - sent, left_ms(timer)); if (rc < 0) // there was an error writing the data break; sent += rc; @@ -70,7 +70,9 @@ int decodePacket(MQTTClient* c, int* value, int timeout) } rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout); if (rc != 1) - goto exit; + { + goto exit; + } *value += (i & 127) * multiplier; multiplier *= 128; } while ((i & 128) != 0); @@ -79,6 +81,7 @@ exit: } +// Return packet type. If no packet avilable, return FAILURE, or READ_ERROR if timeout int readPacket(MQTTClient* c, Timer* timer) { int rc = FAILURE; @@ -89,20 +92,19 @@ int readPacket(MQTTClient* c, Timer* timer) /* 1. read the header byte. This has the packet type in it */ if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, left_ms(timer)) != 1) goto exit; - len = 1; /* 2. read the remaining length. This is variable in itself */ decodePacket(c, &rem_len, left_ms(timer)); len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ - /* 3. read the rest of the buffer using a callback to supply the rest of the data */ if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, left_ms(timer)) != rem_len)) + { + rc = READ_ERROR; goto exit; - + } header.byte = c->readbuf[0]; rc = header.bits.type; exit: - //dmsg_printf("readPacket=%d\r\n", rc); return rc; } @@ -212,10 +214,10 @@ exit: } -int cycle(MQTTClient* c, Timer* timer) +int cycle(MQTTClient* c, Timer* timer) { // read the socket, see what work is due - unsigned short packet_type = readPacket(c, timer); + int packet_type = readPacket(c, timer); int len = 0, rc = SUCCESS; @@ -266,11 +268,17 @@ int cycle(MQTTClient* c, Timer* timer) case PUBCOMP: break; case PINGRESP: - { - c->ping_outstanding = 0; - c->fail_count = 0; - } + { + c->ping_outstanding = 0; + c->fail_count = 0; break; + } + case READ_ERROR: + { + c->isconnected = 0; // we simulate a disconnect if reading error + rc = DISCONNECTED; // so that the outer layer will reconnect and recover + break; + } } if (c->isconnected) rc = keepalive(c); diff --git a/extras/paho_mqtt_c/MQTTClient.h b/extras/paho_mqtt_c/MQTTClient.h index f7ea424..876197c 100644 --- a/extras/paho_mqtt_c/MQTTClient.h +++ b/extras/paho_mqtt_c/MQTTClient.h @@ -27,7 +27,7 @@ enum QoS { QOS0, QOS1, QOS2 }; // all failure return codes must be negative -enum returnCode {DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; +enum returnCode {READ_ERROR = -4, DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; void NewTimer(Timer*);