Files as per 190725_FreeRTOS_IoT_Libs_Task_Pool_and_MQTT_Preview interim release.

This commit is contained in:
Richard Barry 2019-07-25 20:20:24 +00:00
parent b24ab46d39
commit b4c06085e1
17 changed files with 972 additions and 286 deletions

View File

@ -0,0 +1,5 @@
[{000214A0-0000-0000-C000-000000000046}]
Prop3=19,11
[InternetShortcut]
IDList=
URL=https://www.freertos.org/mqtt/

View File

@ -198,10 +198,6 @@
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_taskpool_internal.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\types\iot_taskpool_types.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOSIPConfigDefaults.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOS_ARP.h" />

View File

@ -286,18 +286,6 @@
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include\types</Filter>
</ClInclude>

View File

@ -25,9 +25,9 @@
* 1 tab == 4 spaces!
*/
/*
* TBD
*/
/***
* See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.
***/
/* Standard includes. */
#include <stdio.h>
@ -102,10 +102,9 @@ static UBaseType_t ulNextRand;
int main( void )
{
/*
* Instructions for using this project are provided on:
* TBD
*/
/***
* See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.
***/
/* Miscellaneous initialisation including preparing the logging and seeding
the random number generator. */

View File

@ -25,7 +25,6 @@
* 1 tab == 4 spaces!
*/
//_RB_ Add link to docs here.
/* Kernel includes. */
#include "FreeRTOS.h"

View File

@ -0,0 +1,6 @@
[{000214A0-0000-0000-C000-000000000046}]
Prop3=19,11
[InternetShortcut]
IDList=
URL=https://www.freertos.org/task-pool/
HotKey=0

View File

@ -177,7 +177,7 @@
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\task.h" />
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\timers.h" />
<ClInclude Include="..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW\portmacro.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\iot_taskpool.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_error.h" />

View File

@ -250,11 +250,11 @@
</ClInclude>
<ClInclude Include="iot_config.h" />
<ClInclude Include="iot_config_common.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\include\types</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform\types</Filter>
</ClInclude>
</ItemGroup>
</Project>

View File

@ -25,6 +25,11 @@
* 1 tab == 4 spaces!
*/
/***
* See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.
***/
/* Standard includes. */
#include <stdio.h>
#include <time.h>
@ -55,10 +60,9 @@ const uint8_t ucMACAddress[ 6 ] = { configMAC_ADDR0, configMAC_ADDR1, configMAC_
int main( void )
{
/*
* Instructions for using this project are provided on:
* TBD
*/
/***
* See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.
***/
/* Create the example that demonstrates task pool functionality. Examples
that demonstrate networking connectivity will be added in future projects

View File

@ -0,0 +1,547 @@
/*
* FreeRTOS Kernel V10.2.0
* Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://www.FreeRTOS.org
* http://aws.amazon.com/freertos
*
* 1 tab == 4 spaces!
*/
/**
* @file atomic.h
* @brief FreeRTOS atomic operation support.
*
* Two implementations of atomic are given in this header file:
* 1. Disabling interrupt globally.
* 2. ISA native atomic support.
* The former is available to all ports (compiler-architecture combination),
* while the latter is only available to ports compiling with GCC (version at
* least 4.7.0), which also have ISA atomic support.
*
* User can select which implementation to use by:
* setting/clearing configUSE_ATOMIC_INSTRUCTION in FreeRTOSConfig.h.
* Define AND set configUSE_ATOMIC_INSTRUCTION to 1 for ISA native atomic support.
* Undefine OR clear configUSE_ATOMIC_INSTRUCTION for disabling global interrupt
* implementation.
*
* @see GCC Built-in Functions for Memory Model Aware Atomic Operations
* https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
*/
#ifndef ATOMIC_H
#define ATOMIC_H
#ifndef INC_FREERTOS_H
#error "include FreeRTOS.h must appear in source files before include atomic.h"
#endif
/* Standard includes. */
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
/* Needed for __atomic_compare_exchange() weak=false. */
#include <stdbool.h>
/* This branch is for GCC compiler and GCC compiler only. */
#ifndef portFORCE_INLINE
#define portFORCE_INLINE inline __attribute__((always_inline))
#endif
#else
/* Port specific definitions -- entering/exiting critical section.
* Refer template -- ./lib/FreeRTOS/portable/Compiler/Arch/portmacro.h
*
* Every call to ATOMIC_EXIT_CRITICAL() must be closely paired with
* ATOMIC_ENTER_CRITICAL().
*/
#if defined( portSET_INTERRUPT_MASK_FROM_ISR )
/* Nested interrupt scheme is supported in this port. */
#define ATOMIC_ENTER_CRITICAL() \
UBaseType_t uxCriticalSectionType = portSET_INTERRUPT_MASK_FROM_ISR()
#define ATOMIC_EXIT_CRITICAL() \
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxCriticalSectionType )
#else
/* Nested interrupt scheme is NOT supported in this port. */
#define ATOMIC_ENTER_CRITICAL() portENTER_CRITICAL()
#define ATOMIC_EXIT_CRITICAL() portEXIT_CRITICAL()
#endif /* portSET_INTERRUPT_MASK_FROM_ISR() */
/* Port specific definition -- "always inline".
* Inline is compiler specific, and may not always get inlined depending on your optimization level.
* Also, inline is considerred as performance optimization for atomic.
* Thus, if portFORCE_INLINE is not provided by portmacro.h, instead of resulting error,
* simply define it.
*/
#ifndef portFORCE_INLINE
#define portFORCE_INLINE
#endif
#endif /* configUSE_GCC_BUILTIN_ATOMICS */
#define ATOMIC_COMPARE_AND_SWAP_SUCCESS 0x1U /**< Compare and swap succeeded, swapped. */
#define ATOMIC_COMPARE_AND_SWAP_FAILURE 0x0U /**< Compare and swap failed, did not swap. */
/*----------------------------- Swap && CAS ------------------------------*/
/**
* Atomic compare-and-swap
*
* @brief Performs an atomic compare-and-swap operation on the specified values.
*
* @param[in, out] pDestination Pointer to memory location from where value is
* to be loaded and checked.
* @param[in] ulExchange If condition meets, write this value to memory.
* @param[in] ulComparand Swap condition.
*
* @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.
*
* @note This function only swaps *pDestination with ulExchange, if previous
* *pDestination value equals ulComparand.
*/
static portFORCE_INLINE uint32_t Atomic_CompareAndSwap_u32(
uint32_t volatile * pDestination,
uint32_t ulExchange,
uint32_t ulComparand )
{
uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
if ( __atomic_compare_exchange( pDestination,
&ulComparand,
&ulExchange,
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST ) )
{
ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
}
#else
ATOMIC_ENTER_CRITICAL();
if ( *pDestination == ulComparand )
{
*pDestination = ulExchange;
ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
}
ATOMIC_EXIT_CRITICAL();
#endif
return ulReturnValue;
}
/**
* Atomic swap (pointers)
*
* @brief Atomically sets the address pointed to by *ppDestination to the value
* of *pExchange.
*
* @param[in, out] ppDestination Pointer to memory location from where a pointer
* value is to be loaded and written back to.
* @param[in] pExchange Pointer value to be written to *ppDestination.
*
* @return The initial value of *ppDestination.
*/
static portFORCE_INLINE void * Atomic_SwapPointers_p32(
void * volatile * ppDestination,
void * pExchange )
{
void * pReturnValue;
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
__atomic_exchange( ppDestination, &pExchange, &pReturnValue, __ATOMIC_SEQ_CST );
#else
ATOMIC_ENTER_CRITICAL();
pReturnValue = *ppDestination;
*ppDestination = pExchange;
ATOMIC_EXIT_CRITICAL();
#endif
return pReturnValue;
}
/**
* Atomic compare-and-swap (pointers)
*
* @brief Performs an atomic compare-and-swap operation on the specified pointer
* values.
*
* @param[in, out] ppDestination Pointer to memory location from where a pointer
* value is to be loaded and checked.
* @param[in] pExchange If condition meets, write this value to memory.
* @param[in] pComparand Swap condition.
*
* @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.
*
* @note This function only swaps *ppDestination with pExchange, if previous
* *ppDestination value equals pComparand.
*/
static portFORCE_INLINE uint32_t Atomic_CompareAndSwapPointers_p32(
void * volatile * ppDestination,
void * pExchange, void * pComparand )
{
uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
if ( __atomic_compare_exchange( ppDestination,
&pComparand,
&pExchange,
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST ) )
{
ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
}
#else
ATOMIC_ENTER_CRITICAL();
if ( *ppDestination == pComparand )
{
*ppDestination = pExchange;
ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
}
ATOMIC_EXIT_CRITICAL();
#endif
return ulReturnValue;
}
/*----------------------------- Arithmetic ------------------------------*/
/**
* Atomic add
*
* @brief Atomically adds count to the value of the specified pointer points to.
*
* @param[in,out] pAddend Pointer to memory location from where value is to be
* loaded and written back to.
* @param[in] ulCount Value to be added to *pAddend.
*
* @return previous *pAddend value.
*/
static portFORCE_INLINE uint32_t Atomic_Add_u32(
uint32_t volatile * pAddend,
uint32_t ulCount )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_add(pAddend, ulCount, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pAddend;
*pAddend += ulCount;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/**
* Atomic subtract
*
* @brief Atomically subtracts count from the value of the specified pointer
* pointers to.
*
* @param[in,out] pAddend Pointer to memory location from where value is to be
* loaded and written back to.
* @param[in] ulCount Value to be subtract from *pAddend.
*
* @return previous *pAddend value.
*/
static portFORCE_INLINE uint32_t Atomic_Subtract_u32(
uint32_t volatile * pAddend,
uint32_t ulCount )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_sub(pAddend, ulCount, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pAddend;
*pAddend -= ulCount;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/**
* Atomic increment
*
* @brief Atomically increments the value of the specified pointer points to.
*
* @param[in,out] pAddend Pointer to memory location from where value is to be
* loaded and written back to.
*
* @return *pAddend value before increment.
*/
static portFORCE_INLINE uint32_t Atomic_Increment_u32( uint32_t volatile * pAddend )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_add(pAddend, 1, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pAddend;
*pAddend += 1;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/**
* Atomic decrement
*
* @brief Atomically decrements the value of the specified pointer points to
*
* @param[in,out] pAddend Pointer to memory location from where value is to be
* loaded and written back to.
*
* @return *pAddend value before decrement.
*/
static portFORCE_INLINE uint32_t Atomic_Decrement_u32( uint32_t volatile * pAddend )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_sub(pAddend, 1, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pAddend;
*pAddend -= 1;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/*----------------------------- Bitwise Logical ------------------------------*/
/**
* Atomic OR
*
* @brief Performs an atomic OR operation on the specified values.
*
* @param [in, out] pDestination Pointer to memory location from where value is
* to be loaded and written back to.
* @param [in] ulValue Value to be ORed with *pDestination.
*
* @return The original value of *pDestination.
*/
static portFORCE_INLINE uint32_t Atomic_OR_u32(
uint32_t volatile * pDestination,
uint32_t ulValue )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_or(pDestination, ulValue, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pDestination;
*pDestination |= ulValue;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/**
* Atomic AND
*
* @brief Performs an atomic AND operation on the specified values.
*
* @param [in, out] pDestination Pointer to memory location from where value is
* to be loaded and written back to.
* @param [in] ulValue Value to be ANDed with *pDestination.
*
* @return The original value of *pDestination.
*/
static portFORCE_INLINE uint32_t Atomic_AND_u32(
uint32_t volatile * pDestination,
uint32_t ulValue )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_and(pDestination, ulValue, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pDestination;
*pDestination &= ulValue;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/**
* Atomic NAND
*
* @brief Performs an atomic NAND operation on the specified values.
*
* @param [in, out] pDestination Pointer to memory location from where value is
* to be loaded and written back to.
* @param [in] ulValue Value to be NANDed with *pDestination.
*
* @return The original value of *pDestination.
*/
static portFORCE_INLINE uint32_t Atomic_NAND_u32(
uint32_t volatile * pDestination,
uint32_t ulValue )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_nand(pDestination, ulValue, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pDestination;
*pDestination = ~(ulCurrent & ulValue);
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
/**
* Atomic XOR
*
* @brief Performs an atomic XOR operation on the specified values.
*
* @param [in, out] pDestination Pointer to memory location from where value is
* to be loaded and written back to.
* @param [in] ulValue Value to be XORed with *pDestination.
*
* @return The original value of *pDestination.
*/
static portFORCE_INLINE uint32_t Atomic_XOR_u32(
uint32_t volatile * pDestination,
uint32_t ulValue )
{
#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
return __atomic_fetch_xor(pDestination, ulValue, __ATOMIC_SEQ_CST);
#else
uint32_t ulCurrent;
ATOMIC_ENTER_CRITICAL();
ulCurrent = *pDestination;
*pDestination ^= ulValue;
ATOMIC_EXIT_CRITICAL();
return ulCurrent;
#endif
}
#ifdef __cplusplus
}
#endif
#endif /* ATOMIC_H */

View File

@ -427,7 +427,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pSubscribeOperation );
IotMqttOperation_t * const pSubscribeOperation );
/* @[declare_mqtt_subscribe] */
/**
@ -512,7 +512,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pUnsubscribeOperation );
IotMqttOperation_t * const pUnsubscribeOperation );
/* @[declare_mqtt_unsubscribe] */
/**
@ -642,7 +642,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pPublishOperation );
IotMqttOperation_t * const pPublishOperation );
/* @[declare_mqtt_publish] */
/**
@ -817,7 +817,7 @@ const char * IotMqtt_OperationType( IotMqttOperationType_t operation );
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
IotMqttSubscription_t * pCurrentSubscription );
IotMqttSubscription_t * const pCurrentSubscription );
/* @[declare_mqtt_issubscribed] */
#endif /* ifndef IOT_MQTT_H_ */

View File

@ -94,7 +94,7 @@ static void _mqttSubscription_tryDestroy( void * pData );
static void _mqttOperation_tryDestroy( void * pData );
/**
* @brief Create a keep-alive job for an MQTT connection.
* @brief Initialize the keep-alive operation for an MQTT connection.
*
* @param[in] pNetworkInfo User-provided network information for the new
* connection.
@ -103,9 +103,9 @@ static void _mqttOperation_tryDestroy( void * pData );
*
* @return `true` if the keep-alive job was successfully created; `false` otherwise.
*/
static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
uint16_t keepAliveSeconds,
_mqttConnection_t * pMqttConnection );
static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
uint16_t keepAliveSeconds,
_mqttConnection_t * pMqttConnection );
/**
* @brief Creates a new MQTT connection and initializes its members.
@ -141,7 +141,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pOperationReference );
IotMqttOperation_t * const pOperationReference );
/*-----------------------------------------------------------*/
@ -238,9 +238,9 @@ static void _mqttOperation_tryDestroy( void * pData )
/*-----------------------------------------------------------*/
static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
uint16_t keepAliveSeconds,
_mqttConnection_t * pMqttConnection )
static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
uint16_t keepAliveSeconds,
_mqttConnection_t * pMqttConnection )
{
bool status = true;
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
@ -253,9 +253,12 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqttError_t ( * serializePingreq )( uint8_t **,
size_t * ) = _IotMqtt_SerializePingreq;
/* Set PINGREQ operation members. */
pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;
/* Convert the keep-alive interval to milliseconds. */
pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;
pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;
pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;
/* Choose a PINGREQ serializer function. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
@ -277,8 +280,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Generate a PINGREQ packet. */
serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),
&( pMqttConnection->pingreqPacketSize ) );
serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),
&( pMqttConnection->pingreq.u.operation.packetSize ) );
if( serializeStatus != IOT_MQTT_SUCCESS )
{
@ -291,8 +294,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
/* Create the task pool job that processes keep-alive. */
jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
pMqttConnection,
&( pMqttConnection->keepAliveJobStorage ),
&( pMqttConnection->keepAliveJob ) );
&( pMqttConnection->pingreq.jobStorage ),
&( pMqttConnection->pingreq.job ) );
/* Task pool job creation for a pre-allocated job should never fail.
* Abort the program if it does. */
@ -408,9 +411,9 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
/* Check if keep-alive is active for this connection. */
if( keepAliveSeconds != 0 )
{
if( _createKeepAliveJob( pNetworkInfo,
keepAliveSeconds,
pMqttConnection ) == false )
if( _createKeepAliveOperation( pNetworkInfo,
keepAliveSeconds,
pMqttConnection ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
@ -471,17 +474,31 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
{
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
/* Default free packet function. */
void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;
/* Clean up keep-alive if still allocated. */
if( pMqttConnection->keepAliveMs != 0 )
if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );
_IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
/* Choose a function to free the packet. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->freePacket != NULL )
{
freePacket = pMqttConnection->pSerializer->freePacket;
}
}
#endif
freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
pMqttConnection->keepAliveMs = 0;
pMqttConnection->pPingreqPacket = NULL;
pMqttConnection->pingreqPacketSize = 0;
pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Decrement reference count. */
pMqttConnection->references--;
@ -494,9 +511,9 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
/* A connection to be destroyed should have no keep-alive and at most 1
* reference. */
IotMqtt_Assert( pMqttConnection->references <= 1 );
IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );
IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );
IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );
/* Remove all subscriptions. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
@ -546,7 +563,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pOperationReference )
IotMqttOperation_t * const pOperationReference )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pSubscriptionOperation = NULL;
@ -666,7 +683,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
/* Check the subscription operation data and set the operation type. */
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );
IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );
pSubscriptionOperation->u.operation.type = operation;
/* Generate a subscription packet from the subscription list. */
@ -853,7 +870,7 @@ IotMqttError_t IotMqtt_Init( void )
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Log initialization status. */
if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "Failed to initialize MQTT library serializer. " );
}
@ -896,7 +913,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
_mqttConnection_t * pNewMqttConnection = NULL;
/* Default CONNECT serializer function. */
IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.
IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,
uint8_t **,
size_t * ) = _IotMqtt_SerializeConnect;
@ -911,7 +928,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
}
/* Validate network interface and connect info. */
if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().
if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
@ -1002,7 +1019,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotLogInfo( "Establishing new MQTT connection." );
/* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name? I don't think this does create a connection as that happens below.
/* Initialize a new MQTT connection object. */
pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
pNetworkInfo,
pConnectInfo->keepAliveSeconds );
@ -1059,7 +1076,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_CONNECT;
@ -1127,7 +1144,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
/* Add the CONNECT operation to the send queue for network transmission. */
status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?
status = _IotMqtt_ScheduleOperation( pOperation,
_IotMqtt_ProcessSend,
0 );
@ -1150,13 +1167,13 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
if( status == IOT_MQTT_SUCCESS )
{
/* Check if a keep-alive job should be scheduled. */
if( pNewMqttConnection->keepAliveMs != 0 )
if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
IotLogDebug( "Scheduling first MQTT keep-alive job." );
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
pNewMqttConnection->keepAliveJob,
pNewMqttConnection->nextKeepAliveMs );
pNewMqttConnection->pingreq.job,
pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
{
@ -1268,7 +1285,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_DISCONNECT;
@ -1389,7 +1406,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pSubscribeOperation )
IotMqttOperation_t * const pSubscribeOperation )
{
return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,
mqttConnection,
@ -1445,7 +1462,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pUnsubscribeOperation )
IotMqttOperation_t * const pUnsubscribeOperation )
{
return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,
mqttConnection,
@ -1500,7 +1517,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pPublishOperation )
IotMqttOperation_t * const pPublishOperation )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pOperation = NULL;
@ -1651,8 +1668,8 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
/* A QoS 0 PUBLISH may not be retried. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;
pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;
pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;
pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;
}
else
{

View File

@ -43,6 +43,9 @@
/* Platform layer includes. */
#include "platform/iot_threads.h"
/* Atomics include. */
#include "iot_atomic.h"
/*-----------------------------------------------------------*/
/**
@ -89,6 +92,22 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier );
/**
* @brief Flush a packet from the stream of incoming data.
*
* This function is called when memory for a packet cannot be allocated. The
* packet is flushed from the stream of incoming data so that the next packet
* may be read.
*
* @param[in] pNetworkConnection Network connection to use for receive, which
* may be different from the network connection associated with the MQTT connection.
* @param[in] pMqttConnection The associated MQTT connection.
* @param[in] length The length of the packet to flush.
*/
static void _flushPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
size_t length );
/*-----------------------------------------------------------*/
static bool _incomingPacketValid( uint8_t packetType )
@ -201,6 +220,14 @@ static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
if( pIncomingPacket->pRemainingData == NULL )
{
IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "
"%lu for incoming packet type %lu.",
pMqttConnection,
( unsigned long ) pIncomingPacket->remainingLength,
( unsigned long ) pIncomingPacket->type );
_flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
}
else
@ -593,22 +620,18 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
if( status == IOT_MQTT_SUCCESS )
{
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
if( pMqttConnection->keepAliveFailure == false )
if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),
0,
1 ) == 1 )
{
IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
pMqttConnection );
}
else
{
IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
pMqttConnection );
}
else
{
IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
pMqttConnection );
pMqttConnection->keepAliveFailure = false;
}
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
else
{
@ -637,15 +660,13 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier )
{
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
uint8_t * pPuback = NULL;
size_t pubackSize = 0, bytesSent = 0;
IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
_mqttOperation_t * pPubackOperation = NULL;
/* Default PUBACK serializer and free packet functions. */
/* Default PUBACK serializer function. */
IotMqttError_t ( * serializePuback )( uint16_t,
uint8_t **,
size_t * ) = _IotMqtt_SerializePuback;
void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
pMqttConnection,
@ -669,57 +690,82 @@ static void _sendPuback( _mqttConnection_t * pMqttConnection,
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
/* Create a PUBACK operation. */
status = _IotMqtt_CreateOperation( pMqttConnection,
0,
NULL,
&pPubackOperation );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
/* Set the operation type. */
pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;
/* Generate a PUBACK packet from the packet identifier. */
status = serializePuback( packetIdentifier,
&( pPubackOperation->u.operation.pMqttPacket ),
&( pPubackOperation->u.operation.packetSize ) );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
/* Add the PUBACK operation to the send queue for network transmission. */
status = _IotMqtt_ScheduleOperation( pPubackOperation,
_IotMqtt_ProcessSend,
0 );
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",
pMqttConnection );
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Clean up on error. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
if( pPubackOperation != NULL )
{
if( pMqttConnection->pSerializer->freePacket != NULL )
{
freePacket = pMqttConnection->pSerializer->freePacket;
}
else
{
EMPTY_ELSE_MARKER;
}
_IotMqtt_DestroyOperation( pPubackOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Generate a PUBACK packet from the packet identifier. */
serializeStatus = serializePuback( packetIdentifier,
&pPuback,
&pubackSize );
if( serializeStatus != IOT_MQTT_SUCCESS )
{
IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "
"received PUBLISH %hu.",
pMqttConnection,
packetIdentifier );
}
else
{
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
pPuback,
pubackSize );
EMPTY_ELSE_MARKER;
}
}
if( bytesSent != pubackSize )
{
IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"
" PUBLISH %hu.",
pMqttConnection,
packetIdentifier );
}
else
{
IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",
pMqttConnection,
packetIdentifier );
}
/*-----------------------------------------------------------*/
freePacket( pPuback );
static void _flushPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
size_t length )
{
size_t bytesFlushed = 0;
uint8_t receivedByte = 0;
for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )
{
( void ) _IotMqtt_GetNextByte( pNetworkConnection,
pMqttConnection->pNetworkInterface,
&receivedByte );
}
}
@ -761,17 +807,27 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;
/* Disconnect callback function. */
void ( * disconnectCallback )( void *,
IotMqttCallbackParam_t * ) = NULL;
/* Network close function. */
IotNetworkError_t ( * closeConnection) ( void * ) = NULL;
/* Default free packet function. */
void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
pMqttConnection->disconnected = true;
pMqttConnection->keepAliveFailure = true;
if( pMqttConnection->keepAliveMs != 0 )
if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
/* Keep-alive must have a PINGREQ allocated. */
IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );
IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );
/* PINGREQ provides a reference to the connection, so reference count must
* be nonzero. */
@ -779,7 +835,7 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
/* Attempt to cancel the keep-alive job. */
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pMqttConnection->keepAliveJob,
pMqttConnection->pingreq.job,
NULL );
/* If the keep-alive job was not canceled, it must be already executing.
@ -791,13 +847,23 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
* the executing keep-alive job will clean up itself. */
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
/* Clean up PINGREQ packet and job. */
_IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
/* Choose a function to free the packet. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->freePacket != NULL )
{
freePacket = pMqttConnection->pSerializer->freePacket;
}
}
#endif
freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
pMqttConnection->keepAliveMs = 0;
pMqttConnection->pPingreqPacket = NULL;
pMqttConnection->pingreqPacketSize = 0;
pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Keep-alive is cleaned up; decrement reference count. Since this
* function must be followed with a call to DISCONNECT, a check to
@ -817,12 +883,20 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
EMPTY_ELSE_MARKER;
}
/* Copy the function pointers and contexts, as the MQTT connection may be
* modified after the mutex is released. */
disconnectCallback = pMqttConnection->disconnectCallback.function;
pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;
closeConnection = pMqttConnection->pNetworkInterface->close;
pNetworkConnection = pMqttConnection->pNetworkConnection;
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Close the network connection. */
if( pMqttConnection->pNetworkInterface->close != NULL )
if( closeConnection != NULL )
{
closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );
closeStatus = closeConnection( pNetworkConnection );
if( closeStatus == IOT_NETWORK_SUCCESS )
{
@ -842,14 +916,14 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
}
/* Invoke the disconnect callback. */
if( pMqttConnection->disconnectCallback.function != NULL )
if( disconnectCallback != NULL )
{
/* Set the members of the callback parameter. */
callbackParam.mqttConnection = pMqttConnection;
callbackParam.u.disconnectReason = disconnectReason;
pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,
&callbackParam );
disconnectCallback( pDisconnectCallbackContext,
&callbackParam );
}
else
{

View File

@ -44,6 +44,9 @@
#include "platform/iot_clock.h"
#include "platform/iot_threads.h"
/* Atomics include. */
#include "iot_atomic.h"
/*-----------------------------------------------------------*/
/**
@ -133,7 +136,7 @@ static bool _mqttOperation_match( const IotLink_t * pOperationLink,
static bool _checkRetryLimit( _mqttOperation_t * pOperation )
{
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
bool status = true;
bool status = true, setDup = false;
/* Choose a set DUP function. */
void ( * publishSetDup )( uint8_t *,
@ -162,36 +165,65 @@ static bool _checkRetryLimit( _mqttOperation_t * pOperation )
IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );
/* Check if the retry limit is exhausted. */
if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )
if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )
{
/* The retry count may be at most one more than the retry limit, which
* accounts for the final check for a PUBACK. */
IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );
IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==
pOperation->u.operation.periodic.retry.limit + 1 );
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",
pMqttConnection,
pOperation,
pOperation->u.operation.retry.limit );
pOperation->u.operation.periodic.retry.limit );
status = false;
}
/* Check if this is the first retry. */
else if( pOperation->u.operation.retry.count == 1 )
{
/* Always set the DUP flag on the first retry. */
publishSetDup( pOperation->u.operation.pMqttPacket,
pOperation->u.operation.pPacketIdentifierHigh,
&( pOperation->u.operation.packetIdentifier ) );
}
else
{
/* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
* identifier) must be reset on every retry. */
if( pMqttConnection->awsIotMqttMode == true )
if( pOperation->u.operation.periodic.retry.count == 1 )
{
/* The DUP flag should always be set on the first retry. */
setDup = true;
}
else if( pMqttConnection->awsIotMqttMode == true )
{
/* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
* identifier) must be reset on every retry. */
setDup = true;
}
else
{
EMPTY_ELSE_MARKER;
}
if( setDup == true )
{
/* In AWS IoT MQTT mode, the references mutex must be locked to
* prevent the packet identifier from being read while it is being
* changed. */
if( pMqttConnection->awsIotMqttMode == true )
{
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Always set the DUP flag on the first retry. */
publishSetDup( pOperation->u.operation.pMqttPacket,
pOperation->u.operation.pPacketIdentifierHigh,
&( pOperation->u.operation.packetIdentifier ) );
if( pMqttConnection->awsIotMqttMode == true )
{
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
@ -213,14 +245,16 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
/* This function should never be called with retry count greater than
* retry limit. */
IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );
IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=
pOperation->u.operation.periodic.retry.limit );
/* Increment the retry count. */
( pOperation->u.operation.retry.count )++;
( pOperation->u.operation.periodic.retry.count )++;
/* Check for a response shortly for the final retry. Otherwise, calculate the
* next retry period. */
if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )
if( pOperation->u.operation.periodic.retry.count >
pOperation->u.operation.periodic.retry.limit )
{
scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;
@ -232,14 +266,14 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
}
else
{
scheduleDelay = pOperation->u.operation.retry.nextPeriod;
scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;
/* Double the retry period, subject to a ceiling value. */
pOperation->u.operation.retry.nextPeriod *= 2;
pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;
if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )
if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )
{
pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;
pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;
}
else
{
@ -249,12 +283,12 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",
pMqttConnection,
pOperation,
( unsigned long ) pOperation->u.operation.retry.count,
( unsigned long ) pOperation->u.operation.retry.limit,
( unsigned long ) pOperation->u.operation.periodic.retry.count,
( unsigned long ) pOperation->u.operation.periodic.retry.limit,
( unsigned long ) scheduleDelay );
/* Check if this is the first retry. */
firstRetry = ( pOperation->u.operation.retry.count == 1 );
firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );
/* On the first retry, the PUBLISH will be moved from the pending processing
* list to the pending responses list. Lock the connection references mutex
@ -332,7 +366,7 @@ IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
{
IotLogError( "Callback should not be set for a waitable operation." );
return IOT_MQTT_BAD_PARAMETER;
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
@ -509,8 +543,8 @@ bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
pMqttConnection,
IotMqtt_OperationType( pOperation->u.operation.type ),
pOperation,
pOperation->u.operation.jobReference + 1,
pOperation->u.operation.jobReference );
( long ) ( pOperation->u.operation.jobReference + 1 ),
( long ) ( pOperation->u.operation.jobReference ) );
/* The job reference count must be 0 or 1 after the decrement. */
IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||
@ -649,23 +683,30 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
void * pContext )
{
bool status = true;
uint32_t swapStatus = 0;
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
size_t bytesSent = 0;
/* Swap status is not checked when asserts are disabled. */
( void ) swapStatus;
/* Retrieve the MQTT connection from the context. */
_mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;
_mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );
/* Check parameters. */
/* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );
IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );
/* Check that keep-alive interval is valid. The MQTT spec states its maximum
* value is 65,535 seconds. */
IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );
IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );
/* Only two values are valid for the next keep alive job delay. */
IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||
( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );
IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||
( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs
== IOT_MQTT_RESPONSE_WAIT_MS ) );
IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );
@ -676,10 +717,9 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
&pKeepAliveJob );
IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
/* Determine whether to send a PINGREQ or check for PINGRESP. */
if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )
if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
pPingreqOperation->u.operation.periodic.ping.keepAliveMs )
{
IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );
@ -687,10 +727,10 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
* more important than other operations. Bypass the queue of jobs for
* operations by directly sending the PINGREQ in this job. */
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
pMqttConnection->pPingreqPacket,
pMqttConnection->pingreqPacketSize );
pPingreqOperation->u.operation.pMqttPacket,
pPingreqOperation->u.operation.packetSize );
if( bytesSent != pMqttConnection->pingreqPacketSize )
if( bytesSent != pPingreqOperation->u.operation.packetSize )
{
IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );
status = false;
@ -699,10 +739,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
/* Assume the keep-alive will fail. The network receive callback will
* clear the failure flag upon receiving a PINGRESP. */
pMqttConnection->keepAliveFailure = true;
swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),
1,
0 );
IotMqtt_Assert( swapStatus == 1 );
/* Schedule a check for PINGRESP. */
pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;
pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;
IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",
pMqttConnection,
@ -713,12 +756,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );
if( pMqttConnection->keepAliveFailure == false )
if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )
{
IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );
/* PINGRESP was received. Schedule the next PINGREQ transmission. */
pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =
pPingreqOperation->u.operation.periodic.ping.keepAliveMs;
}
else
{
@ -737,13 +781,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,
pKeepAliveJob,
pMqttConnection->nextKeepAliveMs );
pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",
IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",
pMqttConnection,
IOT_MQTT_RESPONSE_WAIT_MS );
( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
}
else
{
@ -769,8 +813,6 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
EMPTY_ELSE_MARKER;
}
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
/*-----------------------------------------------------------*/
@ -799,6 +841,8 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
}
else
{
/* This operation may have already been removed by cleanup of pending
* operations (called from Disconnect). In that case, do nothing here. */
EMPTY_ELSE_MARKER;
}
@ -810,15 +854,9 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
_IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,
&callbackParam );
/* Free any buffers associated with the current PUBLISH message. */
if( pOperation->u.publish.pReceivedData != NULL )
{
IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Free buffers associated with the current PUBLISH message. */
IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );
IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
/* Free the incoming PUBLISH operation. */
IotMqtt_FreeOperation( pOperation );
@ -851,7 +889,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
/* Check PUBLISH retry counts and limits. */
if( pOperation->u.operation.retry.limit > 0 )
if( pOperation->u.operation.periodic.retry.limit > 0 )
{
if( _checkRetryLimit( pOperation ) == false )
{
@ -923,7 +961,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
{
/* Check if this operation should be scheduled for retransmission. */
if( pOperation->u.operation.retry.limit > 0 )
if( pOperation->u.operation.periodic.retry.limit > 0 )
{
if( _scheduleNextRetry( pOperation ) == false )
{
@ -1141,7 +1179,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
/* Check if the matched operation is a PUBLISH with retry. If it is, cancel
* the retry job. */
if( pResult->u.operation.retry.limit > 0 )
if( pResult->u.operation.periodic.retry.limit > 0 )
{
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pResult->job,
@ -1188,7 +1226,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
if( pResult != NULL )
{
IotLogDebug( "(MQTT connection %p) Found operation %s." ,
IotLogDebug( "(MQTT connection %p) Found operation %s.",
pMqttConnection,
IotMqtt_OperationType( type ) );

View File

@ -1375,12 +1375,12 @@ IotMqttError_t _IotMqtt_DeserializePublish( _mqttPacket_t * pPublish )
* a packet identifer, but QoS 0 PUBLISH packets do not. */
if( pOutput->qos == IOT_MQTT_QOS_0 )
{
pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );
pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );
pOutput->pPayload = pPacketIdentifierHigh;
}
else
{
pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );
pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );
pOutput->pPayload = pPacketIdentifierHigh + sizeof( uint16_t );
}

View File

@ -586,7 +586,7 @@ void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnecti
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
IotMqttSubscription_t * pCurrentSubscription )
IotMqttSubscription_t * const pCurrentSubscription )
{
bool status = false;
_mqttSubscription_t * pSubscription = NULL;
@ -595,7 +595,7 @@ bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
topicMatchParams.pTopicName = pTopicFilter;
topicMatchParams.topicNameLength = topicFilterLength;
topicMatchParams.exactMatchOnly = false;
topicMatchParams.exactMatchOnly = true;
/* Prevent any other thread from modifying the subscription list while this
* function is running. */

View File

@ -247,6 +247,83 @@
/*---------------------- MQTT internal data structures ----------------------*/
/**
* @cond DOXYGEN_IGNORE
* Doxygen should ignore this section.
*
* Forward declaration of MQTT connection type.
*/
struct _mqttConnection;
/** @endcond */
/**
* @brief Internal structure representing a single MQTT operation, such as
* CONNECT, SUBSCRIBE, PUBLISH, etc.
*
* Queues of these structures keeps track of all in-progress MQTT operations.
*/
typedef struct _mqttOperation
{
/* Pointers to neighboring queue elements. */
IotLink_t link; /**< @brief List link member. */
bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */
struct _mqttConnection * pMqttConnection; /**< @brief MQTT connection associated with this operation. */
IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */
IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */
union
{
/* If incomingPublish is false, this struct is valid. */
struct
{
/* Basic operation information. */
int32_t jobReference; /**< @brief Tracks if a job is using this operation. Must always be 0, 1, or 2. */
IotMqttOperationType_t type; /**< @brief What operation this structure represents. */
uint32_t flags; /**< @brief Flags passed to the function that created this operation. */
uint16_t packetIdentifier; /**< @brief The packet identifier used with this operation. */
/* Serialized packet and size. */
uint8_t * pMqttPacket; /**< @brief The MQTT packet to send over the network. */
uint8_t * pPacketIdentifierHigh; /**< @brief The location of the high byte of the packet identifier in the MQTT packet. */
size_t packetSize; /**< @brief Size of `pMqttPacket`. */
/* How to notify of an operation's completion. */
union
{
IotSemaphore_t waitSemaphore; /**< @brief Semaphore to be used with @ref mqtt_function_wait. */
IotMqttCallbackInfo_t callback; /**< @brief User-provided callback function and parameter. */
} notify; /**< @brief How to notify of this operation's completion. */
IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */
union
{
struct
{
uint32_t count; /**< @brief Current number of retries. */
uint32_t limit; /**< @brief Maximum number of retries allowed. */
uint32_t nextPeriodMs; /**< @brief Next retry period. */
} retry; /**< @brief Additional information for PUBLISH retry. */
struct
{
uint32_t failure; /**< @brief Flag tracking keep-alive status. */
uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */
uint32_t nextPeriodMs; /**< @brief Relative delay for next keep-alive job. */
} ping; /**< @brief Additional information for keep-alive pings. */
} periodic; /**< @brief Additional information for periodic operations. */
} operation;
/* If incomingPublish is true, this struct is valid. */
struct
{
IotMqttPublishInfo_t publishInfo; /**< @brief Deserialized PUBLISH. */
const void * pReceivedData; /**< @brief Any buffer associated with this PUBLISH that should be freed. */
} publish;
} u; /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */
} _mqttOperation_t;
/**
* @brief Represents an MQTT connection.
*/
@ -262,22 +339,16 @@ typedef struct _mqttConnection
const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */
#endif
bool disconnected; /**< @brief Tracks if this connection has been disconnected. */
IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */
int32_t references; /**< @brief Counts callbacks and operations using this connection. */
IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */
IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */
bool disconnected; /**< @brief Tracks if this connection has been disconnected. */
IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */
int32_t references; /**< @brief Counts callbacks and operations using this connection. */
IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */
IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */
IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */
IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */
IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */
IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */
bool keepAliveFailure; /**< @brief Failure flag for keep-alive operation. */
uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */
uint32_t nextKeepAliveMs; /**< @brief Relative delay for next keep-alive job. */
IotTaskPoolJobStorage_t keepAliveJobStorage; /**< @brief Task pool job for processing this connection's keep-alive. */
IotTaskPoolJob_t keepAliveJob; /**< @brief Task pool job for processing this connection's keep-alive. */
uint8_t * pPingreqPacket; /**< @brief An MQTT PINGREQ packet, allocated if keep-alive is active. */
size_t pingreqPacketSize; /**< @brief The size of an allocated PINGREQ packet. */
_mqttOperation_t pingreq; /**< @brief Operation used for MQTT keep-alive. */
} _mqttConnection_t;
/**
@ -312,64 +383,6 @@ typedef struct _mqttSubscription
char pTopicFilter[]; /**< @brief The subscription topic filter. */
} _mqttSubscription_t;
/**
* @brief Internal structure representing a single MQTT operation, such as
* CONNECT, SUBSCRIBE, PUBLISH, etc.
*
* Queues of these structures keeps track of all in-progress MQTT operations.
*/
typedef struct _mqttOperation
{
/* Pointers to neighboring queue elements. */
IotLink_t link; /**< @brief List link member. */
bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */
_mqttConnection_t * pMqttConnection; /**< @brief MQTT connection associated with this operation. */
IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */
IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */
union
{
/* If incomingPublish is false, this struct is valid. */
struct
{
/* Basic operation information. */
int32_t jobReference; /**< @brief Tracks if a job is using this operation. Must always be 0, 1, or 2. */
IotMqttOperationType_t type; /**< @brief What operation this structure represents. */
uint32_t flags; /**< @brief Flags passed to the function that created this operation. */
uint16_t packetIdentifier; /**< @brief The packet identifier used with this operation. */
/* Serialized packet and size. */
uint8_t * pMqttPacket; /**< @brief The MQTT packet to send over the network. */
uint8_t * pPacketIdentifierHigh; /**< @brief The location of the high byte of the packet identifier in the MQTT packet. */
size_t packetSize; /**< @brief Size of `pMqttPacket`. */
/* How to notify of an operation's completion. */
union
{
IotSemaphore_t waitSemaphore; /**< @brief Semaphore to be used with @ref mqtt_function_wait. */
IotMqttCallbackInfo_t callback; /**< @brief User-provided callback function and parameter. */
} notify; /**< @brief How to notify of this operation's completion. */
IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */
struct
{
uint32_t count;
uint32_t limit;
uint32_t nextPeriod;
} retry;
} operation;
/* If incomingPublish is true, this struct is valid. */
struct
{
IotMqttPublishInfo_t publishInfo; /**< @brief Deserialized PUBLISH. */
const void * pReceivedData; /**< @brief Any buffer associated with this PUBLISH that should be freed. */
} publish;
} u; /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */
} _mqttOperation_t;
/**
* @brief Represents an MQTT packet received from the network.
*