mirror of
synced 2025-03-19 11:32:58 +00:00
194 lines
6.2 KiB
194 lines
6.2 KiB
/* Standard includes. */
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
/* FreeRTOS includes. */
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "MQTTClient.h"
#include "wifi_conf.h"
void messageArrived(MessageData* data)
mqtt_printf(MQTT_INFO, "Message arrived on topic %.*s: %.*s\n", data->topicName->lenstring.len, data->topicName->lenstring.data,
data->message->payloadlen, (char*)data->message->payload);
//This example is original and cannot restart if failed. To use this example, define WAIT_FOR_ACK and not define MQTT_TASK in MQTTClient.h
static void prvMQTTEchoTask(void *pvParameters)
/* connect to gpssensor.ddns.net, subscribe to a topic, send and receive messages regularly every 5 sec */
MQTTClient client;
Network network;
unsigned char sendbuf[512], readbuf[80];
int rc = 0, count = 0;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
char* address = "gpssensor.ddns.net";
char* sub_topic = "LASS/Test/Pm25Ameba/#";
char* pub_topic = "LASS/Test/Pm25Ameba/FT1_018";
MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));
mqtt_printf(MQTT_INFO, "Wait Wi-Fi to be connected.");
while(wifi_is_ready_to_transceive(RTW_STA_INTERFACE) != RTW_SUCCESS) {
vTaskDelay(5000 / portTICK_PERIOD_MS);
mqtt_printf(MQTT_INFO, "Wi-Fi connected.");
mqtt_printf(MQTT_INFO, "Connect Network \"%s\"", address);
while ((rc = NetworkConnect(&network, address, 1883)) != 0){
mqtt_printf(MQTT_INFO, "Return code from network connect is %d\n", rc);
vTaskDelay(1000 / portTICK_PERIOD_MS);
mqtt_printf(MQTT_INFO, "\"%s\" Connected", address);
connectData.MQTTVersion = 3;
connectData.clientID.cstring = "FT1_018";
mqtt_printf(MQTT_INFO, "Start MQTT connection");
while ((rc = MQTTConnect(&client, &connectData)) != 0){
mqtt_printf(MQTT_INFO, "Return code from MQTT connect is %d\n", rc);
vTaskDelay(1000 / portTICK_PERIOD_MS);
mqtt_printf(MQTT_INFO, "MQTT Connected");
mqtt_printf(MQTT_INFO, "Subscribe to Topic: %s", sub_topic);
if ((rc = MQTTSubscribe(&client, sub_topic, QOS2, messageArrived)) != 0)
mqtt_printf(MQTT_INFO, "Return code from MQTT subscribe is %d\n", rc);
mqtt_printf(MQTT_INFO, "Publish Topics: %s", pub_topic);
while (++count)
MQTTMessage message;
char payload[300];
message.qos = QOS1;
message.retained = 0;
message.payload = payload;
sprintf(payload, "hello from AMEBA %d", count);
message.payloadlen = strlen(payload);
if ((rc = MQTTPublish(&client, pub_topic, &message)) != 0)
mqtt_printf(MQTT_INFO,"Return code from MQTT publish is %d\n", rc);
if ((rc = MQTTYield(&client, 1000)) != 0)
mqtt_printf(MQTT_INFO,"Return code from yield is %d\n", rc);
/* do not return */
#if defined(MQTT_TASK)
void MQTTPublishMessage(MQTTClient* c, char *topic)
int rc = 0;
static int count = 0;
MQTTMessage message;
char payload[300];
message.qos = QOS1;
message.retained = 0;
message.payload = payload;
if(c->mqttstatus == MQTT_RUNNING){
sprintf(payload, "hello from AMEBA %d", count);
message.payloadlen = strlen(payload);
mqtt_printf(MQTT_INFO, "Publish Topic %s : %d", topic, count);
if ((rc = MQTTPublish(c, topic, &message)) != 0){
mqtt_printf(MQTT_INFO, "Return code from MQTT publish is %d\n", rc);
static void prvMQTTTask(void *pvParameters)
MQTTClient client;
Network network;
static unsigned char sendbuf[MQTT_SENDBUF_LEN], readbuf[MQTT_READBUF_LEN];
int rc = 0, mqtt_pub_count = 0;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
connectData.MQTTVersion = 3;
connectData.clientID.cstring = "FT1_018";
char* address = "gpssensor.ddns.net";
char* sub_topic = "LASS/Test/Pm25Ameba/#";
char* pub_topic = "LASS/Test/Pm25Ameba/FT1_018";
MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));
while (1)
while(wifi_is_ready_to_transceive(RTW_STA_INTERFACE) != RTW_SUCCESS) {
mqtt_printf(MQTT_INFO, "Wait Wi-Fi to be connected.");
vTaskDelay(5000 / portTICK_PERIOD_MS);
fd_set read_fds;
fd_set except_fds;
struct timeval timeout;
timeout.tv_sec = MQTT_SELECT_TIMEOUT;
timeout.tv_usec = 0;
if(network.my_socket >= 0){
FD_SET(network.my_socket, &read_fds);
FD_SET(network.my_socket, &except_fds);
rc = FreeRTOS_Select(network.my_socket + 1, &read_fds, NULL, &except_fds, &timeout);
if(FD_ISSET(network.my_socket, &except_fds))
mqtt_printf(MQTT_INFO, "except_fds is set");
MQTTSetStatus(&client, MQTT_START); //my_socket will be close and reopen in MQTTDataHandle if STATUS set to MQTT_START
else if(rc == 0) //select timeout
if(++mqtt_pub_count == 5) //Send MQTT publish message every 5 seconds
MQTTPublishMessage(&client, pub_topic);
mqtt_pub_count = 0;
MQTTDataHandle(&client, &read_fds, &connectData, messageArrived, address, sub_topic);
void vStartMQTTTasks(uint16_t usTaskStackSize, UBaseType_t uxTaskPriority)
BaseType_t x = 0L;
#if defined(MQTT_TASK)
xTaskCreate(prvMQTTTask, /* The function that implements the task. */
"MQTTTask", /* Just a text name for the task to aid debugging. */
usTaskStackSize, /* The stack size is defined in FreeRTOSIPConfig.h. */
(void *)x, /* The task parameter, not used in this case. */
uxTaskPriority, /* The priority assigned to the task is defined in FreeRTOSConfig.h. */
NULL); /* The task handle is not used. */
xTaskCreate(prvMQTTEchoTask, /* The function that implements the task. */
"MQTTEcho0", /* Just a text name for the task to aid debugging. */
usTaskStackSize + 128, /* The stack size is defined in FreeRTOSIPConfig.h. */
(void *)x, /* The task parameter, not used in this case. */
uxTaskPriority, /* The priority assigned to the task is defined in FreeRTOSConfig.h. */
NULL); /* The task handle is not used. */
void example_mqtt(void)
vStartMQTTTasks(4096, tskIDLE_PRIORITY + 4);