1 /*
2     Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
3 
4     This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5     you can redistribute it and/or modify it under the terms of the GNU General Public License
6     version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
7     distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9     See  the GNU General Public License for more details.   You should have received a copy of
10     the  GNU General Public License along with Threading Building Blocks; if not, write to the
11     Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA
12 
13     As a special exception,  you may use this file  as part of a free software library without
14     restriction.  Specifically,  if other files instantiate templates  or use macros or inline
15     functions from this file, or you compile this file and link it with other files to produce
16     an executable,  this file does not by itself cause the resulting executable to be covered
17     by the GNU General Public License. This exception does not however invalidate any other
18     reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_concurrent_priority_queue_H
22 #define __TBB_concurrent_priority_queue_H
23 
24 #include "atomic.h"
25 #include "cache_aligned_allocator.h"
26 #include "tbb_exception.h"
27 #include "tbb_stddef.h"
28 #include "tbb_profiling.h"
29 #include "internal/_aggregator_impl.h"
30 #include <vector>
31 #include <iterator>
32 #include <functional>
33 
34 #if __TBB_INITIALIZER_LISTS_PRESENT
35     #include <initializer_list>
36 #endif
37 
38 namespace tbb {
39 namespace interface5 {
40 
41 using namespace tbb::internal;
42 
43 //! Concurrent priority queue
44 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
45 class concurrent_priority_queue {
46  public:
47     //! Element type in the queue.
48     typedef T value_type;
49 
50     //! Reference type
51     typedef T& reference;
52 
53     //! Const reference type
54     typedef const T& const_reference;
55 
56     //! Integral type for representing size of the queue.
57     typedef size_t size_type;
58 
59     //! Difference type for iterator
60     typedef ptrdiff_t difference_type;
61 
62     //! Allocator type
63     typedef A allocator_type;
64 
65     //! Constructs a new concurrent_priority_queue with default capacity
66     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
67     {
68         my_aggregator.initialize_handler(my_functor_t(this));
69     }
70 
71     //! Constructs a new concurrent_priority_queue with init_sz capacity
72     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
73         mark(0), my_size(0), data(a)
74     {
75         data.reserve(init_capacity);
76         my_aggregator.initialize_handler(my_functor_t(this));
77     }
78 
79     //! [begin,end) constructor
80     template<typename InputIterator>
81     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
82         mark(0), data(begin, end, a)
83     {
84         my_aggregator.initialize_handler(my_functor_t(this));
85         heapify();
86         my_size = data.size();
87     }
88 
89 #if __TBB_INITIALIZER_LISTS_PRESENT
90     //! Constructor from std::initializer_list
91     concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
92         mark(0),data(init_list.begin(), init_list.end(), a)
93     {
94         my_aggregator.initialize_handler(my_functor_t(this));
95         heapify();
96         my_size = data.size();
97     }
98 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
99 
100     //! Copy constructor
101     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
concurrent_priority_queue(const concurrent_priority_queue & src)102     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
103         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
104     {
105         my_aggregator.initialize_handler(my_functor_t(this));
106         heapify();
107     }
108 
109     //! Copy constructor with specific allocator
110     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
concurrent_priority_queue(const concurrent_priority_queue & src,const allocator_type & a)111     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
112         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
113     {
114         my_aggregator.initialize_handler(my_functor_t(this));
115         heapify();
116     }
117 
118     //! Assignment operator
119     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
120     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
121         if (this != &src) {
122             vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
123             mark = src.mark;
124             my_size = src.my_size;
125         }
126         return *this;
127     }
128 
129 #if __TBB_CPP11_RVALUE_REF_PRESENT
130     //! Move constructor
131     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
concurrent_priority_queue(concurrent_priority_queue && src)132     concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
133         my_size(src.my_size), data(std::move(src.data))
134     {
135         my_aggregator.initialize_handler(my_functor_t(this));
136     }
137 
138     //! Move constructor with specific allocator
139     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
concurrent_priority_queue(concurrent_priority_queue && src,const allocator_type & a)140     concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
141         my_size(src.my_size),
142 #if __TBB_ALLOCATOR_TRAITS_PRESENT
143         data(std::move(src.data), a)
144 #else
145     // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
146     // It seems that the reason is absence of support of allocator_traits (stateful allocators).
147         data(a)
148 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
149     {
150         my_aggregator.initialize_handler(my_functor_t(this));
151 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
152         if (a != src.data.get_allocator()){
153             data.reserve(src.data.size());
154             data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
155         }else{
156             data = std::move(src.data);
157         }
158 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
159     }
160 
161     //! Move assignment operator
162     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
163     concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
164         if (this != &src) {
165             mark = src.mark;
166             my_size = src.my_size;
167 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
168             if (data.get_allocator() != src.data.get_allocator()){
169                 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
170             }else
171 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
172             {
173                 data = std::move(src.data);
174             }
175         }
176         return *this;
177     }
178 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
179 
180     //! Assign the queue from [begin,end) range, not thread-safe
181     template<typename InputIterator>
assign(InputIterator begin,InputIterator end)182     void assign(InputIterator begin, InputIterator end) {
183         vector_t(begin, end, data.get_allocator()).swap(data);
184         mark = 0;
185         my_size = data.size();
186         heapify();
187     }
188 
189 #if __TBB_INITIALIZER_LISTS_PRESENT
190     //! Assign the queue from std::initializer_list, not thread-safe
assign(std::initializer_list<T> il)191     void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
192 
193     //! Assign from std::initializer_list, not thread-safe
194     concurrent_priority_queue& operator=(std::initializer_list<T> il) {
195         this->assign(il.begin(), il.end());
196         return *this;
197     }
198 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
199 
200     //! Returns true if empty, false otherwise
201     /** Returned value may not reflect results of pending operations.
202         This operation reads shared data and will trigger a race condition. */
empty()203     bool empty() const { return size()==0; }
204 
205     //! Returns the current number of elements contained in the queue
206     /** Returned value may not reflect results of pending operations.
207         This operation reads shared data and will trigger a race condition. */
size()208     size_type size() const { return __TBB_load_with_acquire(my_size); }
209 
210     //! Pushes elem onto the queue, increasing capacity of queue if necessary
211     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
push(const_reference elem)212     void push(const_reference elem) {
213         cpq_operation op_data(elem, PUSH_OP);
214         my_aggregator.execute(&op_data);
215         if (op_data.status == FAILED) // exception thrown
216             throw_exception(eid_bad_alloc);
217     }
218 
219 #if __TBB_CPP11_RVALUE_REF_PRESENT
220     //! Pushes elem onto the queue, increasing capacity of queue if necessary
221     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
push(value_type && elem)222     void push(value_type &&elem) {
223         cpq_operation op_data(elem, PUSH_RVALUE_OP);
224         my_aggregator.execute(&op_data);
225         if (op_data.status == FAILED) // exception thrown
226             throw_exception(eid_bad_alloc);
227     }
228 
229 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
230     //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
231     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
232     template<typename... Args>
emplace(Args &&...args)233     void emplace(Args&&... args) {
234         push(value_type(std::forward<Args>(args)...));
235     }
236 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
237 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
238 
239     //! Gets a reference to and removes highest priority element
240     /** If a highest priority element was found, sets elem and returns true,
241         otherwise returns false.
242         This operation can be safely used concurrently with other push, try_pop or emplace operations. */
try_pop(reference elem)243     bool try_pop(reference elem) {
244         cpq_operation op_data(POP_OP);
245         op_data.elem = &elem;
246         my_aggregator.execute(&op_data);
247         return op_data.status==SUCCEEDED;
248     }
249 
250     //! Clear the queue; not thread-safe
251     /** This operation is unsafe if there are pending concurrent operations on the queue.
252         Resets size, effectively emptying queue; does not free space.
253         May not clear elements added in pending operations. */
clear()254     void clear() {
255         data.clear();
256         mark = 0;
257         my_size = 0;
258     }
259 
260     //! Swap this queue with another; not thread-safe
261     /** This operation is unsafe if there are pending concurrent operations on the queue. */
swap(concurrent_priority_queue & q)262     void swap(concurrent_priority_queue& q) {
263         using std::swap;
264         data.swap(q.data);
265         swap(mark, q.mark);
266         swap(my_size, q.my_size);
267     }
268 
269     //! Return allocator object
get_allocator()270     allocator_type get_allocator() const { return data.get_allocator(); }
271 
272  private:
273     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
274     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
275 
276     class cpq_operation : public aggregated_operation<cpq_operation> {
277      public:
278         operation_type type;
279         union {
280             value_type *elem;
281             size_type sz;
282         };
cpq_operation(const_reference e,operation_type t)283         cpq_operation(const_reference e, operation_type t) :
284             type(t), elem(const_cast<value_type*>(&e)) {}
cpq_operation(operation_type t)285         cpq_operation(operation_type t) : type(t) {}
286     };
287 
288     class my_functor_t {
289         concurrent_priority_queue<T, Compare, A> *cpq;
290      public:
my_functor_t()291         my_functor_t() {}
my_functor_t(concurrent_priority_queue<T,Compare,A> * cpq_)292         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
operator()293         void operator()(cpq_operation* op_list) {
294             cpq->handle_operations(op_list);
295         }
296     };
297 
298     typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
299     aggregator_t my_aggregator;
300     //! Padding added to avoid false sharing
301     char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
302     //! The point at which unsorted elements begin
303     size_type mark;
304     __TBB_atomic size_type my_size;
305     Compare compare;
306     //! Padding added to avoid false sharing
307     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
308     //! Storage for the heap of elements in queue, plus unheapified elements
309     /** data has the following structure:
310 
311          binary unheapified
312           heap   elements
313         ____|_______|____
314         |       |       |
315         v       v       v
316         [_|...|_|_|...|_| |...| ]
317          0       ^       ^       ^
318                  |       |       |__capacity
319                  |       |__my_size
320                  |__mark
321 
322         Thus, data stores the binary heap starting at position 0 through
323         mark-1 (it may be empty).  Then there are 0 or more elements
324         that have not yet been inserted into the heap, in positions
325         mark through my_size-1. */
326     typedef std::vector<value_type, allocator_type> vector_t;
327     vector_t data;
328 
handle_operations(cpq_operation * op_list)329     void handle_operations(cpq_operation *op_list) {
330         cpq_operation *tmp, *pop_list=NULL;
331 
332         __TBB_ASSERT(mark == data.size(), NULL);
333 
334         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
335         while (op_list) {
336             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
337             // node. This thread is going to handle the operation, and so will acquire it
338             // and perform the associated operation w/o triggering a race condition; the
339             // thread that created the operation is waiting on the status field, so when
340             // this thread is done with the operation, it will perform a
341             // store_with_release to give control back to the waiting thread in
342             // aggregator::insert_operation.
343             call_itt_notify(acquired, &(op_list->status));
344             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
345             tmp = op_list;
346             op_list = itt_hide_load_word(op_list->next);
347             if (tmp->type == POP_OP) {
348                 if (mark < data.size() &&
349                     compare(data[0], data[data.size()-1])) {
350                     // there are newly pushed elems and the last one
351                     // is higher than top
352                     *(tmp->elem) = move(data[data.size()-1]);
353                     __TBB_store_with_release(my_size, my_size-1);
354                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
355                     data.pop_back();
356                     __TBB_ASSERT(mark<=data.size(), NULL);
357                 }
358                 else { // no convenient item to pop; postpone
359                     itt_hide_store_word(tmp->next, pop_list);
360                     pop_list = tmp;
361                 }
362             } else { // PUSH_OP or PUSH_RVALUE_OP
363                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
364                 __TBB_TRY{
365                     if (tmp->type == PUSH_OP) {
366                         data.push_back(*(tmp->elem));
367                     } else {
368                         data.push_back(move(*(tmp->elem)));
369                     }
370                     __TBB_store_with_release(my_size, my_size + 1);
371                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
372                 } __TBB_CATCH(...) {
373                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
374                 }
375             }
376         }
377 
378         // second pass processes pop operations
379         while (pop_list) {
380             tmp = pop_list;
381             pop_list = itt_hide_load_word(pop_list->next);
382             __TBB_ASSERT(tmp->type == POP_OP, NULL);
383             if (data.empty()) {
384                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
385             }
386             else {
387                 __TBB_ASSERT(mark<=data.size(), NULL);
388                 if (mark < data.size() &&
389                     compare(data[0], data[data.size()-1])) {
390                     // there are newly pushed elems and the last one is
391                     // higher than top
392                     *(tmp->elem) = move(data[data.size()-1]);
393                     __TBB_store_with_release(my_size, my_size-1);
394                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
395                     data.pop_back();
396                 }
397                 else { // extract top and push last element down heap
398                     *(tmp->elem) = move(data[0]);
399                     __TBB_store_with_release(my_size, my_size-1);
400                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
401                     reheap();
402                 }
403             }
404         }
405 
406         // heapify any leftover pushed elements before doing the next
407         // batch of operations
408         if (mark<data.size()) heapify();
409         __TBB_ASSERT(mark == data.size(), NULL);
410     }
411 
412     //! Merge unsorted elements into heap
heapify()413     void heapify() {
414         if (!mark && data.size()>0) mark = 1;
415         for (; mark<data.size(); ++mark) {
416             // for each unheapified element under size
417             size_type cur_pos = mark;
418             value_type to_place = move(data[mark]);
419             do { // push to_place up the heap
420                 size_type parent = (cur_pos-1)>>1;
421                 if (!compare(data[parent], to_place)) break;
422                 data[cur_pos] = move(data[parent]);
423                 cur_pos = parent;
424             } while( cur_pos );
425             data[cur_pos] = move(to_place);
426         }
427     }
428 
429     //! Re-heapify after an extraction
430     /** Re-heapify by pushing last element down the heap from the root. */
reheap()431     void reheap() {
432         size_type cur_pos=0, child=1;
433 
434         while (child < mark) {
435             size_type target = child;
436             if (child+1 < mark && compare(data[child], data[child+1]))
437                 ++target;
438             // target now has the higher priority child
439             if (compare(data[target], data[data.size()-1])) break;
440             data[cur_pos] = move(data[target]);
441             cur_pos = target;
442             child = (cur_pos<<1)+1;
443         }
444         if (cur_pos != data.size()-1)
445             data[cur_pos] = move(data[data.size()-1]);
446         data.pop_back();
447         if (mark > data.size()) mark = data.size();
448     }
449 };
450 
451 } // namespace interface5
452 
453 using interface5::concurrent_priority_queue;
454 
455 } // namespace tbb
456 
457 #endif /* __TBB_concurrent_priority_queue_H */
458