1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_task_stream_H
18 #define _TBB_task_stream_H
19
20 //! This file is a possible future replacement for the task_stream class implemented in
21 //! task_stream.h. It refactors the code and extends task_stream capabilities by moving lane
22 //! management during operations on caller side. Despite the fact that new implementation should not
23 //! affect performance of the original task stream, analysis on this subject was not made at the
24 //! time it was developed. In addition, it is not clearly seen at the moment that this container
25 //! would be suitable for critical tasks due to linear time complexity on its operations.
26
27 #include "oneapi/tbb/detail/_utils.h"
28 #include "oneapi/tbb/cache_aligned_allocator.h"
29 #include "oneapi/tbb/mutex.h"
30
31 #include "scheduler_common.h"
32 #include "misc.h" // for FastRandom
33
34 #include <deque>
35 #include <climits>
36 #include <atomic>
37
38 namespace tbb {
39 namespace detail {
40 namespace r1 {
41
42 //! Essentially, this is just a pair of a queue and a mutex to protect the queue.
43 /** The reason std::pair is not used is that the code would look less clean
44 if field names were replaced with 'first' and 'second'. **/
45 template< typename T, typename mutex_t >
alignas(max_nfs_size)46 struct alignas(max_nfs_size) queue_and_mutex {
47 typedef std::deque< T, cache_aligned_allocator<T> > queue_base_t;
48
49 queue_base_t my_queue{};
50 mutex_t my_mutex{};
51 };
52
53 using population_t = uintptr_t;
54 const population_t one = 1;
55
set_one_bit(std::atomic<population_t> & dest,int pos)56 inline void set_one_bit( std::atomic<population_t>& dest, int pos ) {
57 __TBB_ASSERT( pos>=0, NULL );
58 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
59 dest.fetch_or( one<<pos );
60 }
61
clear_one_bit(std::atomic<population_t> & dest,int pos)62 inline void clear_one_bit( std::atomic<population_t>& dest, int pos ) {
63 __TBB_ASSERT( pos>=0, NULL );
64 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
65 dest.fetch_and( ~(one<<pos) );
66 }
67
is_bit_set(population_t val,int pos)68 inline bool is_bit_set( population_t val, int pos ) {
69 __TBB_ASSERT( pos>=0, NULL );
70 __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
71 return (val & (one<<pos)) != 0;
72 }
73
74 struct random_lane_selector :
75 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
76 no_assign
77 #else
78 no_copy
79 #endif
80 {
random_lane_selectorrandom_lane_selector81 random_lane_selector( FastRandom& random ) : my_random( random ) {}
operatorrandom_lane_selector82 unsigned operator()( unsigned out_of ) const {
83 __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
84 return my_random.get() & (out_of-1);
85 }
86 private:
87 FastRandom& my_random;
88 };
89
90 struct lane_selector_base :
91 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
92 no_assign
93 #else
94 no_copy
95 #endif
96 {
97 unsigned& my_previous;
lane_selector_baselane_selector_base98 lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
99 };
100
101 struct subsequent_lane_selector : lane_selector_base {
subsequent_lane_selectorsubsequent_lane_selector102 subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorsubsequent_lane_selector103 unsigned operator()( unsigned out_of ) const {
104 __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
105 return (++my_previous &= out_of-1);
106 }
107 };
108
109 struct preceding_lane_selector : lane_selector_base {
preceding_lane_selectorpreceding_lane_selector110 preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorpreceding_lane_selector111 unsigned operator()( unsigned out_of ) const {
112 __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
113 return (--my_previous &= (out_of-1));
114 }
115 };
116
117 //! Specializes from which side of the underlying container elements are retrieved. Method must be
118 //! called under corresponding mutex locked.
119 template<task_stream_accessor_type accessor>
120 class task_stream_accessor : no_copy {
121 protected:
122 using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)123 d1::task* get_item( lane_t::queue_base_t& queue ) {
124 d1::task* result = queue.front();
125 queue.pop_front();
126 return result;
127 }
128 };
129
130 template<>
131 class task_stream_accessor< back_nonnull_accessor > : no_copy {
132 protected:
133 using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)134 d1::task* get_item( lane_t::queue_base_t& queue ) {
135 d1::task* result = nullptr;
136 __TBB_ASSERT(!queue.empty(), nullptr);
137 // Isolated task can put zeros in queue see look_specific
138 do {
139 result = queue.back();
140 queue.pop_back();
141 } while ( !result && !queue.empty() );
142 return result;
143 }
144 };
145
146 //! The container for "fairness-oriented" aka "enqueued" tasks.
147 template<task_stream_accessor_type accessor>
148 class task_stream : public task_stream_accessor< accessor > {
149 using lane_t = typename task_stream_accessor<accessor>::lane_t;
150 std::atomic<population_t> population{};
151 lane_t* lanes{nullptr};
152 unsigned N{};
153
154 public:
155 task_stream() = default;
156
initialize(unsigned n_lanes)157 void initialize( unsigned n_lanes ) {
158 const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
159
160 N = n_lanes >= max_lanes ? max_lanes : n_lanes > 2 ? 1 << (tbb::detail::log2(n_lanes - 1) + 1) : 2;
161 __TBB_ASSERT( N == max_lanes || (N >= n_lanes && ((N - 1) & N) == 0), "number of lanes miscalculated" );
162 __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
163 lanes = static_cast<lane_t*>(cache_aligned_allocate(sizeof(lane_t) * N));
164 for (unsigned i = 0; i < N; ++i) {
165 new (lanes + i) lane_t;
166 }
167 __TBB_ASSERT( !population.load(std::memory_order_relaxed), NULL );
168 }
169
~task_stream()170 ~task_stream() {
171 if (lanes) {
172 for (unsigned i = 0; i < N; ++i) {
173 lanes[i].~lane_t();
174 }
175 cache_aligned_deallocate(lanes);
176 }
177 }
178
179 //! Push a task into a lane. Lane selection is performed by passed functor.
180 template<typename lane_selector_t>
push(d1::task * source,const lane_selector_t & next_lane)181 void push(d1::task* source, const lane_selector_t& next_lane ) {
182 bool succeed = false;
183 unsigned lane = 0;
184 do {
185 lane = next_lane( /*out_of=*/N );
186 __TBB_ASSERT( lane < N, "Incorrect lane index." );
187 } while( ! (succeed = try_push( source, lane )) );
188 }
189
190 //! Try finding and popping a task using passed functor for lane selection. Last used lane is
191 //! updated inside lane selector.
192 template<typename lane_selector_t>
pop(const lane_selector_t & next_lane)193 d1::task* pop( const lane_selector_t& next_lane ) {
194 d1::task* popped = NULL;
195 unsigned lane = 0;
196 do {
197 lane = next_lane( /*out_of=*/N );
198 __TBB_ASSERT( lane < N, "Incorrect lane index." );
199 } while( !empty() && !(popped = try_pop( lane )) );
200 return popped;
201 }
202
203 //! Try finding and popping a related task.
pop_specific(unsigned & last_used_lane,isolation_type isolation)204 d1::task* pop_specific( unsigned& last_used_lane, isolation_type isolation ) {
205 d1::task* result = NULL;
206 // Lane selection is round-robin in backward direction.
207 unsigned idx = last_used_lane & (N-1);
208 do {
209 if( is_bit_set( population.load(std::memory_order_relaxed), idx ) ) {
210 lane_t& lane = lanes[idx];
211 mutex::scoped_lock lock;
212 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
213 result = look_specific( lane.my_queue, isolation );
214 if( lane.my_queue.empty() )
215 clear_one_bit( population, idx );
216 if( result )
217 break;
218 }
219 }
220 idx=(idx-1)&(N-1);
221 } while( !empty() && idx != last_used_lane );
222 last_used_lane = idx;
223 return result;
224 }
225
226 //! Checks existence of a task.
empty()227 bool empty() {
228 return !population.load(std::memory_order_relaxed);
229 }
230
231 private:
232 //! Returns true on successful push, otherwise - false.
try_push(d1::task * source,unsigned lane_idx)233 bool try_push(d1::task* source, unsigned lane_idx ) {
234 mutex::scoped_lock lock;
235 if( lock.try_acquire( lanes[lane_idx].my_mutex ) ) {
236 lanes[lane_idx].my_queue.push_back( source );
237 set_one_bit( population, lane_idx ); // TODO: avoid atomic op if the bit is already set
238 return true;
239 }
240 return false;
241 }
242
243 //! Returns pointer to task on successful pop, otherwise - NULL.
try_pop(unsigned lane_idx)244 d1::task* try_pop( unsigned lane_idx ) {
245 if( !is_bit_set( population.load(std::memory_order_relaxed), lane_idx ) )
246 return NULL;
247 d1::task* result = NULL;
248 lane_t& lane = lanes[lane_idx];
249 mutex::scoped_lock lock;
250 if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
251 result = this->get_item( lane.my_queue );
252 if( lane.my_queue.empty() )
253 clear_one_bit( population, lane_idx );
254 }
255 return result;
256 }
257
258 // TODO: unify '*_specific' logic with 'pop' methods above
look_specific(typename lane_t::queue_base_t & queue,isolation_type isolation)259 d1::task* look_specific( typename lane_t::queue_base_t& queue, isolation_type isolation ) {
260 __TBB_ASSERT( !queue.empty(), NULL );
261 // TODO: add a worst-case performance test and consider an alternative container with better
262 // performance for isolation search.
263 typename lane_t::queue_base_t::iterator curr = queue.end();
264 do {
265 // TODO: consider logic from get_task to simplify the code.
266 d1::task* result = *--curr;
267 if( result && task_accessor::isolation(*result) == isolation ) {
268 if( queue.end() - curr == 1 )
269 queue.pop_back(); // a little of housekeeping along the way
270 else
271 *curr = 0; // grabbing task with the same isolation
272 // TODO: move one of the container's ends instead if the task has been found there
273 return result;
274 }
275 } while( curr != queue.begin() );
276 return NULL;
277 }
278
279 }; // task_stream
280
281 } // namespace r1
282 } // namespace detail
283 } // namespace tbb
284
285 #endif /* _TBB_task_stream_H */
286