Merge pull request #158 from baoshi/mqtt_fix

Handling MQTT read failure and send buffer length
This commit is contained in:
Angus Gratton 2016-07-14 07:30:35 +10:00 committed by GitHub
commit eac9504d8a
2 changed files with 21 additions and 13 deletions

View file

@ -35,7 +35,7 @@ int sendPacket(MQTTClient* c, int length, Timer* timer)
while (sent < length && !expired(timer))
{
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer));
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length - sent, left_ms(timer));
if (rc < 0) // there was an error writing the data
break;
sent += rc;
@ -70,7 +70,9 @@ int decodePacket(MQTTClient* c, int* value, int timeout)
}
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
if (rc != 1)
goto exit;
{
goto exit;
}
*value += (i & 127) * multiplier;
multiplier *= 128;
} while ((i & 128) != 0);
@ -79,6 +81,7 @@ exit:
}
// Return packet type. If no packet avilable, return FAILURE, or READ_ERROR if timeout
int readPacket(MQTTClient* c, Timer* timer)
{
int rc = FAILURE;
@ -89,20 +92,19 @@ int readPacket(MQTTClient* c, Timer* timer)
/* 1. read the header byte. This has the packet type in it */
if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, left_ms(timer)) != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, left_ms(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, left_ms(timer)) != rem_len))
{
rc = READ_ERROR;
goto exit;
}
header.byte = c->readbuf[0];
rc = header.bits.type;
exit:
//dmsg_printf("readPacket=%d\r\n", rc);
return rc;
}
@ -212,10 +214,10 @@ exit:
}
int cycle(MQTTClient* c, Timer* timer)
int cycle(MQTTClient* c, Timer* timer)
{
// read the socket, see what work is due
unsigned short packet_type = readPacket(c, timer);
int packet_type = readPacket(c, timer);
int len = 0,
rc = SUCCESS;
@ -266,11 +268,17 @@ int cycle(MQTTClient* c, Timer* timer)
case PUBCOMP:
break;
case PINGRESP:
{
c->ping_outstanding = 0;
c->fail_count = 0;
}
{
c->ping_outstanding = 0;
c->fail_count = 0;
break;
}
case READ_ERROR:
{
c->isconnected = 0; // we simulate a disconnect if reading error
rc = DISCONNECTED; // so that the outer layer will reconnect and recover
break;
}
}
if (c->isconnected)
rc = keepalive(c);

View file

@ -27,7 +27,7 @@
enum QoS { QOS0, QOS1, QOS2 };
// all failure return codes must be negative
enum returnCode {DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
enum returnCode {READ_ERROR = -4, DISCONNECTED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
void NewTimer(Timer*);