![]() |
Site Archive (Complete) | |||
|
ABOUT US |
CONTACT |
ADVERTISE |
SUBSCRIBE |
SOURCE CODE |
CURRENT PRINT ISSUE |
NEWSLETTERS
|
RESOURCES
|
BLOGS
|
PODCASTS
|
CAREERS
|
||||
July 01, 2008
Lock-Free QueuesOne thread can write and another read at the same time!Petru Marginean
One thread can write and another read—at the same time!
Petru is a Vice President for Morgan Stanley, where he works as a C++ senior programmer in investment banking. He can be contacted at petru.marginean@gmail.com.
This article as written assumes a sequentially consistent model. In particular, the code relies on specific order of instructions in both Consumer and Producer methods. However, without inserting proper memory barrier instructions, these instructions can be reordered with unpredictable results (see, for example, the classic Double-Checked Locking problem). Another issue is using the standard std::list<T>. While the article mentions that it is the developer responsibility to check that the reading/writing std::list<T>::iterator is atomic, this turns out to be too restrictive. While gcc/MSVC++2003 has 4-byte iterators, the MSVC++2005 has 8-byte iterators in Release Mode and 12-byte iterators in the Debug Mode. The solution to prevent this is to use memory barriers/volatile variables. The downloadable code for the article has fixed that issue. Many thanks to Herb Sutter who signaled the issue and helped me fix the code. --P.M.
Queues can be useful in a variety of systems involving data-stream processing. Typically, you have a data source producing datarequests coming to a web server, market feeds, or digital telephony packetsat a variable pace, and you need to process the data as fast as possible so there are no losses. To do this, you can push data into a queue using one thread and process it using a different threada good utilization of resources on multicore processors. One thread inserts data into the queue, and the other reads/deletes elements from the queue. Your main requirement is that a high-rate data burst does not last longer than the system's ability to accumulate data while the consumer thread handles it. The queue you use has to be threadsafe to prevent race conditions when inserting/removing data from multiple threads. For obvious reasons, it is necessary that the queue mutual exclusion mechanism add as little overhead as possible.
In this article, I present a lock-free queue (the source code for the lockfreequeue class is available online; see www.ddj.com/code/) in which one thread can write to the queue and another read from itat the same time without any locking.
To do this, the code implements these requirements:
Because I use Standard C++, the code is portable under the aforementioned "machine" assumption:
template <typename T>
struct LockFreeQueue
{
LockFreeQueue();
void Produce(const T& t);
bool Consume(T& t);
private:
typedef std::list<T> TList;
TList list;
typename TList::iterator iHead, iTail;
};
Considering how simple this code is, you might wonder how can it be threadsafe. The magic is due to design, not implementation. Take a look at the implementation of the Produce() and Consume() methods. The Produce() method looks like this:
void Produce(const T& t)
{
list.push_back(t);
iTail = list.end();
list.erase(list.begin(), iHead);
}
To understand how this works, mentally separate the data from LockFreeQueue<T> into two groups:
Produce() is the only method that changes the list (adding new elements and erasing the consumed elements), and it is essential that only one thread ever calls Produce()it's the Producer thread! The iterator (iTail) (only manipulated by the Producer thread) changes it only after a new element is added to the list. This way, when the Consumer thread is reading the iTail element, the new added element is ready to be used. The Consume() method tries to read all the elements between iHead and iTail (excluding both ends).
bool Consume(T& t)
{
typename TList::iterator iNext = iHead;
++iNext;
if (iNext != iTail)
{
iHead = iNext;
t = *iHead;
return true;
}
return false;
}
This method reads the elements, but doesn't remove them from the list. Nor does it access the list directly, but through the iterators. They are guaranteed to be valid after std::list<T> is modified, so no matter what the Producer thread does to the list, you are safe to use them.
The std::list<T> maintains an element (pointed to by iHead) that is considered already read. For this algorithm to work even when the queue was just created, I add an empty T() element in the constructor of the LockFreeQueue<T> (see Figure 1):
[Click image to view at full size]
Figure 1: Adding an empty T() element in the constructor of the LockFreeQueue<T>.
Consume() may fail to read an element (and return false). Unlike traditional lock-based queues, this queue works fast when the queue is not empty, but needs an external locking or polling method to wait for data. Sometimes you want to wait if there is no element available in the queue, and avoid returning false. A naive approach to waiting is:
T Consume()
{
T tmp;
while (!Consume(tmp))
;
return tmp;
}
This Consume() method will likely heat up one of your CPUs red-hot to 100-percent use if there are no elements in the queue. Nevertheless, this should have good performance when the queue is not empty. However, if you think of it, a queue that's almost never empty is a sign of systemic trouble: It means the consumer is unable to keep pace with the producer, and sooner or later, the system is doomed to die of memory exhaustion. Call this approach NAIVE_POLLING.
A friendlier Consume() function does some pooling and calls some sort of sleep() or yield() function available on your system:
The DoSleep() can be implemented using nanosleep() (POSIX) or Sleep() (Windows), or even better, using boost::thread::sleep(), which abstracts away system-dependent nomenclature. Call this approach SLEEP. Instead of simple polling, you can use more advanced techniques to signal the Consumer thread that a new element is available. I illustrate this in Listing One using a boost::condition variable.
Listing One
I used the timed_wait() instead of the simpler wait() to solve a possible deadlock when Produce() is called between line A and line B in Listing One. Then wait() will miss the notify_one() call and have to wait for the next produced element to wake up. If this element never comes (no more produced elements or if the Produce() call actually waits for Consume() to return), there's a deadlock. Call this approach TIME_WAIT.
The lock is still wait-free as long as there are elements in the queue. In this case, the Consumer() thread does no waiting and reads data as fast as possible (even with the Producer() that is inserting new elements). Only when the queue is exhausted does locking occur.
|
|
||||||||||||||||||||||||||||
|
|