1 // lock-free queue from 2 // Michael, M. M. and Scott, M. L., 3 // "simple, fast and practical non-blocking and blocking concurrent queue algorithms" 4 // 5 // Copyright (C) 2008-2013 Tim Blechmann 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See 8 // accompanying file LICENSE_1_0.txt or copy at 9 // http://www.boost.org/LICENSE_1_0.txt) 10 11 #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED 12 #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED 13 14 #include <boost/assert.hpp> 15 #include <boost/static_assert.hpp> 16 #include <boost/type_traits/has_trivial_assign.hpp> 17 #include <boost/type_traits/has_trivial_destructor.hpp> 18 #include <boost/config.hpp> // for BOOST_LIKELY & BOOST_ALIGNMENT 19 20 #include <boost/lockfree/detail/allocator_rebind_helper.hpp> 21 #include <boost/lockfree/detail/atomic.hpp> 22 #include <boost/lockfree/detail/copy_payload.hpp> 23 #include <boost/lockfree/detail/freelist.hpp> 24 #include <boost/lockfree/detail/parameter.hpp> 25 #include <boost/lockfree/detail/tagged_ptr.hpp> 26 27 #include <boost/lockfree/lockfree_forward.hpp> 28 29 #ifdef BOOST_HAS_PRAGMA_ONCE 30 #pragma once 31 #endif 32 33 34 #if defined(_MSC_VER) 35 #pragma warning(push) 36 #pragma warning(disable: 4324) // structure was padded due to __declspec(align()) 37 #endif 38 39 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000) 40 #pragma warning(push) 41 #pragma warning(disable:488) // template parameter unused in declaring parameter types, 42 // gets erronously triggered the queue constructor which 43 // takes an allocator of another type and rebinds it 44 #endif 45 46 47 48 namespace boost { 49 namespace lockfree { 50 namespace detail { 51 52 typedef parameter::parameters<boost::parameter::optional<tag::allocator>, 53 boost::parameter::optional<tag::capacity> 54 > queue_signature; 55 56 } /* namespace detail */ 57 58 59 /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free, 60 * construction/destruction has to be synchronized. It uses a freelist for memory management, 61 * freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed. 62 * 63 * \b Policies: 64 * - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized<false> \n 65 * Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n 66 * If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed 67 * by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index 68 * type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way 69 * to achieve lock-freedom. 70 * 71 * - \ref boost::lockfree::capacity, optional \n 72 * If this template argument is passed to the options, the size of the queue is set at compile-time.\n 73 * This option implies \c fixed_sized<true> 74 * 75 * - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator<std::allocator<void>> \n 76 * Specifies the allocator that is used for the internal freelist 77 * 78 * \b Requirements: 79 * - T must have a copy constructor 80 * - T must have a trivial assignment operator 81 * - T must have a trivial destructor 82 * 83 * */ 84 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 85 template <typename T, class A0, class A1, class A2> 86 #else 87 template <typename T, typename ...Options> 88 #endif 89 class queue 90 { 91 private: 92 #ifndef BOOST_DOXYGEN_INVOKED 93 94 #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR 95 BOOST_STATIC_ASSERT((boost::has_trivial_destructor<T>::value)); 96 #endif 97 98 #ifdef BOOST_HAS_TRIVIAL_ASSIGN 99 BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value)); 100 #endif 101 102 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 103 typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args; 104 #else 105 typedef typename detail::queue_signature::bind<Options...>::type bound_args; 106 #endif 107 108 static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity; 109 static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node 110 static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value; 111 static const bool node_based = !(has_capacity || fixed_sized); 112 static const bool compile_time_sized = has_capacity; 113 BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES)114 struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node 115 { 116 typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle; 117 typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type; 118 119 node(T const & v, handle_type null_handle): 120 next(tagged_node_handle(null_handle, 0)), data(v) 121 { 122 /* increment tag to avoid ABA problem */ 123 tagged_node_handle old_next = next.load(memory_order_relaxed); 124 tagged_node_handle new_next (null_handle, old_next.get_next_tag()); 125 next.store(new_next, memory_order_release); 126 } 127 128 node (handle_type null_handle): 129 next(tagged_node_handle(null_handle, 0)) 130 {} 131 132 node(void) 133 {} 134 135 atomic<tagged_node_handle> next; 136 T data; 137 }; 138 139 typedef typename detail::extract_allocator<bound_args, node>::type node_allocator; 140 typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t; 141 typedef typename pool_t::tagged_node_handle tagged_node_handle; 142 typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type; 143 initialize(void)144 void initialize(void) 145 { 146 node * n = pool.template construct<true, false>(pool.null_handle()); 147 tagged_node_handle dummy_node(pool.get_handle(n), 0); 148 head_.store(dummy_node, memory_order_relaxed); 149 tail_.store(dummy_node, memory_order_release); 150 } 151 152 struct implementation_defined 153 { 154 typedef node_allocator allocator; 155 typedef std::size_t size_type; 156 }; 157 158 #endif 159 160 BOOST_DELETED_FUNCTION(queue(queue const&)) 161 BOOST_DELETED_FUNCTION(queue& operator= (queue const&)) 162 163 public: 164 typedef T value_type; 165 typedef typename implementation_defined::allocator allocator; 166 typedef typename implementation_defined::size_type size_type; 167 168 /** 169 * \return true, if implementation is lock-free. 170 * 171 * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner. 172 * On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is 173 * no possibility to provide a completely accurate implementation, because one would need to test every internal 174 * node, which is impossible if further nodes will be allocated from the operating system. 175 * */ is_lock_free(void) const176 bool is_lock_free (void) const 177 { 178 return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free(); 179 } 180 181 //! Construct queue 182 // @{ queue(void)183 queue(void): 184 head_(tagged_node_handle(0, 0)), 185 tail_(tagged_node_handle(0, 0)), 186 pool(node_allocator(), capacity) 187 { 188 BOOST_ASSERT(has_capacity); 189 initialize(); 190 } 191 192 template <typename U> queue(typename detail::allocator_rebind_helper<node_allocator,U>::type const & alloc)193 explicit queue(typename detail::allocator_rebind_helper<node_allocator, U>::type const & alloc): 194 head_(tagged_node_handle(0, 0)), 195 tail_(tagged_node_handle(0, 0)), 196 pool(alloc, capacity) 197 { 198 BOOST_STATIC_ASSERT(has_capacity); 199 initialize(); 200 } 201 queue(allocator const & alloc)202 explicit queue(allocator const & alloc): 203 head_(tagged_node_handle(0, 0)), 204 tail_(tagged_node_handle(0, 0)), 205 pool(alloc, capacity) 206 { 207 BOOST_ASSERT(has_capacity); 208 initialize(); 209 } 210 // @} 211 212 //! Construct queue, allocate n nodes for the freelist. 213 // @{ queue(size_type n)214 explicit queue(size_type n): 215 head_(tagged_node_handle(0, 0)), 216 tail_(tagged_node_handle(0, 0)), 217 pool(node_allocator(), n + 1) 218 { 219 BOOST_ASSERT(!has_capacity); 220 initialize(); 221 } 222 223 template <typename U> queue(size_type n,typename detail::allocator_rebind_helper<node_allocator,U>::type const & alloc)224 queue(size_type n, typename detail::allocator_rebind_helper<node_allocator, U>::type const & alloc): 225 head_(tagged_node_handle(0, 0)), 226 tail_(tagged_node_handle(0, 0)), 227 pool(alloc, n + 1) 228 { 229 BOOST_STATIC_ASSERT(!has_capacity); 230 initialize(); 231 } 232 // @} 233 234 /** \copydoc boost::lockfree::stack::reserve 235 * */ reserve(size_type n)236 void reserve(size_type n) 237 { 238 pool.template reserve<true>(n); 239 } 240 241 /** \copydoc boost::lockfree::stack::reserve_unsafe 242 * */ reserve_unsafe(size_type n)243 void reserve_unsafe(size_type n) 244 { 245 pool.template reserve<false>(n); 246 } 247 248 /** Destroys queue, free all nodes from freelist. 249 * */ ~queue(void)250 ~queue(void) 251 { 252 T dummy; 253 while(unsynchronized_pop(dummy)) 254 {} 255 256 pool.template destruct<false>(head_.load(memory_order_relaxed)); 257 } 258 259 /** Check if the queue is empty 260 * 261 * \return true, if the queue is empty, false otherwise 262 * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this 263 * value in program logic. 264 * */ empty(void) const265 bool empty(void) const 266 { 267 return pool.get_handle(head_.load()) == pool.get_handle(tail_.load()); 268 } 269 270 /** Pushes object t to the queue. 271 * 272 * \post object will be pushed to the queue, if internal node can be allocated 273 * \returns true, if the push operation is successful. 274 * 275 * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated 276 * from the OS. This may not be lock-free. 277 * */ push(T const & t)278 bool push(T const & t) 279 { 280 return do_push<false>(t); 281 } 282 283 /** Pushes object t to the queue. 284 * 285 * \post object will be pushed to the queue, if internal node can be allocated 286 * \returns true, if the push operation is successful. 287 * 288 * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail 289 * \throws if memory allocator throws 290 * */ bounded_push(T const & t)291 bool bounded_push(T const & t) 292 { 293 return do_push<true>(t); 294 } 295 296 297 private: 298 #ifndef BOOST_DOXYGEN_INVOKED 299 template <bool Bounded> do_push(T const & t)300 bool do_push(T const & t) 301 { 302 node * n = pool.template construct<true, Bounded>(t, pool.null_handle()); 303 handle_type node_handle = pool.get_handle(n); 304 305 if (n == NULL) 306 return false; 307 308 for (;;) { 309 tagged_node_handle tail = tail_.load(memory_order_acquire); 310 node * tail_node = pool.get_pointer(tail); 311 tagged_node_handle next = tail_node->next.load(memory_order_acquire); 312 node * next_ptr = pool.get_pointer(next); 313 314 tagged_node_handle tail2 = tail_.load(memory_order_acquire); 315 if (BOOST_LIKELY(tail == tail2)) { 316 if (next_ptr == 0) { 317 tagged_node_handle new_tail_next(node_handle, next.get_next_tag()); 318 if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) { 319 tagged_node_handle new_tail(node_handle, tail.get_next_tag()); 320 tail_.compare_exchange_strong(tail, new_tail); 321 return true; 322 } 323 } 324 else { 325 tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag()); 326 tail_.compare_exchange_strong(tail, new_tail); 327 } 328 } 329 } 330 } 331 #endif 332 333 public: 334 335 /** Pushes object t to the queue. 336 * 337 * \post object will be pushed to the queue, if internal node can be allocated 338 * \returns true, if the push operation is successful. 339 * 340 * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated 341 * from the OS. This may not be lock-free. 342 * \throws if memory allocator throws 343 * */ unsynchronized_push(T const & t)344 bool unsynchronized_push(T const & t) 345 { 346 node * n = pool.template construct<false, false>(t, pool.null_handle()); 347 348 if (n == NULL) 349 return false; 350 351 for (;;) { 352 tagged_node_handle tail = tail_.load(memory_order_relaxed); 353 tagged_node_handle next = tail->next.load(memory_order_relaxed); 354 node * next_ptr = next.get_ptr(); 355 356 if (next_ptr == 0) { 357 tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed); 358 tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed); 359 return true; 360 } 361 else 362 tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed); 363 } 364 } 365 366 /** Pops object from queue. 367 * 368 * \post if pop operation is successful, object will be copied to ret. 369 * \returns true, if the pop operation is successful, false if queue was empty. 370 * 371 * \note Thread-safe and non-blocking 372 * */ pop(T & ret)373 bool pop (T & ret) 374 { 375 return pop<T>(ret); 376 } 377 378 /** Pops object from queue. 379 * 380 * \pre type U must be constructible by T and copyable, or T must be convertible to U 381 * \post if pop operation is successful, object will be copied to ret. 382 * \returns true, if the pop operation is successful, false if queue was empty. 383 * 384 * \note Thread-safe and non-blocking 385 * */ 386 template <typename U> pop(U & ret)387 bool pop (U & ret) 388 { 389 for (;;) { 390 tagged_node_handle head = head_.load(memory_order_acquire); 391 node * head_ptr = pool.get_pointer(head); 392 393 tagged_node_handle tail = tail_.load(memory_order_acquire); 394 tagged_node_handle next = head_ptr->next.load(memory_order_acquire); 395 node * next_ptr = pool.get_pointer(next); 396 397 tagged_node_handle head2 = head_.load(memory_order_acquire); 398 if (BOOST_LIKELY(head == head2)) { 399 if (pool.get_handle(head) == pool.get_handle(tail)) { 400 if (next_ptr == 0) 401 return false; 402 403 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); 404 tail_.compare_exchange_strong(tail, new_tail); 405 406 } else { 407 if (next_ptr == 0) 408 /* this check is not part of the original algorithm as published by michael and scott 409 * 410 * however we reuse the tagged_ptr part for the freelist and clear the next part during node 411 * allocation. we can observe a null-pointer here. 412 * */ 413 continue; 414 detail::copy_payload(next_ptr->data, ret); 415 416 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); 417 if (head_.compare_exchange_weak(head, new_head)) { 418 pool.template destruct<true>(head); 419 return true; 420 } 421 } 422 } 423 } 424 } 425 426 /** Pops object from queue. 427 * 428 * \post if pop operation is successful, object will be copied to ret. 429 * \returns true, if the pop operation is successful, false if queue was empty. 430 * 431 * \note Not thread-safe, but non-blocking 432 * 433 * */ unsynchronized_pop(T & ret)434 bool unsynchronized_pop (T & ret) 435 { 436 return unsynchronized_pop<T>(ret); 437 } 438 439 /** Pops object from queue. 440 * 441 * \pre type U must be constructible by T and copyable, or T must be convertible to U 442 * \post if pop operation is successful, object will be copied to ret. 443 * \returns true, if the pop operation is successful, false if queue was empty. 444 * 445 * \note Not thread-safe, but non-blocking 446 * 447 * */ 448 template <typename U> unsynchronized_pop(U & ret)449 bool unsynchronized_pop (U & ret) 450 { 451 for (;;) { 452 tagged_node_handle head = head_.load(memory_order_relaxed); 453 node * head_ptr = pool.get_pointer(head); 454 tagged_node_handle tail = tail_.load(memory_order_relaxed); 455 tagged_node_handle next = head_ptr->next.load(memory_order_relaxed); 456 node * next_ptr = pool.get_pointer(next); 457 458 if (pool.get_handle(head) == pool.get_handle(tail)) { 459 if (next_ptr == 0) 460 return false; 461 462 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); 463 tail_.store(new_tail); 464 } else { 465 if (next_ptr == 0) 466 /* this check is not part of the original algorithm as published by michael and scott 467 * 468 * however we reuse the tagged_ptr part for the freelist and clear the next part during node 469 * allocation. we can observe a null-pointer here. 470 * */ 471 continue; 472 detail::copy_payload(next_ptr->data, ret); 473 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); 474 head_.store(new_head); 475 pool.template destruct<false>(head); 476 return true; 477 } 478 } 479 } 480 481 /** consumes one element via a functor 482 * 483 * pops one element from the queue and applies the functor on this object 484 * 485 * \returns true, if one element was consumed 486 * 487 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 488 * */ 489 template <typename Functor> consume_one(Functor & f)490 bool consume_one(Functor & f) 491 { 492 T element; 493 bool success = pop(element); 494 if (success) 495 f(element); 496 497 return success; 498 } 499 500 /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs) 501 template <typename Functor> consume_one(Functor const & f)502 bool consume_one(Functor const & f) 503 { 504 T element; 505 bool success = pop(element); 506 if (success) 507 f(element); 508 509 return success; 510 } 511 512 /** consumes all elements via a functor 513 * 514 * sequentially pops all elements from the queue and applies the functor on each object 515 * 516 * \returns number of elements that are consumed 517 * 518 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 519 * */ 520 template <typename Functor> consume_all(Functor & f)521 size_t consume_all(Functor & f) 522 { 523 size_t element_count = 0; 524 while (consume_one(f)) 525 element_count += 1; 526 527 return element_count; 528 } 529 530 /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs) 531 template <typename Functor> consume_all(Functor const & f)532 size_t consume_all(Functor const & f) 533 { 534 size_t element_count = 0; 535 while (consume_one(f)) 536 element_count += 1; 537 538 return element_count; 539 } 540 541 private: 542 #ifndef BOOST_DOXYGEN_INVOKED 543 atomic<tagged_node_handle> head_; 544 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle); 545 char padding1[padding_size]; 546 atomic<tagged_node_handle> tail_; 547 char padding2[padding_size]; 548 549 pool_t pool; 550 #endif 551 }; 552 553 } /* namespace lockfree */ 554 } /* namespace boost */ 555 556 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000) 557 #pragma warning(pop) 558 #endif 559 560 #if defined(_MSC_VER) 561 #pragma warning(pop) 562 #endif 563 564 #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */ 565