1 //  lock-free single-producer/single-consumer ringbuffer
2 //  this algorithm is implemented in various projects (linux kernel)
3 //
4 //  Copyright (C) 2009-2013 Tim Blechmann
5 //
6 //  Distributed under the Boost Software License, Version 1.0. (See
7 //  accompanying file LICENSE_1_0.txt or copy at
8 //  http://www.boost.org/LICENSE_1_0.txt)
9 
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12 
13 #include <algorithm>
14 #include <memory>
15 
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/utility.hpp>
20 #include <boost/utility/enable_if.hpp>
21 #include <boost/config.hpp> // for BOOST_LIKELY
22 
23 #include <boost/type_traits/has_trivial_destructor.hpp>
24 #include <boost/type_traits/is_convertible.hpp>
25 
26 #include <boost/lockfree/detail/atomic.hpp>
27 #include <boost/lockfree/detail/copy_payload.hpp>
28 #include <boost/lockfree/detail/parameter.hpp>
29 #include <boost/lockfree/detail/prefix.hpp>
30 
31 #include <boost/lockfree/lockfree_forward.hpp>
32 
33 #ifdef BOOST_HAS_PRAGMA_ONCE
34 #pragma once
35 #endif
36 
37 namespace boost    {
38 namespace lockfree {
39 namespace detail   {
40 
41 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
42                               boost::parameter::optional<tag::allocator>
43                              > ringbuffer_signature;
44 
45 template <typename T>
46 class ringbuffer_base
47 {
48 #ifndef BOOST_DOXYGEN_INVOKED
49 protected:
50     typedef std::size_t size_t;
51     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
52     atomic<size_t> write_index_;
53     char padding1[padding_size]; /* force read_index and write_index to different cache lines */
54     atomic<size_t> read_index_;
55 
56     BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
57     BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
58 
59 protected:
ringbuffer_base(void)60     ringbuffer_base(void):
61         write_index_(0), read_index_(0)
62     {}
63 
next_index(size_t arg,size_t max_size)64     static size_t next_index(size_t arg, size_t max_size)
65     {
66         size_t ret = arg + 1;
67         while (BOOST_UNLIKELY(ret >= max_size))
68             ret -= max_size;
69         return ret;
70     }
71 
read_available(size_t write_index,size_t read_index,size_t max_size)72     static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
73     {
74         if (write_index >= read_index)
75             return write_index - read_index;
76 
77         const size_t ret = write_index + max_size - read_index;
78         return ret;
79     }
80 
write_available(size_t write_index,size_t read_index,size_t max_size)81     static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
82     {
83         size_t ret = read_index - write_index - 1;
84         if (write_index >= read_index)
85             ret += max_size;
86         return ret;
87     }
88 
read_available(size_t max_size) const89     size_t read_available(size_t max_size) const
90     {
91         size_t write_index = write_index_.load(memory_order_acquire);
92         const size_t read_index  = read_index_.load(memory_order_relaxed);
93         return read_available(write_index, read_index, max_size);
94     }
95 
write_available(size_t max_size) const96     size_t write_available(size_t max_size) const
97     {
98         size_t write_index = write_index_.load(memory_order_relaxed);
99         const size_t read_index  = read_index_.load(memory_order_acquire);
100         return write_available(write_index, read_index, max_size);
101     }
102 
push(T const & t,T * buffer,size_t max_size)103     bool push(T const & t, T * buffer, size_t max_size)
104     {
105         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
106         const size_t next = next_index(write_index, max_size);
107 
108         if (next == read_index_.load(memory_order_acquire))
109             return false; /* ringbuffer is full */
110 
111         new (buffer + write_index) T(t); // copy-construct
112 
113         write_index_.store(next, memory_order_release);
114 
115         return true;
116     }
117 
push(const T * input_buffer,size_t input_count,T * internal_buffer,size_t max_size)118     size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
119     {
120         return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
121     }
122 
123     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end,T * internal_buffer,size_t max_size)124     ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
125     {
126         // FIXME: avoid std::distance
127 
128         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
129         const size_t read_index  = read_index_.load(memory_order_acquire);
130         const size_t avail = write_available(write_index, read_index, max_size);
131 
132         if (avail == 0)
133             return begin;
134 
135         size_t input_count = std::distance(begin, end);
136         input_count = (std::min)(input_count, avail);
137 
138         size_t new_write_index = write_index + input_count;
139 
140         const ConstIterator last = boost::next(begin, input_count);
141 
142         if (write_index + input_count > max_size) {
143             /* copy data in two sections */
144             const size_t count0 = max_size - write_index;
145             const ConstIterator midpoint = boost::next(begin, count0);
146 
147             std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
148             std::uninitialized_copy(midpoint, last, internal_buffer);
149             new_write_index -= max_size;
150         } else {
151             std::uninitialized_copy(begin, last, internal_buffer + write_index);
152 
153             if (new_write_index == max_size)
154                 new_write_index = 0;
155         }
156 
157         write_index_.store(new_write_index, memory_order_release);
158         return last;
159     }
160 
161     template <typename Functor>
consume_one(Functor & functor,T * buffer,size_t max_size)162     bool consume_one(Functor & functor, T * buffer, size_t max_size)
163     {
164         const size_t write_index = write_index_.load(memory_order_acquire);
165         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
166         if ( empty(write_index, read_index) )
167             return false;
168 
169         T & object_to_consume = buffer[read_index];
170         functor( object_to_consume );
171         object_to_consume.~T();
172 
173         size_t next = next_index(read_index, max_size);
174         read_index_.store(next, memory_order_release);
175         return true;
176     }
177 
178     template <typename Functor>
consume_one(Functor const & functor,T * buffer,size_t max_size)179     bool consume_one(Functor const & functor, T * buffer, size_t max_size)
180     {
181         const size_t write_index = write_index_.load(memory_order_acquire);
182         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
183         if ( empty(write_index, read_index) )
184             return false;
185 
186         T & object_to_consume = buffer[read_index];
187         functor( object_to_consume );
188         object_to_consume.~T();
189 
190         size_t next = next_index(read_index, max_size);
191         read_index_.store(next, memory_order_release);
192         return true;
193     }
194 
195     template <typename Functor>
consume_all(Functor const & functor,T * internal_buffer,size_t max_size)196     size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
197     {
198         const size_t write_index = write_index_.load(memory_order_acquire);
199         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
200 
201         const size_t avail = read_available(write_index, read_index, max_size);
202 
203         if (avail == 0)
204             return 0;
205 
206         const size_t output_count = avail;
207 
208         size_t new_read_index = read_index + output_count;
209 
210         if (read_index + output_count > max_size) {
211             /* copy data in two sections */
212             const size_t count0 = max_size - read_index;
213             const size_t count1 = output_count - count0;
214 
215             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
216             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
217 
218             new_read_index -= max_size;
219         } else {
220             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
221 
222             if (new_read_index == max_size)
223                 new_read_index = 0;
224         }
225 
226         read_index_.store(new_read_index, memory_order_release);
227         return output_count;
228     }
229 
230     template <typename Functor>
consume_all(Functor & functor,T * internal_buffer,size_t max_size)231     size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
232     {
233         const size_t write_index = write_index_.load(memory_order_acquire);
234         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
235 
236         const size_t avail = read_available(write_index, read_index, max_size);
237 
238         if (avail == 0)
239             return 0;
240 
241         const size_t output_count = avail;
242 
243         size_t new_read_index = read_index + output_count;
244 
245         if (read_index + output_count > max_size) {
246             /* copy data in two sections */
247             const size_t count0 = max_size - read_index;
248             const size_t count1 = output_count - count0;
249 
250             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
251             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
252 
253             new_read_index -= max_size;
254         } else {
255             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
256 
257             if (new_read_index == max_size)
258                 new_read_index = 0;
259         }
260 
261         read_index_.store(new_read_index, memory_order_release);
262         return output_count;
263     }
264 
pop(T * output_buffer,size_t output_count,T * internal_buffer,size_t max_size)265     size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
266     {
267         const size_t write_index = write_index_.load(memory_order_acquire);
268         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
269 
270         const size_t avail = read_available(write_index, read_index, max_size);
271 
272         if (avail == 0)
273             return 0;
274 
275         output_count = (std::min)(output_count, avail);
276 
277         size_t new_read_index = read_index + output_count;
278 
279         if (read_index + output_count > max_size) {
280             /* copy data in two sections */
281             const size_t count0 = max_size - read_index;
282             const size_t count1 = output_count - count0;
283 
284             copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
285             copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
286 
287             new_read_index -= max_size;
288         } else {
289             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
290             if (new_read_index == max_size)
291                 new_read_index = 0;
292         }
293 
294         read_index_.store(new_read_index, memory_order_release);
295         return output_count;
296     }
297 
298     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it,T * internal_buffer,size_t max_size)299     size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
300     {
301         const size_t write_index = write_index_.load(memory_order_acquire);
302         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
303 
304         const size_t avail = read_available(write_index, read_index, max_size);
305         if (avail == 0)
306             return 0;
307 
308         size_t new_read_index = read_index + avail;
309 
310         if (read_index + avail > max_size) {
311             /* copy data in two sections */
312             const size_t count0 = max_size - read_index;
313             const size_t count1 = avail - count0;
314 
315             it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
316             copy_and_delete(internal_buffer, internal_buffer + count1, it);
317 
318             new_read_index -= max_size;
319         } else {
320             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
321             if (new_read_index == max_size)
322                 new_read_index = 0;
323         }
324 
325         read_index_.store(new_read_index, memory_order_release);
326         return avail;
327     }
328 
front(const T * internal_buffer) const329     const T& front(const T * internal_buffer) const
330     {
331         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
332         return *(internal_buffer + read_index);
333     }
334 
front(T * internal_buffer)335     T& front(T * internal_buffer)
336     {
337         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
338         return *(internal_buffer + read_index);
339     }
340 #endif
341 
342 
343 public:
344     /** reset the ringbuffer
345      *
346      * \note Not thread-safe
347      * */
reset(void)348     void reset(void)
349     {
350         if ( !boost::has_trivial_destructor<T>::value ) {
351             // make sure to call all destructors!
352 
353             T dummy_element;
354             while (pop(dummy_element))
355             {}
356         } else {
357             write_index_.store(0, memory_order_relaxed);
358             read_index_.store(0, memory_order_release);
359         }
360     }
361 
362     /** Check if the ringbuffer is empty
363      *
364      * \return true, if the ringbuffer is empty, false otherwise
365      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
366      * */
empty(void)367     bool empty(void)
368     {
369         return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
370     }
371 
372     /**
373      * \return true, if implementation is lock-free.
374      *
375      * */
is_lock_free(void) const376     bool is_lock_free(void) const
377     {
378         return write_index_.is_lock_free() && read_index_.is_lock_free();
379     }
380 
381 private:
empty(size_t write_index,size_t read_index)382     bool empty(size_t write_index, size_t read_index)
383     {
384         return write_index == read_index;
385     }
386 
387     template< class OutputIterator >
copy_and_delete(T * first,T * last,OutputIterator out)388     OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
389     {
390         if (boost::has_trivial_destructor<T>::value) {
391             return std::copy(first, last, out); // will use memcpy if possible
392         } else {
393             for (; first != last; ++first, ++out) {
394                 *out = *first;
395                 first->~T();
396             }
397             return out;
398         }
399     }
400 
401     template< class Functor >
run_functor_and_delete(T * first,T * last,Functor & functor)402     void run_functor_and_delete( T * first, T * last, Functor & functor )
403     {
404         for (; first != last; ++first) {
405             functor(*first);
406             first->~T();
407         }
408     }
409 
410     template< class Functor >
run_functor_and_delete(T * first,T * last,Functor const & functor)411     void run_functor_and_delete( T * first, T * last, Functor const & functor )
412     {
413         for (; first != last; ++first) {
414             functor(*first);
415             first->~T();
416         }
417     }
418 };
419 
420 template <typename T, std::size_t MaxSize>
421 class compile_time_sized_ringbuffer:
422     public ringbuffer_base<T>
423 {
424     typedef std::size_t size_type;
425     static const std::size_t max_size = MaxSize + 1;
426 
427     typedef typename boost::aligned_storage<max_size * sizeof(T),
428                                             boost::alignment_of<T>::value
429                                            >::type storage_type;
430 
431     storage_type storage_;
432 
data()433     T * data()
434     {
435         return static_cast<T*>(storage_.address());
436     }
437 
data() const438     const T * data() const
439     {
440         return static_cast<const T*>(storage_.address());
441     }
442 
443 protected:
max_number_of_elements() const444     size_type max_number_of_elements() const
445     {
446         return max_size;
447     }
448 
449 public:
push(T const & t)450     bool push(T const & t)
451     {
452         return ringbuffer_base<T>::push(t, data(), max_size);
453     }
454 
455     template <typename Functor>
consume_one(Functor & f)456     bool consume_one(Functor & f)
457     {
458         return ringbuffer_base<T>::consume_one(f, data(), max_size);
459     }
460 
461     template <typename Functor>
consume_one(Functor const & f)462     bool consume_one(Functor const & f)
463     {
464         return ringbuffer_base<T>::consume_one(f, data(), max_size);
465     }
466 
467     template <typename Functor>
consume_all(Functor & f)468     size_type consume_all(Functor & f)
469     {
470         return ringbuffer_base<T>::consume_all(f, data(), max_size);
471     }
472 
473     template <typename Functor>
consume_all(Functor const & f)474     size_type consume_all(Functor const & f)
475     {
476         return ringbuffer_base<T>::consume_all(f, data(), max_size);
477     }
478 
push(T const * t,size_type size)479     size_type push(T const * t, size_type size)
480     {
481         return ringbuffer_base<T>::push(t, size, data(), max_size);
482     }
483 
484     template <size_type size>
push(T const (& t)[size])485     size_type push(T const (&t)[size])
486     {
487         return push(t, size);
488     }
489 
490     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)491     ConstIterator push(ConstIterator begin, ConstIterator end)
492     {
493         return ringbuffer_base<T>::push(begin, end, data(), max_size);
494     }
495 
pop(T * ret,size_type size)496     size_type pop(T * ret, size_type size)
497     {
498         return ringbuffer_base<T>::pop(ret, size, data(), max_size);
499     }
500 
501     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it)502     size_type pop_to_output_iterator(OutputIterator it)
503     {
504         return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
505     }
506 
front(void) const507     const T& front(void) const
508     {
509         return ringbuffer_base<T>::front(data());
510     }
511 
front(void)512     T& front(void)
513     {
514         return ringbuffer_base<T>::front(data());
515     }
516 };
517 
518 template <typename T, typename Alloc>
519 class runtime_sized_ringbuffer:
520     public ringbuffer_base<T>,
521     private Alloc
522 {
523     typedef std::size_t size_type;
524     size_type max_elements_;
525     typedef typename Alloc::pointer pointer;
526     pointer array_;
527 
528 protected:
max_number_of_elements() const529     size_type max_number_of_elements() const
530     {
531         return max_elements_;
532     }
533 
534 public:
runtime_sized_ringbuffer(size_type max_elements)535     explicit runtime_sized_ringbuffer(size_type max_elements):
536         max_elements_(max_elements + 1)
537     {
538         array_ = Alloc::allocate(max_elements_);
539     }
540 
541     template <typename U>
runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc,size_type max_elements)542     runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
543         Alloc(alloc), max_elements_(max_elements + 1)
544     {
545         array_ = Alloc::allocate(max_elements_);
546     }
547 
runtime_sized_ringbuffer(Alloc const & alloc,size_type max_elements)548     runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
549         Alloc(alloc), max_elements_(max_elements + 1)
550     {
551         array_ = Alloc::allocate(max_elements_);
552     }
553 
~runtime_sized_ringbuffer(void)554     ~runtime_sized_ringbuffer(void)
555     {
556         // destroy all remaining items
557         T out;
558         while (pop(&out, 1)) {}
559 
560         Alloc::deallocate(array_, max_elements_);
561     }
562 
push(T const & t)563     bool push(T const & t)
564     {
565         return ringbuffer_base<T>::push(t, &*array_, max_elements_);
566     }
567 
568     template <typename Functor>
consume_one(Functor & f)569     bool consume_one(Functor & f)
570     {
571         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
572     }
573 
574     template <typename Functor>
consume_one(Functor const & f)575     bool consume_one(Functor const & f)
576     {
577         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
578     }
579 
580     template <typename Functor>
consume_all(Functor & f)581     size_type consume_all(Functor & f)
582     {
583         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
584     }
585 
586     template <typename Functor>
consume_all(Functor const & f)587     size_type consume_all(Functor const & f)
588     {
589         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
590     }
591 
push(T const * t,size_type size)592     size_type push(T const * t, size_type size)
593     {
594         return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
595     }
596 
597     template <size_type size>
push(T const (& t)[size])598     size_type push(T const (&t)[size])
599     {
600         return push(t, size);
601     }
602 
603     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)604     ConstIterator push(ConstIterator begin, ConstIterator end)
605     {
606         return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_);
607     }
608 
pop(T * ret,size_type size)609     size_type pop(T * ret, size_type size)
610     {
611         return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_);
612     }
613 
614     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it)615     size_type pop_to_output_iterator(OutputIterator it)
616     {
617         return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_);
618     }
619 
front(void) const620     const T& front(void) const
621     {
622         return ringbuffer_base<T>::front(&*array_);
623     }
624 
front(void)625     T& front(void)
626     {
627         return ringbuffer_base<T>::front(&*array_);
628     }
629 };
630 
631 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
632 template <typename T, typename A0, typename A1>
633 #else
634 template <typename T, typename ...Options>
635 #endif
636 struct make_ringbuffer
637 {
638 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
639     typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
640 #else
641     typedef typename ringbuffer_signature::bind<Options...>::type bound_args;
642 #endif
643 
644     typedef extract_capacity<bound_args> extract_capacity_t;
645 
646     static const bool runtime_sized = !extract_capacity_t::has_capacity;
647     static const size_t capacity    =  extract_capacity_t::capacity;
648 
649     typedef extract_allocator<bound_args, T> extract_allocator_t;
650     typedef typename extract_allocator_t::type allocator;
651 
652     // allocator argument is only sane, for run-time sized ringbuffers
653     BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
654                                   mpl::bool_<!extract_allocator_t::has_allocator>,
655                                   mpl::true_
656                                  >::type::value));
657 
658     typedef typename mpl::if_c<runtime_sized,
659                                runtime_sized_ringbuffer<T, allocator>,
660                                compile_time_sized_ringbuffer<T, capacity>
661                               >::type ringbuffer_type;
662 };
663 
664 
665 } /* namespace detail */
666 
667 
668 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
669  *
670  *  \b Policies:
671  *  - \c boost::lockfree::capacity<>, optional <br>
672  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
673  *
674  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
675  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
676  *    to be sized at run-time
677  *
678  *  \b Requirements:
679  *  - T must have a default constructor
680  *  - T must be copyable
681  * */
682 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
683 template <typename T, class A0, class A1>
684 #else
685 template <typename T, typename ...Options>
686 #endif
687 class spsc_queue:
688 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
689     public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
690 #else
691     public detail::make_ringbuffer<T, Options...>::ringbuffer_type
692 #endif
693 {
694 private:
695 
696 #ifndef BOOST_DOXYGEN_INVOKED
697 
698 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
699     typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
700     static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
701     typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
702 #else
703     typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type;
704     static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized;
705     typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg;
706 #endif
707 
708 
709     struct implementation_defined
710     {
711         typedef allocator_arg allocator;
712         typedef std::size_t size_type;
713     };
714 #endif
715 
716 public:
717     typedef T value_type;
718     typedef typename implementation_defined::allocator allocator;
719     typedef typename implementation_defined::size_type size_type;
720 
721     /** Constructs a spsc_queue
722      *
723      *  \pre spsc_queue must be configured to be sized at compile-time
724      */
725     // @{
spsc_queue(void)726     spsc_queue(void)
727     {
728         BOOST_ASSERT(!runtime_sized);
729     }
730 
731     template <typename U>
spsc_queue(typename allocator::template rebind<U>::other const &)732     explicit spsc_queue(typename allocator::template rebind<U>::other const &)
733     {
734         // just for API compatibility: we don't actually need an allocator
735         BOOST_STATIC_ASSERT(!runtime_sized);
736     }
737 
spsc_queue(allocator const &)738     explicit spsc_queue(allocator const &)
739     {
740         // just for API compatibility: we don't actually need an allocator
741         BOOST_ASSERT(!runtime_sized);
742     }
743     // @}
744 
745 
746     /** Constructs a spsc_queue for element_count elements
747      *
748      *  \pre spsc_queue must be configured to be sized at run-time
749      */
750     // @{
spsc_queue(size_type element_count)751     explicit spsc_queue(size_type element_count):
752         base_type(element_count)
753     {
754         BOOST_ASSERT(runtime_sized);
755     }
756 
757     template <typename U>
spsc_queue(size_type element_count,typename allocator::template rebind<U>::other const & alloc)758     spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
759         base_type(alloc, element_count)
760     {
761         BOOST_STATIC_ASSERT(runtime_sized);
762     }
763 
spsc_queue(size_type element_count,allocator_arg const & alloc)764     spsc_queue(size_type element_count, allocator_arg const & alloc):
765         base_type(alloc, element_count)
766     {
767         BOOST_ASSERT(runtime_sized);
768     }
769     // @}
770 
771     /** Pushes object t to the ringbuffer.
772      *
773      * \pre only one thread is allowed to push data to the spsc_queue
774      * \post object will be pushed to the spsc_queue, unless it is full.
775      * \return true, if the push operation is successful.
776      *
777      * \note Thread-safe and wait-free
778      * */
push(T const & t)779     bool push(T const & t)
780     {
781         return base_type::push(t);
782     }
783 
784     /** Pops one object from ringbuffer.
785      *
786      * \pre only one thread is allowed to pop data to the spsc_queue
787      * \post if ringbuffer is not empty, object will be discarded.
788      * \return true, if the pop operation is successful, false if ringbuffer was empty.
789      *
790      * \note Thread-safe and wait-free
791      */
pop()792     bool pop ()
793     {
794         detail::consume_noop consume_functor;
795         return consume_one( consume_functor );
796     }
797 
798     /** Pops one object from ringbuffer.
799      *
800      * \pre only one thread is allowed to pop data to the spsc_queue
801      * \post if ringbuffer is not empty, object will be copied to ret.
802      * \return true, if the pop operation is successful, false if ringbuffer was empty.
803      *
804      * \note Thread-safe and wait-free
805      */
806     template <typename U>
807     typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop(U & ret)808     pop (U & ret)
809     {
810         detail::consume_via_copy<U> consume_functor(ret);
811         return consume_one( consume_functor );
812     }
813 
814     /** Pushes as many objects from the array t as there is space.
815      *
816      * \pre only one thread is allowed to push data to the spsc_queue
817      * \return number of pushed items
818      *
819      * \note Thread-safe and wait-free
820      */
push(T const * t,size_type size)821     size_type push(T const * t, size_type size)
822     {
823         return base_type::push(t, size);
824     }
825 
826     /** Pushes as many objects from the array t as there is space available.
827      *
828      * \pre only one thread is allowed to push data to the spsc_queue
829      * \return number of pushed items
830      *
831      * \note Thread-safe and wait-free
832      */
833     template <size_type size>
push(T const (& t)[size])834     size_type push(T const (&t)[size])
835     {
836         return push(t, size);
837     }
838 
839     /** Pushes as many objects from the range [begin, end) as there is space .
840      *
841      * \pre only one thread is allowed to push data to the spsc_queue
842      * \return iterator to the first element, which has not been pushed
843      *
844      * \note Thread-safe and wait-free
845      */
846     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)847     ConstIterator push(ConstIterator begin, ConstIterator end)
848     {
849         return base_type::push(begin, end);
850     }
851 
852     /** Pops a maximum of size objects from ringbuffer.
853      *
854      * \pre only one thread is allowed to pop data to the spsc_queue
855      * \return number of popped items
856      *
857      * \note Thread-safe and wait-free
858      * */
pop(T * ret,size_type size)859     size_type pop(T * ret, size_type size)
860     {
861         return base_type::pop(ret, size);
862     }
863 
864     /** Pops a maximum of size objects from spsc_queue.
865      *
866      * \pre only one thread is allowed to pop data to the spsc_queue
867      * \return number of popped items
868      *
869      * \note Thread-safe and wait-free
870      * */
871     template <size_type size>
pop(T (& ret)[size])872     size_type pop(T (&ret)[size])
873     {
874         return pop(ret, size);
875     }
876 
877     /** Pops objects to the output iterator it
878      *
879      * \pre only one thread is allowed to pop data to the spsc_queue
880      * \return number of popped items
881      *
882      * \note Thread-safe and wait-free
883      * */
884     template <typename OutputIterator>
885     typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
pop(OutputIterator it)886     pop(OutputIterator it)
887     {
888         return base_type::pop_to_output_iterator(it);
889     }
890 
891     /** consumes one element via a functor
892      *
893      *  pops one element from the queue and applies the functor on this object
894      *
895      * \returns true, if one element was consumed
896      *
897      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
898      * */
899     template <typename Functor>
consume_one(Functor & f)900     bool consume_one(Functor & f)
901     {
902         return base_type::consume_one(f);
903     }
904 
905     /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
906     template <typename Functor>
consume_one(Functor const & f)907     bool consume_one(Functor const & f)
908     {
909         return base_type::consume_one(f);
910     }
911 
912     /** consumes all elements via a functor
913      *
914      * sequentially pops all elements from the queue and applies the functor on each object
915      *
916      * \returns number of elements that are consumed
917      *
918      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
919      * */
920     template <typename Functor>
consume_all(Functor & f)921     size_type consume_all(Functor & f)
922     {
923         return base_type::consume_all(f);
924     }
925 
926     /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
927     template <typename Functor>
consume_all(Functor const & f)928     size_type consume_all(Functor const & f)
929     {
930         return base_type::consume_all(f);
931     }
932 
933     /** get number of elements that are available for read
934      *
935      * \return number of available elements that can be popped from the spsc_queue
936      *
937      * \note Thread-safe and wait-free, should only be called from the consumer thread
938      * */
read_available() const939     size_type read_available() const
940     {
941         return base_type::read_available(base_type::max_number_of_elements());
942     }
943 
944     /** get write space to write elements
945      *
946      * \return number of elements that can be pushed to the spsc_queue
947      *
948      * \note Thread-safe and wait-free, should only be called from the producer thread
949      * */
write_available() const950     size_type write_available() const
951     {
952         return base_type::write_available(base_type::max_number_of_elements());
953     }
954 
955     /** get reference to element in the front of the queue
956      *
957      * Availability of front element can be checked using read_available().
958      *
959      * \pre only a consuming thread is allowed to check front element
960      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
961      * \return reference to the first element in the queue
962      *
963      * \note Thread-safe and wait-free
964      */
front() const965     const T& front() const
966     {
967         BOOST_ASSERT(read_available() > 0);
968         return base_type::front();
969     }
970 
971     /// \copydoc boost::lockfree::spsc_queue::front() const
front()972     T& front()
973     {
974         BOOST_ASSERT(read_available() > 0);
975         return base_type::front();
976     }
977 
978     /** reset the ringbuffer
979      *
980      * \note Not thread-safe
981      * */
reset(void)982     void reset(void)
983     {
984         if ( !boost::has_trivial_destructor<T>::value ) {
985             // make sure to call all destructors!
986 
987             T dummy_element;
988             while (pop(dummy_element))
989             {}
990         } else {
991             base_type::write_index_.store(0, memory_order_relaxed);
992             base_type::read_index_.store(0, memory_order_release);
993         }
994    }
995 };
996 
997 } /* namespace lockfree */
998 } /* namespace boost */
999 
1000 
1001 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */
1002