1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_market_H
18 #define _TBB_market_H
19 
20 #include "scheduler_common.h"
21 #include "market_concurrent_monitor.h"
22 #include "intrusive_list.h"
23 #include "rml_tbb.h"
24 #include "oneapi/tbb/rw_mutex.h"
25 
26 #include "oneapi/tbb/spin_rw_mutex.h"
27 #include "oneapi/tbb/task_group.h"
28 
29 #include <atomic>
30 
31 #if defined(_MSC_VER) && defined(_Wp64)
32     // Workaround for overzealous compiler warnings in /Wp64 mode
33     #pragma warning (push)
34     #pragma warning (disable: 4244)
35 #endif
36 
37 namespace tbb {
38 namespace detail {
39 
40 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
41 namespace d1 {
42 class task_scheduler_handle;
43 }
44 #endif
45 
46 namespace r1 {
47 
48 class task_arena_base;
49 class task_group_context;
50 
51 //------------------------------------------------------------------------
52 // Class market
53 //------------------------------------------------------------------------
54 
55 class market : no_copy, rml::tbb_client {
56     friend class arena;
57     friend class task_arena_base;
58     template<typename SchedulerTraits> friend class custom_scheduler;
59     friend class task_group_context;
60     friend class governor;
61 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
62     friend class lifetime_control;
63 #endif
64 
65 public:
66     //! Keys for the arena map array. The lower the value the higher priority of the arena list.
67     static constexpr unsigned num_priority_levels = 3;
68 
69 private:
70     friend void ITT_DoUnsafeOneTimeInitialization ();
71 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
72     friend bool finalize_impl(d1::task_scheduler_handle& handle);
73 #endif
74 
75     typedef intrusive_list<arena> arena_list_type;
76     typedef intrusive_list<thread_data> thread_data_list_type;
77 
78     //! Currently active global market
79     static market* theMarket;
80 
81     typedef scheduler_mutex_type global_market_mutex_type;
82 
83     //! Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas, and cancellation propagation
84     static global_market_mutex_type  theMarketMutex;
85 
86     //! Lightweight mutex guarding accounting operations with arenas list
87     typedef rw_mutex arenas_list_mutex_type;
88     // TODO: introduce fine-grained (per priority list) locking of arenas.
89     arenas_list_mutex_type my_arenas_list_mutex;
90 
91     //! Pointer to the RML server object that services this TBB instance.
92     rml::tbb_server* my_server;
93 
94     //! Waiting object for external and coroutine waiters.
95     market_concurrent_monitor my_sleep_monitor;
96 
97     //! Maximal number of workers allowed for use by the underlying resource manager
98     /** It can't be changed after market creation. **/
99     unsigned my_num_workers_hard_limit;
100 
101     //! Current application-imposed limit on the number of workers (see set_active_num_workers())
102     /** It can't be more than my_num_workers_hard_limit. **/
103     std::atomic<unsigned> my_num_workers_soft_limit;
104 
105     //! Number of workers currently requested from RML
106     int my_num_workers_requested;
107 
108     //! The target serialization epoch for callers of adjust_job_count_estimate
109     int my_adjust_demand_target_epoch;
110 
111     //! The current serialization epoch for callers of adjust_job_count_estimate
112     d1::waitable_atomic<int> my_adjust_demand_current_epoch;
113 
114     //! First unused index of worker
115     /** Used to assign indices to the new workers coming from RML, and busy part
116         of my_workers array. **/
117     std::atomic<unsigned> my_first_unused_worker_idx;
118 
119     //! Number of workers that were requested by all arenas on all priority levels
120     std::atomic<int> my_total_demand;
121 
122     //! Number of workers that were requested by arenas per single priority list item
123     int my_priority_level_demand[num_priority_levels];
124 
125 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
126     //! How many times mandatory concurrency was requested from the market
127     int my_mandatory_num_requested;
128 #endif
129 
130     //! Per priority list of registered arenas
131     arena_list_type my_arenas[num_priority_levels];
132 
133     //! The first arena to be checked when idle worker seeks for an arena to enter
134     /** The check happens in round-robin fashion. **/
135     arena *my_next_arena;
136 
137     //! ABA prevention marker to assign to newly created arenas
138     std::atomic<uintptr_t> my_arenas_aba_epoch;
139 
140     //! Reference count controlling market object lifetime
141     std::atomic<unsigned> my_ref_count;
142 
143     //! Count of external threads attached
144     std::atomic<unsigned> my_public_ref_count;
145 
146     //! Stack size of worker threads
147     std::size_t my_stack_size;
148 
149     //! Shutdown mode
150     bool my_join_workers;
151 
152     //! The value indicating that the soft limit warning is unnecessary
153     static const unsigned skip_soft_limit_warning = ~0U;
154 
155     //! Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning
156     std::atomic<unsigned> my_workers_soft_limit_to_report;
157 
158     //! Constructor
159     market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size );
160 
161     //! Destroys and deallocates market object created by market::create()
162     void destroy ();
163 
164     //! Recalculates the number of workers requested from RML and updates the allotment.
165     int update_workers_request();
166 
167     //! Recalculates the number of workers assigned to each arena in the list.
168     /** The actual number of workers servicing a particular arena may temporarily
169         deviate from the calculated value. **/
update_allotment(unsigned effective_soft_limit)170     void update_allotment (unsigned effective_soft_limit) {
171         int total_demand = my_total_demand.load(std::memory_order_relaxed);
172         if (total_demand) {
173             update_allotment(my_arenas, total_demand, (int)effective_soft_limit);
174         }
175     }
176 
177     //! Returns next arena that needs more workers, or NULL.
178     arena* arena_in_need(arena* prev);
179 
180     template <typename Pred>
enforce(Pred pred,const char * msg)181     static void enforce (Pred pred, const char* msg) {
182         suppress_unused_warning(pred, msg);
183 #if TBB_USE_ASSERT
184         global_market_mutex_type::scoped_lock lock(theMarketMutex);
185         __TBB_ASSERT(pred(), msg);
186 #endif
187     }
188 
189     ////////////////////////////////////////////////////////////////////////////////
190     // Helpers to unify code branches dependent on priority feature presence
191 
192     arena* select_next_arena( arena* hint );
193 
194     void insert_arena_into_list ( arena& a );
195 
196     void remove_arena_from_list ( arena& a );
197 
198     arena* arena_in_need ( arena_list_type* arenas, arena* hint );
199 
200     int update_allotment ( arena_list_type* arenas, int total_demand, int max_workers );
201 
202     bool is_arena_in_list( arena_list_type& arenas, arena* a );
203 
204     bool is_arena_alive( arena* a );
205 
206     ////////////////////////////////////////////////////////////////////////////////
207     // Implementation of rml::tbb_client interface methods
208 
version()209     version_type version () const override { return 0; }
210 
max_job_count()211     unsigned max_job_count () const override { return my_num_workers_hard_limit; }
212 
min_stack_size()213     std::size_t min_stack_size () const override { return worker_stack_size(); }
214 
215     job* create_one_job () override;
216 
217     void cleanup( job& j ) override;
218 
219     void acknowledge_close_connection () override;
220 
221     void process( job& j ) override;
222 
223 public:
224     //! Factory method creating new market object
225     static market& global_market( bool is_public, unsigned max_num_workers = 0, std::size_t stack_size = 0 );
226 
227     //! Add reference to market if theMarket exists
228     static bool add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned max_num_workers = 0, std::size_t stack_size = 0 );
229 
230     //! Creates an arena object
231     /** If necessary, also creates global market instance, and boosts its ref count.
232         Each call to create_arena() must be matched by the call to arena::free_arena(). **/
233     static arena* create_arena ( int num_slots, int num_reserved_slots,
234                                  unsigned arena_index, std::size_t stack_size );
235 
236     //! Removes the arena from the market's list
237     void try_destroy_arena ( arena*, uintptr_t aba_epoch, unsigned pririty_level );
238 
239     //! Removes the arena from the market's list
240     void detach_arena ( arena& );
241 
242     //! Decrements market's refcount and destroys it in the end
243     bool release ( bool is_public, bool blocking_terminate );
244 
245     //! Return wait list
get_wait_list()246     market_concurrent_monitor& get_wait_list() { return my_sleep_monitor; }
247 
248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
249     //! Imlpementation of mandatory concurrency enabling
250     void enable_mandatory_concurrency_impl ( arena *a );
251 
252     //! Inform the external thread that there is an arena with mandatory concurrency
253     void enable_mandatory_concurrency ( arena *a );
254 
255     //! Inform the external thread that the arena is no more interested in mandatory concurrency
256     void disable_mandatory_concurrency_impl(arena* a);
257 
258     //! Inform the external thread that the arena is no more interested in mandatory concurrency
259     void mandatory_concurrency_disable ( arena *a );
260 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
261 
262     //! Request that arena's need in workers should be adjusted.
263     /** Concurrent invocations are possible only on behalf of different arenas. **/
264     void adjust_demand ( arena&, int delta, bool mandatory );
265 
266     //! Used when RML asks for join mode during workers termination.
must_join_workers()267     bool must_join_workers () const { return my_join_workers; }
268 
269     //! Returns the requested stack size of worker threads.
worker_stack_size()270     std::size_t worker_stack_size () const { return my_stack_size; }
271 
272     //! Set number of active workers
273     static void set_active_num_workers( unsigned w );
274 
275     //! Reports active parallelism level according to user's settings
276     static unsigned app_parallelism_limit();
277 
278 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
279     //! Reports if any active global lifetime references are present
280     static unsigned is_lifetime_control_present();
281 #endif
282 
283     //! Finds all contexts affected by the state change and propagates the new state to them.
284     /** The propagation is relayed to the market because tasks created by one
285         external thread can be passed to and executed by other external threads. This means
286         that context trees can span several arenas at once and thus state change
287         propagation cannot be generally localized to one arena only. **/
288     template <typename T>
289     bool propagate_task_group_state (std::atomic<T> d1::task_group_context::*mptr_state, d1::task_group_context& src, T new_state );
290 
291     //! List of registered external threads
292     thread_data_list_type my_masters;
293 
294     //! Array of pointers to the registered workers
295     /** Used by cancellation propagation mechanism.
296         Must be the last data member of the class market. **/
297     std::atomic<thread_data*> my_workers[1];
298 
max_num_workers()299     static unsigned max_num_workers() {
300         global_market_mutex_type::scoped_lock lock( theMarketMutex );
301         return theMarket? theMarket->my_num_workers_hard_limit : 0;
302     }
303 
304     void add_external_thread(thread_data& td);
305 
306     void remove_external_thread(thread_data& td);
307 }; // class market
308 
309 } // namespace r1
310 } // namespace detail
311 } // namespace tbb
312 
313 #if defined(_MSC_VER) && defined(_Wp64)
314     // Workaround for overzealous compiler warnings in /Wp64 mode
315     #pragma warning (pop)
316 #endif // warning 4244 is back
317 
318 #endif /* _TBB_market_H */
319