1 // lock-free single-producer/single-consumer ringbuffer 2 // this algorithm is implemented in various projects (linux kernel) 3 // 4 // Copyright (C) 2009-2013 Tim Blechmann 5 // 6 // Distributed under the Boost Software License, Version 1.0. (See 7 // accompanying file LICENSE_1_0.txt or copy at 8 // http://www.boost.org/LICENSE_1_0.txt) 9 10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 12 13 #include <algorithm> 14 #include <memory> 15 16 #include <boost/aligned_storage.hpp> 17 #include <boost/assert.hpp> 18 #include <boost/static_assert.hpp> 19 #include <boost/utility.hpp> 20 #include <boost/utility/enable_if.hpp> 21 #include <boost/config.hpp> // for BOOST_LIKELY 22 23 #include <boost/type_traits/has_trivial_destructor.hpp> 24 #include <boost/type_traits/is_convertible.hpp> 25 26 #include <boost/lockfree/detail/atomic.hpp> 27 #include <boost/lockfree/detail/copy_payload.hpp> 28 #include <boost/lockfree/detail/parameter.hpp> 29 #include <boost/lockfree/detail/prefix.hpp> 30 31 #include <boost/lockfree/lockfree_forward.hpp> 32 33 #ifdef BOOST_HAS_PRAGMA_ONCE 34 #pragma once 35 #endif 36 37 namespace boost { 38 namespace lockfree { 39 namespace detail { 40 41 typedef parameter::parameters<boost::parameter::optional<tag::capacity>, 42 boost::parameter::optional<tag::allocator> 43 > ringbuffer_signature; 44 45 template <typename T> 46 class ringbuffer_base 47 { 48 #ifndef BOOST_DOXYGEN_INVOKED 49 protected: 50 typedef std::size_t size_t; 51 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t); 52 atomic<size_t> write_index_; 53 char padding1[padding_size]; /* force read_index and write_index to different cache lines */ 54 atomic<size_t> read_index_; 55 56 BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&)) 57 BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&)) 58 59 protected: ringbuffer_base(void)60 ringbuffer_base(void): 61 write_index_(0), read_index_(0) 62 {} 63 next_index(size_t arg,size_t max_size)64 static size_t next_index(size_t arg, size_t max_size) 65 { 66 size_t ret = arg + 1; 67 while (BOOST_UNLIKELY(ret >= max_size)) 68 ret -= max_size; 69 return ret; 70 } 71 read_available(size_t write_index,size_t read_index,size_t max_size)72 static size_t read_available(size_t write_index, size_t read_index, size_t max_size) 73 { 74 if (write_index >= read_index) 75 return write_index - read_index; 76 77 const size_t ret = write_index + max_size - read_index; 78 return ret; 79 } 80 write_available(size_t write_index,size_t read_index,size_t max_size)81 static size_t write_available(size_t write_index, size_t read_index, size_t max_size) 82 { 83 size_t ret = read_index - write_index - 1; 84 if (write_index >= read_index) 85 ret += max_size; 86 return ret; 87 } 88 read_available(size_t max_size) const89 size_t read_available(size_t max_size) const 90 { 91 size_t write_index = write_index_.load(memory_order_acquire); 92 const size_t read_index = read_index_.load(memory_order_relaxed); 93 return read_available(write_index, read_index, max_size); 94 } 95 write_available(size_t max_size) const96 size_t write_available(size_t max_size) const 97 { 98 size_t write_index = write_index_.load(memory_order_relaxed); 99 const size_t read_index = read_index_.load(memory_order_acquire); 100 return write_available(write_index, read_index, max_size); 101 } 102 push(T const & t,T * buffer,size_t max_size)103 bool push(T const & t, T * buffer, size_t max_size) 104 { 105 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread 106 const size_t next = next_index(write_index, max_size); 107 108 if (next == read_index_.load(memory_order_acquire)) 109 return false; /* ringbuffer is full */ 110 111 new (buffer + write_index) T(t); // copy-construct 112 113 write_index_.store(next, memory_order_release); 114 115 return true; 116 } 117 push(const T * input_buffer,size_t input_count,T * internal_buffer,size_t max_size)118 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size) 119 { 120 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer; 121 } 122 123 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end,T * internal_buffer,size_t max_size)124 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size) 125 { 126 // FIXME: avoid std::distance 127 128 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread 129 const size_t read_index = read_index_.load(memory_order_acquire); 130 const size_t avail = write_available(write_index, read_index, max_size); 131 132 if (avail == 0) 133 return begin; 134 135 size_t input_count = std::distance(begin, end); 136 input_count = (std::min)(input_count, avail); 137 138 size_t new_write_index = write_index + input_count; 139 140 const ConstIterator last = boost::next(begin, input_count); 141 142 if (write_index + input_count > max_size) { 143 /* copy data in two sections */ 144 const size_t count0 = max_size - write_index; 145 const ConstIterator midpoint = boost::next(begin, count0); 146 147 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index); 148 std::uninitialized_copy(midpoint, last, internal_buffer); 149 new_write_index -= max_size; 150 } else { 151 std::uninitialized_copy(begin, last, internal_buffer + write_index); 152 153 if (new_write_index == max_size) 154 new_write_index = 0; 155 } 156 157 write_index_.store(new_write_index, memory_order_release); 158 return last; 159 } 160 161 template <typename Functor> consume_one(Functor & functor,T * buffer,size_t max_size)162 bool consume_one(Functor & functor, T * buffer, size_t max_size) 163 { 164 const size_t write_index = write_index_.load(memory_order_acquire); 165 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 166 if ( empty(write_index, read_index) ) 167 return false; 168 169 T & object_to_consume = buffer[read_index]; 170 functor( object_to_consume ); 171 object_to_consume.~T(); 172 173 size_t next = next_index(read_index, max_size); 174 read_index_.store(next, memory_order_release); 175 return true; 176 } 177 178 template <typename Functor> consume_one(Functor const & functor,T * buffer,size_t max_size)179 bool consume_one(Functor const & functor, T * buffer, size_t max_size) 180 { 181 const size_t write_index = write_index_.load(memory_order_acquire); 182 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 183 if ( empty(write_index, read_index) ) 184 return false; 185 186 T & object_to_consume = buffer[read_index]; 187 functor( object_to_consume ); 188 object_to_consume.~T(); 189 190 size_t next = next_index(read_index, max_size); 191 read_index_.store(next, memory_order_release); 192 return true; 193 } 194 195 template <typename Functor> consume_all(Functor const & functor,T * internal_buffer,size_t max_size)196 size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size) 197 { 198 const size_t write_index = write_index_.load(memory_order_acquire); 199 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 200 201 const size_t avail = read_available(write_index, read_index, max_size); 202 203 if (avail == 0) 204 return 0; 205 206 const size_t output_count = avail; 207 208 size_t new_read_index = read_index + output_count; 209 210 if (read_index + output_count > max_size) { 211 /* copy data in two sections */ 212 const size_t count0 = max_size - read_index; 213 const size_t count1 = output_count - count0; 214 215 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); 216 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); 217 218 new_read_index -= max_size; 219 } else { 220 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); 221 222 if (new_read_index == max_size) 223 new_read_index = 0; 224 } 225 226 read_index_.store(new_read_index, memory_order_release); 227 return output_count; 228 } 229 230 template <typename Functor> consume_all(Functor & functor,T * internal_buffer,size_t max_size)231 size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size) 232 { 233 const size_t write_index = write_index_.load(memory_order_acquire); 234 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 235 236 const size_t avail = read_available(write_index, read_index, max_size); 237 238 if (avail == 0) 239 return 0; 240 241 const size_t output_count = avail; 242 243 size_t new_read_index = read_index + output_count; 244 245 if (read_index + output_count > max_size) { 246 /* copy data in two sections */ 247 const size_t count0 = max_size - read_index; 248 const size_t count1 = output_count - count0; 249 250 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); 251 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); 252 253 new_read_index -= max_size; 254 } else { 255 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); 256 257 if (new_read_index == max_size) 258 new_read_index = 0; 259 } 260 261 read_index_.store(new_read_index, memory_order_release); 262 return output_count; 263 } 264 pop(T * output_buffer,size_t output_count,T * internal_buffer,size_t max_size)265 size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size) 266 { 267 const size_t write_index = write_index_.load(memory_order_acquire); 268 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 269 270 const size_t avail = read_available(write_index, read_index, max_size); 271 272 if (avail == 0) 273 return 0; 274 275 output_count = (std::min)(output_count, avail); 276 277 size_t new_read_index = read_index + output_count; 278 279 if (read_index + output_count > max_size) { 280 /* copy data in two sections */ 281 const size_t count0 = max_size - read_index; 282 const size_t count1 = output_count - count0; 283 284 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer); 285 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0); 286 287 new_read_index -= max_size; 288 } else { 289 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer); 290 if (new_read_index == max_size) 291 new_read_index = 0; 292 } 293 294 read_index_.store(new_read_index, memory_order_release); 295 return output_count; 296 } 297 298 template <typename OutputIterator> pop_to_output_iterator(OutputIterator it,T * internal_buffer,size_t max_size)299 size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size) 300 { 301 const size_t write_index = write_index_.load(memory_order_acquire); 302 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 303 304 const size_t avail = read_available(write_index, read_index, max_size); 305 if (avail == 0) 306 return 0; 307 308 size_t new_read_index = read_index + avail; 309 310 if (read_index + avail > max_size) { 311 /* copy data in two sections */ 312 const size_t count0 = max_size - read_index; 313 const size_t count1 = avail - count0; 314 315 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it); 316 copy_and_delete(internal_buffer, internal_buffer + count1, it); 317 318 new_read_index -= max_size; 319 } else { 320 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it); 321 if (new_read_index == max_size) 322 new_read_index = 0; 323 } 324 325 read_index_.store(new_read_index, memory_order_release); 326 return avail; 327 } 328 front(const T * internal_buffer) const329 const T& front(const T * internal_buffer) const 330 { 331 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 332 return *(internal_buffer + read_index); 333 } 334 front(T * internal_buffer)335 T& front(T * internal_buffer) 336 { 337 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 338 return *(internal_buffer + read_index); 339 } 340 #endif 341 342 343 public: 344 /** reset the ringbuffer 345 * 346 * \note Not thread-safe 347 * */ reset(void)348 void reset(void) 349 { 350 if ( !boost::has_trivial_destructor<T>::value ) { 351 // make sure to call all destructors! 352 353 T dummy_element; 354 while (pop(dummy_element)) 355 {} 356 } else { 357 write_index_.store(0, memory_order_relaxed); 358 read_index_.store(0, memory_order_release); 359 } 360 } 361 362 /** Check if the ringbuffer is empty 363 * 364 * \return true, if the ringbuffer is empty, false otherwise 365 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate. 366 * */ empty(void)367 bool empty(void) 368 { 369 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed)); 370 } 371 372 /** 373 * \return true, if implementation is lock-free. 374 * 375 * */ is_lock_free(void) const376 bool is_lock_free(void) const 377 { 378 return write_index_.is_lock_free() && read_index_.is_lock_free(); 379 } 380 381 private: empty(size_t write_index,size_t read_index)382 bool empty(size_t write_index, size_t read_index) 383 { 384 return write_index == read_index; 385 } 386 387 template< class OutputIterator > copy_and_delete(T * first,T * last,OutputIterator out)388 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out ) 389 { 390 if (boost::has_trivial_destructor<T>::value) { 391 return std::copy(first, last, out); // will use memcpy if possible 392 } else { 393 for (; first != last; ++first, ++out) { 394 *out = *first; 395 first->~T(); 396 } 397 return out; 398 } 399 } 400 401 template< class Functor > run_functor_and_delete(T * first,T * last,Functor & functor)402 void run_functor_and_delete( T * first, T * last, Functor & functor ) 403 { 404 for (; first != last; ++first) { 405 functor(*first); 406 first->~T(); 407 } 408 } 409 410 template< class Functor > run_functor_and_delete(T * first,T * last,Functor const & functor)411 void run_functor_and_delete( T * first, T * last, Functor const & functor ) 412 { 413 for (; first != last; ++first) { 414 functor(*first); 415 first->~T(); 416 } 417 } 418 }; 419 420 template <typename T, std::size_t MaxSize> 421 class compile_time_sized_ringbuffer: 422 public ringbuffer_base<T> 423 { 424 typedef std::size_t size_type; 425 static const std::size_t max_size = MaxSize + 1; 426 427 typedef typename boost::aligned_storage<max_size * sizeof(T), 428 boost::alignment_of<T>::value 429 >::type storage_type; 430 431 storage_type storage_; 432 data()433 T * data() 434 { 435 return static_cast<T*>(storage_.address()); 436 } 437 data() const438 const T * data() const 439 { 440 return static_cast<const T*>(storage_.address()); 441 } 442 443 protected: max_number_of_elements() const444 size_type max_number_of_elements() const 445 { 446 return max_size; 447 } 448 449 public: push(T const & t)450 bool push(T const & t) 451 { 452 return ringbuffer_base<T>::push(t, data(), max_size); 453 } 454 455 template <typename Functor> consume_one(Functor & f)456 bool consume_one(Functor & f) 457 { 458 return ringbuffer_base<T>::consume_one(f, data(), max_size); 459 } 460 461 template <typename Functor> consume_one(Functor const & f)462 bool consume_one(Functor const & f) 463 { 464 return ringbuffer_base<T>::consume_one(f, data(), max_size); 465 } 466 467 template <typename Functor> consume_all(Functor & f)468 size_type consume_all(Functor & f) 469 { 470 return ringbuffer_base<T>::consume_all(f, data(), max_size); 471 } 472 473 template <typename Functor> consume_all(Functor const & f)474 size_type consume_all(Functor const & f) 475 { 476 return ringbuffer_base<T>::consume_all(f, data(), max_size); 477 } 478 push(T const * t,size_type size)479 size_type push(T const * t, size_type size) 480 { 481 return ringbuffer_base<T>::push(t, size, data(), max_size); 482 } 483 484 template <size_type size> push(T const (& t)[size])485 size_type push(T const (&t)[size]) 486 { 487 return push(t, size); 488 } 489 490 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end)491 ConstIterator push(ConstIterator begin, ConstIterator end) 492 { 493 return ringbuffer_base<T>::push(begin, end, data(), max_size); 494 } 495 pop(T * ret,size_type size)496 size_type pop(T * ret, size_type size) 497 { 498 return ringbuffer_base<T>::pop(ret, size, data(), max_size); 499 } 500 501 template <typename OutputIterator> pop_to_output_iterator(OutputIterator it)502 size_type pop_to_output_iterator(OutputIterator it) 503 { 504 return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size); 505 } 506 front(void) const507 const T& front(void) const 508 { 509 return ringbuffer_base<T>::front(data()); 510 } 511 front(void)512 T& front(void) 513 { 514 return ringbuffer_base<T>::front(data()); 515 } 516 }; 517 518 template <typename T, typename Alloc> 519 class runtime_sized_ringbuffer: 520 public ringbuffer_base<T>, 521 private Alloc 522 { 523 typedef std::size_t size_type; 524 size_type max_elements_; 525 typedef typename Alloc::pointer pointer; 526 pointer array_; 527 528 protected: max_number_of_elements() const529 size_type max_number_of_elements() const 530 { 531 return max_elements_; 532 } 533 534 public: runtime_sized_ringbuffer(size_type max_elements)535 explicit runtime_sized_ringbuffer(size_type max_elements): 536 max_elements_(max_elements + 1) 537 { 538 array_ = Alloc::allocate(max_elements_); 539 } 540 541 template <typename U> runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc,size_type max_elements)542 runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements): 543 Alloc(alloc), max_elements_(max_elements + 1) 544 { 545 array_ = Alloc::allocate(max_elements_); 546 } 547 runtime_sized_ringbuffer(Alloc const & alloc,size_type max_elements)548 runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements): 549 Alloc(alloc), max_elements_(max_elements + 1) 550 { 551 array_ = Alloc::allocate(max_elements_); 552 } 553 ~runtime_sized_ringbuffer(void)554 ~runtime_sized_ringbuffer(void) 555 { 556 // destroy all remaining items 557 T out; 558 while (pop(&out, 1)) {} 559 560 Alloc::deallocate(array_, max_elements_); 561 } 562 push(T const & t)563 bool push(T const & t) 564 { 565 return ringbuffer_base<T>::push(t, &*array_, max_elements_); 566 } 567 568 template <typename Functor> consume_one(Functor & f)569 bool consume_one(Functor & f) 570 { 571 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_); 572 } 573 574 template <typename Functor> consume_one(Functor const & f)575 bool consume_one(Functor const & f) 576 { 577 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_); 578 } 579 580 template <typename Functor> consume_all(Functor & f)581 size_type consume_all(Functor & f) 582 { 583 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_); 584 } 585 586 template <typename Functor> consume_all(Functor const & f)587 size_type consume_all(Functor const & f) 588 { 589 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_); 590 } 591 push(T const * t,size_type size)592 size_type push(T const * t, size_type size) 593 { 594 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_); 595 } 596 597 template <size_type size> push(T const (& t)[size])598 size_type push(T const (&t)[size]) 599 { 600 return push(t, size); 601 } 602 603 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end)604 ConstIterator push(ConstIterator begin, ConstIterator end) 605 { 606 return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_); 607 } 608 pop(T * ret,size_type size)609 size_type pop(T * ret, size_type size) 610 { 611 return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_); 612 } 613 614 template <typename OutputIterator> pop_to_output_iterator(OutputIterator it)615 size_type pop_to_output_iterator(OutputIterator it) 616 { 617 return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_); 618 } 619 front(void) const620 const T& front(void) const 621 { 622 return ringbuffer_base<T>::front(&*array_); 623 } 624 front(void)625 T& front(void) 626 { 627 return ringbuffer_base<T>::front(&*array_); 628 } 629 }; 630 631 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 632 template <typename T, typename A0, typename A1> 633 #else 634 template <typename T, typename ...Options> 635 #endif 636 struct make_ringbuffer 637 { 638 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 639 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args; 640 #else 641 typedef typename ringbuffer_signature::bind<Options...>::type bound_args; 642 #endif 643 644 typedef extract_capacity<bound_args> extract_capacity_t; 645 646 static const bool runtime_sized = !extract_capacity_t::has_capacity; 647 static const size_t capacity = extract_capacity_t::capacity; 648 649 typedef extract_allocator<bound_args, T> extract_allocator_t; 650 typedef typename extract_allocator_t::type allocator; 651 652 // allocator argument is only sane, for run-time sized ringbuffers 653 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>, 654 mpl::bool_<!extract_allocator_t::has_allocator>, 655 mpl::true_ 656 >::type::value)); 657 658 typedef typename mpl::if_c<runtime_sized, 659 runtime_sized_ringbuffer<T, allocator>, 660 compile_time_sized_ringbuffer<T, capacity> 661 >::type ringbuffer_type; 662 }; 663 664 665 } /* namespace detail */ 666 667 668 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free. 669 * 670 * \b Policies: 671 * - \c boost::lockfree::capacity<>, optional <br> 672 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time. 673 * 674 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br> 675 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured 676 * to be sized at run-time 677 * 678 * \b Requirements: 679 * - T must have a default constructor 680 * - T must be copyable 681 * */ 682 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 683 template <typename T, class A0, class A1> 684 #else 685 template <typename T, typename ...Options> 686 #endif 687 class spsc_queue: 688 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 689 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type 690 #else 691 public detail::make_ringbuffer<T, Options...>::ringbuffer_type 692 #endif 693 { 694 private: 695 696 #ifndef BOOST_DOXYGEN_INVOKED 697 698 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 699 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type; 700 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized; 701 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg; 702 #else 703 typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type; 704 static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized; 705 typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg; 706 #endif 707 708 709 struct implementation_defined 710 { 711 typedef allocator_arg allocator; 712 typedef std::size_t size_type; 713 }; 714 #endif 715 716 public: 717 typedef T value_type; 718 typedef typename implementation_defined::allocator allocator; 719 typedef typename implementation_defined::size_type size_type; 720 721 /** Constructs a spsc_queue 722 * 723 * \pre spsc_queue must be configured to be sized at compile-time 724 */ 725 // @{ spsc_queue(void)726 spsc_queue(void) 727 { 728 BOOST_ASSERT(!runtime_sized); 729 } 730 731 template <typename U> spsc_queue(typename allocator::template rebind<U>::other const &)732 explicit spsc_queue(typename allocator::template rebind<U>::other const &) 733 { 734 // just for API compatibility: we don't actually need an allocator 735 BOOST_STATIC_ASSERT(!runtime_sized); 736 } 737 spsc_queue(allocator const &)738 explicit spsc_queue(allocator const &) 739 { 740 // just for API compatibility: we don't actually need an allocator 741 BOOST_ASSERT(!runtime_sized); 742 } 743 // @} 744 745 746 /** Constructs a spsc_queue for element_count elements 747 * 748 * \pre spsc_queue must be configured to be sized at run-time 749 */ 750 // @{ spsc_queue(size_type element_count)751 explicit spsc_queue(size_type element_count): 752 base_type(element_count) 753 { 754 BOOST_ASSERT(runtime_sized); 755 } 756 757 template <typename U> spsc_queue(size_type element_count,typename allocator::template rebind<U>::other const & alloc)758 spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc): 759 base_type(alloc, element_count) 760 { 761 BOOST_STATIC_ASSERT(runtime_sized); 762 } 763 spsc_queue(size_type element_count,allocator_arg const & alloc)764 spsc_queue(size_type element_count, allocator_arg const & alloc): 765 base_type(alloc, element_count) 766 { 767 BOOST_ASSERT(runtime_sized); 768 } 769 // @} 770 771 /** Pushes object t to the ringbuffer. 772 * 773 * \pre only one thread is allowed to push data to the spsc_queue 774 * \post object will be pushed to the spsc_queue, unless it is full. 775 * \return true, if the push operation is successful. 776 * 777 * \note Thread-safe and wait-free 778 * */ push(T const & t)779 bool push(T const & t) 780 { 781 return base_type::push(t); 782 } 783 784 /** Pops one object from ringbuffer. 785 * 786 * \pre only one thread is allowed to pop data to the spsc_queue 787 * \post if ringbuffer is not empty, object will be discarded. 788 * \return true, if the pop operation is successful, false if ringbuffer was empty. 789 * 790 * \note Thread-safe and wait-free 791 */ pop()792 bool pop () 793 { 794 detail::consume_noop consume_functor; 795 return consume_one( consume_functor ); 796 } 797 798 /** Pops one object from ringbuffer. 799 * 800 * \pre only one thread is allowed to pop data to the spsc_queue 801 * \post if ringbuffer is not empty, object will be copied to ret. 802 * \return true, if the pop operation is successful, false if ringbuffer was empty. 803 * 804 * \note Thread-safe and wait-free 805 */ 806 template <typename U> 807 typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type pop(U & ret)808 pop (U & ret) 809 { 810 detail::consume_via_copy<U> consume_functor(ret); 811 return consume_one( consume_functor ); 812 } 813 814 /** Pushes as many objects from the array t as there is space. 815 * 816 * \pre only one thread is allowed to push data to the spsc_queue 817 * \return number of pushed items 818 * 819 * \note Thread-safe and wait-free 820 */ push(T const * t,size_type size)821 size_type push(T const * t, size_type size) 822 { 823 return base_type::push(t, size); 824 } 825 826 /** Pushes as many objects from the array t as there is space available. 827 * 828 * \pre only one thread is allowed to push data to the spsc_queue 829 * \return number of pushed items 830 * 831 * \note Thread-safe and wait-free 832 */ 833 template <size_type size> push(T const (& t)[size])834 size_type push(T const (&t)[size]) 835 { 836 return push(t, size); 837 } 838 839 /** Pushes as many objects from the range [begin, end) as there is space . 840 * 841 * \pre only one thread is allowed to push data to the spsc_queue 842 * \return iterator to the first element, which has not been pushed 843 * 844 * \note Thread-safe and wait-free 845 */ 846 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end)847 ConstIterator push(ConstIterator begin, ConstIterator end) 848 { 849 return base_type::push(begin, end); 850 } 851 852 /** Pops a maximum of size objects from ringbuffer. 853 * 854 * \pre only one thread is allowed to pop data to the spsc_queue 855 * \return number of popped items 856 * 857 * \note Thread-safe and wait-free 858 * */ pop(T * ret,size_type size)859 size_type pop(T * ret, size_type size) 860 { 861 return base_type::pop(ret, size); 862 } 863 864 /** Pops a maximum of size objects from spsc_queue. 865 * 866 * \pre only one thread is allowed to pop data to the spsc_queue 867 * \return number of popped items 868 * 869 * \note Thread-safe and wait-free 870 * */ 871 template <size_type size> pop(T (& ret)[size])872 size_type pop(T (&ret)[size]) 873 { 874 return pop(ret, size); 875 } 876 877 /** Pops objects to the output iterator it 878 * 879 * \pre only one thread is allowed to pop data to the spsc_queue 880 * \return number of popped items 881 * 882 * \note Thread-safe and wait-free 883 * */ 884 template <typename OutputIterator> 885 typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type pop(OutputIterator it)886 pop(OutputIterator it) 887 { 888 return base_type::pop_to_output_iterator(it); 889 } 890 891 /** consumes one element via a functor 892 * 893 * pops one element from the queue and applies the functor on this object 894 * 895 * \returns true, if one element was consumed 896 * 897 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 898 * */ 899 template <typename Functor> consume_one(Functor & f)900 bool consume_one(Functor & f) 901 { 902 return base_type::consume_one(f); 903 } 904 905 /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs) 906 template <typename Functor> consume_one(Functor const & f)907 bool consume_one(Functor const & f) 908 { 909 return base_type::consume_one(f); 910 } 911 912 /** consumes all elements via a functor 913 * 914 * sequentially pops all elements from the queue and applies the functor on each object 915 * 916 * \returns number of elements that are consumed 917 * 918 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 919 * */ 920 template <typename Functor> consume_all(Functor & f)921 size_type consume_all(Functor & f) 922 { 923 return base_type::consume_all(f); 924 } 925 926 /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs) 927 template <typename Functor> consume_all(Functor const & f)928 size_type consume_all(Functor const & f) 929 { 930 return base_type::consume_all(f); 931 } 932 933 /** get number of elements that are available for read 934 * 935 * \return number of available elements that can be popped from the spsc_queue 936 * 937 * \note Thread-safe and wait-free, should only be called from the consumer thread 938 * */ read_available() const939 size_type read_available() const 940 { 941 return base_type::read_available(base_type::max_number_of_elements()); 942 } 943 944 /** get write space to write elements 945 * 946 * \return number of elements that can be pushed to the spsc_queue 947 * 948 * \note Thread-safe and wait-free, should only be called from the producer thread 949 * */ write_available() const950 size_type write_available() const 951 { 952 return base_type::write_available(base_type::max_number_of_elements()); 953 } 954 955 /** get reference to element in the front of the queue 956 * 957 * Availability of front element can be checked using read_available(). 958 * 959 * \pre only a consuming thread is allowed to check front element 960 * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method. 961 * \return reference to the first element in the queue 962 * 963 * \note Thread-safe and wait-free 964 */ front() const965 const T& front() const 966 { 967 BOOST_ASSERT(read_available() > 0); 968 return base_type::front(); 969 } 970 971 /// \copydoc boost::lockfree::spsc_queue::front() const front()972 T& front() 973 { 974 BOOST_ASSERT(read_available() > 0); 975 return base_type::front(); 976 } 977 978 /** reset the ringbuffer 979 * 980 * \note Not thread-safe 981 * */ reset(void)982 void reset(void) 983 { 984 if ( !boost::has_trivial_destructor<T>::value ) { 985 // make sure to call all destructors! 986 987 T dummy_element; 988 while (pop(dummy_element)) 989 {} 990 } else { 991 base_type::write_index_.store(0, memory_order_relaxed); 992 base_type::read_index_.store(0, memory_order_release); 993 } 994 } 995 }; 996 997 } /* namespace lockfree */ 998 } /* namespace boost */ 999 1000 1001 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */ 1002