MQTT Client: Use LWIP socket options instead of select()

This commit is contained in:
Angus Gratton 2016-05-28 17:31:39 +10:00
parent 42880fded5
commit b73b93c58c
2 changed files with 50 additions and 116 deletions

View file

@ -1,15 +1,8 @@
/** /**
****************************************************************************** * Paho Embedded MQTT client, esp-open-rtos support.
* @file MQTTESP8266.c
* @author Baoshi <mail(at)ba0sh1(dot)com>
* @version 0.1
* @date Sep 9, 2015
* @brief Eclipse Paho ported to ESP8266 RTOS
* *
****************************************************************************** * Copyright (c) 2015, Baoshi Zhu & 2016, Angus Gratton
* @copyright * All rights reserved.
*
* Copyright (c) 2015, Baoshi Zhu. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be * Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE.txt file. * found in the LICENSE.txt file.
* *
@ -28,7 +21,7 @@
#include "MQTTESP8266.h" #include "MQTTESP8266.h"
char expired(Timer* timer) char expired(Timer* timer)
{ {
portTickType now = xTaskGetTickCount(); portTickType now = xTaskGetTickCount();
int32_t left = timer->end_time - now; int32_t left = timer->end_time - now;
@ -36,20 +29,20 @@ char expired(Timer* timer)
} }
void countdown_ms(Timer* timer, unsigned int timeout) void countdown_ms(Timer* timer, unsigned int timeout)
{ {
portTickType now = xTaskGetTickCount(); portTickType now = xTaskGetTickCount();
timer->end_time = now + timeout / portTICK_RATE_MS; timer->end_time = now + timeout / portTICK_RATE_MS;
} }
void countdown(Timer* timer, unsigned int timeout) void countdown(Timer* timer, unsigned int timeout)
{ {
countdown_ms(timer, timeout * 1000); countdown_ms(timer, timeout * 1000);
} }
int left_ms(Timer* timer) int left_ms(Timer* timer)
{ {
portTickType now = xTaskGetTickCount(); portTickType now = xTaskGetTickCount();
int32_t left = timer->end_time - now; int32_t left = timer->end_time - now;
@ -57,119 +50,69 @@ int left_ms(Timer* timer)
} }
void InitTimer(Timer* timer) void InitTimer(Timer* timer)
{ {
timer->end_time = 0; timer->end_time = 0;
} }
static int mqtt_esp_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
int mqtt_esp_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{ {
struct timeval tv; /* set SO_RCVTIMEO if timeout_ms > 0 otherwise O_NONBLOCK */
fd_set fdset; lwip_setsockopt(n->socket, SOL_SOCKET, SO_RCVTIMEO, &timeout_ms, sizeof(int));
int rc = 0; lwip_fcntl(n->socket, F_SETFL, (timeout_ms) > 0 ? 0 : O_NONBLOCK);
int rcvd = 0; int r = recv(n->socket, buffer, len, 0);
FD_ZERO(&fdset); if(r == 0) {
FD_SET(n->my_socket, &fdset); r = -1; /* 0 indicates timeout */
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
rc = select(n->my_socket + 1, &fdset, 0, 0, &tv);
if ((rc > 0) && (FD_ISSET(n->my_socket, &fdset)))
{
rcvd = recv(n->my_socket, buffer, len, 0);
} }
else return r;
{
// select fail
return -1;
}
return rcvd;
} }
int mqtt_esp_write(Network* n, unsigned char* buffer, int len, int timeout_ms) static int mqtt_esp_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{ {
struct timeval tv; lwip_setsockopt(n->socket, SOL_SOCKET, SO_SNDTIMEO, &timeout_ms, sizeof(int));
fd_set fdset; lwip_fcntl(n->socket, F_SETFL, (timeout_ms > 0) ? 0 : O_NONBLOCK);
int rc = 0; int r = send(n->socket, buffer, len, 0);
if(r == 0) {
FD_ZERO(&fdset); r = -1; /* 0 indicates timeout */
FD_SET(n->my_socket, &fdset);
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
rc = select(n->my_socket + 1, 0, &fdset, 0, &tv);
if ((rc > 0) && (FD_ISSET(n->my_socket, &fdset)))
{
rc = send(n->my_socket, buffer, len, 0);
} }
else return r;
{
// select fail
return -1;
}
return rc;
} }
void NewNetwork(Network* n) void NewNetwork(Network* n)
{ {
n->my_socket = -1; n->socket = -1;
n->mqttread = mqtt_esp_read; n->mqttread = mqtt_esp_read;
n->mqttwrite = mqtt_esp_write; n->mqttwrite = mqtt_esp_write;
} }
static int host2addr(const char *hostname , struct in_addr *in)
{
struct addrinfo hints, *servinfo, *p;
struct sockaddr_in *h;
int rv;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
rv = getaddrinfo(hostname, 0 , &hints , &servinfo);
if (rv != 0)
{
return rv;
}
// loop through all the results and get the first resolve
for (p = servinfo; p != 0; p = p->ai_next)
{
h = (struct sockaddr_in *)p->ai_addr;
in->s_addr = h->sin_addr.s_addr;
}
freeaddrinfo(servinfo); // all done with this structure
return 0;
}
int ConnectNetwork(Network* n, const char* host, int port) int ConnectNetwork(Network* n, const char* host, int port)
{ {
struct sockaddr_in addr; struct sockaddr_in addr;
ip_addr_t ipaddr;
int ret; int ret;
if (host2addr(host, &(addr.sin_addr)) != 0) err_t e = netconn_gethostbyname(host, &ipaddr);
{ if(e) {
return -1; return e;
} }
inet_addr_from_ipaddr(&(addr.sin_addr), &ipaddr);
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_port = htons(port); addr.sin_port = htons(port);
n->my_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); ret = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if( n->my_socket < 0 ) if (ret < 0)
{ {
// error return ret;
return -1;
} }
ret = connect(n->my_socket, ( struct sockaddr *)&addr, sizeof(struct sockaddr_in)); n->socket = ret;
if( ret < 0 )
ret = connect(n->socket, ( struct sockaddr *)&addr, sizeof(struct sockaddr_in));
if (ret < 0)
{ {
// error close(n->socket);
close(n->my_socket); n->socket = -1;
return ret; return ret;
} }
@ -179,7 +122,7 @@ int ConnectNetwork(Network* n, const char* host, int port)
int DisconnectNetwork(Network* n) int DisconnectNetwork(Network* n)
{ {
close(n->my_socket); close(n->socket);
n->my_socket = -1; n->socket = -1;
return 0; return 0;
} }

View file

@ -1,15 +1,8 @@
/** /**
****************************************************************************** * Paho Embedded MQTT client, esp-open-rtos support.
* @file MQTTESP8266.h
* @author Baoshi <mail(at)ba0sh1(dot)com>
* @version 0.1
* @date Sep 9, 2015
* @brief Eclipse Paho ported to ESP8266 RTOS
* *
****************************************************************************** * Copyright (c) 2015, Baoshi Zhu & 2016, Angus Gratton.
* @copyright * All rights reserved.
*
* Copyright (c) 2015, Baoshi Zhu. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be * Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE.txt file. * found in the LICENSE.txt file.
* *
@ -24,20 +17,22 @@
#include <FreeRTOS.h> #include <FreeRTOS.h>
#include <portmacro.h> #include <portmacro.h>
#include <lwip/api.h>
typedef struct Timer Timer; typedef struct Timer Timer;
typedef struct Network Network;
struct Timer struct Timer
{ {
portTickType end_time; portTickType end_time;
}; };
typedef struct Network Network;
struct Network struct Network
{ {
int my_socket; int socket;
int (*mqttread) (Network*, unsigned char*, int, int); int (*mqttread) (Network*, unsigned char*, int, int);
int (*mqttwrite) (Network*, unsigned char*, int, int); int (*mqttwrite) (Network*, unsigned char*, int, int);
}; };
char expired(Timer*); char expired(Timer*);
@ -47,10 +42,6 @@ int left_ms(Timer*);
void InitTimer(Timer*); void InitTimer(Timer*);
int mqtt_esp_read(Network*, unsigned char*, int, int);
int mqtt_esp_write(Network*, unsigned char*, int, int);
void mqtt_esp_disconnect(Network*);
void NewNetwork(Network* n); void NewNetwork(Network* n);
int ConnectNetwork(Network* n, const char* host, int port); int ConnectNetwork(Network* n, const char* host, int port);
int DisconnectNetwork(Network* n); int DisconnectNetwork(Network* n);