1 /* 2 Copyright 2005-2014 Intel Corporation. All Rights Reserved. 3 4 This file is part of Threading Building Blocks. Threading Building Blocks is free software; 5 you can redistribute it and/or modify it under the terms of the GNU General Public License 6 version 2 as published by the Free Software Foundation. Threading Building Blocks is 7 distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the 8 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 9 See the GNU General Public License for more details. You should have received a copy of 10 the GNU General Public License along with Threading Building Blocks; if not, write to the 11 Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 12 13 As a special exception, you may use this file as part of a free software library without 14 restriction. Specifically, if other files instantiate templates or use macros or inline 15 functions from this file, or you compile this file and link it with other files to produce 16 an executable, this file does not by itself cause the resulting executable to be covered 17 by the GNU General Public License. This exception does not however invalidate any other 18 reasons why the executable file might be covered by the GNU General Public License. 19 */ 20 21 #ifndef __TBB_concurrent_priority_queue_H 22 #define __TBB_concurrent_priority_queue_H 23 24 #include "atomic.h" 25 #include "cache_aligned_allocator.h" 26 #include "tbb_exception.h" 27 #include "tbb_stddef.h" 28 #include "tbb_profiling.h" 29 #include "internal/_aggregator_impl.h" 30 #include <vector> 31 #include <iterator> 32 #include <functional> 33 34 #if __TBB_INITIALIZER_LISTS_PRESENT 35 #include <initializer_list> 36 #endif 37 38 namespace tbb { 39 namespace interface5 { 40 41 using namespace tbb::internal; 42 43 //! Concurrent priority queue 44 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> > 45 class concurrent_priority_queue { 46 public: 47 //! Element type in the queue. 48 typedef T value_type; 49 50 //! Reference type 51 typedef T& reference; 52 53 //! Const reference type 54 typedef const T& const_reference; 55 56 //! Integral type for representing size of the queue. 57 typedef size_t size_type; 58 59 //! Difference type for iterator 60 typedef ptrdiff_t difference_type; 61 62 //! Allocator type 63 typedef A allocator_type; 64 65 //! Constructs a new concurrent_priority_queue with default capacity 66 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a) 67 { 68 my_aggregator.initialize_handler(my_functor_t(this)); 69 } 70 71 //! Constructs a new concurrent_priority_queue with init_sz capacity 72 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : 73 mark(0), my_size(0), data(a) 74 { 75 data.reserve(init_capacity); 76 my_aggregator.initialize_handler(my_functor_t(this)); 77 } 78 79 //! [begin,end) constructor 80 template<typename InputIterator> 81 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 82 mark(0), data(begin, end, a) 83 { 84 my_aggregator.initialize_handler(my_functor_t(this)); 85 heapify(); 86 my_size = data.size(); 87 } 88 89 #if __TBB_INITIALIZER_LISTS_PRESENT 90 //! Constructor from std::initializer_list 91 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) : 92 mark(0),data(init_list.begin(), init_list.end(), a) 93 { 94 my_aggregator.initialize_handler(my_functor_t(this)); 95 heapify(); 96 my_size = data.size(); 97 } 98 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 99 100 //! Copy constructor 101 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ concurrent_priority_queue(const concurrent_priority_queue & src)102 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), 103 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator()) 104 { 105 my_aggregator.initialize_handler(my_functor_t(this)); 106 heapify(); 107 } 108 109 //! Copy constructor with specific allocator 110 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ concurrent_priority_queue(const concurrent_priority_queue & src,const allocator_type & a)111 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), 112 my_size(src.my_size), data(src.data.begin(), src.data.end(), a) 113 { 114 my_aggregator.initialize_handler(my_functor_t(this)); 115 heapify(); 116 } 117 118 //! Assignment operator 119 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 120 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) { 121 if (this != &src) { 122 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data); 123 mark = src.mark; 124 my_size = src.my_size; 125 } 126 return *this; 127 } 128 129 #if __TBB_CPP11_RVALUE_REF_PRESENT 130 //! Move constructor 131 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ concurrent_priority_queue(concurrent_priority_queue && src)132 concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark), 133 my_size(src.my_size), data(std::move(src.data)) 134 { 135 my_aggregator.initialize_handler(my_functor_t(this)); 136 } 137 138 //! Move constructor with specific allocator 139 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ concurrent_priority_queue(concurrent_priority_queue && src,const allocator_type & a)140 concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark), 141 my_size(src.my_size), 142 #if __TBB_ALLOCATOR_TRAITS_PRESENT 143 data(std::move(src.data), a) 144 #else 145 // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator). 146 // It seems that the reason is absence of support of allocator_traits (stateful allocators). 147 data(a) 148 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT 149 { 150 my_aggregator.initialize_handler(my_functor_t(this)); 151 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 152 if (a != src.data.get_allocator()){ 153 data.reserve(src.data.size()); 154 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end())); 155 }else{ 156 data = std::move(src.data); 157 } 158 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT 159 } 160 161 //! Move assignment operator 162 /** This operation is unsafe if there are pending concurrent operations on the src queue. */ 163 concurrent_priority_queue& operator=( concurrent_priority_queue&& src) { 164 if (this != &src) { 165 mark = src.mark; 166 my_size = src.my_size; 167 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 168 if (data.get_allocator() != src.data.get_allocator()){ 169 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data); 170 }else 171 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT 172 { 173 data = std::move(src.data); 174 } 175 } 176 return *this; 177 } 178 #endif //__TBB_CPP11_RVALUE_REF_PRESENT 179 180 //! Assign the queue from [begin,end) range, not thread-safe 181 template<typename InputIterator> assign(InputIterator begin,InputIterator end)182 void assign(InputIterator begin, InputIterator end) { 183 vector_t(begin, end, data.get_allocator()).swap(data); 184 mark = 0; 185 my_size = data.size(); 186 heapify(); 187 } 188 189 #if __TBB_INITIALIZER_LISTS_PRESENT 190 //! Assign the queue from std::initializer_list, not thread-safe assign(std::initializer_list<T> il)191 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); } 192 193 //! Assign from std::initializer_list, not thread-safe 194 concurrent_priority_queue& operator=(std::initializer_list<T> il) { 195 this->assign(il.begin(), il.end()); 196 return *this; 197 } 198 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 199 200 //! Returns true if empty, false otherwise 201 /** Returned value may not reflect results of pending operations. 202 This operation reads shared data and will trigger a race condition. */ empty()203 bool empty() const { return size()==0; } 204 205 //! Returns the current number of elements contained in the queue 206 /** Returned value may not reflect results of pending operations. 207 This operation reads shared data and will trigger a race condition. */ size()208 size_type size() const { return __TBB_load_with_acquire(my_size); } 209 210 //! Pushes elem onto the queue, increasing capacity of queue if necessary 211 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ push(const_reference elem)212 void push(const_reference elem) { 213 cpq_operation op_data(elem, PUSH_OP); 214 my_aggregator.execute(&op_data); 215 if (op_data.status == FAILED) // exception thrown 216 throw_exception(eid_bad_alloc); 217 } 218 219 #if __TBB_CPP11_RVALUE_REF_PRESENT 220 //! Pushes elem onto the queue, increasing capacity of queue if necessary 221 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ push(value_type && elem)222 void push(value_type &&elem) { 223 cpq_operation op_data(elem, PUSH_RVALUE_OP); 224 my_aggregator.execute(&op_data); 225 if (op_data.status == FAILED) // exception thrown 226 throw_exception(eid_bad_alloc); 227 } 228 229 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT 230 //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */ 231 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 232 template<typename... Args> emplace(Args &&...args)233 void emplace(Args&&... args) { 234 push(value_type(std::forward<Args>(args)...)); 235 } 236 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */ 237 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ 238 239 //! Gets a reference to and removes highest priority element 240 /** If a highest priority element was found, sets elem and returns true, 241 otherwise returns false. 242 This operation can be safely used concurrently with other push, try_pop or emplace operations. */ try_pop(reference elem)243 bool try_pop(reference elem) { 244 cpq_operation op_data(POP_OP); 245 op_data.elem = &elem; 246 my_aggregator.execute(&op_data); 247 return op_data.status==SUCCEEDED; 248 } 249 250 //! Clear the queue; not thread-safe 251 /** This operation is unsafe if there are pending concurrent operations on the queue. 252 Resets size, effectively emptying queue; does not free space. 253 May not clear elements added in pending operations. */ clear()254 void clear() { 255 data.clear(); 256 mark = 0; 257 my_size = 0; 258 } 259 260 //! Swap this queue with another; not thread-safe 261 /** This operation is unsafe if there are pending concurrent operations on the queue. */ swap(concurrent_priority_queue & q)262 void swap(concurrent_priority_queue& q) { 263 using std::swap; 264 data.swap(q.data); 265 swap(mark, q.mark); 266 swap(my_size, q.my_size); 267 } 268 269 //! Return allocator object get_allocator()270 allocator_type get_allocator() const { return data.get_allocator(); } 271 272 private: 273 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP}; 274 enum operation_status { WAIT=0, SUCCEEDED, FAILED }; 275 276 class cpq_operation : public aggregated_operation<cpq_operation> { 277 public: 278 operation_type type; 279 union { 280 value_type *elem; 281 size_type sz; 282 }; cpq_operation(const_reference e,operation_type t)283 cpq_operation(const_reference e, operation_type t) : 284 type(t), elem(const_cast<value_type*>(&e)) {} cpq_operation(operation_type t)285 cpq_operation(operation_type t) : type(t) {} 286 }; 287 288 class my_functor_t { 289 concurrent_priority_queue<T, Compare, A> *cpq; 290 public: my_functor_t()291 my_functor_t() {} my_functor_t(concurrent_priority_queue<T,Compare,A> * cpq_)292 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {} operator()293 void operator()(cpq_operation* op_list) { 294 cpq->handle_operations(op_list); 295 } 296 }; 297 298 typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t; 299 aggregator_t my_aggregator; 300 //! Padding added to avoid false sharing 301 char padding1[NFS_MaxLineSize - sizeof(aggregator_t)]; 302 //! The point at which unsorted elements begin 303 size_type mark; 304 __TBB_atomic size_type my_size; 305 Compare compare; 306 //! Padding added to avoid false sharing 307 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)]; 308 //! Storage for the heap of elements in queue, plus unheapified elements 309 /** data has the following structure: 310 311 binary unheapified 312 heap elements 313 ____|_______|____ 314 | | | 315 v v v 316 [_|...|_|_|...|_| |...| ] 317 0 ^ ^ ^ 318 | | |__capacity 319 | |__my_size 320 |__mark 321 322 Thus, data stores the binary heap starting at position 0 through 323 mark-1 (it may be empty). Then there are 0 or more elements 324 that have not yet been inserted into the heap, in positions 325 mark through my_size-1. */ 326 typedef std::vector<value_type, allocator_type> vector_t; 327 vector_t data; 328 handle_operations(cpq_operation * op_list)329 void handle_operations(cpq_operation *op_list) { 330 cpq_operation *tmp, *pop_list=NULL; 331 332 __TBB_ASSERT(mark == data.size(), NULL); 333 334 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops. 335 while (op_list) { 336 // ITT note: &(op_list->status) tag is used to cover accesses to op_list 337 // node. This thread is going to handle the operation, and so will acquire it 338 // and perform the associated operation w/o triggering a race condition; the 339 // thread that created the operation is waiting on the status field, so when 340 // this thread is done with the operation, it will perform a 341 // store_with_release to give control back to the waiting thread in 342 // aggregator::insert_operation. 343 call_itt_notify(acquired, &(op_list->status)); 344 __TBB_ASSERT(op_list->type != INVALID_OP, NULL); 345 tmp = op_list; 346 op_list = itt_hide_load_word(op_list->next); 347 if (tmp->type == POP_OP) { 348 if (mark < data.size() && 349 compare(data[0], data[data.size()-1])) { 350 // there are newly pushed elems and the last one 351 // is higher than top 352 *(tmp->elem) = move(data[data.size()-1]); 353 __TBB_store_with_release(my_size, my_size-1); 354 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 355 data.pop_back(); 356 __TBB_ASSERT(mark<=data.size(), NULL); 357 } 358 else { // no convenient item to pop; postpone 359 itt_hide_store_word(tmp->next, pop_list); 360 pop_list = tmp; 361 } 362 } else { // PUSH_OP or PUSH_RVALUE_OP 363 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" ); 364 __TBB_TRY{ 365 if (tmp->type == PUSH_OP) { 366 data.push_back(*(tmp->elem)); 367 } else { 368 data.push_back(move(*(tmp->elem))); 369 } 370 __TBB_store_with_release(my_size, my_size + 1); 371 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 372 } __TBB_CATCH(...) { 373 itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); 374 } 375 } 376 } 377 378 // second pass processes pop operations 379 while (pop_list) { 380 tmp = pop_list; 381 pop_list = itt_hide_load_word(pop_list->next); 382 __TBB_ASSERT(tmp->type == POP_OP, NULL); 383 if (data.empty()) { 384 itt_store_word_with_release(tmp->status, uintptr_t(FAILED)); 385 } 386 else { 387 __TBB_ASSERT(mark<=data.size(), NULL); 388 if (mark < data.size() && 389 compare(data[0], data[data.size()-1])) { 390 // there are newly pushed elems and the last one is 391 // higher than top 392 *(tmp->elem) = move(data[data.size()-1]); 393 __TBB_store_with_release(my_size, my_size-1); 394 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 395 data.pop_back(); 396 } 397 else { // extract top and push last element down heap 398 *(tmp->elem) = move(data[0]); 399 __TBB_store_with_release(my_size, my_size-1); 400 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED)); 401 reheap(); 402 } 403 } 404 } 405 406 // heapify any leftover pushed elements before doing the next 407 // batch of operations 408 if (mark<data.size()) heapify(); 409 __TBB_ASSERT(mark == data.size(), NULL); 410 } 411 412 //! Merge unsorted elements into heap heapify()413 void heapify() { 414 if (!mark && data.size()>0) mark = 1; 415 for (; mark<data.size(); ++mark) { 416 // for each unheapified element under size 417 size_type cur_pos = mark; 418 value_type to_place = move(data[mark]); 419 do { // push to_place up the heap 420 size_type parent = (cur_pos-1)>>1; 421 if (!compare(data[parent], to_place)) break; 422 data[cur_pos] = move(data[parent]); 423 cur_pos = parent; 424 } while( cur_pos ); 425 data[cur_pos] = move(to_place); 426 } 427 } 428 429 //! Re-heapify after an extraction 430 /** Re-heapify by pushing last element down the heap from the root. */ reheap()431 void reheap() { 432 size_type cur_pos=0, child=1; 433 434 while (child < mark) { 435 size_type target = child; 436 if (child+1 < mark && compare(data[child], data[child+1])) 437 ++target; 438 // target now has the higher priority child 439 if (compare(data[target], data[data.size()-1])) break; 440 data[cur_pos] = move(data[target]); 441 cur_pos = target; 442 child = (cur_pos<<1)+1; 443 } 444 if (cur_pos != data.size()-1) 445 data[cur_pos] = move(data[data.size()-1]); 446 data.pop_back(); 447 if (mark > data.size()) mark = data.size(); 448 } 449 }; 450 451 } // namespace interface5 452 453 using interface5::concurrent_priority_queue; 454 455 } // namespace tbb 456 457 #endif /* __TBB_concurrent_priority_queue_H */ 458