8. Message Queues
A Message Queue is another kernel object that offers communication between tasks. This object differs from semaphores in many ways:
Unlike semaphore, the Message Queue carries an information (data) from the sending task to the receiving task
That information can be anything: numbers, arrays, structures, pointers... Using pointers as the message content, for instance, even allows exposition of private data to the receiving task
Unlike semaphores, the Message Queue hold history of sent messages within a buffer that is filled by senders, and unfilled by the receiving task when activated. If the queue is correctly sized, no messages are lost
Common use of Message Queues involves several tasks sending messages to a unique destination task as depicted in the figure below
The Message Queue interacts with the RTOS the same way other kernel objects do: On the sender side, the task can be blocked upon sending attempt if the Queue is full. On the receiver side, the task is blocked upon receiving attempt if the Queue is empty.
So, no need to say, Messages Queues are very useful, and involved in several cases of task synchronization.
1. A practical example
In the previous tutorial, we have seen that using a Mutex provides a solution for the shared use of a common resource (i.e. the debug console). In this tutorial, we will address the same issue differently using a Message Queue. To achieve this, we create a third task 'Task_Console' which is the only one having control over the debug console. Task_1 and Task_2 then send messages to Task_Console every time there is something to be printed.
Let us go thru this practical case. First we declare 3 tasks:
// FreeRTOS tasks
void vTask1 (void *pvParameters);
void vTask2 (void *pvParameters);
void vTaskConsole (void *pvParameters);
Then, as usual, the next step is to declare (as global) the Message Queue object:
// Kernel Objects
xQueueHandle xConsoleQueue;
Also, we will define a user type as the message data type. Here, the type msg_t corresponds to an array of 64 unsigned bytes (char) to hold character strings:
// Define the message_t type as an array of 64 char
typedef uint8_t msg_t[64];
Next, the main() function creates the Message Queue. The first parameter is the depth of the queue buffer (i.e. the maximum number of pending messages the queue can hold), the second parameter is the size of each message:
...
// Create Queue to hold console messages
xConsoleQueue = xQueueCreate(4, sizeof(msg_t));
// Give a nice name to the Queue in the trace recorder
vTraceSetQueueName(xConsoleQueue, "Console Queue");
...
Then the main() function creates the tasks, and start the scheduler:
...
// Create Tasks
xTaskCreate(vTask1, "Task_1", 256, NULL, 3, NULL);
xTaskCreate(vTask2, "Task_2", 256, NULL, 2, NULL);
xTaskCreate(vTaskConsole, "Task_Console", 256, NULL, 1, NULL);
// Start the Scheduler
vTaskStartScheduler();
...
Note that Task_Console has been given the lowest priority level. We know that sending messages to the console over UART is a very slow process. Hence, the strategy here is to do it "when time permits", with no priority over other, supposedly important tasks.
Task_1 wants to print a long message every 20ms. Yet it does not access the console directly, it sends a message to the "console" task instead, using the xConsoleQueue Message Queue. my_sprintf() is used to format the string into a local memory buffer.
/*
* Task_1
*/
void vTask1 (void *pvParameters)
{
msg_t msg;
while(1)
{
// Prepare message
my_sprintf((char *)msg, "With great power comes great responsibility\r\n");
// Send message to the Console Queue
xQueueSendToBack(xConsoleQueue, &msg, 0);
// Wait for 20ms
vTaskDelay(20);
}
}
The same way, Task_2 sends a short message to print every 2ms. Let us add a cycling index to the message in order to make sure that no messages are lost:
/*
* Task_2
*/
void vTask2 (void *pvParameters)
{
msg_t msg;
uint8_t index = 0;
while(1)
{
// Prepare message
my_sprintf((char *)msg, "%d# ", index);
// Send message to Console Queue
xQueueSendToBack(xConsoleQueue, &msg, 0);
// Increment index
(index==9) ? index=0 : index++;
// Wait for 2ms
vTaskDelay(2);
}
}
And finally, Task_Console processes incoming messages:
/*
* Task_Console
*/
void vTaskConsole (void *pvParameters)
{
msg_t msg;
while(1)
{
// Wait for something in the message Queue
xQueueReceive(xConsoleQueue, &msg, portMAX_DELAY);
// Send message to console
my_printf((char *)msg);
}
}
There are different ways to manage the Message Queue buffer. Using the xQueueSendToBack() function make the queue working as a FIFO (First In, First Out). Messages will be printed in the order of arrival.
The result:
As expected, we have ten '#' for one long message and no message from Task_2 has been lost on the way!
The detail of Task_2 sending its message for a short print:
Every 20ms, both Task_1 and Task_2 are activated altogether. Task_1 has higher priority level, therefore it goes first, immediately followed by Task_2. When Task_Console gets active, 2 messages are waiting in the queue. The one from Task_1 is popped first and it takes long time to print. In that period of time, 2 new messages from Task_2 come in the queue raising the total of 3 messages now waiting. All 3 are quickly processed by Task_Console as soon as the long message has finished printing, and before it gets blocked due to an now empty queue. Crystal clear!
By double-clicking on any Console Queue event tag, you can get the history of queue usage. As discussed before, in the worst situation, we have 3 messages in the queue, and it is sized to handle up to 4 messages. So the headroom is not big, but so far so good.
- Commit name "Message queue" - Push onto Gitlab |
2. Let's dig a little
Let us investigate how messages are handled in the Message Queue.
The story starts within Task_1 that prepares the original message.
Setting a breakpoint there, we can locate msg in memory @0x20000690
And we find the message there as expected.
Now stepping into the xQueueSendToBack(), we can explore the way the messages are stored into the Queue. There are several pointers in the xConsoleQueue variable, but it is not hard to guess...
In the address segment 0x200001E8→0x200002E8 (256 bytes), we find the actual message queue with 4 slots of 64 bytes for holding messages. Writing into this buffer is operated as with a ring buffer [0]→[1]→[2]→[3]→[0]... At the end of xQueueSendToBack(), the full message is copied at the current slot location. In the below snapshot that's slot[0], but it could be any, and it changes every time. The buffer ins and outs are managed by pointers.
And the story ends within Task_Console.
Here again, we can locate the local msg variable in memory @0x20000F50
And after the xQueueReceive() function returns, we have a local copy of our original message in the destination task.
In conclusion, at this moment of the story, we have 3 copies of the same message in the MCU memory:
One in the sending task (local msg @0x20000690)
A second in the message queue buffer @0x200001E8
A third in the receiving task (local msg @0x20000F50)
That is a very safe way to work with Message Queue because no message can get lost, and no variable is shared among the tasks. Yet, if messages hold a lot of data, this approach produces a waste of memory.
So why not using the Message Queue to send pointers instead of full message? Seems like a good idea...
3. Passing references with messages
So instead of sending full messages, now Task_1 and Task_2 sends an address. First thing to do is to change the size of the queue buffer. It now holds 4 pointers (total 16 bytes) instead of 4 full message (total 256 bytes). That's net saving of 240 bytes on the heap!
// Create Queue to hold console messages
xConsoleQueue = xQueueCreate(4, sizeof(msg_t *));
Then edit Task_1 and Task_2:
/*
* Task_1
*/
void vTask1 (void *pvParameters)
{
msg_t msg;
msg_t *pmsg = NULL;
while(1)
{
// Prepare message
my_sprintf((char *)msg, "With great power comes great responsibility\r\n");
pmsg = &msg;
// Send message to the Console Queue
xQueueSendToBack(xConsoleQueue, &pmsg, 0);
// Wait for 20ms
vTaskDelay(20);
}
}
/*
* Task_2
*/
void vTask2 (void *pvParameters)
{
msg_t msg;
msg_t *pmsg = NULL;
uint8_t index = 0;
while(1)
{
// Prepare message
my_sprintf((char *)msg, "%d# ", index);
pmsg = &msg;
// Send message to Console Queue
xQueueSendToBack(xConsoleQueue, &pmsg, 0);
// Increment index
(index==9) ? index=0 : index++;
// Wait for 2ms
vTaskDelay(2);
}
}
And finally, Task_Console is now getting a pointer from the Message Queue:
/*
* Task_Console
*/
void vTaskConsole (void *pvParameters)
{
msg_t *pmsg = NULL;
while(1)
{
// Wait for something in the message Queue
xQueueReceive(xConsoleQueue, &pmsg, portMAX_DELAY);
// Send message to console
my_printf((char *)pmsg);
}
}
Is that working? Well, yes and no...
We've got the right number of printed messages, but looking closer, you'll see that messages #0 and #1 from Task_2 never print. Instead, we have 3 duplicates of message #2... Weird...
To better understand what is happening here, let us print the Task_2 index value using a user event channel and analyze the trace:
Remember that messages now only carry a pointer to the data to be processed, which data being local to the sending task. After a message is sent to the queue, if that data changes before the message is processed by the receiving task, then the old data is lost, and will never be processed. That's what happens here.
@t=2.280 both Task_1 and Task2 send their message. The Task_Console starts processing the long message from Task_1 first. The message #0 from Task_2 (i.e. the pointer to the msg variable local to Task_2) is queued.
@t=2.282 (2ms later), Task_2 overwrite the content of his msg variable with #1 message, and put another pointer (the same value actually, the msg variable hasn't moved) in the Message Queue.
@t=2.284 (2ms later), Task_2 overwrite again the content of his msg variable with #2 message, and put a third pointer (still the same) in the Message Queue.
Around @t=2.285, Task_Console is done printing the long message from Task_1 and starts popping all remaining queued messages from Task_2. What it gets is 3 times the same pointer, pointing to the msg variable local to Task_2, and holding at that moment the #2 message. Simple!
For now, I can't think of a simple solution to make this working if we want to stick with pointers. We probably need to implement a buffering strategy within Task_2 and somehow having a way for Task_2 to know whether a message has been processed on not.
So :
In some cases, you don't care loosing messages, that have become obsolete anyway
Otherwise, if there is a risk that the sending frequency overload the short-term reading capability (which is the case here), and if you can't afford loosing messages, then pointer should be avoided. Full messages must be queued. So we go back to the first approach.
- Commit name "Message queue with pointers" - Push onto Gitlab |
Another common mistake is to put the message preparation and queuing within a function that is called from the task and then queuing a pointer that is local to that function... Obviously, that doesn't work. Queuing a variable local to a function makes no sense since that variable is destroyed as soon as the function returns, and before the message is processed by the receiving task.