消息队列
1 消息队列基础
1.1 消息队列的运作机制
创建消息队列时FreeRTOS会先给消息队列分配一块内存空间,这块内存的大小等于消息队列控制块大小加上(单个消息空间大小与消息队列长度的乘积),接着再初始化消息队列,此时消息队列为空。
任务或者中断服务程序都可以给消息队列发送消息,当发送消息时:
- 如果队列未满或者允许覆盖入队,FreeRTOS会将消息拷贝到消息队列队尾
- 否则(队列已满),会根据用户指定的阻塞超时时间进行阻塞,在这段时间中,如果队列一直不允许入队,该任务将保持阻塞状态以等待队列允许入队。
- 当其它任务从其等待的队列中读取入了数据(队列未满),该任务将自动由阻塞态转换为就绪态。
- 当等待的时间超过了指定的阻塞时间,即使队列中还不允许入队,任务也会自动从阻塞态转移为就绪态,此时发送消息的任务或者中断程序会收到一个错误码
errQUEUE_FULL
。
发送紧急消息的过程与发送消息几乎一样,唯一的不同是,当发送紧急消息时,发送的位置是消息队列队头而非队尾,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理。
1.2 消息队列的阻塞机制
我们使用的消息队列一般不是属于某个任务的队列,在很多时候,我们创建的队列,是每个任务都可以去对他进行读写操作的,但是为了保护每个任务对它进行读写操作的过程,我们必须要有阻塞机制,在某个任务对它读写操作的时候,必须保证该任务能正常完成读写操作,而不受后来的任务干扰。那么,如何实现这个机制呢,其实FreeRTOS已经为我们做好了,每个对消息队列读写的函数,都有这种机制,我称之为阻塞机制。
1.2.1 接收消息
假设有一个任务A对某个队列进行读操作的时候(也就是我们所说的出队),发现它没有消息,那么此时任务A有3个选择:
- 任务A扭头就走,既然队列没有消息,那我也不等了,干其它事情去,这样子任务A不会进入阻塞态。
- 任务A还是在这里等等吧,可能过一会队列就有消息,此时任务A会进入阻塞状态,在等待着消息的道来,而任务A的等待时间就由我们自己定义,比如设置1000个系统时钟节拍tick的等待,在这1000个tick到来之前任务A都是处于阻塞态,当阻塞的这段时间任务A等到了队列的消息,那么任务A就会从阻塞态变成就绪态,如果此时任务A比当前运行的任务优先级还高,那么,任务A就会得到消息并且运行;假如1000个tick都过去了,队列还没消息,那任务A就不等了,从阻塞态中唤醒,返回一个没等到消息的错误代码,然后继续执行任务A的其他代码。
- 任务A死等,不等到消息就不走了,这样子任务A就会进入阻塞态,直到完成读取队列的消息。
1.2.2 发送消息
在发送消息操作的时候,为了保护数据,当且仅当队列允许入队的时候,发送者才能成功发送消息
- 队列中无可用消息空间时,说明消息队列已满,此时,系统会根据用户指定的阻塞超时时间将任务阻塞,在指定的超时时间内如果还不能完成入队操作,发送消息的任务或者中断服务程序会收到一个错误码errQUEUE_FULL,然后解除阻塞状态;
- 当然,只有在任务中发送消息才允许进行阻塞状态,而在中断中发送消息不允许带有阻塞机制的,需要调用在中断中发送消息的API函数接口,因为发送消息的上下文环境是在中断中,不允许有阻塞的情况。
假如有多个任务阻塞在一个消息队列中,那么这些阻塞的任务将按照任务优先级进行排序,优先级高的任务将优先获得队列的访问权。
2 消息队列创建
2.1 消息队列控制块
来看一下消息队列控制块(结构体)的结构组成:
FreeRTOS的消息队列控制块由多个元素组成,当消息队列被创建时,系统会为控制块分配对应的内存空间,用于保存消息队列的一些信息如消息的存储位置,头指针pcHead
、尾指针pcTail
、消息大小uxItemSize
以及队列长度uxLength
等。
每个消息队列都与消息空间在同一段连续的内存空间中,在创建成功的时候,这些内存就被占用了,只有删除了消息队列的时候,这段内存才会被释放掉,创建成功的时候就已经分配好每个消息空间与消息队列的容量,无法更改,每个消息空间可以存放不大于消息大小uxItemSize
的任意类型的数据,所有消息队列中的消息空间总数即是消息队列的长度,这个长度可在消息队列创建时指定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| typedef struct QueueDefinition { int8_t *pcHead; int8_t *pcTail; int8_t *pcWriteTo;
union /* 使用联合体用来确保两个互斥的结构体成员不会同时出现 */ { int8_t *pcReadFrom; UBaseType_t uxRecursiveCallCount; } u;
List_t xTasksWaitingToSend; List_t xTasksWaitingToReceive;
volatile UBaseType_t uxMessagesWaiting; UBaseType_t uxLength; UBaseType_t uxItemSize;
volatile int8_t cRxLock; volatile int8_t cTxLock;
#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) ) uint8_t ucStaticallyAllocated; #endif
#if ( configUSE_QUEUE_SETS == 1 ) struct QueueDefinition *pxQueueSetContainer; #endif
#if ( configUSE_TRACE_FACILITY == 1 ) UBaseType_t uxQueueNumber; uint8_t ucQueueType; #endif
} xQUEUE;
typedef xQUEUE Queue_t;
|
队列的类型定义:
1 2 3 4 5 6
| #define queueQUEUE_TYPE_BASE ( ( uint8_t ) 0U ) #define queueQUEUE_TYPE_SET ( ( uint8_t ) 0U ) #define queueQUEUE_TYPE_MUTEX ( ( uint8_t ) 1U ) #define queueQUEUE_TYPE_COUNTING_SEMAPHORE ( ( uint8_t ) 2U ) #define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) #define queueQUEUE_TYPE_RECURSIVE_MUTEX ( ( uint8_t ) 4U )
|
queueQUEUE_TYPE_BASE即基本的消息队列,另外,信号量机制也是通过队列实现的,因此当用于互斥信号量,二值信号量等时,会标记对于的队列类型。
2.2 创建消息队列函数
创建消息队列的函数实际为xQueueGenericCreate()这个函数,该函数首先进行队列的内存分配,然后调用prvInitialiseNewQueue()进行队列的初始化:
输入参数:
- uxQueueLength:队列的长度
- uxItemSize:单个消息的大小
- ucQueueType:队列的类型(用途)
返回值:
- pxNewQueue:消息队列控制块,一个结构体指针(QueueHandle_t句柄,实际是void*),也即消息队列在内存中的地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType ) { Queue_t *pxNewQueue; size_t xQueueSizeInBytes; uint8_t *pucQueueStorage;
configASSERT( uxQueueLength > ( UBaseType_t ) 0 ); if( uxItemSize == ( UBaseType_t ) 0 ) { xQueueSizeInBytes = ( size_t ) 0; } else { xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); }
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes ); if( pxNewQueue != NULL ) { pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t ); #if( configSUPPORT_STATIC_ALLOCATION == 1 ) { pxNewQueue->ucStaticallyAllocated = pdFALSE; } #endif
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue ); }
return pxNewQueue; }
|
2.3 初始化队列准备
prvInitialiseNewQueue()函数首先初始化队列的长度和消息大小等参数,然后调用xQueueGenericReset()函数进行队列复位(初始化):
输入参数:
- uxQueueLength:队列的长度
- uxItemSize:单个消息的大小
- pucQueueStorage:实际存放消息的地址
- ucQueueType:队列的类型(用途)
- pxNewQueue:消息队列控制块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue ) { ( void ) ucQueueType; if( uxItemSize == ( UBaseType_t ) 0 ) {
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue; } else { pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage; }
pxNewQueue->uxLength = uxQueueLength; pxNewQueue->uxItemSize = uxItemSize; ( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
#if ( configUSE_TRACE_FACILITY == 1 ) { pxNewQueue->ucQueueType = ucQueueType; } #endif
#if( configUSE_QUEUE_SETS == 1 ) { pxNewQueue->pxQueueSetContainer = NULL; } #endif
traceQUEUE_CREATE( pxNewQueue ); }
|
2.4 队列复位(初始化)
xQueueGenericReset()函数中会调用vListInitialise()函数构建列表结构:
输入参数:
- xQueue:消息队列控制块(内存中分配的消息队列的地址)
- xNewQueue:是否为新的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue ) { Queue_t * const pxQueue = ( Queue_t * ) xQueue;
configASSERT( pxQueue ); taskENTER_CRITICAL(); { pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize ); pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U; pxQueue->pcWriteTo = pxQueue->pcHead; pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize ); pxQueue->cRxLock = queueUNLOCKED; pxQueue->cTxLock = queueUNLOCKED;
if( xNewQueue == pdFALSE ) {
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { queueYIELD_IF_USING_PREEMPTION(); } } } else { vListInitialise( &( pxQueue->xTasksWaitingToSend ) ); vListInitialise( &( pxQueue->xTasksWaitingToReceive ) ); } } taskEXIT_CRITICAL(); return pdPASS; }
|
2.5 构建列表结构
这里先来看一下列表结构体的定义:
1 2 3 4 5 6 7 8
| typedef struct xLIST { listFIRST_LIST_INTEGRITY_CHECK_VALUE configLIST_VOLATILE UBaseType_t uxNumberOfItems; ListItem_t * configLIST_VOLATILE pxIndex; MiniListItem_t xListEnd; listSECOND_LIST_INTEGRITY_CHECK_VALUE } List_t;
|
再来看vListInitialise这个函数:
输入参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void vListInitialise( List_t * const pxList ) { pxList->pxIndex = ( ListItem_t * ) &( pxList->xListEnd );
pxList->xListEnd.xItemValue = portMAX_DELAY;
pxList->xListEnd.pxNext = ( ListItem_t * ) &( pxList->xListEnd ); pxList->xListEnd.pxPrevious = ( ListItem_t * ) &( pxList->xListEnd );
pxList->uxNumberOfItems = ( UBaseType_t ) 0U;
listSET_LIST_INTEGRITY_CHECK_1_VALUE( pxList ); listSET_LIST_INTEGRITY_CHECK_2_VALUE( pxList ); }
|
3 消息队列发送与接收
在消息队列的发送和接收过程中,可能会进行阻塞延时(发送消息但队列已满,结束消息但队列中没有消息),此时会进行任务切换,关于任务切换的内容可参考:
3.1 发送消息队列函数
xQueueSend()函数实际是使用xQueueGenericSend()这个函数:
输入参数:
- xQueue:消息队列控制块
- pvItemToQueue:要发送的消息数据
- xTicksToWait:发送允许的阻塞时间
- xCopyPosition:发送到消息队列的位置
先来看一下发送允许的位置,包括发送到队尾,发送到队头,覆盖写入3种。
1 2 3
| #define queueSEND_TO_BACK ( ( BaseType_t ) 0 ) #define queueSEND_TO_FRONT ( ( BaseType_t ) 1 ) #define queueOVERWRITE ( ( BaseType_t ) 2 )
|
再来看一下xQueueGenericSend这个函数,xQueueSend默认是发送到队尾:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| #define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )
BaseType_t xQueueGenericSend( QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait, const BaseType_t xCopyPosition ) { BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired; TimeOut_t xTimeOut; Queue_t * const pxQueue = ( Queue_t * ) xQueue;
configASSERT( pxQueue ); configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) ); configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) ); #if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) ) { configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) ); } #endif
for( ;; ) { taskENTER_CRITICAL(); { if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) ) { traceQUEUE_SEND( pxQueue ); xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
#if ( configUSE_QUEUE_SETS == 1 ) { if( pxQueue->pxQueueSetContainer != NULL ) { if( prvNotifyQueueSetContainer( pxQueue, xCopyPosition ) != pdFALSE ) { queueYIELD_IF_USING_PREEMPTION(); } } else { if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) {
queueYIELD_IF_USING_PREEMPTION(); } } else if( xYieldRequired != pdFALSE ) { queueYIELD_IF_USING_PREEMPTION(); } } } #else #endif
taskEXIT_CRITICAL(); return pdPASS; } else { if( xTicksToWait == ( TickType_t ) 0 ) { taskEXIT_CRITICAL(); traceQUEUE_SEND_FAILED( pxQueue ); return errQUEUE_FULL; } else if( xEntryTimeSet == pdFALSE ) { vTaskSetTimeOutState( &xTimeOut ); xEntryTimeSet = pdTRUE; } } } taskEXIT_CRITICAL(); vTaskSuspendAll(); prvLockQueue( pxQueue ); if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) { if( prvIsQueueFull( pxQueue ) != pdFALSE ) { traceBLOCKING_ON_QUEUE_SEND( pxQueue ); vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );
prvUnlockQueue( pxQueue );
if( xTaskResumeAll() == pdFALSE ) { portYIELD_WITHIN_API(); } } else { prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); } } else { prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll();
traceQUEUE_SEND_FAILED( pxQueue ); return errQUEUE_FULL; } } }
|
3.2 接收消息队列
xQueueReceive()函数实际是使用xQueueGenericReceive()这个函数:
输入参数:
- xQueue:消息队列控制块
- pvBuffer:要接收的消息数据
- xTicksToWait:接收允许的阻塞时间
- xJustPeeking:是否是Peek(只获取不删除)模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| #define xQueueReceive( xQueue, pvBuffer, xTicksToWait ) xQueueGenericReceive( ( xQueue ), ( pvBuffer ), ( xTicksToWait ), pdFALSE )
BaseType_t xQueueGenericReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait, const BaseType_t xJustPeeking ) { BaseType_t xEntryTimeSet = pdFALSE; TimeOut_t xTimeOut; int8_t *pcOriginalReadPosition; Queue_t * const pxQueue = ( Queue_t * ) xQueue;
configASSERT( pxQueue ); configASSERT( !( ( pvBuffer == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) ); #if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) ) { configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) ); } #endif
for( ;; ) { taskENTER_CRITICAL(); { const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting; if( uxMessagesWaiting > ( UBaseType_t ) 0 ) { pcOriginalReadPosition = pxQueue->u.pcReadFrom; prvCopyDataFromQueue( pxQueue, pvBuffer ); if( xJustPeeking == pdFALSE ) { traceQUEUE_RECEIVE( pxQueue ); pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1; #if ( configUSE_MUTEXES == 1 ) { if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) { pxQueue->pxMutexHolder = ( int8_t * ) pvTaskIncrementMutexHeldCount(); } } #endif if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { queueYIELD_IF_USING_PREEMPTION(); } } } else { traceQUEUE_PEEK( pxQueue ); pxQueue->u.pcReadFrom = pcOriginalReadPosition; if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) { queueYIELD_IF_USING_PREEMPTION(); } } } taskEXIT_CRITICAL(); return pdPASS; } else { if( xTicksToWait == ( TickType_t ) 0 ) { taskEXIT_CRITICAL(); traceQUEUE_RECEIVE_FAILED( pxQueue ); return errQUEUE_EMPTY; } else if( xEntryTimeSet == pdFALSE ) { vTaskSetTimeOutState( &xTimeOut ); xEntryTimeSet = pdTRUE; } } } taskEXIT_CRITICAL(); vTaskSuspendAll(); prvLockQueue( pxQueue ); if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) { if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) { #if ( configUSE_MUTEXES == 1 ) { if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) { taskENTER_CRITICAL(); { vTaskPriorityInherit( ( void * ) pxQueue->pxMutexHolder ); } taskEXIT_CRITICAL(); } } #endif vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait ); prvUnlockQueue( pxQueue ); if( xTaskResumeAll() == pdFALSE ) { portYIELD_WITHIN_API(); } } else { prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); } } else { prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) { traceQUEUE_RECEIVE_FAILED( pxQueue ); return errQUEUE_EMPTY; } } } }
|