1 //  lock-free single-producer/single-consumer ringbuffer
2 //  this algorithm is implemented in various projects (linux kernel)
3 //
4 //  Copyright (C) 2009-2013 Tim Blechmann
5 //
6 //  Distributed under the Boost Software License, Version 1.0. (See
7 //  accompanying file LICENSE_1_0.txt or copy at
8 //  http://www.boost.org/LICENSE_1_0.txt)
9 
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12 
13 #include <algorithm>
14 #include <memory>
15 
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/utility.hpp>
20 #include <boost/utility/enable_if.hpp>
21 
22 #include <boost/type_traits/has_trivial_destructor.hpp>
23 #include <boost/type_traits/is_convertible.hpp>
24 
25 #include <boost/lockfree/detail/atomic.hpp>
26 #include <boost/lockfree/detail/branch_hints.hpp>
27 #include <boost/lockfree/detail/copy_payload.hpp>
28 #include <boost/lockfree/detail/parameter.hpp>
29 #include <boost/lockfree/detail/prefix.hpp>
30 
31 #ifdef BOOST_HAS_PRAGMA_ONCE
32 #pragma once
33 #endif
34 
35 namespace boost    {
36 namespace lockfree {
37 namespace detail   {
38 
39 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
40                               boost::parameter::optional<tag::allocator>
41                              > ringbuffer_signature;
42 
43 template <typename T>
44 class ringbuffer_base
45 {
46 #ifndef BOOST_DOXYGEN_INVOKED
47 protected:
48     typedef std::size_t size_t;
49     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
50     atomic<size_t> write_index_;
51     char padding1[padding_size]; /* force read_index and write_index to different cache lines */
52     atomic<size_t> read_index_;
53 
54     BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
55     BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
56 
57 protected:
ringbuffer_base(void)58     ringbuffer_base(void):
59         write_index_(0), read_index_(0)
60     {}
61 
next_index(size_t arg,size_t max_size)62     static size_t next_index(size_t arg, size_t max_size)
63     {
64         size_t ret = arg + 1;
65         while (unlikely(ret >= max_size))
66             ret -= max_size;
67         return ret;
68     }
69 
read_available(size_t write_index,size_t read_index,size_t max_size)70     static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
71     {
72         if (write_index >= read_index)
73             return write_index - read_index;
74 
75         const size_t ret = write_index + max_size - read_index;
76         return ret;
77     }
78 
write_available(size_t write_index,size_t read_index,size_t max_size)79     static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
80     {
81         size_t ret = read_index - write_index - 1;
82         if (write_index >= read_index)
83             ret += max_size;
84         return ret;
85     }
86 
read_available(size_t max_size) const87     size_t read_available(size_t max_size) const
88     {
89         size_t write_index = write_index_.load(memory_order_acquire);
90         const size_t read_index  = read_index_.load(memory_order_relaxed);
91         return read_available(write_index, read_index, max_size);
92     }
93 
write_available(size_t max_size) const94     size_t write_available(size_t max_size) const
95     {
96         size_t write_index = write_index_.load(memory_order_relaxed);
97         const size_t read_index  = read_index_.load(memory_order_acquire);
98         return write_available(write_index, read_index, max_size);
99     }
100 
push(T const & t,T * buffer,size_t max_size)101     bool push(T const & t, T * buffer, size_t max_size)
102     {
103         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
104         const size_t next = next_index(write_index, max_size);
105 
106         if (next == read_index_.load(memory_order_acquire))
107             return false; /* ringbuffer is full */
108 
109         new (buffer + write_index) T(t); // copy-construct
110 
111         write_index_.store(next, memory_order_release);
112 
113         return true;
114     }
115 
push(const T * input_buffer,size_t input_count,T * internal_buffer,size_t max_size)116     size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
117     {
118         return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
119     }
120 
121     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end,T * internal_buffer,size_t max_size)122     ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
123     {
124         // FIXME: avoid std::distance
125 
126         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
127         const size_t read_index  = read_index_.load(memory_order_acquire);
128         const size_t avail = write_available(write_index, read_index, max_size);
129 
130         if (avail == 0)
131             return begin;
132 
133         size_t input_count = std::distance(begin, end);
134         input_count = (std::min)(input_count, avail);
135 
136         size_t new_write_index = write_index + input_count;
137 
138         const ConstIterator last = boost::next(begin, input_count);
139 
140         if (write_index + input_count > max_size) {
141             /* copy data in two sections */
142             const size_t count0 = max_size - write_index;
143             const ConstIterator midpoint = boost::next(begin, count0);
144 
145             std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
146             std::uninitialized_copy(midpoint, last, internal_buffer);
147             new_write_index -= max_size;
148         } else {
149             std::uninitialized_copy(begin, last, internal_buffer + write_index);
150 
151             if (new_write_index == max_size)
152                 new_write_index = 0;
153         }
154 
155         write_index_.store(new_write_index, memory_order_release);
156         return last;
157     }
158 
159     template <typename Functor>
consume_one(Functor & functor,T * buffer,size_t max_size)160     bool consume_one(Functor & functor, T * buffer, size_t max_size)
161     {
162         const size_t write_index = write_index_.load(memory_order_acquire);
163         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
164         if ( empty(write_index, read_index) )
165             return false;
166 
167         T & object_to_consume = buffer[read_index];
168         functor( object_to_consume );
169         object_to_consume.~T();
170 
171         size_t next = next_index(read_index, max_size);
172         read_index_.store(next, memory_order_release);
173         return true;
174     }
175 
176     template <typename Functor>
consume_one(Functor const & functor,T * buffer,size_t max_size)177     bool consume_one(Functor const & functor, T * buffer, size_t max_size)
178     {
179         const size_t write_index = write_index_.load(memory_order_acquire);
180         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
181         if ( empty(write_index, read_index) )
182             return false;
183 
184         T & object_to_consume = buffer[read_index];
185         functor( object_to_consume );
186         object_to_consume.~T();
187 
188         size_t next = next_index(read_index, max_size);
189         read_index_.store(next, memory_order_release);
190         return true;
191     }
192 
193     template <typename Functor>
consume_all(Functor const & functor,T * internal_buffer,size_t max_size)194     size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
195     {
196         const size_t write_index = write_index_.load(memory_order_acquire);
197         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
198 
199         const size_t avail = read_available(write_index, read_index, max_size);
200 
201         if (avail == 0)
202             return 0;
203 
204         const size_t output_count = avail;
205 
206         size_t new_read_index = read_index + output_count;
207 
208         if (read_index + output_count > max_size) {
209             /* copy data in two sections */
210             const size_t count0 = max_size - read_index;
211             const size_t count1 = output_count - count0;
212 
213             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
214             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
215 
216             new_read_index -= max_size;
217         } else {
218             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
219 
220             if (new_read_index == max_size)
221                 new_read_index = 0;
222         }
223 
224         read_index_.store(new_read_index, memory_order_release);
225         return output_count;
226     }
227 
228     template <typename Functor>
consume_all(Functor & functor,T * internal_buffer,size_t max_size)229     size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
230     {
231         const size_t write_index = write_index_.load(memory_order_acquire);
232         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
233 
234         const size_t avail = read_available(write_index, read_index, max_size);
235 
236         if (avail == 0)
237             return 0;
238 
239         const size_t output_count = avail;
240 
241         size_t new_read_index = read_index + output_count;
242 
243         if (read_index + output_count > max_size) {
244             /* copy data in two sections */
245             const size_t count0 = max_size - read_index;
246             const size_t count1 = output_count - count0;
247 
248             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
249             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
250 
251             new_read_index -= max_size;
252         } else {
253             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
254 
255             if (new_read_index == max_size)
256                 new_read_index = 0;
257         }
258 
259         read_index_.store(new_read_index, memory_order_release);
260         return output_count;
261     }
262 
pop(T * output_buffer,size_t output_count,T * internal_buffer,size_t max_size)263     size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
264     {
265         const size_t write_index = write_index_.load(memory_order_acquire);
266         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
267 
268         const size_t avail = read_available(write_index, read_index, max_size);
269 
270         if (avail == 0)
271             return 0;
272 
273         output_count = (std::min)(output_count, avail);
274 
275         size_t new_read_index = read_index + output_count;
276 
277         if (read_index + output_count > max_size) {
278             /* copy data in two sections */
279             const size_t count0 = max_size - read_index;
280             const size_t count1 = output_count - count0;
281 
282             copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
283             copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
284 
285             new_read_index -= max_size;
286         } else {
287             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
288             if (new_read_index == max_size)
289                 new_read_index = 0;
290         }
291 
292         read_index_.store(new_read_index, memory_order_release);
293         return output_count;
294     }
295 
296     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it,T * internal_buffer,size_t max_size)297     size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
298     {
299         const size_t write_index = write_index_.load(memory_order_acquire);
300         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
301 
302         const size_t avail = read_available(write_index, read_index, max_size);
303         if (avail == 0)
304             return 0;
305 
306         size_t new_read_index = read_index + avail;
307 
308         if (read_index + avail > max_size) {
309             /* copy data in two sections */
310             const size_t count0 = max_size - read_index;
311             const size_t count1 = avail - count0;
312 
313             it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
314             copy_and_delete(internal_buffer, internal_buffer + count1, it);
315 
316             new_read_index -= max_size;
317         } else {
318             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
319             if (new_read_index == max_size)
320                 new_read_index = 0;
321         }
322 
323         read_index_.store(new_read_index, memory_order_release);
324         return avail;
325     }
326 
front(const T * internal_buffer) const327     const T& front(const T * internal_buffer) const
328     {
329         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
330         return *(internal_buffer + read_index);
331     }
332 
front(T * internal_buffer)333     T& front(T * internal_buffer)
334     {
335         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
336         return *(internal_buffer + read_index);
337     }
338 #endif
339 
340 
341 public:
342     /** reset the ringbuffer
343      *
344      * \note Not thread-safe
345      * */
reset(void)346     void reset(void)
347     {
348         if ( !boost::has_trivial_destructor<T>::value ) {
349             // make sure to call all destructors!
350 
351             T dummy_element;
352             while (pop(dummy_element))
353             {}
354         } else {
355             write_index_.store(0, memory_order_relaxed);
356             read_index_.store(0, memory_order_release);
357         }
358     }
359 
360     /** Check if the ringbuffer is empty
361      *
362      * \return true, if the ringbuffer is empty, false otherwise
363      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
364      * */
empty(void)365     bool empty(void)
366     {
367         return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
368     }
369 
370     /**
371      * \return true, if implementation is lock-free.
372      *
373      * */
is_lock_free(void) const374     bool is_lock_free(void) const
375     {
376         return write_index_.is_lock_free() && read_index_.is_lock_free();
377     }
378 
379 private:
empty(size_t write_index,size_t read_index)380     bool empty(size_t write_index, size_t read_index)
381     {
382         return write_index == read_index;
383     }
384 
385     template< class OutputIterator >
copy_and_delete(T * first,T * last,OutputIterator out)386     OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
387     {
388         if (boost::has_trivial_destructor<T>::value) {
389             return std::copy(first, last, out); // will use memcpy if possible
390         } else {
391             for (; first != last; ++first, ++out) {
392                 *out = *first;
393                 first->~T();
394             }
395             return out;
396         }
397     }
398 
399     template< class Functor >
run_functor_and_delete(T * first,T * last,Functor & functor)400     void run_functor_and_delete( T * first, T * last, Functor & functor )
401     {
402         for (; first != last; ++first) {
403             functor(*first);
404             first->~T();
405         }
406     }
407 
408     template< class Functor >
run_functor_and_delete(T * first,T * last,Functor const & functor)409     void run_functor_and_delete( T * first, T * last, Functor const & functor )
410     {
411         for (; first != last; ++first) {
412             functor(*first);
413             first->~T();
414         }
415     }
416 };
417 
418 template <typename T, std::size_t MaxSize>
419 class compile_time_sized_ringbuffer:
420     public ringbuffer_base<T>
421 {
422     typedef std::size_t size_type;
423     static const std::size_t max_size = MaxSize + 1;
424 
425     typedef typename boost::aligned_storage<max_size * sizeof(T),
426                                             boost::alignment_of<T>::value
427                                            >::type storage_type;
428 
429     storage_type storage_;
430 
data()431     T * data()
432     {
433         return static_cast<T*>(storage_.address());
434     }
435 
data() const436     const T * data() const
437     {
438         return static_cast<const T*>(storage_.address());
439     }
440 
441 protected:
max_number_of_elements() const442     size_type max_number_of_elements() const
443     {
444         return max_size;
445     }
446 
447 public:
push(T const & t)448     bool push(T const & t)
449     {
450         return ringbuffer_base<T>::push(t, data(), max_size);
451     }
452 
453     template <typename Functor>
consume_one(Functor & f)454     bool consume_one(Functor & f)
455     {
456         return ringbuffer_base<T>::consume_one(f, data(), max_size);
457     }
458 
459     template <typename Functor>
consume_one(Functor const & f)460     bool consume_one(Functor const & f)
461     {
462         return ringbuffer_base<T>::consume_one(f, data(), max_size);
463     }
464 
465     template <typename Functor>
consume_all(Functor & f)466     size_type consume_all(Functor & f)
467     {
468         return ringbuffer_base<T>::consume_all(f, data(), max_size);
469     }
470 
471     template <typename Functor>
consume_all(Functor const & f)472     size_type consume_all(Functor const & f)
473     {
474         return ringbuffer_base<T>::consume_all(f, data(), max_size);
475     }
476 
push(T const * t,size_type size)477     size_type push(T const * t, size_type size)
478     {
479         return ringbuffer_base<T>::push(t, size, data(), max_size);
480     }
481 
482     template <size_type size>
push(T const (& t)[size])483     size_type push(T const (&t)[size])
484     {
485         return push(t, size);
486     }
487 
488     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)489     ConstIterator push(ConstIterator begin, ConstIterator end)
490     {
491         return ringbuffer_base<T>::push(begin, end, data(), max_size);
492     }
493 
pop(T * ret,size_type size)494     size_type pop(T * ret, size_type size)
495     {
496         return ringbuffer_base<T>::pop(ret, size, data(), max_size);
497     }
498 
499     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it)500     size_type pop_to_output_iterator(OutputIterator it)
501     {
502         return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
503     }
504 
front(void) const505     const T& front(void) const
506     {
507         return ringbuffer_base<T>::front(data());
508     }
509 
front(void)510     T& front(void)
511     {
512         return ringbuffer_base<T>::front(data());
513     }
514 };
515 
516 template <typename T, typename Alloc>
517 class runtime_sized_ringbuffer:
518     public ringbuffer_base<T>,
519     private Alloc
520 {
521     typedef std::size_t size_type;
522     size_type max_elements_;
523     typedef typename Alloc::pointer pointer;
524     pointer array_;
525 
526 protected:
max_number_of_elements() const527     size_type max_number_of_elements() const
528     {
529         return max_elements_;
530     }
531 
532 public:
runtime_sized_ringbuffer(size_type max_elements)533     explicit runtime_sized_ringbuffer(size_type max_elements):
534         max_elements_(max_elements + 1)
535     {
536         array_ = Alloc::allocate(max_elements_);
537     }
538 
539     template <typename U>
runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc,size_type max_elements)540     runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
541         Alloc(alloc), max_elements_(max_elements + 1)
542     {
543         array_ = Alloc::allocate(max_elements_);
544     }
545 
runtime_sized_ringbuffer(Alloc const & alloc,size_type max_elements)546     runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
547         Alloc(alloc), max_elements_(max_elements + 1)
548     {
549         array_ = Alloc::allocate(max_elements_);
550     }
551 
~runtime_sized_ringbuffer(void)552     ~runtime_sized_ringbuffer(void)
553     {
554         // destroy all remaining items
555         T out;
556         while (pop(&out, 1)) {}
557 
558         Alloc::deallocate(array_, max_elements_);
559     }
560 
push(T const & t)561     bool push(T const & t)
562     {
563         return ringbuffer_base<T>::push(t, &*array_, max_elements_);
564     }
565 
566     template <typename Functor>
consume_one(Functor & f)567     bool consume_one(Functor & f)
568     {
569         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
570     }
571 
572     template <typename Functor>
consume_one(Functor const & f)573     bool consume_one(Functor const & f)
574     {
575         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
576     }
577 
578     template <typename Functor>
consume_all(Functor & f)579     size_type consume_all(Functor & f)
580     {
581         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
582     }
583 
584     template <typename Functor>
consume_all(Functor const & f)585     size_type consume_all(Functor const & f)
586     {
587         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
588     }
589 
push(T const * t,size_type size)590     size_type push(T const * t, size_type size)
591     {
592         return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
593     }
594 
595     template <size_type size>
push(T const (& t)[size])596     size_type push(T const (&t)[size])
597     {
598         return push(t, size);
599     }
600 
601     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)602     ConstIterator push(ConstIterator begin, ConstIterator end)
603     {
604         return ringbuffer_base<T>::push(begin, end, array_, max_elements_);
605     }
606 
pop(T * ret,size_type size)607     size_type pop(T * ret, size_type size)
608     {
609         return ringbuffer_base<T>::pop(ret, size, array_, max_elements_);
610     }
611 
612     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it)613     size_type pop_to_output_iterator(OutputIterator it)
614     {
615         return ringbuffer_base<T>::pop_to_output_iterator(it, array_, max_elements_);
616     }
617 
front(void) const618     const T& front(void) const
619     {
620         return ringbuffer_base<T>::front(array_);
621     }
622 
front(void)623     T& front(void)
624     {
625         return ringbuffer_base<T>::front(array_);
626     }
627 };
628 
629 template <typename T, typename A0, typename A1>
630 struct make_ringbuffer
631 {
632     typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
633 
634     typedef extract_capacity<bound_args> extract_capacity_t;
635 
636     static const bool runtime_sized = !extract_capacity_t::has_capacity;
637     static const size_t capacity    =  extract_capacity_t::capacity;
638 
639     typedef extract_allocator<bound_args, T> extract_allocator_t;
640     typedef typename extract_allocator_t::type allocator;
641 
642     // allocator argument is only sane, for run-time sized ringbuffers
643     BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
644                                   mpl::bool_<!extract_allocator_t::has_allocator>,
645                                   mpl::true_
646                                  >::type::value));
647 
648     typedef typename mpl::if_c<runtime_sized,
649                                runtime_sized_ringbuffer<T, allocator>,
650                                compile_time_sized_ringbuffer<T, capacity>
651                               >::type ringbuffer_type;
652 };
653 
654 
655 } /* namespace detail */
656 
657 
658 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
659  *
660  *  \b Policies:
661  *  - \c boost::lockfree::capacity<>, optional <br>
662  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
663  *
664  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
665  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
666  *    to be sized at run-time
667  *
668  *  \b Requirements:
669  *  - T must have a default constructor
670  *  - T must be copyable
671  * */
672 #ifndef BOOST_DOXYGEN_INVOKED
673 template <typename T,
674           class A0 = boost::parameter::void_,
675           class A1 = boost::parameter::void_>
676 #else
677 template <typename T, ...Options>
678 #endif
679 class spsc_queue:
680     public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
681 {
682 private:
683 
684 #ifndef BOOST_DOXYGEN_INVOKED
685     typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
686     static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
687     typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
688 
689     struct implementation_defined
690     {
691         typedef allocator_arg allocator;
692         typedef std::size_t size_type;
693     };
694 #endif
695 
696 public:
697     typedef T value_type;
698     typedef typename implementation_defined::allocator allocator;
699     typedef typename implementation_defined::size_type size_type;
700 
701     /** Constructs a spsc_queue
702      *
703      *  \pre spsc_queue must be configured to be sized at compile-time
704      */
705     // @{
spsc_queue(void)706     spsc_queue(void)
707     {
708         BOOST_ASSERT(!runtime_sized);
709     }
710 
711     template <typename U>
spsc_queue(typename allocator::template rebind<U>::other const & alloc)712     explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc)
713     {
714         // just for API compatibility: we don't actually need an allocator
715         BOOST_STATIC_ASSERT(!runtime_sized);
716     }
717 
spsc_queue(allocator const & alloc)718     explicit spsc_queue(allocator const & alloc)
719     {
720         // just for API compatibility: we don't actually need an allocator
721         BOOST_ASSERT(!runtime_sized);
722     }
723     // @}
724 
725 
726     /** Constructs a spsc_queue for element_count elements
727      *
728      *  \pre spsc_queue must be configured to be sized at run-time
729      */
730     // @{
spsc_queue(size_type element_count)731     explicit spsc_queue(size_type element_count):
732         base_type(element_count)
733     {
734         BOOST_ASSERT(runtime_sized);
735     }
736 
737     template <typename U>
spsc_queue(size_type element_count,typename allocator::template rebind<U>::other const & alloc)738     spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
739         base_type(alloc, element_count)
740     {
741         BOOST_STATIC_ASSERT(runtime_sized);
742     }
743 
spsc_queue(size_type element_count,allocator_arg const & alloc)744     spsc_queue(size_type element_count, allocator_arg const & alloc):
745         base_type(alloc, element_count)
746     {
747         BOOST_ASSERT(runtime_sized);
748     }
749     // @}
750 
751     /** Pushes object t to the ringbuffer.
752      *
753      * \pre only one thread is allowed to push data to the spsc_queue
754      * \post object will be pushed to the spsc_queue, unless it is full.
755      * \return true, if the push operation is successful.
756      *
757      * \note Thread-safe and wait-free
758      * */
push(T const & t)759     bool push(T const & t)
760     {
761         return base_type::push(t);
762     }
763 
764     /** Pops one object from ringbuffer.
765      *
766      * \pre only one thread is allowed to pop data to the spsc_queue
767      * \post if ringbuffer is not empty, object will be discarded.
768      * \return true, if the pop operation is successful, false if ringbuffer was empty.
769      *
770      * \note Thread-safe and wait-free
771      */
pop()772     bool pop ()
773     {
774         detail::consume_noop consume_functor;
775         return consume_one( consume_functor );
776     }
777 
778     /** Pops one object from ringbuffer.
779      *
780      * \pre only one thread is allowed to pop data to the spsc_queue
781      * \post if ringbuffer is not empty, object will be copied to ret.
782      * \return true, if the pop operation is successful, false if ringbuffer was empty.
783      *
784      * \note Thread-safe and wait-free
785      */
786     template <typename U>
787     typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop(U & ret)788     pop (U & ret)
789     {
790         detail::consume_via_copy<U> consume_functor(ret);
791         return consume_one( consume_functor );
792     }
793 
794     /** Pushes as many objects from the array t as there is space.
795      *
796      * \pre only one thread is allowed to push data to the spsc_queue
797      * \return number of pushed items
798      *
799      * \note Thread-safe and wait-free
800      */
push(T const * t,size_type size)801     size_type push(T const * t, size_type size)
802     {
803         return base_type::push(t, size);
804     }
805 
806     /** Pushes as many objects from the array t as there is space available.
807      *
808      * \pre only one thread is allowed to push data to the spsc_queue
809      * \return number of pushed items
810      *
811      * \note Thread-safe and wait-free
812      */
813     template <size_type size>
push(T const (& t)[size])814     size_type push(T const (&t)[size])
815     {
816         return push(t, size);
817     }
818 
819     /** Pushes as many objects from the range [begin, end) as there is space .
820      *
821      * \pre only one thread is allowed to push data to the spsc_queue
822      * \return iterator to the first element, which has not been pushed
823      *
824      * \note Thread-safe and wait-free
825      */
826     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)827     ConstIterator push(ConstIterator begin, ConstIterator end)
828     {
829         return base_type::push(begin, end);
830     }
831 
832     /** Pops a maximum of size objects from ringbuffer.
833      *
834      * \pre only one thread is allowed to pop data to the spsc_queue
835      * \return number of popped items
836      *
837      * \note Thread-safe and wait-free
838      * */
pop(T * ret,size_type size)839     size_type pop(T * ret, size_type size)
840     {
841         return base_type::pop(ret, size);
842     }
843 
844     /** Pops a maximum of size objects from spsc_queue.
845      *
846      * \pre only one thread is allowed to pop data to the spsc_queue
847      * \return number of popped items
848      *
849      * \note Thread-safe and wait-free
850      * */
851     template <size_type size>
pop(T (& ret)[size])852     size_type pop(T (&ret)[size])
853     {
854         return pop(ret, size);
855     }
856 
857     /** Pops objects to the output iterator it
858      *
859      * \pre only one thread is allowed to pop data to the spsc_queue
860      * \return number of popped items
861      *
862      * \note Thread-safe and wait-free
863      * */
864     template <typename OutputIterator>
865     typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
pop(OutputIterator it)866     pop(OutputIterator it)
867     {
868         return base_type::pop_to_output_iterator(it);
869     }
870 
871     /** consumes one element via a functor
872      *
873      *  pops one element from the queue and applies the functor on this object
874      *
875      * \returns true, if one element was consumed
876      *
877      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
878      * */
879     template <typename Functor>
consume_one(Functor & f)880     bool consume_one(Functor & f)
881     {
882         return base_type::consume_one(f);
883     }
884 
885     /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
886     template <typename Functor>
consume_one(Functor const & f)887     bool consume_one(Functor const & f)
888     {
889         return base_type::consume_one(f);
890     }
891 
892     /** consumes all elements via a functor
893      *
894      * sequentially pops all elements from the queue and applies the functor on each object
895      *
896      * \returns number of elements that are consumed
897      *
898      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
899      * */
900     template <typename Functor>
consume_all(Functor & f)901     size_type consume_all(Functor & f)
902     {
903         return base_type::consume_all(f);
904     }
905 
906     /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
907     template <typename Functor>
consume_all(Functor const & f)908     size_type consume_all(Functor const & f)
909     {
910         return base_type::consume_all(f);
911     }
912 
913     /** get number of elements that are available for read
914      *
915      * \return number of available elements that can be popped from the spsc_queue
916      *
917      * \note Thread-safe and wait-free, should only be called from the consumer thread
918      * */
read_available() const919     size_type read_available() const
920     {
921         return base_type::read_available(base_type::max_number_of_elements());
922     }
923 
924     /** get write space to write elements
925      *
926      * \return number of elements that can be pushed to the spsc_queue
927      *
928      * \note Thread-safe and wait-free, should only be called from the producer thread
929      * */
write_available() const930     size_type write_available() const
931     {
932         return base_type::write_available(base_type::max_number_of_elements());
933     }
934 
935     /** get reference to element in the front of the queue
936      *
937      * Availability of front element can be checked using read_available().
938      *
939      * \pre only a consuming thread is allowed to check front element
940      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
941      * \return reference to the first element in the queue
942      *
943      * \note Thread-safe and wait-free
944      */
front() const945     const T& front() const
946     {
947         BOOST_ASSERT(read_available() > 0);
948         return base_type::front();
949     }
950 
951     /// \copydoc boost::lockfree::spsc_queue::front() const
front()952     T& front()
953     {
954         BOOST_ASSERT(read_available() > 0);
955         return base_type::front();
956     }
957 
958     /** reset the ringbuffer
959      *
960      * \note Not thread-safe
961      * */
reset(void)962     void reset(void)
963     {
964         if ( !boost::has_trivial_destructor<T>::value ) {
965             // make sure to call all destructors!
966 
967             T dummy_element;
968             while (pop(dummy_element))
969             {}
970         } else {
971             base_type::write_index_.store(0, memory_order_relaxed);
972             base_type::read_index_.store(0, memory_order_release);
973         }
974    }
975 };
976 
977 } /* namespace lockfree */
978 } /* namespace boost */
979 
980 
981 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */
982