1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_priority_queue_H 18 #define __TBB_concurrent_priority_queue_H 19 20 #define __TBB_concurrent_priority_queue_H_include_area 21 #include "internal/_warning_suppress_enable_notice.h" 22 23 #include "atomic.h" 24 #include "cache_aligned_allocator.h" 25 #include "tbb_exception.h" 26 #include "tbb_stddef.h" 27 #include "tbb_profiling.h" 28 #include "internal/_aggregator_impl.h" 29 #include "internal/_template_helpers.h" 30 #include "internal/_allocator_traits.h" 31 #include <vector> 32 #include <iterator> 33 #include <functional> 34 #include __TBB_STD_SWAP_HEADER 35 36 #if __TBB_INITIALIZER_LISTS_PRESENT 37 #include <initializer_list> 38 #endif 39 40 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 41 #include <type_traits> 42 #endif 43 44 namespace tbb { 45 namespace interface5 { 46 namespace internal { 47 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 48 template<typename T, bool C = std::is_copy_constructible<T>::value> 49 struct use_element_copy_constructor { 50 typedef tbb::internal::true_type type; 51 }; 52 template<typename T> 53 struct use_element_copy_constructor <T,false> { 54 typedef tbb::internal::false_type type; 55 }; 56 #else 57 template<typename> 58 struct use_element_copy_constructor { 59 typedef tbb::internal::true_type type; 60 }; 61 #endif 62 } // namespace internal 63 64 using namespace tbb::internal; 65 66 //! Concurrent priority queue 67 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> > 68 class concurrent_priority_queue { 69 public: 70 //! Element type in the queue. 71 typedef T value_type; 72 73 //! Reference type 74 typedef T& reference; 75 76 //! Const reference type 77 typedef const T& const_reference; 78 79 //! Integral type for representing size of the queue. 80 typedef size_t size_type; 81 82 //! Difference type for iterator 83 typedef ptrdiff_t difference_type; 84 85 //! Allocator type 86 typedef A allocator_type; 87 88 //! Constructs a new concurrent_priority_queue with default capacity 89 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(), data(a) 90 { 91 my_aggregator.initialize_handler(my_functor_t(this)); 92 } 93 94 //! Constructs a new concurrent_priority_queue with default capacity 95 explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a) 96 { 97 my_aggregator.initialize_handler(my_functor_t(this)); 98 } 99 100 //! Constructs a new concurrent_priority_queue with init_sz capacity 101 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : 102 mark(0), my_size(0), compare(), data(a) 103 { 104 data.reserve(init_capacity); 105 my_aggregator.initialize_handler(my_functor_t(this)); 106 } 107 108 //! Constructs a new concurrent_priority_queue with init_sz capacity 109 explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) : 110 mark(0), my_size(0), compare(c), data(a) 111 { 112 data.reserve(init_capacity); 113 my_aggregator.initialize_handler(my_functor_t(this)); 114 } 115 116 //! [begin,end) constructor 117 template<typename InputIterator> 118 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 119 mark(0), compare(), data(begin, end, a) 120 { 121 my_aggregator.initialize_handler(my_functor_t(this)); 122 heapify(); 123 my_size = data.size(); 124 } 125 126 //! [begin,end) constructor 127 template<typename InputIterator> 128 concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) : 129 mark(0), compare(c), data(begin, end, a) 130 { 131 my_aggregator.initialize_handler(my_functor_t(this)); 132 heapify(); 133 my_size = data.size(); 134 } 135 136 #if __TBB_INITIALIZER_LISTS_PRESENT 137 //! Constructor from std::initializer_list 138 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) : 139 mark(0), compare(), data(init_list.begin(), init_list.end(), a) 140 { 141 my_aggregator.initialize_handler(my_functor_t(this)); 142 heapify(); 143 my_size = data.size(); 144 } 145 146 //! Constructor from std::initializer_list 147 concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) : 148 mark(0), compare(c), data(init_list.begin(), init_list.end(), a) 149 { 150 my_aggregator.initialize_handler(my_functor_t(this)); 151 heapify(); 152 my_size = data.size(); 153 } 154 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 155 156 //! Copy constructor 157 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 158 concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), 159 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator()) 160 { 161 my_aggregator.initialize_handler(my_functor_t(this)); 162 heapify(); 163 } 164 165 //! Copy constructor with specific allocator 166 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 167 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), 168 my_size(src.my_size), data(src.data.begin(), src.data.end(), a) 169 { 170 my_aggregator.initialize_handler(my_functor_t(this)); 171 heapify(); 172 } 173 174 //! Assignment operator 175 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 176 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) { 177 if (this != &src) { 178 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data); 179 mark = src.mark; 180 my_size = src.my_size; 181 } 182 return *this; 183 } 184 185 #if __TBB_CPP11_RVALUE_REF_PRESENT 186 //! Move constructor 187 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 188 concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark), 189 my_size(src.my_size), data(std::move(src.data)) 190 { 191 my_aggregator.initialize_handler(my_functor_t(this)); 192 } 193 194 //! Move constructor with specific allocator 195 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 196 concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark), 197 my_size(src.my_size), 198 #if __TBB_ALLOCATOR_TRAITS_PRESENT 199 data(std::move(src.data), a) 200 #else 201 // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator). 202 // It seems that the reason is absence of support of allocator_traits (stateful allocators). 203 data(a) 204 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT 205 { 206 my_aggregator.initialize_handler(my_functor_t(this)); 207 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 208 if (a != src.data.get_allocator()){ 209 data.reserve(src.data.size()); 210 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end())); 211 }else{ 212 data = std::move(src.data); 213 } 214 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT 215 } 216 217 //! Move assignment operator 218 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 219 concurrent_priority_queue& operator=( concurrent_priority_queue&& src) { 220 if (this != &src) { 221 mark = src.mark; 222 my_size = src.my_size; 223 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 224 if (data.get_allocator() != src.data.get_allocator()){ 225 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data); 226 }else 227 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT 228 { 229 data = std::move(src.data); 230 } 231 } 232 return *this; 233 } 234 #endif //__TBB_CPP11_RVALUE_REF_PRESENT 235 236 //! Assign the queue from [begin,end) range, not thread-safe 237 template<typename InputIterator> 238 void assign(InputIterator begin, InputIterator end) { 239 vector_t(begin, end, data.get_allocator()).swap(data); 240 mark = 0; 241 my_size = data.size(); 242 heapify(); 243 } 244 245 #if __TBB_INITIALIZER_LISTS_PRESENT 246 //! Assign the queue from std::initializer_list, not thread-safe 247 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); } 248 249 //! Assign from std::initializer_list, not thread-safe 250 concurrent_priority_queue& operator=(std::initializer_list<T> il) { 251 this->assign(il.begin(), il.end()); 252 return *this; 253 } 254 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 255 256 //! Returns true if empty, false otherwise 257 /** Returned value may not reflect results of pending operations. 258 This operation reads shared data and will trigger a race condition. */ 259 bool empty() const { return size()==0; } 260 261 //! Returns the current number of elements contained in the queue 262 /** Returned value may not reflect results of pending operations. 263 This operation reads shared data and will trigger a race condition. */ 264 size_type size() const { return __TBB_load_with_acquire(my_size); } 265 266 //! Pushes elem onto the queue, increasing capacity of queue if necessary 267 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 268 void push(const_reference elem) { 269 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 270 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." ); 271 #endif 272 cpq_operation op_data(elem, PUSH_OP); 273 my_aggregator.execute(&op_data); 274 if (op_data.status == FAILED) // exception thrown 275 throw_exception(eid_bad_alloc); 276 } 277 278 #if __TBB_CPP11_RVALUE_REF_PRESENT 279 //! Pushes elem onto the queue, increasing capacity of queue if necessary 280 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 281 void push(value_type &&elem) { 282 cpq_operation op_data(elem, PUSH_RVALUE_OP); 283 my_aggregator.execute(&op_data); 284 if (op_data.status == FAILED) // exception thrown 285 throw_exception(eid_bad_alloc); 286 } 287 288 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT 289 //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */ 290 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 291 template<typename... Args> 292 void emplace(Args&&... args) { 293 push(value_type(std::forward<Args>(args)...)); 294 } 295 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */ 296 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ 297 298 //! Gets a reference to and removes highest priority element 299 /** If a highest priority element was found, sets elem and returns true, 300 otherwise returns false. 301 This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 302 bool try_pop(reference elem) { 303 cpq_operation op_data(POP_OP); 304 op_data.elem = &elem; 305 my_aggregator.execute(&op_data); 306 return op_data.status==SUCCEEDED; 307 } 308 309 //! Clear the queue; not thread-safe 310 /** This operation is unsafe if there are pending concurrent operations on the queue. 311 Resets size, effectively emptying queue; does not free space. 312 May not clear elements added in pending operations. */ 313 void clear() { 314 data.clear(); 315 mark = 0; 316 my_size = 0; 317 } 318 319 //! Swap this queue with another; not thread-safe 320 /** This operation is unsafe if there are pending concurrent operations on the queue. */ 321 void swap(concurrent_priority_queue& q) { 322 using std::swap; 323 data.swap(q.data); 324 swap(mark, q.mark); 325 swap(my_size, q.my_size); 326 } 327 328 //! Return allocator object 329 allocator_type get_allocator() const { return data.get_allocator(); } 330 331 private: 332 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP}; 333 enum operation_status { WAIT=0, SUCCEEDED, FAILED }; 334 335 class cpq_operation : public aggregated_operation<cpq_operation> { 336 public: 337 operation_type type; 338 union { 339 value_type *elem; 340 size_type sz; 341 }; 342 cpq_operation(const_reference e, operation_type t) : 343 type(t), elem(const_cast<value_type*>(&e)) {} 344 cpq_operation(operation_type t) : type(t) {} 345 }; 346 347 class my_functor_t { 348 concurrent_priority_queue<T, Compare, A> *cpq; 349 public: 350 my_functor_t() {} 351 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {} 352 void operator()(cpq_operation* op_list) { 353 cpq->handle_operations(op_list); 354 } 355 }; 356 357 typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t; 358 aggregator_t my_aggregator; 359 //! Padding added to avoid false sharing 360 char padding1[NFS_MaxLineSize - sizeof(aggregator_t)]; 361 //! The point at which unsorted elements begin 362 size_type mark; 363 __TBB_atomic size_type my_size; 364 Compare compare; 365 //! Padding added to avoid false sharing 366 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)]; 367 //! Storage for the heap of elements in queue, plus unheapified elements 368 /** data has the following structure: 369 370 binary unheapified 371 heap elements 372 ____|_______|____ 373 | | | 374 v v v 375 [_|...|_|_|...|_| |...| ] 376 0 ^ ^ ^ 377 | | |__capacity 378 | |__my_size 379 |__mark 380 381 Thus, data stores the binary heap starting at position 0 through 382 mark-1 (it may be empty). Then there are 0 or more elements 383 that have not yet been inserted into the heap, in positions 384 mark through my_size-1. */ 385 typedef std::vector<value_type, allocator_type> vector_t; 386 vector_t data; 387 388 void handle_operations(cpq_operation *op_list) { 389 cpq_operation *tmp, *pop_list=NULL; 390 391 __TBB_ASSERT(mark == data.size(), NULL); 392 393 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops. 394 while (op_list) { 395 // ITT note: &(op_list->status) tag is used to cover accesses to op_list 396 // node. This thread is going to handle the operation, and so will acquire it 397 // and perform the associated operation w/o triggering a race condition; the 398 // thread that created the operation is waiting on the status field, so when 399 // this thread is done with the operation, it will perform a 400 // store_with_release to give control back to the waiting thread in 401 // aggregator::insert_operation. 402 call_itt_notify(acquired, &(op_list->status)); 403 __TBB_ASSERT(op_list->type != INVALID_OP, NULL); 404 tmp = op_list; 405 op_list = itt_hide_load_word(op_list->next); 406 if (tmp->type == POP_OP) { 407 if (mark < data.size() && 408 compare(data[0], data[data.size()-1])) { 409 // there are newly pushed elems and the last one 410 // is higher than top 411 *(tmp->elem) = tbb::internal::move(data[data.size()-1]); 412 __TBB_store_with_release(my_size, my_size-1); 413 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 414 data.pop_back(); 415 __TBB_ASSERT(mark<=data.size(), NULL); 416 } 417 else { // no convenient item to pop; postpone 418 itt_hide_store_word(tmp->next, pop_list); 419 pop_list = tmp; 420 } 421 } else { // PUSH_OP or PUSH_RVALUE_OP 422 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" ); 423 __TBB_TRY{ 424 if (tmp->type == PUSH_OP) { 425 push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type()); 426 } else { 427 data.push_back(tbb::internal::move(*(tmp->elem))); 428 } 429 __TBB_store_with_release(my_size, my_size + 1); 430 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 431 } __TBB_CATCH(...) { 432 itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); 433 } 434 } 435 } 436 437 // second pass processes pop operations 438 while (pop_list) { 439 tmp = pop_list; 440 pop_list = itt_hide_load_word(pop_list->next); 441 __TBB_ASSERT(tmp->type == POP_OP, NULL); 442 if (data.empty()) { 443 itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); 444 } 445 else { 446 __TBB_ASSERT(mark<=data.size(), NULL); 447 if (mark < data.size() && 448 compare(data[0], data[data.size()-1])) { 449 // there are newly pushed elems and the last one is 450 // higher than top 451 *(tmp->elem) = tbb::internal::move(data[data.size()-1]); 452 __TBB_store_with_release(my_size, my_size-1); 453 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 454 data.pop_back(); 455 } 456 else { // extract top and push last element down heap 457 *(tmp->elem) = tbb::internal::move(data[0]); 458 __TBB_store_with_release(my_size, my_size-1); 459 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 460 reheap(); 461 } 462 } 463 } 464 465 // heapify any leftover pushed elements before doing the next 466 // batch of operations 467 if (mark<data.size()) heapify(); 468 __TBB_ASSERT(mark == data.size(), NULL); 469 } 470 471 //! Merge unsorted elements into heap 472 void heapify() { 473 if (!mark && data.size()>0) mark = 1; 474 for (; mark<data.size(); ++mark) { 475 // for each unheapified element under size 476 size_type cur_pos = mark; 477 value_type to_place = tbb::internal::move(data[mark]); 478 do { // push to_place up the heap 479 size_type parent = (cur_pos-1)>>1; 480 if (!compare(data[parent], to_place)) break; 481 data[cur_pos] = tbb::internal::move(data[parent]); 482 cur_pos = parent; 483 } while( cur_pos ); 484 data[cur_pos] = tbb::internal::move(to_place); 485 } 486 } 487 488 //! Re-heapify after an extraction 489 /** Re-heapify by pushing last element down the heap from the root. */ 490 void reheap() { 491 size_type cur_pos=0, child=1; 492 493 while (child < mark) { 494 size_type target = child; 495 if (child+1 < mark && compare(data[child], data[child+1])) 496 ++target; 497 // target now has the higher priority child 498 if (compare(data[target], data[data.size()-1])) break; 499 data[cur_pos] = tbb::internal::move(data[target]); 500 cur_pos = target; 501 child = (cur_pos<<1)+1; 502 } 503 if (cur_pos != data.size()-1) 504 data[cur_pos] = tbb::internal::move(data[data.size()-1]); 505 data.pop_back(); 506 if (mark > data.size()) mark = data.size(); 507 } 508 509 void push_back_helper(const T& t, tbb::internal::true_type) { 510 data.push_back(t); 511 } 512 513 void push_back_helper(const T&, tbb::internal::false_type) { 514 __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." ); 515 } 516 }; 517 518 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 519 namespace internal { 520 521 template<typename T, typename... Args> 522 using priority_queue_t = concurrent_priority_queue< 523 T, 524 std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >, 525 pack_element_t<0, Args...>, std::less<T> >, 526 std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >, 527 pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> > 528 >; 529 } 530 531 // Deduction guide for the constructor from two iterators 532 template<typename InputIterator, 533 typename T = typename std::iterator_traits<InputIterator>::value_type, 534 typename... Args 535 > concurrent_priority_queue(InputIterator, InputIterator, Args...) 536 -> internal::priority_queue_t<T, Args...>; 537 538 template<typename T, typename CompareOrAllocalor> 539 concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor) 540 -> internal::priority_queue_t<T, CompareOrAllocalor>; 541 542 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 543 } // namespace interface5 544 545 using interface5::concurrent_priority_queue; 546 547 } // namespace tbb 548 549 #include "internal/_warning_suppress_disable_notice.h" 550 #undef __TBB_concurrent_priority_queue_H_include_area 551 552 #endif /* __TBB_concurrent_priority_queue_H */ 553