1 /* 2 Copyright (c) 2005-2017 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 16 17 18 19 */ 20 21 #ifndef __TBB_concurrent_priority_queue_H 22 #define __TBB_concurrent_priority_queue_H 23 24 #include "atomic.h" 25 #include "cache_aligned_allocator.h" 26 #include "tbb_exception.h" 27 #include "tbb_stddef.h" 28 #include "tbb_profiling.h" 29 #include "internal/_aggregator_impl.h" 30 #include <vector> 31 #include <iterator> 32 #include <functional> 33 #include __TBB_STD_SWAP_HEADER 34 35 #if __TBB_INITIALIZER_LISTS_PRESENT 36 #include <initializer_list> 37 #endif 38 39 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 40 #include <type_traits> 41 #endif 42 43 namespace tbb { 44 namespace interface5 { 45 namespace internal { 46 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 47 template<typename T, bool C = std::is_copy_constructible<T>::value> 48 struct use_element_copy_constructor { 49 typedef tbb::internal::true_type type; 50 }; 51 template<typename T> 52 struct use_element_copy_constructor <T,false> { 53 typedef tbb::internal::false_type type; 54 }; 55 #else 56 template<typename> 57 struct use_element_copy_constructor { 58 typedef tbb::internal::true_type type; 59 }; 60 #endif 61 } // namespace internal 62 63 using namespace tbb::internal; 64 65 //! Concurrent priority queue 66 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> > 67 class concurrent_priority_queue { 68 public: 69 //! Element type in the queue. 70 typedef T value_type; 71 72 //! Reference type 73 typedef T& reference; 74 75 //! Const reference type 76 typedef const T& const_reference; 77 78 //! Integral type for representing size of the queue. 79 typedef size_t size_type; 80 81 //! Difference type for iterator 82 typedef ptrdiff_t difference_type; 83 84 //! Allocator type 85 typedef A allocator_type; 86 87 //! Constructs a new concurrent_priority_queue with default capacity 88 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a) 89 { 90 my_aggregator.initialize_handler(my_functor_t(this)); 91 } 92 93 //! Constructs a new concurrent_priority_queue with init_sz capacity 94 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : 95 mark(0), my_size(0), data(a) 96 { 97 data.reserve(init_capacity); 98 my_aggregator.initialize_handler(my_functor_t(this)); 99 } 100 101 //! [begin,end) constructor 102 template<typename InputIterator> 103 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 104 mark(0), data(begin, end, a) 105 { 106 my_aggregator.initialize_handler(my_functor_t(this)); 107 heapify(); 108 my_size = data.size(); 109 } 110 111 #if __TBB_INITIALIZER_LISTS_PRESENT 112 //! Constructor from std::initializer_list 113 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) : 114 mark(0),data(init_list.begin(), init_list.end(), a) 115 { 116 my_aggregator.initialize_handler(my_functor_t(this)); 117 heapify(); 118 my_size = data.size(); 119 } 120 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 121 122 //! Copy constructor 123 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 124 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), 125 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator()) 126 { 127 my_aggregator.initialize_handler(my_functor_t(this)); 128 heapify(); 129 } 130 131 //! Copy constructor with specific allocator 132 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 133 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), 134 my_size(src.my_size), data(src.data.begin(), src.data.end(), a) 135 { 136 my_aggregator.initialize_handler(my_functor_t(this)); 137 heapify(); 138 } 139 140 //! Assignment operator 141 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 142 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) { 143 if (this != &src) { 144 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data); 145 mark = src.mark; 146 my_size = src.my_size; 147 } 148 return *this; 149 } 150 151 #if __TBB_CPP11_RVALUE_REF_PRESENT 152 //! Move constructor 153 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 154 concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark), 155 my_size(src.my_size), data(std::move(src.data)) 156 { 157 my_aggregator.initialize_handler(my_functor_t(this)); 158 } 159 160 //! Move constructor with specific allocator 161 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 162 concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark), 163 my_size(src.my_size), 164 #if __TBB_ALLOCATOR_TRAITS_PRESENT 165 data(std::move(src.data), a) 166 #else 167 // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator). 168 // It seems that the reason is absence of support of allocator_traits (stateful allocators). 169 data(a) 170 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT 171 { 172 my_aggregator.initialize_handler(my_functor_t(this)); 173 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 174 if (a != src.data.get_allocator()){ 175 data.reserve(src.data.size()); 176 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end())); 177 }else{ 178 data = std::move(src.data); 179 } 180 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT 181 } 182 183 //! Move assignment operator 184 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 185 concurrent_priority_queue& operator=( concurrent_priority_queue&& src) { 186 if (this != &src) { 187 mark = src.mark; 188 my_size = src.my_size; 189 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 190 if (data.get_allocator() != src.data.get_allocator()){ 191 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data); 192 }else 193 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT 194 { 195 data = std::move(src.data); 196 } 197 } 198 return *this; 199 } 200 #endif //__TBB_CPP11_RVALUE_REF_PRESENT 201 202 //! Assign the queue from [begin,end) range, not thread-safe 203 template<typename InputIterator> 204 void assign(InputIterator begin, InputIterator end) { 205 vector_t(begin, end, data.get_allocator()).swap(data); 206 mark = 0; 207 my_size = data.size(); 208 heapify(); 209 } 210 211 #if __TBB_INITIALIZER_LISTS_PRESENT 212 //! Assign the queue from std::initializer_list, not thread-safe 213 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); } 214 215 //! Assign from std::initializer_list, not thread-safe 216 concurrent_priority_queue& operator=(std::initializer_list<T> il) { 217 this->assign(il.begin(), il.end()); 218 return *this; 219 } 220 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 221 222 //! Returns true if empty, false otherwise 223 /** Returned value may not reflect results of pending operations. 224 This operation reads shared data and will trigger a race condition. */ 225 bool empty() const { return size()==0; } 226 227 //! Returns the current number of elements contained in the queue 228 /** Returned value may not reflect results of pending operations. 229 This operation reads shared data and will trigger a race condition. */ 230 size_type size() const { return __TBB_load_with_acquire(my_size); } 231 232 //! Pushes elem onto the queue, increasing capacity of queue if necessary 233 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 234 void push(const_reference elem) { 235 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 236 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." ); 237 #endif 238 cpq_operation op_data(elem, PUSH_OP); 239 my_aggregator.execute(&op_data); 240 if (op_data.status == FAILED) // exception thrown 241 throw_exception(eid_bad_alloc); 242 } 243 244 #if __TBB_CPP11_RVALUE_REF_PRESENT 245 //! Pushes elem onto the queue, increasing capacity of queue if necessary 246 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 247 void push(value_type &&elem) { 248 cpq_operation op_data(elem, PUSH_RVALUE_OP); 249 my_aggregator.execute(&op_data); 250 if (op_data.status == FAILED) // exception thrown 251 throw_exception(eid_bad_alloc); 252 } 253 254 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT 255 //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */ 256 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 257 template<typename... Args> 258 void emplace(Args&&... args) { 259 push(value_type(std::forward<Args>(args)...)); 260 } 261 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */ 262 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ 263 264 //! Gets a reference to and removes highest priority element 265 /** If a highest priority element was found, sets elem and returns true, 266 otherwise returns false. 267 This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 268 bool try_pop(reference elem) { 269 cpq_operation op_data(POP_OP); 270 op_data.elem = &elem; 271 my_aggregator.execute(&op_data); 272 return op_data.status==SUCCEEDED; 273 } 274 275 //! Clear the queue; not thread-safe 276 /** This operation is unsafe if there are pending concurrent operations on the queue. 277 Resets size, effectively emptying queue; does not free space. 278 May not clear elements added in pending operations. */ 279 void clear() { 280 data.clear(); 281 mark = 0; 282 my_size = 0; 283 } 284 285 //! Swap this queue with another; not thread-safe 286 /** This operation is unsafe if there are pending concurrent operations on the queue. */ 287 void swap(concurrent_priority_queue& q) { 288 using std::swap; 289 data.swap(q.data); 290 swap(mark, q.mark); 291 swap(my_size, q.my_size); 292 } 293 294 //! Return allocator object 295 allocator_type get_allocator() const { return data.get_allocator(); } 296 297 private: 298 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP}; 299 enum operation_status { WAIT=0, SUCCEEDED, FAILED }; 300 301 class cpq_operation : public aggregated_operation<cpq_operation> { 302 public: 303 operation_type type; 304 union { 305 value_type *elem; 306 size_type sz; 307 }; 308 cpq_operation(const_reference e, operation_type t) : 309 type(t), elem(const_cast<value_type*>(&e)) {} 310 cpq_operation(operation_type t) : type(t) {} 311 }; 312 313 class my_functor_t { 314 concurrent_priority_queue<T, Compare, A> *cpq; 315 public: 316 my_functor_t() {} 317 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {} 318 void operator()(cpq_operation* op_list) { 319 cpq->handle_operations(op_list); 320 } 321 }; 322 323 typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t; 324 aggregator_t my_aggregator; 325 //! Padding added to avoid false sharing 326 char padding1[NFS_MaxLineSize - sizeof(aggregator_t)]; 327 //! The point at which unsorted elements begin 328 size_type mark; 329 __TBB_atomic size_type my_size; 330 Compare compare; 331 //! Padding added to avoid false sharing 332 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)]; 333 //! Storage for the heap of elements in queue, plus unheapified elements 334 /** data has the following structure: 335 336 binary unheapified 337 heap elements 338 ____|_______|____ 339 | | | 340 v v v 341 [_|...|_|_|...|_| |...| ] 342 0 ^ ^ ^ 343 | | |__capacity 344 | |__my_size 345 |__mark 346 347 Thus, data stores the binary heap starting at position 0 through 348 mark-1 (it may be empty). Then there are 0 or more elements 349 that have not yet been inserted into the heap, in positions 350 mark through my_size-1. */ 351 typedef std::vector<value_type, allocator_type> vector_t; 352 vector_t data; 353 354 void handle_operations(cpq_operation *op_list) { 355 cpq_operation *tmp, *pop_list=NULL; 356 357 __TBB_ASSERT(mark == data.size(), NULL); 358 359 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops. 360 while (op_list) { 361 // ITT note: &(op_list->status) tag is used to cover accesses to op_list 362 // node. This thread is going to handle the operation, and so will acquire it 363 // and perform the associated operation w/o triggering a race condition; the 364 // thread that created the operation is waiting on the status field, so when 365 // this thread is done with the operation, it will perform a 366 // store_with_release to give control back to the waiting thread in 367 // aggregator::insert_operation. 368 call_itt_notify(acquired, &(op_list->status)); 369 __TBB_ASSERT(op_list->type != INVALID_OP, NULL); 370 tmp = op_list; 371 op_list = itt_hide_load_word(op_list->next); 372 if (tmp->type == POP_OP) { 373 if (mark < data.size() && 374 compare(data[0], data[data.size()-1])) { 375 // there are newly pushed elems and the last one 376 // is higher than top 377 *(tmp->elem) = tbb::internal::move(data[data.size()-1]); 378 __TBB_store_with_release(my_size, my_size-1); 379 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 380 data.pop_back(); 381 __TBB_ASSERT(mark<=data.size(), NULL); 382 } 383 else { // no convenient item to pop; postpone 384 itt_hide_store_word(tmp->next, pop_list); 385 pop_list = tmp; 386 } 387 } else { // PUSH_OP or PUSH_RVALUE_OP 388 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" ); 389 __TBB_TRY{ 390 if (tmp->type == PUSH_OP) { 391 push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type()); 392 } else { 393 data.push_back(tbb::internal::move(*(tmp->elem))); 394 } 395 __TBB_store_with_release(my_size, my_size + 1); 396 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 397 } __TBB_CATCH(...) { 398 itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); 399 } 400 } 401 } 402 403 // second pass processes pop operations 404 while (pop_list) { 405 tmp = pop_list; 406 pop_list = itt_hide_load_word(pop_list->next); 407 __TBB_ASSERT(tmp->type == POP_OP, NULL); 408 if (data.empty()) { 409 itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); 410 } 411 else { 412 __TBB_ASSERT(mark<=data.size(), NULL); 413 if (mark < data.size() && 414 compare(data[0], data[data.size()-1])) { 415 // there are newly pushed elems and the last one is 416 // higher than top 417 *(tmp->elem) = tbb::internal::move(data[data.size()-1]); 418 __TBB_store_with_release(my_size, my_size-1); 419 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 420 data.pop_back(); 421 } 422 else { // extract top and push last element down heap 423 *(tmp->elem) = tbb::internal::move(data[0]); 424 __TBB_store_with_release(my_size, my_size-1); 425 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 426 reheap(); 427 } 428 } 429 } 430 431 // heapify any leftover pushed elements before doing the next 432 // batch of operations 433 if (mark<data.size()) heapify(); 434 __TBB_ASSERT(mark == data.size(), NULL); 435 } 436 437 //! Merge unsorted elements into heap 438 void heapify() { 439 if (!mark && data.size()>0) mark = 1; 440 for (; mark<data.size(); ++mark) { 441 // for each unheapified element under size 442 size_type cur_pos = mark; 443 value_type to_place = tbb::internal::move(data[mark]); 444 do { // push to_place up the heap 445 size_type parent = (cur_pos-1)>>1; 446 if (!compare(data[parent], to_place)) break; 447 data[cur_pos] = tbb::internal::move(data[parent]); 448 cur_pos = parent; 449 } while( cur_pos ); 450 data[cur_pos] = tbb::internal::move(to_place); 451 } 452 } 453 454 //! Re-heapify after an extraction 455 /** Re-heapify by pushing last element down the heap from the root. */ 456 void reheap() { 457 size_type cur_pos=0, child=1; 458 459 while (child < mark) { 460 size_type target = child; 461 if (child+1 < mark && compare(data[child], data[child+1])) 462 ++target; 463 // target now has the higher priority child 464 if (compare(data[target], data[data.size()-1])) break; 465 data[cur_pos] = tbb::internal::move(data[target]); 466 cur_pos = target; 467 child = (cur_pos<<1)+1; 468 } 469 if (cur_pos != data.size()-1) 470 data[cur_pos] = tbb::internal::move(data[data.size()-1]); 471 data.pop_back(); 472 if (mark > data.size()) mark = data.size(); 473 } 474 475 void push_back_helper(const T& t, tbb::internal::true_type) { 476 data.push_back(t); 477 } 478 479 void push_back_helper(const T&, tbb::internal::false_type) { 480 __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." ); 481 } 482 }; 483 484 } // namespace interface5 485 486 using interface5::concurrent_priority_queue; 487 488 } // namespace tbb 489 490 #endif /* __TBB_concurrent_priority_queue_H */ 491