1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_concurrent_queue_base.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_exception.h" 24 #include "detail/_containers_helpers.h" 25 #include "cache_aligned_allocator.h" 26 27 namespace tbb { 28 namespace detail { 29 namespace d2 { 30 31 // A high-performance thread-safe non-blocking concurrent queue. 32 // Multiple threads may each push and pop concurrently. 33 // Assignment construction is not allowed. 34 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 35 class concurrent_queue { 36 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 37 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 38 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 39 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 40 public: 41 using size_type = std::size_t; 42 using value_type = T; 43 using reference = T&; 44 using const_reference = const T&; 45 using difference_type = std::ptrdiff_t; 46 47 using allocator_type = Allocator; 48 using pointer = typename allocator_traits_type::pointer; 49 using const_pointer = typename allocator_traits_type::const_pointer; 50 51 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 52 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 53 concurrent_queue()54 concurrent_queue() : concurrent_queue(allocator_type()) {} 55 concurrent_queue(const allocator_type & a)56 explicit concurrent_queue(const allocator_type& a) : 57 my_allocator(a), my_queue_representation(nullptr) 58 { 59 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 60 queue_allocator_traits::construct(my_allocator, my_queue_representation); 61 62 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 63 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 64 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 65 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 66 } 67 68 template <typename InputIterator> 69 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : concurrent_queue(a)70 concurrent_queue(a) 71 { 72 for (; begin != end; ++begin) 73 push(*begin); 74 } 75 concurrent_queue(const concurrent_queue & src,const allocator_type & a)76 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 77 concurrent_queue(a) 78 { 79 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 80 } 81 concurrent_queue(const concurrent_queue & src)82 concurrent_queue(const concurrent_queue& src) : 83 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 84 { 85 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 86 } 87 88 // Move constructors concurrent_queue(concurrent_queue && src)89 concurrent_queue(concurrent_queue&& src) : 90 concurrent_queue(std::move(src.my_allocator)) 91 { 92 internal_swap(src); 93 } 94 concurrent_queue(concurrent_queue && src,const allocator_type & a)95 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 96 concurrent_queue(a) 97 { 98 // checking that memory allocated by one instance of allocator can be deallocated 99 // with another 100 if (my_allocator == src.my_allocator) { 101 internal_swap(src); 102 } else { 103 // allocators are different => performing per-element move 104 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 105 src.clear(); 106 } 107 } 108 109 // Destroy queue ~concurrent_queue()110 ~concurrent_queue() { 111 clear(); 112 my_queue_representation->clear(my_allocator); 113 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 114 r1::cache_aligned_deallocate(my_queue_representation); 115 } 116 117 // Enqueue an item at tail of queue. push(const T & value)118 void push(const T& value) { 119 internal_push(value); 120 } 121 push(T && value)122 void push(T&& value) { 123 internal_push(std::move(value)); 124 } 125 126 template <typename... Args> emplace(Args &&...args)127 void emplace( Args&&... args ) { 128 internal_push(std::forward<Args>(args)...); 129 } 130 131 // Attempt to dequeue an item from head of queue. 132 /** Does not wait for item to become available. 133 Returns true if successful; false otherwise. */ try_pop(T & result)134 bool try_pop( T& result ) { 135 return internal_try_pop(&result); 136 } 137 138 // Return the number of items in the queue; thread unsafe unsafe_size()139 size_type unsafe_size() const { 140 std::ptrdiff_t size = my_queue_representation->size(); 141 return size < 0 ? 0 : size_type(size); 142 } 143 144 // Equivalent to size()==0. empty()145 __TBB_nodiscard bool empty() const { 146 return my_queue_representation->empty(); 147 } 148 149 // Clear the queue. not thread-safe. clear()150 void clear() { 151 while (!empty()) { 152 T value; 153 try_pop(value); 154 } 155 } 156 157 // Return allocator object get_allocator()158 allocator_type get_allocator() const { return my_allocator; } 159 160 //------------------------------------------------------------------------ 161 // The iterators are intended only for debugging. They are slow and not thread safe. 162 //------------------------------------------------------------------------ 163 unsafe_begin()164 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } unsafe_end()165 iterator unsafe_end() { return iterator(); } unsafe_begin()166 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } unsafe_end()167 const_iterator unsafe_end() const { return const_iterator(); } unsafe_cbegin()168 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } unsafe_cend()169 const_iterator unsafe_cend() const { return const_iterator(); } 170 171 private: internal_swap(concurrent_queue & src)172 void internal_swap(concurrent_queue& src) { 173 using std::swap; 174 swap(my_queue_representation, src.my_queue_representation); 175 } 176 177 template <typename... Args> internal_push(Args &&...args)178 void internal_push( Args&&... args ) { 179 ticket_type k = my_queue_representation->tail_counter++; 180 my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 181 } 182 internal_try_pop(void * dst)183 bool internal_try_pop( void* dst ) { 184 ticket_type k; 185 do { 186 k = my_queue_representation->head_counter.load(std::memory_order_relaxed); 187 do { 188 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { 189 // Queue is empty 190 return false; 191 } 192 193 // Queue had item with ticket k when we looked. Attempt to get that item. 194 // Another thread snatched the item, retry. 195 } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); 196 } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator)); 197 return true; 198 } 199 200 template <typename Container, typename Value, typename A> 201 friend class concurrent_queue_iterator; 202 copy_construct_item(T * location,const void * src)203 static void copy_construct_item(T* location, const void* src) { 204 // TODO: use allocator_traits for copy construction 205 new (location) value_type(*static_cast<const value_type*>(src)); 206 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 207 } 208 move_construct_item(T * location,const void * src)209 static void move_construct_item(T* location, const void* src) { 210 // TODO: use allocator_traits for move construction 211 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 212 } 213 214 queue_allocator_type my_allocator; 215 queue_representation_type* my_queue_representation; 216 }; // class concurrent_queue 217 218 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 219 // Deduction guide for the constructor from two iterators 220 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>, 221 typename = std::enable_if_t<is_input_iterator_v<It>>, 222 typename = std::enable_if_t<is_allocator_v<Alloc>>> 223 concurrent_queue( It, It, Alloc = Alloc() ) 224 -> concurrent_queue<iterator_value_t<It>, Alloc>; 225 226 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 227 228 class concurrent_monitor; 229 230 // The concurrent monitor tags for concurrent_bounded_queue. 231 static constexpr std::size_t cbq_slots_avail_tag = 0; 232 static constexpr std::size_t cbq_items_avail_tag = 1; 233 } // namespace d2 234 235 236 namespace r1 { 237 class concurrent_monitor; 238 239 TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 240 TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 241 TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 242 TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 243 , std::size_t ticket ); 244 TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 245 std::ptrdiff_t target, d1::delegate_base& predicate ); 246 } // namespace r1 247 248 249 namespace d2 { 250 // A high-performance thread-safe blocking concurrent bounded queue. 251 // Supports boundedness and blocking semantics. 252 // Multiple threads may each push and pop concurrently. 253 // Assignment construction is not allowed. 254 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 255 class concurrent_bounded_queue { 256 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 257 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 258 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 259 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 260 261 template <typename FuncType> internal_wait(r1::concurrent_monitor * monitors,std::size_t monitor_tag,std::ptrdiff_t target,FuncType pred)262 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 263 d1::delegated_function<FuncType> func(pred); 264 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 265 } 266 public: 267 using size_type = std::ptrdiff_t; 268 using value_type = T; 269 using reference = T&; 270 using const_reference = const T&; 271 using difference_type = std::ptrdiff_t; 272 273 using allocator_type = Allocator; 274 using pointer = typename allocator_traits_type::pointer; 275 using const_pointer = typename allocator_traits_type::const_pointer; 276 277 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 278 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 279 concurrent_bounded_queue()280 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 281 concurrent_bounded_queue(const allocator_type & a)282 explicit concurrent_bounded_queue( const allocator_type& a ) : 283 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 284 { 285 my_queue_representation = reinterpret_cast<queue_representation_type*>( 286 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 287 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 288 queue_allocator_traits::construct(my_allocator, my_queue_representation); 289 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 290 291 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 292 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 293 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 294 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 295 } 296 297 template <typename InputIterator> 298 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : concurrent_bounded_queue(a)299 concurrent_bounded_queue(a) 300 { 301 for (; begin != end; ++begin) 302 push(*begin); 303 } 304 concurrent_bounded_queue(const concurrent_bounded_queue & src,const allocator_type & a)305 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 306 concurrent_bounded_queue(a) 307 { 308 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 309 } 310 concurrent_bounded_queue(const concurrent_bounded_queue & src)311 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 312 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 313 { 314 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 315 } 316 317 // Move constructors concurrent_bounded_queue(concurrent_bounded_queue && src)318 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 319 concurrent_bounded_queue(std::move(src.my_allocator)) 320 { 321 internal_swap(src); 322 } 323 concurrent_bounded_queue(concurrent_bounded_queue && src,const allocator_type & a)324 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 325 concurrent_bounded_queue(a) 326 { 327 // checking that memory allocated by one instance of allocator can be deallocated 328 // with another 329 if (my_allocator == src.my_allocator) { 330 internal_swap(src); 331 } else { 332 // allocators are different => performing per-element move 333 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 334 src.clear(); 335 } 336 } 337 338 // Destroy queue ~concurrent_bounded_queue()339 ~concurrent_bounded_queue() { 340 clear(); 341 my_queue_representation->clear(my_allocator); 342 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 343 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 344 sizeof(queue_representation_type)); 345 } 346 347 // Enqueue an item at tail of queue. push(const T & value)348 void push( const T& value ) { 349 internal_push(value); 350 } 351 push(T && value)352 void push( T&& value ) { 353 internal_push(std::move(value)); 354 } 355 356 // Enqueue an item at tail of queue if queue is not already full. 357 // Does not wait for queue to become not full. 358 // Returns true if item is pushed; false if queue was already full. try_push(const T & value)359 bool try_push( const T& value ) { 360 return internal_push_if_not_full(value); 361 } 362 try_push(T && value)363 bool try_push( T&& value ) { 364 return internal_push_if_not_full(std::move(value)); 365 } 366 367 template <typename... Args> emplace(Args &&...args)368 void emplace( Args&&... args ) { 369 internal_push(std::forward<Args>(args)...); 370 } 371 372 template <typename... Args> try_emplace(Args &&...args)373 bool try_emplace( Args&&... args ) { 374 return internal_push_if_not_full(std::forward<Args>(args)...); 375 } 376 377 // Attempt to dequeue an item from head of queue. 378 /** Does not wait for item to become available. 379 Returns true if successful; false otherwise. */ pop(T & result)380 bool pop( T& result ) { 381 return internal_pop(&result); 382 } 383 try_pop(T & result)384 bool try_pop( T& result ) { 385 return internal_pop_if_present(&result); 386 } 387 abort()388 void abort() { 389 internal_abort(); 390 } 391 392 // Return the number of items in the queue; thread unsafe size()393 std::ptrdiff_t size() const { 394 return my_queue_representation->size(); 395 } 396 set_capacity(size_type new_capacity)397 void set_capacity( size_type new_capacity ) { 398 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 399 my_capacity = c; 400 } 401 capacity()402 size_type capacity() const { 403 return my_capacity; 404 } 405 406 // Equivalent to size()==0. empty()407 __TBB_nodiscard bool empty() const { 408 return my_queue_representation->empty(); 409 } 410 411 // Clear the queue. not thread-safe. clear()412 void clear() { 413 while (!empty()) { 414 T value; 415 try_pop(value); 416 } 417 } 418 419 // Return allocator object get_allocator()420 allocator_type get_allocator() const { return my_allocator; } 421 422 //------------------------------------------------------------------------ 423 // The iterators are intended only for debugging. They are slow and not thread safe. 424 //------------------------------------------------------------------------ 425 unsafe_begin()426 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } unsafe_end()427 iterator unsafe_end() { return iterator(); } unsafe_begin()428 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } unsafe_end()429 const_iterator unsafe_end() const { return const_iterator(); } unsafe_cbegin()430 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } unsafe_cend()431 const_iterator unsafe_cend() const { return const_iterator(); } 432 433 private: internal_swap(concurrent_bounded_queue & src)434 void internal_swap( concurrent_bounded_queue& src ) { 435 std::swap(my_queue_representation, src.my_queue_representation); 436 std::swap(my_monitors, src.my_monitors); 437 } 438 439 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 440 441 template <typename... Args> internal_push(Args &&...args)442 void internal_push( Args&&... args ) { 443 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 444 ticket_type ticket = my_queue_representation->tail_counter++; 445 std::ptrdiff_t target = ticket - my_capacity; 446 447 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 448 auto pred = [&] { 449 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 450 throw_exception(exception_id::user_abort); 451 } 452 453 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 454 }; 455 456 try_call( [&] { 457 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 458 }).on_exception( [&] { 459 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator); 460 }); 461 462 } 463 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 464 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 465 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 466 } 467 468 template <typename... Args> internal_push_if_not_full(Args &&...args)469 bool internal_push_if_not_full( Args&&... args ) { 470 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 471 do { 472 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 473 // Queue is full 474 return false; 475 } 476 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 477 // Another thread claimed the slot, so retry. 478 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 479 480 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 481 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 482 return true; 483 } 484 internal_pop(void * dst)485 bool internal_pop( void* dst ) { 486 std::ptrdiff_t target; 487 // This loop is a single pop operation; abort_counter should not be re-read inside 488 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 489 490 do { 491 target = my_queue_representation->head_counter++; 492 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 493 auto pred = [&] { 494 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 495 throw_exception(exception_id::user_abort); 496 } 497 498 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 499 }; 500 501 try_call( [&] { 502 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 503 }).on_exception( [&] { 504 my_queue_representation->head_counter--; 505 }); 506 } 507 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 508 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator)); 509 510 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 511 return true; 512 } 513 internal_pop_if_present(void * dst)514 bool internal_pop_if_present( void* dst ) { 515 ticket_type ticket; 516 do { 517 ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); 518 do { 519 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 520 // Queue is empty 521 return false; 522 } 523 // Queue had item with ticket k when we looked. Attempt to get that item. 524 // Another thread snatched the item, retry. 525 } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); 526 } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator)); 527 528 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 529 return true; 530 } 531 internal_abort()532 void internal_abort() { 533 ++my_abort_counter; 534 r1::abort_bounded_queue_monitors(my_monitors); 535 } 536 copy_construct_item(T * location,const void * src)537 static void copy_construct_item(T* location, const void* src) { 538 // TODO: use allocator_traits for copy construction 539 new (location) value_type(*static_cast<const value_type*>(src)); 540 } 541 move_construct_item(T * location,const void * src)542 static void move_construct_item(T* location, const void* src) { 543 // TODO: use allocator_traits for move construction 544 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 545 } 546 547 template <typename Container, typename Value, typename A> 548 friend class concurrent_queue_iterator; 549 550 queue_allocator_type my_allocator; 551 std::ptrdiff_t my_capacity; 552 std::atomic<unsigned> my_abort_counter; 553 queue_representation_type* my_queue_representation; 554 555 r1::concurrent_monitor* my_monitors; 556 }; // class concurrent_bounded_queue 557 558 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 559 // Deduction guide for the constructor from two iterators 560 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>> 561 concurrent_bounded_queue( It, It, Alloc = Alloc() ) 562 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>; 563 564 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 565 566 } //namespace d2 567 } // namesapce detail 568 569 inline namespace v1 { 570 571 using detail::d2::concurrent_queue; 572 using detail::d2::concurrent_bounded_queue; 573 using detail::r1::user_abort; 574 using detail::r1::bad_last_alloc; 575 576 } // inline namespace v1 577 } // namespace tbb 578 579 #endif // __TBB_concurrent_queue_H 580