With the help of non blocking algorithms it is possible to manage IPC more effeciently without blocking the threads when a shared resource needs to be accessed simalteneously.
One of the well known non blocking algorithms is based on atomic operations on pointers.
It allows multiple threads to access a singly linked list.
Win32 API on Windows XP has the following functions, which implement this algorithm:
InterlockedPushEntrySList InterlockedPopEntrySList InterlockedFlushSList
Actually MSDN contains example on how to use these functions but it might be not so obvious how useful these routines are.
Assume you are implementing a FIFO or LIFO message queue where multiple threads write messages and one server thread is processing these messages. It seems suitable to use the singly linked list for the messages and one event for signaling to the processing thread.
As illustration to the ideas described above you can think of a log writer thread in a multithreaded application.
Typically multiple threads submit log messages and one thread manages these messages on a FIFO basis (first in, first out).
For this problem let us implement the following functions:
//initialization routine void InitLogWriting(); //log writer thread routine DWORD LogWritingThreadRoutine(LPVOID lpParam); //write message routine which is executed by all threads void WriteLogMessage(char * strMessage); //this routine ends log writer thread void EndLogWriting();
For the sake of simplicity I do not include error handling code in the code examples here.
First, for our shared singly linked list it is needed to define a data item structure, and a few shared variables, which will be used in all thread of the application.
typedef struct _MSG_ITEM{
SLIST_ENTRY ListEntry; // this is needed for Win32 API
char * strText; // here is the place for some data
} MSG_ITEM, *PMSG_ITEM;.
//handle to the signalling event
//this event is used when new messages are
//available for the log writer thread
HANDLE hEvent;
//pointer to the first element in the list
SLIST_HEADER ListHead;
Initalization routine can look like this:
void InitLogWriting(){
//initialize the list
InitializeSListHead(&ListHead);
//create event with manual reset flag turned off
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
}
The log writer thread routine waits for the event to be signalled and then processes all messages fetched from the shared slist.
To process messages in FIFO order the fetched sub list of entries must be
reversed. This is because other threads use InterlockedPushEntrySList to insert new entries at the start of the list.
This function should be ok to reverse the list:
void ReverseSList(PSLIST_ENTRY * pHead){
SLIST_ENTRY * pPrev = NULL;
SLIST_ENTRY * pThis = *pHead;
SLIST_ENTRY * pNext = (*pHead)->Next;
while(pThis->Next){
pNext = pThis->Next;
pThis->Next = pPrev;
pPrev = pThis;
pThis = pNext;
}
//list tail
pThis->Next = pPrev;
*pHead = pThis;
}
So log writer thread routine would be:
DWORD LogWritingThreadRoutine(LPVOID lpParam){
PMSG_ITEM pListItem;
PMSG_ITEM pPrevItem;
char * pszText;
while(WAIT_OBJECT_0==WaitForSingleObject(hEvent, INFINITE)){
//fetch all items from the list
pListItem = (PMSG_ITEM) InterlockedFlushSList(&ListHead);
//reverse fetched sub list to get FIFO order
if (pListItem) ReverseSList(&pListItem);
//process all fetched items
while(pListItem){
pszText = pListItem->strText;
//exit thread if an empty message is received
if (!pszText){
//do not forget to call CloseHandle(hEvent) when
//the entire process is exiting
return 0;
}
//process message text contained in pszText
// for example, MessageBox(NULL, pszText, NULL, 0);
//...
//switch to next list item and
//free memory allocated for the message
pPrevItem = pListItem;
pListItem = (PMSG_ITEM) pListItem->ListEntry.Next;
//now we can delete the entire list item
//including message text
delete [] (pPrevItem);
}
}
return GetLastError();
}
Now comes the function which is called by various threads to store new log message.
void WriteLogMessage(char * strMessage){
//allocate memory for text and for list item
//+1 is needed for zero at the end of the string
UINT iBufferLength = sizeof(MSG_ITEM) + strlen(strMessage) + 1;
char * ptr = new char[iBufferLength];
//initialize new list item
PMSG_ITEM pListItem = (PMSG_ITEM) ptr;
pListItem->strText = ptr + sizeof(MSG_ITEM);
//copy message text
strcpy(pListItem->strText, strMessage);
//insert new item into the list
InterlockedPushEntrySList(&ListHead, &pListItem->ListEntry);
//signal to log writer thread
SetEvent(hEvent);
}
Finally, to stop the log writer thread the empty text message should be added to the shared list.
void EndLogWriting(){
char * ptr = new char[sizeof(MSG_ITEM)];
PMSG_ITEM pListItem = (PMSG_ITEM) ptr;
pListItem->strText = NULL;
//insert new item into the list
InterlockedPushEntrySList(&ListHead, &pListItem->ListEntry);
//Signal emtpy message
SetEvent(hEvent);
}
Usage.
In the example below I create log writer thread and send three messages to be logged.
DWORD tID;
InitLogWriting();
HANDLE hThread = CreateThread(NULL, 0, \
(LPTHREAD_START_ROUTINE) LogWritingThreadRoutine, 0, 0, &tID);
WriteLogMessage("Test message 1");
Sleep(4000);
WriteLogMessage("Test message 2");
Sleep(4000);
WriteLogMessage("Test message 3");
EndLogWriting();
You can also see the full source code in one text file.
Comments on WriteLogMessage:
I think the way you’ve tacked storage for the strText pointer as well as the string itself is pretty clever. I wouldn’t have thought of that.
And although it’s clearer to use new and delete like you did, the SList Interlocked functions require each allocated block to be aligned on a dword boundary, which new doesn’t do. You need to use _aligned_malloc and _aligned_free instead. The requirement for alignment is imposed by the CPU. This is a gotcha that would be very hard to debug….
ChasM, thank you for your useful comment.
MSDN indeed says:
All list items must be aligned on a MEMORY_ALLOCATION_ALIGNMENT boundary; otherwise, this function will behave unpredictably.
Instead of proper allocation (which will cause editing this post) I may add some special requirements for the logging API.
Considering that on Win32 MEMORY_ALLOCATION_ALIGNMENT is 8 that makes my example suitable only for strings with length equal to 3, 11, 19,.. etc :)
On the surface this queue looks like lock free. Just replace:
char * ptr = new char[iBufferLength];
with
MsgItem* msqItem = (MsgItem*)_aligned_malloc(sizeof(MsgItem), MEMORY_ALLOCATION_ALIGNMENT);
and “magic happens”.
In fact treads posting the messages are serialized while allocating memory (new/malloc/_aligned_malloc).
Still a good class, because the alternatives lock also on posting messages.