1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_priority_queue_H
18 #define __TBB_concurrent_priority_queue_H
19 
20 #define __TBB_concurrent_priority_queue_H_include_area
21 #include "internal/_warning_suppress_enable_notice.h"
22 
23 #include "atomic.h"
24 #include "cache_aligned_allocator.h"
25 #include "tbb_exception.h"
26 #include "tbb_stddef.h"
27 #include "tbb_profiling.h"
28 #include "internal/_aggregator_impl.h"
29 #include "internal/_template_helpers.h"
30 #include "internal/_allocator_traits.h"
31 #include <vector>
32 #include <iterator>
33 #include <functional>
34 #include __TBB_STD_SWAP_HEADER
35 
36 #if __TBB_INITIALIZER_LISTS_PRESENT
37     #include <initializer_list>
38 #endif
39 
40 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
41     #include <type_traits>
42 #endif
43 
44 namespace tbb {
45 namespace interface5 {
46 namespace internal {
47 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
48     template<typename T, bool C = std::is_copy_constructible<T>::value>
49     struct use_element_copy_constructor {
50         typedef tbb::internal::true_type type;
51     };
52     template<typename T>
53     struct use_element_copy_constructor <T,false> {
54         typedef tbb::internal::false_type type;
55     };
56 #else
57     template<typename>
58     struct use_element_copy_constructor {
59         typedef tbb::internal::true_type type;
60     };
61 #endif
62 } // namespace internal
63 
64 using namespace tbb::internal;
65 
66 //! Concurrent priority queue
67 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
68 class concurrent_priority_queue {
69  public:
70     //! Element type in the queue.
71     typedef T value_type;
72 
73     //! Reference type
74     typedef T& reference;
75 
76     //! Const reference type
77     typedef const T& const_reference;
78 
79     //! Integral type for representing size of the queue.
80     typedef size_t size_type;
81 
82     //! Difference type for iterator
83     typedef ptrdiff_t difference_type;
84 
85     //! Allocator type
86     typedef A allocator_type;
87 
88     //! Constructs a new concurrent_priority_queue with default capacity
89     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(), data(a)
90     {
91         my_aggregator.initialize_handler(my_functor_t(this));
92     }
93 
94     //! Constructs a new concurrent_priority_queue with default capacity
95     explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a)
96     {
97         my_aggregator.initialize_handler(my_functor_t(this));
98     }
99 
100     //! Constructs a new concurrent_priority_queue with init_sz capacity
101     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
102         mark(0), my_size(0), compare(), data(a)
103     {
104         data.reserve(init_capacity);
105         my_aggregator.initialize_handler(my_functor_t(this));
106     }
107 
108     //! Constructs a new concurrent_priority_queue with init_sz capacity
109     explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) :
110         mark(0), my_size(0), compare(c), data(a)
111     {
112         data.reserve(init_capacity);
113         my_aggregator.initialize_handler(my_functor_t(this));
114     }
115 
116     //! [begin,end) constructor
117     template<typename InputIterator>
118     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
119         mark(0), compare(), data(begin, end, a)
120     {
121         my_aggregator.initialize_handler(my_functor_t(this));
122         heapify();
123         my_size = data.size();
124     }
125 
126     //! [begin,end) constructor
127     template<typename InputIterator>
128     concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) :
129         mark(0), compare(c), data(begin, end, a)
130     {
131         my_aggregator.initialize_handler(my_functor_t(this));
132         heapify();
133         my_size = data.size();
134     }
135 
136 #if __TBB_INITIALIZER_LISTS_PRESENT
137     //! Constructor from std::initializer_list
138     concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
139         mark(0), compare(), data(init_list.begin(), init_list.end(), a)
140     {
141         my_aggregator.initialize_handler(my_functor_t(this));
142         heapify();
143         my_size = data.size();
144     }
145 
146     //! Constructor from std::initializer_list
147     concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) :
148         mark(0), compare(c), data(init_list.begin(), init_list.end(), a)
149     {
150         my_aggregator.initialize_handler(my_functor_t(this));
151         heapify();
152         my_size = data.size();
153     }
154 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
155 
156     //! Copy constructor
157     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
158     concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
159         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
160     {
161         my_aggregator.initialize_handler(my_functor_t(this));
162         heapify();
163     }
164 
165     //! Copy constructor with specific allocator
166     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
167     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
168         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
169     {
170         my_aggregator.initialize_handler(my_functor_t(this));
171         heapify();
172     }
173 
174     //! Assignment operator
175     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
176     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
177         if (this != &src) {
178             vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
179             mark = src.mark;
180             my_size = src.my_size;
181         }
182         return *this;
183     }
184 
185 #if __TBB_CPP11_RVALUE_REF_PRESENT
186     //! Move constructor
187     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
188     concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
189         my_size(src.my_size), data(std::move(src.data))
190     {
191         my_aggregator.initialize_handler(my_functor_t(this));
192     }
193 
194     //! Move constructor with specific allocator
195     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
196     concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
197         my_size(src.my_size),
198 #if __TBB_ALLOCATOR_TRAITS_PRESENT
199         data(std::move(src.data), a)
200 #else
201     // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
202     // It seems that the reason is absence of support of allocator_traits (stateful allocators).
203         data(a)
204 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
205     {
206         my_aggregator.initialize_handler(my_functor_t(this));
207 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
208         if (a != src.data.get_allocator()){
209             data.reserve(src.data.size());
210             data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
211         }else{
212             data = std::move(src.data);
213         }
214 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
215     }
216 
217     //! Move assignment operator
218     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
219     concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
220         if (this != &src) {
221             mark = src.mark;
222             my_size = src.my_size;
223 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
224             if (data.get_allocator() != src.data.get_allocator()){
225                 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
226             }else
227 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
228             {
229                 data = std::move(src.data);
230             }
231         }
232         return *this;
233     }
234 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
235 
236     //! Assign the queue from [begin,end) range, not thread-safe
237     template<typename InputIterator>
238     void assign(InputIterator begin, InputIterator end) {
239         vector_t(begin, end, data.get_allocator()).swap(data);
240         mark = 0;
241         my_size = data.size();
242         heapify();
243     }
244 
245 #if __TBB_INITIALIZER_LISTS_PRESENT
246     //! Assign the queue from std::initializer_list, not thread-safe
247     void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
248 
249     //! Assign from std::initializer_list, not thread-safe
250     concurrent_priority_queue& operator=(std::initializer_list<T> il) {
251         this->assign(il.begin(), il.end());
252         return *this;
253     }
254 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
255 
256     //! Returns true if empty, false otherwise
257     /** Returned value may not reflect results of pending operations.
258         This operation reads shared data and will trigger a race condition. */
259     bool empty() const { return size()==0; }
260 
261     //! Returns the current number of elements contained in the queue
262     /** Returned value may not reflect results of pending operations.
263         This operation reads shared data and will trigger a race condition. */
264     size_type size() const { return __TBB_load_with_acquire(my_size); }
265 
266     //! Pushes elem onto the queue, increasing capacity of queue if necessary
267     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
268     void push(const_reference elem) {
269 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
270         __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
271 #endif
272         cpq_operation op_data(elem, PUSH_OP);
273         my_aggregator.execute(&op_data);
274         if (op_data.status == FAILED) // exception thrown
275             throw_exception(eid_bad_alloc);
276     }
277 
278 #if __TBB_CPP11_RVALUE_REF_PRESENT
279     //! Pushes elem onto the queue, increasing capacity of queue if necessary
280     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
281     void push(value_type &&elem) {
282         cpq_operation op_data(elem, PUSH_RVALUE_OP);
283         my_aggregator.execute(&op_data);
284         if (op_data.status == FAILED) // exception thrown
285             throw_exception(eid_bad_alloc);
286     }
287 
288 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
289     //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
290     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
291     template<typename... Args>
292     void emplace(Args&&... args) {
293         push(value_type(std::forward<Args>(args)...));
294     }
295 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
296 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
297 
298     //! Gets a reference to and removes highest priority element
299     /** If a highest priority element was found, sets elem and returns true,
300         otherwise returns false.
301         This operation can be safely used concurrently with other push, try_pop or emplace operations. */
302     bool try_pop(reference elem) {
303         cpq_operation op_data(POP_OP);
304         op_data.elem = &elem;
305         my_aggregator.execute(&op_data);
306         return op_data.status==SUCCEEDED;
307     }
308 
309     //! Clear the queue; not thread-safe
310     /** This operation is unsafe if there are pending concurrent operations on the queue.
311         Resets size, effectively emptying queue; does not free space.
312         May not clear elements added in pending operations. */
313     void clear() {
314         data.clear();
315         mark = 0;
316         my_size = 0;
317     }
318 
319     //! Swap this queue with another; not thread-safe
320     /** This operation is unsafe if there are pending concurrent operations on the queue. */
321     void swap(concurrent_priority_queue& q) {
322         using std::swap;
323         data.swap(q.data);
324         swap(mark, q.mark);
325         swap(my_size, q.my_size);
326     }
327 
328     //! Return allocator object
329     allocator_type get_allocator() const { return data.get_allocator(); }
330 
331  private:
332     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
333     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
334 
335     class cpq_operation : public aggregated_operation<cpq_operation> {
336      public:
337         operation_type type;
338         union {
339             value_type *elem;
340             size_type sz;
341         };
342         cpq_operation(const_reference e, operation_type t) :
343             type(t), elem(const_cast<value_type*>(&e)) {}
344         cpq_operation(operation_type t) : type(t) {}
345     };
346 
347     class my_functor_t {
348         concurrent_priority_queue<T, Compare, A> *cpq;
349      public:
350         my_functor_t() {}
351         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
352         void operator()(cpq_operation* op_list) {
353             cpq->handle_operations(op_list);
354         }
355     };
356 
357     typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
358     aggregator_t my_aggregator;
359     //! Padding added to avoid false sharing
360     char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
361     //! The point at which unsorted elements begin
362     size_type mark;
363     __TBB_atomic size_type my_size;
364     Compare compare;
365     //! Padding added to avoid false sharing
366     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
367     //! Storage for the heap of elements in queue, plus unheapified elements
368     /** data has the following structure:
369 
370          binary unheapified
371           heap   elements
372         ____|_______|____
373         |       |       |
374         v       v       v
375         [_|...|_|_|...|_| |...| ]
376          0       ^       ^       ^
377                  |       |       |__capacity
378                  |       |__my_size
379                  |__mark
380 
381         Thus, data stores the binary heap starting at position 0 through
382         mark-1 (it may be empty).  Then there are 0 or more elements
383         that have not yet been inserted into the heap, in positions
384         mark through my_size-1. */
385     typedef std::vector<value_type, allocator_type> vector_t;
386     vector_t data;
387 
388     void handle_operations(cpq_operation *op_list) {
389         cpq_operation *tmp, *pop_list=NULL;
390 
391         __TBB_ASSERT(mark == data.size(), NULL);
392 
393         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
394         while (op_list) {
395             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
396             // node. This thread is going to handle the operation, and so will acquire it
397             // and perform the associated operation w/o triggering a race condition; the
398             // thread that created the operation is waiting on the status field, so when
399             // this thread is done with the operation, it will perform a
400             // store_with_release to give control back to the waiting thread in
401             // aggregator::insert_operation.
402             call_itt_notify(acquired, &(op_list->status));
403             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
404             tmp = op_list;
405             op_list = itt_hide_load_word(op_list->next);
406             if (tmp->type == POP_OP) {
407                 if (mark < data.size() &&
408                     compare(data[0], data[data.size()-1])) {
409                     // there are newly pushed elems and the last one
410                     // is higher than top
411                     *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
412                     __TBB_store_with_release(my_size, my_size-1);
413                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
414                     data.pop_back();
415                     __TBB_ASSERT(mark<=data.size(), NULL);
416                 }
417                 else { // no convenient item to pop; postpone
418                     itt_hide_store_word(tmp->next, pop_list);
419                     pop_list = tmp;
420                 }
421             } else { // PUSH_OP or PUSH_RVALUE_OP
422                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
423                 __TBB_TRY{
424                     if (tmp->type == PUSH_OP) {
425                         push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
426                     } else {
427                         data.push_back(tbb::internal::move(*(tmp->elem)));
428                     }
429                     __TBB_store_with_release(my_size, my_size + 1);
430                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
431                 } __TBB_CATCH(...) {
432                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
433                 }
434             }
435         }
436 
437         // second pass processes pop operations
438         while (pop_list) {
439             tmp = pop_list;
440             pop_list = itt_hide_load_word(pop_list->next);
441             __TBB_ASSERT(tmp->type == POP_OP, NULL);
442             if (data.empty()) {
443                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
444             }
445             else {
446                 __TBB_ASSERT(mark<=data.size(), NULL);
447                 if (mark < data.size() &&
448                     compare(data[0], data[data.size()-1])) {
449                     // there are newly pushed elems and the last one is
450                     // higher than top
451                     *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
452                     __TBB_store_with_release(my_size, my_size-1);
453                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
454                     data.pop_back();
455                 }
456                 else { // extract top and push last element down heap
457                     *(tmp->elem) = tbb::internal::move(data[0]);
458                     __TBB_store_with_release(my_size, my_size-1);
459                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
460                     reheap();
461                 }
462             }
463         }
464 
465         // heapify any leftover pushed elements before doing the next
466         // batch of operations
467         if (mark<data.size()) heapify();
468         __TBB_ASSERT(mark == data.size(), NULL);
469     }
470 
471     //! Merge unsorted elements into heap
472     void heapify() {
473         if (!mark && data.size()>0) mark = 1;
474         for (; mark<data.size(); ++mark) {
475             // for each unheapified element under size
476             size_type cur_pos = mark;
477             value_type to_place = tbb::internal::move(data[mark]);
478             do { // push to_place up the heap
479                 size_type parent = (cur_pos-1)>>1;
480                 if (!compare(data[parent], to_place)) break;
481                 data[cur_pos] = tbb::internal::move(data[parent]);
482                 cur_pos = parent;
483             } while( cur_pos );
484             data[cur_pos] = tbb::internal::move(to_place);
485         }
486     }
487 
488     //! Re-heapify after an extraction
489     /** Re-heapify by pushing last element down the heap from the root. */
490     void reheap() {
491         size_type cur_pos=0, child=1;
492 
493         while (child < mark) {
494             size_type target = child;
495             if (child+1 < mark && compare(data[child], data[child+1]))
496                 ++target;
497             // target now has the higher priority child
498             if (compare(data[target], data[data.size()-1])) break;
499             data[cur_pos] = tbb::internal::move(data[target]);
500             cur_pos = target;
501             child = (cur_pos<<1)+1;
502         }
503         if (cur_pos != data.size()-1)
504             data[cur_pos] = tbb::internal::move(data[data.size()-1]);
505         data.pop_back();
506         if (mark > data.size()) mark = data.size();
507     }
508 
509     void push_back_helper(const T& t, tbb::internal::true_type) {
510         data.push_back(t);
511     }
512 
513     void push_back_helper(const T&, tbb::internal::false_type) {
514         __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
515     }
516 };
517 
518 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
519 namespace internal {
520 
521 template<typename T, typename... Args>
522 using priority_queue_t = concurrent_priority_queue<
523     T,
524     std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >,
525                         pack_element_t<0, Args...>, std::less<T> >,
526     std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >,
527                          pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> >
528 >;
529 }
530 
531 // Deduction guide for the constructor from two iterators
532 template<typename InputIterator,
533          typename T = typename std::iterator_traits<InputIterator>::value_type,
534          typename... Args
535 > concurrent_priority_queue(InputIterator, InputIterator, Args...)
536 -> internal::priority_queue_t<T, Args...>;
537 
538 template<typename T, typename CompareOrAllocalor>
539 concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor)
540 -> internal::priority_queue_t<T, CompareOrAllocalor>;
541 
542 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
543 } // namespace interface5
544 
545 using interface5::concurrent_priority_queue;
546 
547 } // namespace tbb
548 
549 #include "internal/_warning_suppress_disable_notice.h"
550 #undef __TBB_concurrent_priority_queue_H_include_area
551 
552 #endif /* __TBB_concurrent_priority_queue_H */
553