mirror of
https://github.com/FreeRTOS/FreeRTOS.git
synced 2025-12-17 10:36:22 +08:00
Small MQTT Plaintext hygiene updates (#324)
- Fix the config file header include gaurd - Fixes comments only in the main prvMQTTDemoTask( void * pvParameters ) - Deletes unused headers. - Check the return value of MQTT_Disconnect(). Co-authored-by: Oscar Michael Abrina <abrinao@amazon.com>
This commit is contained in:
@@ -21,7 +21,6 @@
|
||||
*
|
||||
* http://www.FreeRTOS.org
|
||||
* http://aws.amazon.com/freertos
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
@@ -46,10 +45,6 @@
|
||||
#include "FreeRTOS.h"
|
||||
#include "task.h"
|
||||
|
||||
/* FreeRTOS+TCP includes. */
|
||||
#include "FreeRTOS_IP.h"
|
||||
#include "FreeRTOS_Sockets.h"
|
||||
|
||||
/* Demo Specific configs. */
|
||||
#include "demo_config.h"
|
||||
|
||||
@@ -310,8 +305,9 @@ static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] =
|
||||
{ mqttexampleTOPIC, MQTTSubAckFailure }
|
||||
};
|
||||
|
||||
|
||||
/** @brief Static buffer used to hold MQTT messages being sent and received. */
|
||||
/**
|
||||
* @brief Static buffer used to hold MQTT messages being sent and received.
|
||||
*/
|
||||
static MQTTFixedBuffer_t xBuffer =
|
||||
{
|
||||
.pBuffer = ucSharedBuffer,
|
||||
@@ -321,7 +317,8 @@ static MQTTFixedBuffer_t xBuffer =
|
||||
/*-----------------------------------------------------------*/
|
||||
|
||||
/**
|
||||
* @brief Create the task that demonstrates the Plain text MQTT API Demo.
|
||||
* @brief Create the task that demonstrates the MQTT API over a plaintext TCP
|
||||
* connection.
|
||||
*/
|
||||
void vStartSimpleMQTTDemo( void )
|
||||
{
|
||||
@@ -356,74 +353,81 @@ static void prvMQTTDemoTask( void * pvParameters )
|
||||
/****************************** Connect. ******************************/
|
||||
|
||||
/* Attempt to connect to the MQTT broker. If connection fails, retry after
|
||||
* a timeout. Timeout value will be exponentially increased until the maximum
|
||||
* number of attempts are reached or the maximum timeout value is reached.
|
||||
* The function returns a failure status if the TCP connection cannot be established
|
||||
* to the broker after the configured number of attempts. */
|
||||
* a timeout. The timeout value will exponentially increase until the
|
||||
* maximum number of attempts are reached or the maximum timeout value is
|
||||
* reached. The function below returns a failure status if the TCP connection
|
||||
* cannot be established to the broker after the configured number of attempts. */
|
||||
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
|
||||
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
|
||||
|
||||
/* Sends an MQTT Connect packet over the already connected TCP socket,
|
||||
* and waits for connection acknowledgment (CONNACK) packet. */
|
||||
LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) );
|
||||
* and waits for a connection acknowledgment (CONNACK) packet. */
|
||||
LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) );
|
||||
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
|
||||
|
||||
/**************************** Subscribe. ******************************/
|
||||
|
||||
/* If server rejected the subscription request, attempt to resubscribe to topic.
|
||||
* Attempts are made according to the exponential backoff retry strategy
|
||||
* implemented in retryUtils. */
|
||||
/* If server rejected the subscription request, attempt to resubscribe to
|
||||
* the topic. Attempts are made according to the exponential backoff retry
|
||||
* strategy declared in retry_utils.h. */
|
||||
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
|
||||
|
||||
/**************************** Publish and Keep Alive Loop. ******************************/
|
||||
/* Publish messages with QoS0, send and process Keep alive messages. */
|
||||
/******************* Publish and Keep Alive Loop. *********************/
|
||||
/* Publish messages with QoS0, then send and process Keep Alive messages. */
|
||||
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
|
||||
{
|
||||
LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
|
||||
LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) );
|
||||
prvMQTTPublishToTopic( &xMQTTContext );
|
||||
|
||||
/* Process incoming publish echo, since application subscribed to the same
|
||||
* topic the broker will send publish message back to the application. */
|
||||
LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) );
|
||||
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
|
||||
/* Process the incoming publish echo. Since the application subscribed
|
||||
* to the same topic, the broker will send the same publish message
|
||||
* back to the application. */
|
||||
LogInfo( ( "Attempt to receive publish message from broker." ) );
|
||||
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext,
|
||||
mqttexamplePROCESS_LOOP_TIMEOUT_MS );
|
||||
configASSERT( xMQTTStatus == MQTTSuccess );
|
||||
|
||||
/* Leave Connection Idle for some time. */
|
||||
LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) );
|
||||
/* Leave the connection idle for some time. */
|
||||
LogInfo( ( "Keeping Connection Idle...\r\n" ) );
|
||||
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES );
|
||||
}
|
||||
|
||||
/************************ Unsubscribe from the topic. **************************/
|
||||
LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
|
||||
/******************** Unsubscribe from the topic. *********************/
|
||||
LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) );
|
||||
prvMQTTUnsubscribeFromTopic( &xMQTTContext );
|
||||
|
||||
/* Process Incoming packet from the broker. */
|
||||
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
|
||||
/* Process the incoming packet from the broker. */
|
||||
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext,
|
||||
mqttexamplePROCESS_LOOP_TIMEOUT_MS );
|
||||
configASSERT( xMQTTStatus == MQTTSuccess );
|
||||
|
||||
/**************************** Disconnect. ******************************/
|
||||
/**************************** Disconnect. *****************************/
|
||||
|
||||
/* Send an MQTT Disconnect packet over the already connected TCP socket.
|
||||
* There is no corresponding response for the disconnect packet. After sending
|
||||
* disconnect, client must close the network connection. */
|
||||
LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) );
|
||||
MQTT_Disconnect( &xMQTTContext );
|
||||
/* Send an MQTT Disconnect packet over the connected TCP socket.
|
||||
* There is no corresponding response for a disconnect packet. After
|
||||
* sending the disconnect, the client must close the network connection. */
|
||||
LogInfo( ( "Disconnecting the MQTT connection with %s.",
|
||||
democonfigMQTT_BROKER_ENDPOINT ) );
|
||||
xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
|
||||
configASSERT( xMQTTStatus == MQTTSuccess );
|
||||
|
||||
/* Close the network connection. */
|
||||
/* Close the network connection. */
|
||||
xNetworkStatus = Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
|
||||
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
|
||||
|
||||
/* Reset SUBACK status for each topic filter after completion of subscription request cycle. */
|
||||
/* Reset SUBACK status for each topic filter after completion of
|
||||
* subscription request cycle. */
|
||||
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
|
||||
{
|
||||
xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
|
||||
}
|
||||
|
||||
/* Wait for some time between two iterations to ensure that we do not
|
||||
* bombard the public test mosquitto broker. */
|
||||
LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) );
|
||||
LogInfo( ( "Demo completed successfully.\r\n" ) );
|
||||
LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
|
||||
* bombard the MQTT broker. */
|
||||
LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. "
|
||||
"Total free heap is %u.", xPortGetFreeHeapSize() ) );
|
||||
LogInfo( ( "Demo completed successfully." ) );
|
||||
LogInfo( ( "Short delay before starting the next iteration.... \r\n" ) );
|
||||
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
|
||||
}
|
||||
}
|
||||
@@ -581,7 +585,7 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
|
||||
* will expect all the messages it sends to the broker to be sent back to it
|
||||
* from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish
|
||||
* messages received from the broker will have QOS0. */
|
||||
LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
|
||||
LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) );
|
||||
xResult = MQTT_Subscribe( pxMQTTContext,
|
||||
xMQTTSubscription,
|
||||
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
|
||||
@@ -698,7 +702,7 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
|
||||
{
|
||||
if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure )
|
||||
{
|
||||
LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n",
|
||||
LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.",
|
||||
xTopicFilterContext[ ulTopicCount ].pcTopicFilter,
|
||||
xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) );
|
||||
}
|
||||
@@ -709,18 +713,18 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
|
||||
break;
|
||||
|
||||
case MQTT_PACKET_TYPE_UNSUBACK:
|
||||
LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) );
|
||||
LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) );
|
||||
/* Make sure ACK packet identifier matches with Request packet identifier. */
|
||||
configASSERT( usUnsubscribePacketIdentifier == usPacketId );
|
||||
break;
|
||||
|
||||
case MQTT_PACKET_TYPE_PINGRESP:
|
||||
LogInfo( ( "Ping Response successfully received.\r\n" ) );
|
||||
LogInfo( ( "Ping Response successfully received." ) );
|
||||
break;
|
||||
|
||||
/* Any other packet type is invalid. */
|
||||
default:
|
||||
LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n",
|
||||
LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).",
|
||||
pxIncomingPacket->type ) );
|
||||
}
|
||||
}
|
||||
@@ -738,8 +742,8 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
|
||||
if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) &&
|
||||
( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) )
|
||||
{
|
||||
LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
|
||||
"Incoming Publish Message : %.*s\r\n",
|
||||
LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
|
||||
"Incoming Publish Message : %.*s",
|
||||
pxPublishInfo->topicNameLength,
|
||||
pxPublishInfo->pTopicName,
|
||||
pxPublishInfo->payloadLength,
|
||||
@@ -747,7 +751,7 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
|
||||
}
|
||||
else
|
||||
{
|
||||
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n",
|
||||
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.",
|
||||
pxPublishInfo->topicNameLength,
|
||||
pxPublishInfo->pTopicName ) );
|
||||
}
|
||||
|
||||
@@ -24,8 +24,8 @@
|
||||
*
|
||||
* 1 tab == 4 spaces!
|
||||
*/
|
||||
#ifndef MQTT_CONFIG_H_
|
||||
#define MQTT_CONFIG_H_
|
||||
#ifndef CORE_MQTT_CONFIG_H_
|
||||
#define CORE_MQTT_CONFIG_H_
|
||||
|
||||
/**************************************************/
|
||||
/******* DO NOT CHANGE the following order ********/
|
||||
@@ -64,4 +64,4 @@
|
||||
*/
|
||||
#define MQTT_STATE_ARRAY_MAX_COUNT 10U
|
||||
|
||||
#endif /* ifndef MQTT_CONFIG_H_ */
|
||||
#endif /* ifndef CORE_MQTT_CONFIG_H_ */
|
||||
|
||||
@@ -47,6 +47,9 @@
|
||||
|
||||
/************ End of logging configuration ****************/
|
||||
|
||||
/* FreeRTOS+TCP include. */
|
||||
#include "FreeRTOS_Sockets.h"
|
||||
|
||||
/* Transport interface include. */
|
||||
#include "transport_interface.h"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user