消息队列

1 消息队列基础

1.1 消息队列的运作机制

创建消息队列时FreeRTOS会先给消息队列分配一块内存空间,这块内存的大小等于消息队列控制块大小加上(单个消息空间大小与消息队列长度的乘积),接着再初始化消息队列,此时消息队列为空。

任务或者中断服务程序都可以给消息队列发送消息,当发送消息时:

  • 如果队列未满或者允许覆盖入队,FreeRTOS会将消息拷贝到消息队列队尾
  • 否则(队列已满),会根据用户指定的阻塞超时时间进行阻塞,在这段时间中,如果队列一直不允许入队,该任务将保持阻塞状态以等待队列允许入队。
    • 当其它任务从其等待的队列中读取入了数据(队列未满),该任务将自动由阻塞态转换为就绪态。
    • 当等待的时间超过了指定的阻塞时间,即使队列中还不允许入队,任务也会自动从阻塞态转移为就绪态,此时发送消息的任务或者中断程序会收到一个错误码errQUEUE_FULL

发送紧急消息的过程与发送消息几乎一样,唯一的不同是,当发送紧急消息时,发送的位置是消息队列队头而非队尾,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理。

1.2 消息队列的阻塞机制

我们使用的消息队列一般不是属于某个任务的队列,在很多时候,我们创建的队列,是每个任务都可以去对他进行读写操作的,但是为了保护每个任务对它进行读写操作的过程,我们必须要有阻塞机制,在某个任务对它读写操作的时候,必须保证该任务能正常完成读写操作,而不受后来的任务干扰。那么,如何实现这个机制呢,其实FreeRTOS已经为我们做好了,每个对消息队列读写的函数,都有这种机制,我称之为阻塞机制。

1.2.1 接收消息

假设有一个任务A对某个队列进行读操作的时候(也就是我们所说的出队),发现它没有消息,那么此时任务A有3个选择:

  • 任务A扭头就走,既然队列没有消息,那我也不等了,干其它事情去,这样子任务A不会进入阻塞态。
  • 任务A还是在这里等等吧,可能过一会队列就有消息,此时任务A会进入阻塞状态,在等待着消息的道来,而任务A的等待时间就由我们自己定义,比如设置1000个系统时钟节拍tick的等待,在这1000个tick到来之前任务A都是处于阻塞态,当阻塞的这段时间任务A等到了队列的消息,那么任务A就会从阻塞态变成就绪态,如果此时任务A比当前运行的任务优先级还高,那么,任务A就会得到消息并且运行;假如1000个tick都过去了,队列还没消息,那任务A就不等了,从阻塞态中唤醒,返回一个没等到消息的错误代码,然后继续执行任务A的其他代码。
  • 任务A死等,不等到消息就不走了,这样子任务A就会进入阻塞态,直到完成读取队列的消息。

1.2.2 发送消息

发送消息操作的时候,为了保护数据,当且仅当队列允许入队的时候,发送者才能成功发送消息

  • 队列中无可用消息空间时,说明消息队列已满,此时,系统会根据用户指定的阻塞超时时间将任务阻塞,在指定的超时时间内如果还不能完成入队操作,发送消息的任务或者中断服务程序会收到一个错误码errQUEUE_FULL,然后解除阻塞状态;
  • 当然,只有在任务中发送消息才允许进行阻塞状态,而在中断中发送消息不允许带有阻塞机制的,需要调用在中断中发送消息的API函数接口,因为发送消息的上下文环境是在中断中,不允许有阻塞的情况。

假如有多个任务阻塞在一个消息队列中,那么这些阻塞的任务将按照任务优先级进行排序,优先级高的任务将优先获得队列的访问权。

2 消息队列创建

2.1 消息队列控制块

来看一下消息队列控制块(结构体)的结构组成:

FreeRTOS的消息队列控制块由多个元素组成,当消息队列被创建时,系统会为控制块分配对应的内存空间,用于保存消息队列的一些信息如消息的存储位置,头指针pcHead、尾指针pcTail、消息大小uxItemSize以及队列长度uxLength等。

每个消息队列都与消息空间在同一段连续的内存空间中,在创建成功的时候,这些内存就被占用了,只有删除了消息队列的时候,这段内存才会被释放掉,创建成功的时候就已经分配好每个消息空间与消息队列的容量,无法更改,每个消息空间可以存放不大于消息大小uxItemSize的任意类型的数据,所有消息队列中的消息空间总数即是消息队列的长度,这个长度可在消息队列创建时指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
typedef struct QueueDefinition
{
int8_t *pcHead; /* 指向队列消息存储区起始位置,即第一个消息空间 */
int8_t *pcTail; /* 指向队列消息存储区结束位置地址。一旦分配的字节超过了存储队列项的需要,就将其用作一个标记 */
int8_t *pcWriteTo; /* 指向队列消息存储区下一个可以写入消息的位置*/

union /* 使用联合体用来确保两个互斥的结构体成员不会同时出现 */
{
int8_t *pcReadFrom; /* 当结构体用于队列时,pcReadFrom指向出队消息空间的最后一个 */
UBaseType_t uxRecursiveCallCount;/* 当结构体用于互斥量时,uxRecursiveCallCount用于计数,记录递归互斥量被“调用”的次数 */
} u;

List_t xTasksWaitingToSend; /* 一个发送消息的阻塞列表,用于保存阻塞在此队列的任务,任务按照优先级进行排序,由于队列已满,想要发送消息的任务无法发送消息 */
List_t xTasksWaitingToReceive; /* 一个获取消息的阻塞列表,用于保存阻塞在此队列的任务,任务按照优先级进行排序,由于队列是空的,想要获取消息的任务无法获取到消息 */

volatile UBaseType_t uxMessagesWaiting;/* 记录当前消息队列的消息个数,如果消息队列被用于信号量的时候,这个值就表示有效信号量个数*/
UBaseType_t uxLength; /* 队列的长度,也就是能存放多少消息 */
UBaseType_t uxItemSize; /* 单个消息的大小 */

volatile int8_t cRxLock; /* 队列上锁后,储存从队列收到的列表项数目,也就是出队的数量,如果队列没有上锁,设置为queueUNLOCKED */
volatile int8_t cTxLock; /* 队列上锁后,储存发送到队列的列表项数目,也就是入队的数量,如果队列没有上锁,设置为queueUNLOCKED */

#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated; /* 如果队列使用的内存是静态分配的,以确保不会尝试释放内存,则设置为pdTRUE */
#endif

#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition *pxQueueSetContainer; /* 队列集 */
#endif

#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType; /* 队列的类型(用途) */
#endif

} xQUEUE;

/* 旧版本的FreeRTOS使用xQUEUE,新版本使用Queue_t代替*/
typedef xQUEUE Queue_t;

队列的类型定义:

1
2
3
4
5
6
#define queueQUEUE_TYPE_BASE				( ( uint8_t ) 0U ) /* 基础的队列 */
#define queueQUEUE_TYPE_SET ( ( uint8_t ) 0U )
#define queueQUEUE_TYPE_MUTEX ( ( uint8_t ) 1U ) /* 互斥信号量 */
#define queueQUEUE_TYPE_COUNTING_SEMAPHORE ( ( uint8_t ) 2U ) /* 计数信号量 */
#define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) /* 二值信号量 */
#define queueQUEUE_TYPE_RECURSIVE_MUTEX ( ( uint8_t ) 4U ) /* 递归互斥信号量 */

queueQUEUE_TYPE_BASE即基本的消息队列,另外,信号量机制也是通过队列实现的,因此当用于互斥信号量,二值信号量等时,会标记对于的队列类型。

2.2 创建消息队列函数

创建消息队列的函数实际为xQueueGenericCreate()这个函数,该函数首先进行队列的内存分配,然后调用prvInitialiseNewQueue()进行队列的初始化:

输入参数:

  • uxQueueLength:队列的长度
  • uxItemSize:单个消息的大小
  • ucQueueType:队列的类型(用途)

返回值:

  • pxNewQueue:消息队列控制块,一个结构体指针(QueueHandle_t句柄,实际是void*),也即消息队列在内存中的地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )

QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType )
{
Queue_t *pxNewQueue; /* 消息队列控制块,一个结构体指针*/
size_t xQueueSizeInBytes; /* 需要分配的内存大小,Bytes为单位 */
uint8_t *pucQueueStorage; /* 实际存放消息的地址,即消息队列控制块的后面 */

configASSERT( uxQueueLength > ( UBaseType_t ) 0 ); /*判断要创建的消息队列的长度是否大于0*/
if( uxItemSize == ( UBaseType_t ) 0 )
{ /* 若消息队列单个项的大小为0,则不会分配队列存储区域 */
xQueueSizeInBytes = ( size_t ) 0;
}
else
{ /* 计算需要分配的内存大小:队列的长度*单个消息的大小 */
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
}

/* 调用pvPortMalloc分配内存:其大小为消息队列控制块的大小+实际的消息占用的大小 */
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );
/* 若成功分配了内存,则pxNewQueue这个结构体指针(消息队列控制块)不为NULL */
if( pxNewQueue != NULL )
{ /* 跳过消息队列控制块以获取实际的队列存储区域的指针 */
pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );
#if( configSUPPORT_STATIC_ALLOCATION == 1 )
{ /* 队列可以静态创建,也可以动态创建,因此请注意,此任务是动态创建的,以防以后删除 */
pxNewQueue->ucStaticallyAllocated = pdFALSE;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */

/*初始化一个队列*/
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}

return pxNewQueue;
}

2.3 初始化队列准备

prvInitialiseNewQueue()函数首先初始化队列的长度和消息大小等参数,然后调用xQueueGenericReset()函数进行队列复位(初始化):

输入参数:

  • uxQueueLength:队列的长度
  • uxItemSize:单个消息的大小
  • pucQueueStorage:实际存放消息的地址
  • ucQueueType:队列的类型(用途)
  • pxNewQueue:消息队列控制块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue )
{
( void ) ucQueueType;
/* 判断单个消息的大小是否为0*/
if( uxItemSize == ( UBaseType_t ) 0 )
{
/* 没有为队列存储区域分配RAM,但是pcHead不能设置为NULL,因为NULL被用作一个键来表示队列被用作互斥锁。
因此,只需将pcHead设置为消息队列控制块的地址 */
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{ /* 将pcHead设置为实际存放消息的地址 */
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}

/* 按照定义队列类型的位置初始化队列成员 */
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE ); /*队列复位(初始化)*/

#if ( configUSE_TRACE_FACILITY == 1 )
{
pxNewQueue->ucQueueType = ucQueueType;
}
#endif /* configUSE_TRACE_FACILITY */

#if( configUSE_QUEUE_SETS == 1 )
{
pxNewQueue->pxQueueSetContainer = NULL;
}
#endif /* configUSE_QUEUE_SETS */

traceQUEUE_CREATE( pxNewQueue );
}

2.4 队列复位(初始化)

xQueueGenericReset()函数中会调用vListInitialise()函数构建列表结构:

输入参数:

  • xQueue:消息队列控制块(内存中分配的消息队列的地址)
  • xNewQueue:是否为新的队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue )
{
Queue_t * const pxQueue = ( Queue_t * ) xQueue; /* 将内存地址转为消息队列控制块结构体类型,并赋值给局部变量pxQueue */

configASSERT( pxQueue );
taskENTER_CRITICAL(); /* 进入临界区 */
{
/* 队列尾地址 = 头地址 + 队列长度*单个消息的大小 */
pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
/* 当前消息队列的消息个数初始化为0 */
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
/* 可写入消息的位置,初始为pcHead这个位置 */
pxQueue->pcWriteTo = pxQueue->pcHead;
/* 可读取消息的位置,初始为消息队列的队尾位置? */
pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
pxQueue->cRxLock = queueUNLOCKED;
pxQueue->cTxLock = queueUNLOCKED;

if( xNewQueue == pdFALSE ) /* 不是新的队列 */
{
/* 如果有阻塞的任务等待从队列读取,那么这些任务将保持阻塞状态,因为在此函数退出后,队列仍然是空的。
如果有被阻塞的任务等待写入队列,那么应该解除阻塞,因为在这个函数退出后,将可能写入它*/
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
}
}
else /* 是新的队列,则构建消息队列结构 */
{
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
}
taskEXIT_CRITICAL(); /* 退出临界区 */
return pdPASS;
}

2.5 构建列表结构

这里先来看一下列表结构体的定义:

1
2
3
4
5
6
7
8
typedef struct xLIST
{
listFIRST_LIST_INTEGRITY_CHECK_VALUE /*configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES是否被设置为1 */
configLIST_VOLATILE UBaseType_t uxNumberOfItems; /* 链表节点计数器 */
ListItem_t * configLIST_VOLATILE pxIndex; /* 链表节点索引指针 */
MiniListItem_t xListEnd; /* 链表最后一个节点 */
listSECOND_LIST_INTEGRITY_CHECK_VALUE /*configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES是否被设置为1 */
} List_t;

再来看vListInitialise这个函数:

输入参数:

  • pxList:发送或接收消息的阻塞列表的结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void vListInitialise( List_t * const pxList )
{
/* 列表结构包含一个用于标记列表结束的列表项。要初始化列表,将列表末尾作为唯一的列表条目插入。 */
pxList->pxIndex = ( ListItem_t * ) &( pxList->xListEnd );

/* 列表结束值是列表中可能的最高值,以确保它保持在列表的末尾. */
pxList->xListEnd.xItemValue = portMAX_DELAY;

/* 链表的下一个和前一个指针都指向它自己,所以我们知道链表什么时候是空的 */
pxList->xListEnd.pxNext = ( ListItem_t * ) &( pxList->xListEnd );
pxList->xListEnd.pxPrevious = ( ListItem_t * ) &( pxList->xListEnd );

pxList->uxNumberOfItems = ( UBaseType_t ) 0U;

/*如果configUSE_LIST_DATA_INTEGRITY_CHECK_BYTES=1,将已知值写入列表 */
listSET_LIST_INTEGRITY_CHECK_1_VALUE( pxList );
listSET_LIST_INTEGRITY_CHECK_2_VALUE( pxList );
}

3 消息队列发送与接收

在消息队列的发送和接收过程中,可能会进行阻塞延时(发送消息但队列已满,结束消息但队列中没有消息),此时会进行任务切换,关于任务切换的内容可参考:

3.1 发送消息队列函数

xQueueSend()函数实际是使用xQueueGenericSend()这个函数:

输入参数:

  • xQueue:消息队列控制块
  • pvItemToQueue:要发送的消息数据
  • xTicksToWait:发送允许的阻塞时间
  • xCopyPosition:发送到消息队列的位置

先来看一下发送允许的位置,包括发送到队尾,发送到队头,覆盖写入3种。

1
2
3
#define	queueSEND_TO_BACK		( ( BaseType_t ) 0 )
#define queueSEND_TO_FRONT ( ( BaseType_t ) 1 )
#define queueOVERWRITE ( ( BaseType_t ) 2 )

再来看一下xQueueGenericSend这个函数,xQueueSend默认是发送到队尾:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )

BaseType_t xQueueGenericSend( QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait, const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired; /* 超时时间是否已设置 */
TimeOut_t xTimeOut; /* 阻塞等待的超时时间结构体*/
Queue_t * const pxQueue = ( Queue_t * ) xQueue; /* 消息队列控制块*/

configASSERT( pxQueue );
configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif

/* 这个函数放松了编码标准,允许在函数本身中使用返回语句。这样做是为了提高执行时间效率 */
for( ;; )
{
taskENTER_CRITICAL();
{
/*当前消息队列的个数未满或设置了可以覆盖写入 */
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
traceQUEUE_SEND( pxQueue );
/* 复制数据到队列中 */
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

#if ( configUSE_QUEUE_SETS == 1 )
{
/* 队列所属队列集不为空 */
if( pxQueue->pxQueueSetContainer != NULL )
{
/* 向队列集发送通知消息 */
if( prvNotifyQueueSetContainer( pxQueue, xCopyPosition ) != pdFALSE )
{
/* 该队列是队列集的成员,向队列集发送消息会导致一个优先级更高的任务解除阻塞,需要上下文切换 */
queueYIELD_IF_USING_PREEMPTION();
}
}
/* 队列所属队列集为空 */
else
{
/* 如果有任务等待数据到达队列,那么现在解除阻塞 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 未阻塞的任务的优先级比我们自己的要高,所以要立即执行。
是的,在临界区内这样做是没问题的——内核会负责这一点。 */
queueYIELD_IF_USING_PREEMPTION();
}
}
else if( xYieldRequired != pdFALSE )
{
/* 此路径是一种特殊情况,只有在任务持有多个互斥锁并且这些互斥锁返回的顺序与获取它们的顺序不同时才会执行 */
queueYIELD_IF_USING_PREEMPTION();
}
}
}
#else /* configUSE_QUEUE_SETS */
//...省略部分
#endif /* configUSE_QUEUE_SETS */

taskEXIT_CRITICAL();
return pdPASS;
}
else /* 队列已满,无法写入时 */
{
if( xTicksToWait == ( TickType_t ) 0 ) /* 若设置的阻塞等待时间为0 */
{ /* 那就不写了,直接退出临界区 */
taskEXIT_CRITICAL();
/* 退出函数之前返回原始权限级别 */
traceQUEUE_SEND_FAILED( pxQueue );
/* 返回“队列满”的提示 */
return errQUEUE_FULL;
}
else if( xEntryTimeSet == pdFALSE ) /* 若设置的阻塞等待时间不为0,且进入时间还未设置 */
{ /* 队列已满,并且指定了一个块时间,因此配置超时结构体 */
vTaskSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
}
}
taskEXIT_CRITICAL();

/* 现在临界区已经退出,中断和其他任务可以向队列发送和从队列接收 */
vTaskSuspendAll();
prvLockQueue( pxQueue );
/* 更新超时状态,看看它是否已经过期 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) /*未超时*/
{
if( prvIsQueueFull( pxQueue ) != pdFALSE )
{
traceBLOCKING_ON_QUEUE_SEND( pxQueue );
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );

/* 解除队列锁定意味着队列事件可以影响事件列表。现在发生的中断可能会再次将该任务从事件列表中删除
——但由于调度器被挂起,该任务将进入挂起的最后一个准备列表,而不是实际的准备列表。 */
prvUnlockQueue( pxQueue );

/*恢复任务调度器将从等待就绪列表进入就绪列表,所以它是可行的,这个任务已经就绪列表之前产量
——在这种情况下,不会导致上下文切换,除非也在等待一个更高优先级的任务就绪列表 */
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
else
{ /* 再试一次 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else /* 已经超时 */
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();

traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
}
}

3.2 接收消息队列

xQueueReceive()函数实际是使用xQueueGenericReceive()这个函数:

输入参数:

  • xQueue:消息队列控制块
  • pvBuffer:要接收的消息数据
  • xTicksToWait:接收允许的阻塞时间
  • xJustPeeking:是否是Peek(只获取不删除)模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#define xQueueReceive( xQueue, pvBuffer, xTicksToWait ) xQueueGenericReceive( ( xQueue ), ( pvBuffer ), ( xTicksToWait ), pdFALSE )

BaseType_t xQueueGenericReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait, const BaseType_t xJustPeeking )
{
BaseType_t xEntryTimeSet = pdFALSE;
TimeOut_t xTimeOut;
int8_t *pcOriginalReadPosition;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;

configASSERT( pxQueue );
configASSERT( !( ( pvBuffer == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif

/* 这个函数放松了编码标准,允许在函数本身中使用返回语句。这样做是为了提高执行时间效率 */
for( ;; )
{
taskENTER_CRITICAL();
{
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
/* 现在队列中是否有数据?要运行调用任务,必须是希望访问队列的优先级最高的任务 */
if( uxMessagesWaiting > ( UBaseType_t ) 0 ) /*消息队列中有数据*/
{
pcOriginalReadPosition = pxQueue->u.pcReadFrom;/* 记住读取位置,以防队列被读取 */
prvCopyDataFromQueue( pxQueue, pvBuffer );/* 从队列中复制数据 */
if( xJustPeeking == pdFALSE ) /*不是Peeking模式读取消息*/
{
traceQUEUE_RECEIVE( pxQueue );
pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;/* 实际上删除数据,而不仅仅是读取 */
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{ /* 如果有必要,记录实现优先级继承所需的信息 */
pxQueue->pxMutexHolder = ( int8_t * ) pvTaskIncrementMutexHeldCount();
}
}
#endif /* configUSE_MUTEXES */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{ queueYIELD_IF_USING_PREEMPTION(); }
}
}
else /*是Peeking模式读取消息(只读取,不删除)*/
{
traceQUEUE_PEEK( pxQueue ); /*执行PEEK*/
pxQueue->u.pcReadFrom = pcOriginalReadPosition;/* 数据没有被移除,所以重置读指针 */
/* 数据被留在队列中,因此请查看是否有其他任务在等待该数据 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{ /* 等待的任务具有比该任务更高的优先级 */
queueYIELD_IF_USING_PREEMPTION(); /*进行任务切换*/
}
}
}
taskEXIT_CRITICAL();
return pdPASS;
}
else /*消息队列中没有数据*/
{
if( xTicksToWait == ( TickType_t ) 0 )
{ /* 队列是空的,没有指定块时间(或者块时间已经过期),所以现在离开 */
taskEXIT_CRITICAL();
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else if( xEntryTimeSet == pdFALSE )
{ /* 对列是空的,并且指定了块时间,因此配置超时结构 */
vTaskSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
}
}
taskEXIT_CRITICAL();

/* 临界区已经退出,中断和其他任务可以向队列发送或从队列接收消息 */
vTaskSuspendAll();
prvLockQueue( pxQueue );
/* 更新超时状态,查看它是否已经过期 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )//阻塞等待未超时
{
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )//消息队列为空
{
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
taskENTER_CRITICAL();
{ vTaskPriorityInherit( ( void * ) pxQueue->pxMutexHolder ); }
taskEXIT_CRITICAL();
}
}
#endif
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );/*添加任务到阻塞列表*/
prvUnlockQueue( pxQueue );
if( xTaskResumeAll() == pdFALSE )
{ portYIELD_WITHIN_API(); }
}
else //消息队列为空
{ /* 再试一次 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else//阻塞等待已超时
{
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )//消息队列为空
{
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
}
}
}