1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_task_stream_H
18 #define _TBB_task_stream_H
19 
20 //! This file is a possible future replacement for the task_stream class implemented in
21 //! task_stream.h. It refactors the code and extends task_stream capabilities by moving lane
22 //! management during operations on caller side. Despite the fact that new implementation should not
23 //! affect performance of the original task stream, analysis on this subject was not made at the
24 //! time it was developed. In addition, it is not clearly seen at the moment that this container
25 //! would be suitable for critical tasks due to linear time complexity on its operations.
26 
27 #include "oneapi/tbb/detail/_utils.h"
28 #include "oneapi/tbb/cache_aligned_allocator.h"
29 #include "oneapi/tbb/mutex.h"
30 
31 #include "scheduler_common.h"
32 #include "misc.h" // for FastRandom
33 
34 #include <deque>
35 #include <climits>
36 #include <atomic>
37 
38 namespace tbb {
39 namespace detail {
40 namespace r1 {
41 
42 //! Essentially, this is just a pair of a queue and a mutex to protect the queue.
43 /** The reason std::pair is not used is that the code would look less clean
44     if field names were replaced with 'first' and 'second'. **/
45 template< typename T, typename mutex_t >
alignas(max_nfs_size)46 struct alignas(max_nfs_size) queue_and_mutex {
47     typedef std::deque< T, cache_aligned_allocator<T> > queue_base_t;
48 
49     queue_base_t my_queue{};
50     mutex_t      my_mutex{};
51 };
52 
53 using population_t = uintptr_t;
54 const population_t one = 1;
55 
set_one_bit(std::atomic<population_t> & dest,int pos)56 inline void set_one_bit( std::atomic<population_t>& dest, int pos ) {
57     __TBB_ASSERT( pos>=0, NULL );
58     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
59     dest.fetch_or( one<<pos );
60 }
61 
clear_one_bit(std::atomic<population_t> & dest,int pos)62 inline void clear_one_bit( std::atomic<population_t>& dest, int pos ) {
63     __TBB_ASSERT( pos>=0, NULL );
64     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
65     dest.fetch_and( ~(one<<pos) );
66 }
67 
is_bit_set(population_t val,int pos)68 inline bool is_bit_set( population_t val, int pos ) {
69     __TBB_ASSERT( pos>=0, NULL );
70     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
71     return (val & (one<<pos)) != 0;
72 }
73 
74 struct random_lane_selector :
75 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
76         no_assign
77 #else
78         no_copy
79 #endif
80 {
random_lane_selectorrandom_lane_selector81     random_lane_selector( FastRandom& random ) : my_random( random ) {}
operatorrandom_lane_selector82     unsigned operator()( unsigned out_of ) const {
83         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
84         return my_random.get() & (out_of-1);
85     }
86 private:
87     FastRandom& my_random;
88 };
89 
90 struct lane_selector_base :
91 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
92         no_assign
93 #else
94         no_copy
95 #endif
96 {
97     unsigned& my_previous;
lane_selector_baselane_selector_base98     lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
99 };
100 
101 struct subsequent_lane_selector : lane_selector_base {
subsequent_lane_selectorsubsequent_lane_selector102     subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorsubsequent_lane_selector103     unsigned operator()( unsigned out_of ) const {
104         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
105         return (++my_previous &= out_of-1);
106     }
107 };
108 
109 struct preceding_lane_selector : lane_selector_base {
preceding_lane_selectorpreceding_lane_selector110     preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorpreceding_lane_selector111     unsigned operator()( unsigned out_of ) const {
112         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
113         return (--my_previous &= (out_of-1));
114     }
115 };
116 
117 //! Specializes from which side of the underlying container elements are retrieved. Method must be
118 //! called under corresponding mutex locked.
119 template<task_stream_accessor_type accessor>
120 class task_stream_accessor : no_copy {
121 protected:
122     using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)123     d1::task* get_item( lane_t::queue_base_t& queue ) {
124         d1::task* result = queue.front();
125         queue.pop_front();
126         return result;
127     }
128 };
129 
130 template<>
131 class task_stream_accessor< back_nonnull_accessor > : no_copy {
132 protected:
133     using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)134     d1::task* get_item( lane_t::queue_base_t& queue ) {
135         d1::task* result = nullptr;
136         __TBB_ASSERT(!queue.empty(), nullptr);
137         // Isolated task can put zeros in queue see look_specific
138         do {
139             result = queue.back();
140             queue.pop_back();
141         } while ( !result && !queue.empty() );
142         return result;
143     }
144 };
145 
146 //! The container for "fairness-oriented" aka "enqueued" tasks.
147 template<task_stream_accessor_type accessor>
148 class task_stream : public task_stream_accessor< accessor > {
149     using lane_t = typename task_stream_accessor<accessor>::lane_t;
150     std::atomic<population_t> population{};
151     lane_t* lanes{nullptr};
152     unsigned N{};
153 
154 public:
155     task_stream() = default;
156 
initialize(unsigned n_lanes)157     void initialize( unsigned n_lanes ) {
158         const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
159 
160         N = n_lanes >= max_lanes ? max_lanes : n_lanes > 2 ? 1 << (tbb::detail::log2(n_lanes - 1) + 1) : 2;
161         __TBB_ASSERT( N == max_lanes || (N >= n_lanes && ((N - 1) & N) == 0), "number of lanes miscalculated" );
162         __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
163         lanes = static_cast<lane_t*>(cache_aligned_allocate(sizeof(lane_t) * N));
164         for (unsigned i = 0; i < N; ++i) {
165             new (lanes + i) lane_t;
166         }
167         __TBB_ASSERT( !population.load(std::memory_order_relaxed), NULL );
168     }
169 
~task_stream()170     ~task_stream() {
171         if (lanes) {
172             for (unsigned i = 0; i < N; ++i) {
173                 lanes[i].~lane_t();
174             }
175             cache_aligned_deallocate(lanes);
176         }
177     }
178 
179     //! Push a task into a lane. Lane selection is performed by passed functor.
180     template<typename lane_selector_t>
push(d1::task * source,const lane_selector_t & next_lane)181     void push(d1::task* source, const lane_selector_t& next_lane ) {
182         bool succeed = false;
183         unsigned lane = 0;
184         do {
185             lane = next_lane( /*out_of=*/N );
186             __TBB_ASSERT( lane < N, "Incorrect lane index." );
187         } while( ! (succeed = try_push( source, lane )) );
188     }
189 
190     //! Try finding and popping a task using passed functor for lane selection. Last used lane is
191     //! updated inside lane selector.
192     template<typename lane_selector_t>
pop(const lane_selector_t & next_lane)193     d1::task* pop( const lane_selector_t& next_lane ) {
194         d1::task* popped = NULL;
195         unsigned lane = 0;
196         do {
197             lane = next_lane( /*out_of=*/N );
198             __TBB_ASSERT( lane < N, "Incorrect lane index." );
199         } while( !empty() && !(popped = try_pop( lane )) );
200         return popped;
201     }
202 
203     //! Try finding and popping a related task.
pop_specific(unsigned & last_used_lane,isolation_type isolation)204     d1::task* pop_specific( unsigned& last_used_lane, isolation_type isolation ) {
205         d1::task* result = NULL;
206         // Lane selection is round-robin in backward direction.
207         unsigned idx = last_used_lane & (N-1);
208         do {
209             if( is_bit_set( population.load(std::memory_order_relaxed), idx ) ) {
210                 lane_t& lane = lanes[idx];
211                 mutex::scoped_lock lock;
212                 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
213                     result = look_specific( lane.my_queue, isolation );
214                     if( lane.my_queue.empty() )
215                         clear_one_bit( population, idx );
216                     if( result )
217                         break;
218                 }
219             }
220             idx=(idx-1)&(N-1);
221         } while( !empty() && idx != last_used_lane );
222         last_used_lane = idx;
223         return result;
224     }
225 
226     //! Checks existence of a task.
empty()227     bool empty() {
228         return !population.load(std::memory_order_relaxed);
229     }
230 
231 private:
232     //! Returns true on successful push, otherwise - false.
try_push(d1::task * source,unsigned lane_idx)233     bool try_push(d1::task* source, unsigned lane_idx ) {
234         mutex::scoped_lock lock;
235         if( lock.try_acquire( lanes[lane_idx].my_mutex ) ) {
236             lanes[lane_idx].my_queue.push_back( source );
237             set_one_bit( population, lane_idx ); // TODO: avoid atomic op if the bit is already set
238             return true;
239         }
240         return false;
241     }
242 
243     //! Returns pointer to task on successful pop, otherwise - NULL.
try_pop(unsigned lane_idx)244     d1::task* try_pop( unsigned lane_idx ) {
245         if( !is_bit_set( population.load(std::memory_order_relaxed), lane_idx ) )
246             return NULL;
247         d1::task* result = NULL;
248         lane_t& lane = lanes[lane_idx];
249         mutex::scoped_lock lock;
250         if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
251             result = this->get_item( lane.my_queue );
252             if( lane.my_queue.empty() )
253                 clear_one_bit( population, lane_idx );
254         }
255         return result;
256     }
257 
258     // TODO: unify '*_specific' logic with 'pop' methods above
look_specific(typename lane_t::queue_base_t & queue,isolation_type isolation)259     d1::task* look_specific( typename lane_t::queue_base_t& queue, isolation_type isolation ) {
260         __TBB_ASSERT( !queue.empty(), NULL );
261         // TODO: add a worst-case performance test and consider an alternative container with better
262         // performance for isolation search.
263         typename lane_t::queue_base_t::iterator curr = queue.end();
264         do {
265             // TODO: consider logic from get_task to simplify the code.
266             d1::task* result = *--curr;
267             if( result && task_accessor::isolation(*result) == isolation ) {
268                 if( queue.end() - curr == 1 )
269                     queue.pop_back(); // a little of housekeeping along the way
270                 else
271                     *curr = 0;      // grabbing task with the same isolation
272                 // TODO: move one of the container's ends instead if the task has been found there
273                 return result;
274             }
275         } while( curr != queue.begin() );
276         return NULL;
277     }
278 
279 }; // task_stream
280 
281 } // namespace r1
282 } // namespace detail
283 } // namespace tbb
284 
285 #endif /* _TBB_task_stream_H */
286