1 //  lock-free queue from
2 //  Michael, M. M. and Scott, M. L.,
3 //  "simple, fast and practical non-blocking and blocking concurrent queue algorithms"
4 //
5 //  Copyright (C) 2008-2013 Tim Blechmann
6 //
7 //  Distributed under the Boost Software License, Version 1.0. (See
8 //  accompanying file LICENSE_1_0.txt or copy at
9 //  http://www.boost.org/LICENSE_1_0.txt)
10 
11 #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED
12 #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED
13 
14 #include <boost/assert.hpp>
15 #include <boost/static_assert.hpp>
16 #include <boost/type_traits/has_trivial_assign.hpp>
17 #include <boost/type_traits/has_trivial_destructor.hpp>
18 #include <boost/config.hpp> // for BOOST_LIKELY & BOOST_ALIGNMENT
19 
20 #include <boost/lockfree/detail/allocator_rebind_helper.hpp>
21 #include <boost/lockfree/detail/atomic.hpp>
22 #include <boost/lockfree/detail/copy_payload.hpp>
23 #include <boost/lockfree/detail/freelist.hpp>
24 #include <boost/lockfree/detail/parameter.hpp>
25 #include <boost/lockfree/detail/tagged_ptr.hpp>
26 
27 #include <boost/lockfree/lockfree_forward.hpp>
28 
29 #ifdef BOOST_HAS_PRAGMA_ONCE
30 #pragma once
31 #endif
32 
33 
34 #if defined(_MSC_VER)
35 #pragma warning(push)
36 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
37 #endif
38 
39 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000)
40 #pragma warning(push)
41 #pragma warning(disable:488) // template parameter unused in declaring parameter types,
42                              // gets erronously triggered the queue constructor which
43                              // takes an allocator of another type and rebinds it
44 #endif
45 
46 
47 
48 namespace boost    {
49 namespace lockfree {
50 namespace detail   {
51 
52 typedef parameter::parameters<boost::parameter::optional<tag::allocator>,
53                               boost::parameter::optional<tag::capacity>
54                              > queue_signature;
55 
56 } /* namespace detail */
57 
58 
59 /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free,
60  *  construction/destruction has to be synchronized. It uses a freelist for memory management,
61  *  freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed.
62  *
63  *  \b Policies:
64  *  - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized<false> \n
65  *    Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n
66  *    If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed
67  *    by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index
68  *    type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way
69  *    to achieve lock-freedom.
70  *
71  *  - \ref boost::lockfree::capacity, optional \n
72  *    If this template argument is passed to the options, the size of the queue is set at compile-time.\n
73  *    This option implies \c fixed_sized<true>
74  *
75  *  - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator<std::allocator<void>> \n
76  *    Specifies the allocator that is used for the internal freelist
77  *
78  *  \b Requirements:
79  *   - T must have a copy constructor
80  *   - T must have a trivial assignment operator
81  *   - T must have a trivial destructor
82  *
83  * */
84 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
85 template <typename T, class A0, class A1, class A2>
86 #else
87 template <typename T, typename ...Options>
88 #endif
89 class queue
90 {
91 private:
92 #ifndef BOOST_DOXYGEN_INVOKED
93 
94 #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR
95     BOOST_STATIC_ASSERT((boost::has_trivial_destructor<T>::value));
96 #endif
97 
98 #ifdef BOOST_HAS_TRIVIAL_ASSIGN
99     BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value));
100 #endif
101 
102 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
103     typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
104 #else
105     typedef typename detail::queue_signature::bind<Options...>::type bound_args;
106 #endif
107 
108     static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
109     static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
110     static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
111     static const bool node_based = !(has_capacity || fixed_sized);
112     static const bool compile_time_sized = has_capacity;
113 
BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES)114     struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node
115     {
116         typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
117         typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
118 
119         node(T const & v, handle_type null_handle):
120             next(tagged_node_handle(null_handle, 0)), data(v)
121         {
122             /* increment tag to avoid ABA problem */
123             tagged_node_handle old_next = next.load(memory_order_relaxed);
124             tagged_node_handle new_next (null_handle, old_next.get_next_tag());
125             next.store(new_next, memory_order_release);
126         }
127 
128         node (handle_type null_handle):
129             next(tagged_node_handle(null_handle, 0))
130         {}
131 
132         node(void)
133         {}
134 
135         atomic<tagged_node_handle> next;
136         T data;
137     };
138 
139     typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
140     typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
141     typedef typename pool_t::tagged_node_handle tagged_node_handle;
142     typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
143 
initialize(void)144     void initialize(void)
145     {
146         node * n = pool.template construct<true, false>(pool.null_handle());
147         tagged_node_handle dummy_node(pool.get_handle(n), 0);
148         head_.store(dummy_node, memory_order_relaxed);
149         tail_.store(dummy_node, memory_order_release);
150     }
151 
152     struct implementation_defined
153     {
154         typedef node_allocator allocator;
155         typedef std::size_t size_type;
156     };
157 
158 #endif
159 
160     BOOST_DELETED_FUNCTION(queue(queue const&))
161     BOOST_DELETED_FUNCTION(queue& operator= (queue const&))
162 
163 public:
164     typedef T value_type;
165     typedef typename implementation_defined::allocator allocator;
166     typedef typename implementation_defined::size_type size_type;
167 
168     /**
169      * \return true, if implementation is lock-free.
170      *
171      * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner.
172      *       On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is
173      *       no possibility to provide a completely accurate implementation, because one would need to test every internal
174      *       node, which is impossible if further nodes will be allocated from the operating system.
175      * */
is_lock_free(void) const176     bool is_lock_free (void) const
177     {
178         return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free();
179     }
180 
181     //! Construct queue
182     // @{
queue(void)183     queue(void):
184         head_(tagged_node_handle(0, 0)),
185         tail_(tagged_node_handle(0, 0)),
186         pool(node_allocator(), capacity)
187     {
188         BOOST_ASSERT(has_capacity);
189         initialize();
190     }
191 
192     template <typename U>
queue(typename detail::allocator_rebind_helper<node_allocator,U>::type const & alloc)193     explicit queue(typename detail::allocator_rebind_helper<node_allocator, U>::type const & alloc):
194         head_(tagged_node_handle(0, 0)),
195         tail_(tagged_node_handle(0, 0)),
196         pool(alloc, capacity)
197     {
198         BOOST_STATIC_ASSERT(has_capacity);
199         initialize();
200     }
201 
queue(allocator const & alloc)202     explicit queue(allocator const & alloc):
203         head_(tagged_node_handle(0, 0)),
204         tail_(tagged_node_handle(0, 0)),
205         pool(alloc, capacity)
206     {
207         BOOST_ASSERT(has_capacity);
208         initialize();
209     }
210     // @}
211 
212     //! Construct queue, allocate n nodes for the freelist.
213     // @{
queue(size_type n)214     explicit queue(size_type n):
215         head_(tagged_node_handle(0, 0)),
216         tail_(tagged_node_handle(0, 0)),
217         pool(node_allocator(), n + 1)
218     {
219         BOOST_ASSERT(!has_capacity);
220         initialize();
221     }
222 
223     template <typename U>
queue(size_type n,typename detail::allocator_rebind_helper<node_allocator,U>::type const & alloc)224     queue(size_type n, typename detail::allocator_rebind_helper<node_allocator, U>::type const & alloc):
225         head_(tagged_node_handle(0, 0)),
226         tail_(tagged_node_handle(0, 0)),
227         pool(alloc, n + 1)
228     {
229         BOOST_STATIC_ASSERT(!has_capacity);
230         initialize();
231     }
232     // @}
233 
234     /** \copydoc boost::lockfree::stack::reserve
235      * */
reserve(size_type n)236     void reserve(size_type n)
237     {
238         pool.template reserve<true>(n);
239     }
240 
241     /** \copydoc boost::lockfree::stack::reserve_unsafe
242      * */
reserve_unsafe(size_type n)243     void reserve_unsafe(size_type n)
244     {
245         pool.template reserve<false>(n);
246     }
247 
248     /** Destroys queue, free all nodes from freelist.
249      * */
~queue(void)250     ~queue(void)
251     {
252         T dummy;
253         while(unsynchronized_pop(dummy))
254         {}
255 
256         pool.template destruct<false>(head_.load(memory_order_relaxed));
257     }
258 
259     /** Check if the queue is empty
260      *
261      * \return true, if the queue is empty, false otherwise
262      * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
263      *       value in program logic.
264      * */
empty(void) const265     bool empty(void) const
266     {
267         return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
268     }
269 
270     /** Pushes object t to the queue.
271      *
272      * \post object will be pushed to the queue, if internal node can be allocated
273      * \returns true, if the push operation is successful.
274      *
275      * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
276      *                    from the OS. This may not be lock-free.
277      * */
push(T const & t)278     bool push(T const & t)
279     {
280         return do_push<false>(t);
281     }
282 
283     /** Pushes object t to the queue.
284      *
285      * \post object will be pushed to the queue, if internal node can be allocated
286      * \returns true, if the push operation is successful.
287      *
288      * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
289      * \throws if memory allocator throws
290      * */
bounded_push(T const & t)291     bool bounded_push(T const & t)
292     {
293         return do_push<true>(t);
294     }
295 
296 
297 private:
298 #ifndef BOOST_DOXYGEN_INVOKED
299     template <bool Bounded>
do_push(T const & t)300     bool do_push(T const & t)
301     {
302         node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
303         handle_type node_handle = pool.get_handle(n);
304 
305         if (n == NULL)
306             return false;
307 
308         for (;;) {
309             tagged_node_handle tail = tail_.load(memory_order_acquire);
310             node * tail_node = pool.get_pointer(tail);
311             tagged_node_handle next = tail_node->next.load(memory_order_acquire);
312             node * next_ptr = pool.get_pointer(next);
313 
314             tagged_node_handle tail2 = tail_.load(memory_order_acquire);
315             if (BOOST_LIKELY(tail == tail2)) {
316                 if (next_ptr == 0) {
317                     tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
318                     if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
319                         tagged_node_handle new_tail(node_handle, tail.get_next_tag());
320                         tail_.compare_exchange_strong(tail, new_tail);
321                         return true;
322                     }
323                 }
324                 else {
325                     tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
326                     tail_.compare_exchange_strong(tail, new_tail);
327                 }
328             }
329         }
330     }
331 #endif
332 
333 public:
334 
335     /** Pushes object t to the queue.
336      *
337      * \post object will be pushed to the queue, if internal node can be allocated
338      * \returns true, if the push operation is successful.
339      *
340      * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
341      *       from the OS. This may not be lock-free.
342      * \throws if memory allocator throws
343      * */
unsynchronized_push(T const & t)344     bool unsynchronized_push(T const & t)
345     {
346         node * n = pool.template construct<false, false>(t, pool.null_handle());
347 
348         if (n == NULL)
349             return false;
350 
351         for (;;) {
352             tagged_node_handle tail = tail_.load(memory_order_relaxed);
353             tagged_node_handle next = tail->next.load(memory_order_relaxed);
354             node * next_ptr = next.get_ptr();
355 
356             if (next_ptr == 0) {
357                 tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed);
358                 tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed);
359                 return true;
360             }
361             else
362                 tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed);
363         }
364     }
365 
366     /** Pops object from queue.
367      *
368      * \post if pop operation is successful, object will be copied to ret.
369      * \returns true, if the pop operation is successful, false if queue was empty.
370      *
371      * \note Thread-safe and non-blocking
372      * */
pop(T & ret)373     bool pop (T & ret)
374     {
375         return pop<T>(ret);
376     }
377 
378     /** Pops object from queue.
379      *
380      * \pre type U must be constructible by T and copyable, or T must be convertible to U
381      * \post if pop operation is successful, object will be copied to ret.
382      * \returns true, if the pop operation is successful, false if queue was empty.
383      *
384      * \note Thread-safe and non-blocking
385      * */
386     template <typename U>
pop(U & ret)387     bool pop (U & ret)
388     {
389         for (;;) {
390             tagged_node_handle head = head_.load(memory_order_acquire);
391             node * head_ptr = pool.get_pointer(head);
392 
393             tagged_node_handle tail = tail_.load(memory_order_acquire);
394             tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
395             node * next_ptr = pool.get_pointer(next);
396 
397             tagged_node_handle head2 = head_.load(memory_order_acquire);
398             if (BOOST_LIKELY(head == head2)) {
399                 if (pool.get_handle(head) == pool.get_handle(tail)) {
400                     if (next_ptr == 0)
401                         return false;
402 
403                     tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
404                     tail_.compare_exchange_strong(tail, new_tail);
405 
406                 } else {
407                     if (next_ptr == 0)
408                         /* this check is not part of the original algorithm as published by michael and scott
409                          *
410                          * however we reuse the tagged_ptr part for the freelist and clear the next part during node
411                          * allocation. we can observe a null-pointer here.
412                          * */
413                         continue;
414                     detail::copy_payload(next_ptr->data, ret);
415 
416                     tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
417                     if (head_.compare_exchange_weak(head, new_head)) {
418                         pool.template destruct<true>(head);
419                         return true;
420                     }
421                 }
422             }
423         }
424     }
425 
426     /** Pops object from queue.
427      *
428      * \post if pop operation is successful, object will be copied to ret.
429      * \returns true, if the pop operation is successful, false if queue was empty.
430      *
431      * \note Not thread-safe, but non-blocking
432      *
433      * */
unsynchronized_pop(T & ret)434     bool unsynchronized_pop (T & ret)
435     {
436         return unsynchronized_pop<T>(ret);
437     }
438 
439     /** Pops object from queue.
440      *
441      * \pre type U must be constructible by T and copyable, or T must be convertible to U
442      * \post if pop operation is successful, object will be copied to ret.
443      * \returns true, if the pop operation is successful, false if queue was empty.
444      *
445      * \note Not thread-safe, but non-blocking
446      *
447      * */
448     template <typename U>
unsynchronized_pop(U & ret)449     bool unsynchronized_pop (U & ret)
450     {
451         for (;;) {
452             tagged_node_handle head = head_.load(memory_order_relaxed);
453             node * head_ptr = pool.get_pointer(head);
454             tagged_node_handle tail = tail_.load(memory_order_relaxed);
455             tagged_node_handle next = head_ptr->next.load(memory_order_relaxed);
456             node * next_ptr = pool.get_pointer(next);
457 
458             if (pool.get_handle(head) == pool.get_handle(tail)) {
459                 if (next_ptr == 0)
460                     return false;
461 
462                 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
463                 tail_.store(new_tail);
464             } else {
465                 if (next_ptr == 0)
466                     /* this check is not part of the original algorithm as published by michael and scott
467                      *
468                      * however we reuse the tagged_ptr part for the freelist and clear the next part during node
469                      * allocation. we can observe a null-pointer here.
470                      * */
471                     continue;
472                 detail::copy_payload(next_ptr->data, ret);
473                 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
474                 head_.store(new_head);
475                 pool.template destruct<false>(head);
476                 return true;
477             }
478         }
479     }
480 
481     /** consumes one element via a functor
482      *
483      *  pops one element from the queue and applies the functor on this object
484      *
485      * \returns true, if one element was consumed
486      *
487      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
488      * */
489     template <typename Functor>
consume_one(Functor & f)490     bool consume_one(Functor & f)
491     {
492         T element;
493         bool success = pop(element);
494         if (success)
495             f(element);
496 
497         return success;
498     }
499 
500     /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs)
501     template <typename Functor>
consume_one(Functor const & f)502     bool consume_one(Functor const & f)
503     {
504         T element;
505         bool success = pop(element);
506         if (success)
507             f(element);
508 
509         return success;
510     }
511 
512     /** consumes all elements via a functor
513      *
514      * sequentially pops all elements from the queue and applies the functor on each object
515      *
516      * \returns number of elements that are consumed
517      *
518      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
519      * */
520     template <typename Functor>
consume_all(Functor & f)521     size_t consume_all(Functor & f)
522     {
523         size_t element_count = 0;
524         while (consume_one(f))
525             element_count += 1;
526 
527         return element_count;
528     }
529 
530     /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs)
531     template <typename Functor>
consume_all(Functor const & f)532     size_t consume_all(Functor const & f)
533     {
534         size_t element_count = 0;
535         while (consume_one(f))
536             element_count += 1;
537 
538         return element_count;
539     }
540 
541 private:
542 #ifndef BOOST_DOXYGEN_INVOKED
543     atomic<tagged_node_handle> head_;
544     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
545     char padding1[padding_size];
546     atomic<tagged_node_handle> tail_;
547     char padding2[padding_size];
548 
549     pool_t pool;
550 #endif
551 };
552 
553 } /* namespace lockfree */
554 } /* namespace boost */
555 
556 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000)
557 #pragma warning(pop)
558 #endif
559 
560 #if defined(_MSC_VER)
561 #pragma warning(pop)
562 #endif
563 
564 #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */
565