1 /*
2     Copyright (c) 2005-2017 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_concurrent_priority_queue_H
22 #define __TBB_concurrent_priority_queue_H
23 
24 #include "atomic.h"
25 #include "cache_aligned_allocator.h"
26 #include "tbb_exception.h"
27 #include "tbb_stddef.h"
28 #include "tbb_profiling.h"
29 #include "internal/_aggregator_impl.h"
30 #include <vector>
31 #include <iterator>
32 #include <functional>
33 #include __TBB_STD_SWAP_HEADER
34 
35 #if __TBB_INITIALIZER_LISTS_PRESENT
36     #include <initializer_list>
37 #endif
38 
39 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
40     #include <type_traits>
41 #endif
42 
43 namespace tbb {
44 namespace interface5 {
45 namespace internal {
46 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
47     template<typename T, bool C = std::is_copy_constructible<T>::value>
48     struct use_element_copy_constructor {
49         typedef tbb::internal::true_type type;
50     };
51     template<typename T>
52     struct use_element_copy_constructor <T,false> {
53         typedef tbb::internal::false_type type;
54     };
55 #else
56     template<typename>
57     struct use_element_copy_constructor {
58         typedef tbb::internal::true_type type;
59     };
60 #endif
61 } // namespace internal
62 
63 using namespace tbb::internal;
64 
65 //! Concurrent priority queue
66 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
67 class concurrent_priority_queue {
68  public:
69     //! Element type in the queue.
70     typedef T value_type;
71 
72     //! Reference type
73     typedef T& reference;
74 
75     //! Const reference type
76     typedef const T& const_reference;
77 
78     //! Integral type for representing size of the queue.
79     typedef size_t size_type;
80 
81     //! Difference type for iterator
82     typedef ptrdiff_t difference_type;
83 
84     //! Allocator type
85     typedef A allocator_type;
86 
87     //! Constructs a new concurrent_priority_queue with default capacity
88     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
89     {
90         my_aggregator.initialize_handler(my_functor_t(this));
91     }
92 
93     //! Constructs a new concurrent_priority_queue with init_sz capacity
94     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
95         mark(0), my_size(0), data(a)
96     {
97         data.reserve(init_capacity);
98         my_aggregator.initialize_handler(my_functor_t(this));
99     }
100 
101     //! [begin,end) constructor
102     template<typename InputIterator>
103     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
104         mark(0), data(begin, end, a)
105     {
106         my_aggregator.initialize_handler(my_functor_t(this));
107         heapify();
108         my_size = data.size();
109     }
110 
111 #if __TBB_INITIALIZER_LISTS_PRESENT
112     //! Constructor from std::initializer_list
113     concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
114         mark(0),data(init_list.begin(), init_list.end(), a)
115     {
116         my_aggregator.initialize_handler(my_functor_t(this));
117         heapify();
118         my_size = data.size();
119     }
120 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
121 
122     //! Copy constructor
123     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
124     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
125         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
126     {
127         my_aggregator.initialize_handler(my_functor_t(this));
128         heapify();
129     }
130 
131     //! Copy constructor with specific allocator
132     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
133     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
134         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
135     {
136         my_aggregator.initialize_handler(my_functor_t(this));
137         heapify();
138     }
139 
140     //! Assignment operator
141     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
142     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
143         if (this != &src) {
144             vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
145             mark = src.mark;
146             my_size = src.my_size;
147         }
148         return *this;
149     }
150 
151 #if __TBB_CPP11_RVALUE_REF_PRESENT
152     //! Move constructor
153     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
154     concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
155         my_size(src.my_size), data(std::move(src.data))
156     {
157         my_aggregator.initialize_handler(my_functor_t(this));
158     }
159 
160     //! Move constructor with specific allocator
161     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
162     concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
163         my_size(src.my_size),
164 #if __TBB_ALLOCATOR_TRAITS_PRESENT
165         data(std::move(src.data), a)
166 #else
167     // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
168     // It seems that the reason is absence of support of allocator_traits (stateful allocators).
169         data(a)
170 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
171     {
172         my_aggregator.initialize_handler(my_functor_t(this));
173 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
174         if (a != src.data.get_allocator()){
175             data.reserve(src.data.size());
176             data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
177         }else{
178             data = std::move(src.data);
179         }
180 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
181     }
182 
183     //! Move assignment operator
184     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
185     concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
186         if (this != &src) {
187             mark = src.mark;
188             my_size = src.my_size;
189 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
190             if (data.get_allocator() != src.data.get_allocator()){
191                 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
192             }else
193 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
194             {
195                 data = std::move(src.data);
196             }
197         }
198         return *this;
199     }
200 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
201 
202     //! Assign the queue from [begin,end) range, not thread-safe
203     template<typename InputIterator>
204     void assign(InputIterator begin, InputIterator end) {
205         vector_t(begin, end, data.get_allocator()).swap(data);
206         mark = 0;
207         my_size = data.size();
208         heapify();
209     }
210 
211 #if __TBB_INITIALIZER_LISTS_PRESENT
212     //! Assign the queue from std::initializer_list, not thread-safe
213     void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
214 
215     //! Assign from std::initializer_list, not thread-safe
216     concurrent_priority_queue& operator=(std::initializer_list<T> il) {
217         this->assign(il.begin(), il.end());
218         return *this;
219     }
220 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
221 
222     //! Returns true if empty, false otherwise
223     /** Returned value may not reflect results of pending operations.
224         This operation reads shared data and will trigger a race condition. */
225     bool empty() const { return size()==0; }
226 
227     //! Returns the current number of elements contained in the queue
228     /** Returned value may not reflect results of pending operations.
229         This operation reads shared data and will trigger a race condition. */
230     size_type size() const { return __TBB_load_with_acquire(my_size); }
231 
232     //! Pushes elem onto the queue, increasing capacity of queue if necessary
233     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
234     void push(const_reference elem) {
235 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
236         __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
237 #endif
238         cpq_operation op_data(elem, PUSH_OP);
239         my_aggregator.execute(&op_data);
240         if (op_data.status == FAILED) // exception thrown
241             throw_exception(eid_bad_alloc);
242     }
243 
244 #if __TBB_CPP11_RVALUE_REF_PRESENT
245     //! Pushes elem onto the queue, increasing capacity of queue if necessary
246     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
247     void push(value_type &&elem) {
248         cpq_operation op_data(elem, PUSH_RVALUE_OP);
249         my_aggregator.execute(&op_data);
250         if (op_data.status == FAILED) // exception thrown
251             throw_exception(eid_bad_alloc);
252     }
253 
254 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
255     //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
256     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
257     template<typename... Args>
258     void emplace(Args&&... args) {
259         push(value_type(std::forward<Args>(args)...));
260     }
261 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
262 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
263 
264     //! Gets a reference to and removes highest priority element
265     /** If a highest priority element was found, sets elem and returns true,
266         otherwise returns false.
267         This operation can be safely used concurrently with other push, try_pop or emplace operations. */
268     bool try_pop(reference elem) {
269         cpq_operation op_data(POP_OP);
270         op_data.elem = &elem;
271         my_aggregator.execute(&op_data);
272         return op_data.status==SUCCEEDED;
273     }
274 
275     //! Clear the queue; not thread-safe
276     /** This operation is unsafe if there are pending concurrent operations on the queue.
277         Resets size, effectively emptying queue; does not free space.
278         May not clear elements added in pending operations. */
279     void clear() {
280         data.clear();
281         mark = 0;
282         my_size = 0;
283     }
284 
285     //! Swap this queue with another; not thread-safe
286     /** This operation is unsafe if there are pending concurrent operations on the queue. */
287     void swap(concurrent_priority_queue& q) {
288         using std::swap;
289         data.swap(q.data);
290         swap(mark, q.mark);
291         swap(my_size, q.my_size);
292     }
293 
294     //! Return allocator object
295     allocator_type get_allocator() const { return data.get_allocator(); }
296 
297  private:
298     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
299     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
300 
301     class cpq_operation : public aggregated_operation<cpq_operation> {
302      public:
303         operation_type type;
304         union {
305             value_type *elem;
306             size_type sz;
307         };
308         cpq_operation(const_reference e, operation_type t) :
309             type(t), elem(const_cast<value_type*>(&e)) {}
310         cpq_operation(operation_type t) : type(t) {}
311     };
312 
313     class my_functor_t {
314         concurrent_priority_queue<T, Compare, A> *cpq;
315      public:
316         my_functor_t() {}
317         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
318         void operator()(cpq_operation* op_list) {
319             cpq->handle_operations(op_list);
320         }
321     };
322 
323     typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
324     aggregator_t my_aggregator;
325     //! Padding added to avoid false sharing
326     char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
327     //! The point at which unsorted elements begin
328     size_type mark;
329     __TBB_atomic size_type my_size;
330     Compare compare;
331     //! Padding added to avoid false sharing
332     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
333     //! Storage for the heap of elements in queue, plus unheapified elements
334     /** data has the following structure:
335 
336          binary unheapified
337           heap   elements
338         ____|_______|____
339         |       |       |
340         v       v       v
341         [_|...|_|_|...|_| |...| ]
342          0       ^       ^       ^
343                  |       |       |__capacity
344                  |       |__my_size
345                  |__mark
346 
347         Thus, data stores the binary heap starting at position 0 through
348         mark-1 (it may be empty).  Then there are 0 or more elements
349         that have not yet been inserted into the heap, in positions
350         mark through my_size-1. */
351     typedef std::vector<value_type, allocator_type> vector_t;
352     vector_t data;
353 
354     void handle_operations(cpq_operation *op_list) {
355         cpq_operation *tmp, *pop_list=NULL;
356 
357         __TBB_ASSERT(mark == data.size(), NULL);
358 
359         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
360         while (op_list) {
361             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
362             // node. This thread is going to handle the operation, and so will acquire it
363             // and perform the associated operation w/o triggering a race condition; the
364             // thread that created the operation is waiting on the status field, so when
365             // this thread is done with the operation, it will perform a
366             // store_with_release to give control back to the waiting thread in
367             // aggregator::insert_operation.
368             call_itt_notify(acquired, &(op_list->status));
369             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
370             tmp = op_list;
371             op_list = itt_hide_load_word(op_list->next);
372             if (tmp->type == POP_OP) {
373                 if (mark < data.size() &&
374                     compare(data[0], data[data.size()-1])) {
375                     // there are newly pushed elems and the last one
376                     // is higher than top
377                     *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
378                     __TBB_store_with_release(my_size, my_size-1);
379                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
380                     data.pop_back();
381                     __TBB_ASSERT(mark<=data.size(), NULL);
382                 }
383                 else { // no convenient item to pop; postpone
384                     itt_hide_store_word(tmp->next, pop_list);
385                     pop_list = tmp;
386                 }
387             } else { // PUSH_OP or PUSH_RVALUE_OP
388                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
389                 __TBB_TRY{
390                     if (tmp->type == PUSH_OP) {
391                         push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
392                     } else {
393                         data.push_back(tbb::internal::move(*(tmp->elem)));
394                     }
395                     __TBB_store_with_release(my_size, my_size + 1);
396                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
397                 } __TBB_CATCH(...) {
398                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
399                 }
400             }
401         }
402 
403         // second pass processes pop operations
404         while (pop_list) {
405             tmp = pop_list;
406             pop_list = itt_hide_load_word(pop_list->next);
407             __TBB_ASSERT(tmp->type == POP_OP, NULL);
408             if (data.empty()) {
409                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
410             }
411             else {
412                 __TBB_ASSERT(mark<=data.size(), NULL);
413                 if (mark < data.size() &&
414                     compare(data[0], data[data.size()-1])) {
415                     // there are newly pushed elems and the last one is
416                     // higher than top
417                     *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
418                     __TBB_store_with_release(my_size, my_size-1);
419                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
420                     data.pop_back();
421                 }
422                 else { // extract top and push last element down heap
423                     *(tmp->elem) = tbb::internal::move(data[0]);
424                     __TBB_store_with_release(my_size, my_size-1);
425                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
426                     reheap();
427                 }
428             }
429         }
430 
431         // heapify any leftover pushed elements before doing the next
432         // batch of operations
433         if (mark<data.size()) heapify();
434         __TBB_ASSERT(mark == data.size(), NULL);
435     }
436 
437     //! Merge unsorted elements into heap
438     void heapify() {
439         if (!mark && data.size()>0) mark = 1;
440         for (; mark<data.size(); ++mark) {
441             // for each unheapified element under size
442             size_type cur_pos = mark;
443             value_type to_place = tbb::internal::move(data[mark]);
444             do { // push to_place up the heap
445                 size_type parent = (cur_pos-1)>>1;
446                 if (!compare(data[parent], to_place)) break;
447                 data[cur_pos] = tbb::internal::move(data[parent]);
448                 cur_pos = parent;
449             } while( cur_pos );
450             data[cur_pos] = tbb::internal::move(to_place);
451         }
452     }
453 
454     //! Re-heapify after an extraction
455     /** Re-heapify by pushing last element down the heap from the root. */
456     void reheap() {
457         size_type cur_pos=0, child=1;
458 
459         while (child < mark) {
460             size_type target = child;
461             if (child+1 < mark && compare(data[child], data[child+1]))
462                 ++target;
463             // target now has the higher priority child
464             if (compare(data[target], data[data.size()-1])) break;
465             data[cur_pos] = tbb::internal::move(data[target]);
466             cur_pos = target;
467             child = (cur_pos<<1)+1;
468         }
469         if (cur_pos != data.size()-1)
470             data[cur_pos] = tbb::internal::move(data[data.size()-1]);
471         data.pop_back();
472         if (mark > data.size()) mark = data.size();
473     }
474 
475     void push_back_helper(const T& t, tbb::internal::true_type) {
476         data.push_back(t);
477     }
478 
479     void push_back_helper(const T&, tbb::internal::false_type) {
480         __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
481     }
482 };
483 
484 } // namespace interface5
485 
486 using interface5::concurrent_priority_queue;
487 
488 } // namespace tbb
489 
490 #endif /* __TBB_concurrent_priority_queue_H */
491