1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/detail/_template_helpers.h"
19 
20 #include "oneapi/tbb/global_control.h"
21 #include "oneapi/tbb/tbb_allocator.h"
22 #include "oneapi/tbb/spin_mutex.h"
23 
24 #include "governor.h"
25 #include "market.h"
26 #include "misc.h"
27 
28 #include <atomic>
29 #include <set>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 //! Comparator for a set of global_control objects
36 struct control_storage_comparator {
37     bool operator()(const global_control* lhs, const global_control* rhs) const;
38 };
39 
40 class control_storage {
41     friend struct global_control_impl;
42     friend std::size_t global_control_active_value(int);
43 protected:
44     std::size_t my_active_value{0};
45     std::set<global_control*, control_storage_comparator, tbb_allocator<global_control*>> my_list{};
46     spin_mutex my_list_mutex{};
47 public:
48     virtual std::size_t default_value() const = 0;
apply_active(std::size_t new_active)49     virtual void apply_active(std::size_t new_active) {
50         my_active_value = new_active;
51     }
is_first_arg_preferred(std::size_t a,std::size_t b) const52     virtual bool is_first_arg_preferred(std::size_t a, std::size_t b) const {
53         return a>b; // prefer max by default
54     }
active_value()55     virtual std::size_t active_value() {
56         spin_mutex::scoped_lock lock(my_list_mutex); // protect my_list.empty() call
57         return !my_list.empty() ? my_active_value : default_value();
58     }
59 };
60 
61 class alignas(max_nfs_size) allowed_parallelism_control : public control_storage {
default_value() const62     virtual std::size_t default_value() const override {
63         return max(1U, governor::default_num_threads());
64     }
is_first_arg_preferred(std::size_t a,std::size_t b) const65     virtual bool is_first_arg_preferred(std::size_t a, std::size_t b) const override {
66         return a<b; // prefer min allowed parallelism
67     }
apply_active(std::size_t new_active)68     virtual void apply_active(std::size_t new_active) override {
69         control_storage::apply_active(new_active);
70         __TBB_ASSERT( my_active_value>=1, NULL );
71         // -1 to take external thread into account
72         market::set_active_num_workers( my_active_value-1 );
73     }
active_value()74     virtual std::size_t active_value() override {
75         spin_mutex::scoped_lock lock(my_list_mutex); // protect my_list.empty() call
76         if (my_list.empty())
77             return default_value();
78         // non-zero, if market is active
79         const std::size_t workers = market::max_num_workers();
80         // We can't exceed market's maximal number of workers.
81         // +1 to take external thread into account
82         return workers? min(workers+1, my_active_value): my_active_value;
83     }
84 public:
active_value_if_present() const85     std::size_t active_value_if_present() const {
86         return !my_list.empty() ? my_active_value : 0;
87     }
88 };
89 
90 class alignas(max_nfs_size) stack_size_control : public control_storage {
default_value() const91     virtual std::size_t default_value() const override {
92         return ThreadStackSize;
93     }
apply_active(std::size_t new_active)94     virtual void apply_active(std::size_t new_active) override {
95         control_storage::apply_active(new_active);
96 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
97         __TBB_ASSERT( false, "For Windows 8 Store* apps we must not set stack size" );
98 #endif
99     }
100 };
101 
102 class alignas(max_nfs_size) terminate_on_exception_control : public control_storage {
default_value() const103     virtual std::size_t default_value() const override {
104         return 0;
105     }
106 };
107 
108 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
109 class alignas(max_nfs_size) lifetime_control : public control_storage {
is_first_arg_preferred(std::size_t,std::size_t) const110     virtual bool is_first_arg_preferred(std::size_t, std::size_t) const override {
111         return false; // not interested
112     }
default_value() const113     virtual std::size_t default_value() const override {
114         return 0;
115     }
apply_active(std::size_t new_active)116     virtual void apply_active(std::size_t new_active) override {
117         if (new_active == 1) {
118             // reserve the market reference
119             market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
120             if (market::theMarket) {
121                 market::add_ref_unsafe(lock, /*is_public*/ true);
122             }
123         } else if (new_active == 0) { // new_active == 0
124             // release the market reference
125             market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
126             if (market::theMarket != nullptr) {
127                 lock.release();
128                 market::theMarket->release(/*is_public*/ true, /*blocking_terminate*/ false);
129             }
130         }
131         control_storage::apply_active(new_active);
132     }
133 
134 public:
is_empty()135     bool is_empty() {
136         spin_mutex::scoped_lock lock(my_list_mutex);
137         return my_list.empty();
138     }
139 };
140 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
141 
142 static allowed_parallelism_control allowed_parallelism_ctl;
143 static stack_size_control stack_size_ctl;
144 static terminate_on_exception_control terminate_on_exception_ctl;
145 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
146 static lifetime_control lifetime_ctl;
147 static control_storage *controls[] = {&allowed_parallelism_ctl, &stack_size_ctl, &terminate_on_exception_ctl, &lifetime_ctl};
148 #else
149 static control_storage *controls[] = {&allowed_parallelism_ctl, &stack_size_ctl, &terminate_on_exception_ctl};
150 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
151 
152 //! Comparator for a set of global_control objects
operator ()(const global_control * lhs,const global_control * rhs) const153 inline bool control_storage_comparator::operator()(const global_control* lhs, const global_control* rhs) const {
154     __TBB_ASSERT_RELEASE(lhs->my_param < global_control::parameter_max , NULL);
155     return lhs->my_value < rhs->my_value || (lhs->my_value == rhs->my_value && lhs < rhs);
156 }
157 
app_parallelism_limit()158 unsigned market::app_parallelism_limit() {
159     return allowed_parallelism_ctl.active_value_if_present();
160 }
161 
terminate_on_exception()162 bool terminate_on_exception() {
163     return global_control::active_value(global_control::terminate_on_exception) == 1;
164 }
165 
166 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
is_lifetime_control_present()167 unsigned market::is_lifetime_control_present() {
168     return !lifetime_ctl.is_empty();
169 }
170 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
171 
172 struct global_control_impl {
173 private:
erase_if_presenttbb::detail::r1::global_control_impl174     static bool erase_if_present(control_storage* const c, d1::global_control& gc) {
175         auto it = c->my_list.find(&gc);
176         if (it != c->my_list.end()) {
177             c->my_list.erase(it);
178             return true;
179         }
180         return false;
181     }
182 
183 public:
184 
createtbb::detail::r1::global_control_impl185     static void create(d1::global_control& gc) {
186         __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL);
187         control_storage* const c = controls[gc.my_param];
188 
189         spin_mutex::scoped_lock lock(c->my_list_mutex);
190         if (c->my_list.empty() || c->is_first_arg_preferred(gc.my_value, c->my_active_value)) {
191             // to guarantee that apply_active() is called with current active value,
192             // calls it here and in internal_destroy() under my_list_mutex
193             c->apply_active(gc.my_value);
194         }
195         c->my_list.insert(&gc);
196     }
197 
destroytbb::detail::r1::global_control_impl198     static void destroy(d1::global_control& gc) {
199         __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL);
200         control_storage* const c = controls[gc.my_param];
201         // Concurrent reading and changing global parameter is possible.
202         spin_mutex::scoped_lock lock(c->my_list_mutex);
203 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
204         __TBB_ASSERT(gc.my_param == global_control::scheduler_handle || !c->my_list.empty(), NULL);
205 #else
206         __TBB_ASSERT(!c->my_list.empty(), NULL);
207 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
208         std::size_t new_active = (std::size_t)(-1), old_active = c->my_active_value;
209 
210         if (!erase_if_present(c, gc)) {
211 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
212             __TBB_ASSERT(gc.my_param == global_control::scheduler_handle , NULL);
213             return;
214 #else
215             __TBB_ASSERT(false, "Unreachable code");
216 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
217         }
218         if (c->my_list.empty()) {
219             __TBB_ASSERT(new_active == (std::size_t) - 1, NULL);
220             new_active = c->default_value();
221         } else {
222             new_active = (*c->my_list.begin())->my_value;
223         }
224         if (new_active != old_active) {
225             c->apply_active(new_active);
226         }
227     }
228 
remove_and_check_if_emptytbb::detail::r1::global_control_impl229     static bool remove_and_check_if_empty(d1::global_control& gc) {
230         __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL);
231         control_storage* const c = controls[gc.my_param];
232 
233         spin_mutex::scoped_lock lock(c->my_list_mutex);
234         __TBB_ASSERT(!c->my_list.empty(), NULL);
235         erase_if_present(c, gc);
236         return c->my_list.empty();
237     }
238 #if TBB_USE_ASSERT
is_presenttbb::detail::r1::global_control_impl239     static bool is_present(d1::global_control& gc) {
240         __TBB_ASSERT_RELEASE(gc.my_param < global_control::parameter_max, NULL);
241         control_storage* const c = controls[gc.my_param];
242 
243         spin_mutex::scoped_lock lock(c->my_list_mutex);
244         auto it = c->my_list.find(&gc);
245         if (it != c->my_list.end()) {
246             return true;
247         }
248         return false;
249     }
250 #endif // TBB_USE_ASSERT
251 };
252 
create(d1::global_control & gc)253 void __TBB_EXPORTED_FUNC create(d1::global_control& gc) {
254     global_control_impl::create(gc);
255 }
destroy(d1::global_control & gc)256 void __TBB_EXPORTED_FUNC destroy(d1::global_control& gc) {
257     global_control_impl::destroy(gc);
258 }
259 
remove_and_check_if_empty(d1::global_control & gc)260 bool remove_and_check_if_empty(d1::global_control& gc) {
261     return global_control_impl::remove_and_check_if_empty(gc);
262 }
263 #if TBB_USE_ASSERT
is_present(d1::global_control & gc)264 bool is_present(d1::global_control& gc) {
265     return global_control_impl::is_present(gc);
266 }
267 #endif // TBB_USE_ASSERT
global_control_active_value(int param)268 std::size_t __TBB_EXPORTED_FUNC global_control_active_value(int param) {
269     __TBB_ASSERT_RELEASE(param < global_control::parameter_max, NULL);
270     return controls[param]->active_value();
271 }
272 
273 } // namespace r1
274 } // namespace detail
275 } // namespace tbb
276