1 //  Copyright (c) 2017-2018 John Biddiscombe
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_RUNTIME_THREADS_POLICIES_SHARED_PRIORITY_QUEUE_SCHEDULER)
7 #define HPX_RUNTIME_THREADS_POLICIES_SHARED_PRIORITY_QUEUE_SCHEDULER
8 
9 #include <hpx/config.hpp>
10 #include <hpx/compat/mutex.hpp>
11 #include <hpx/runtime/threads/policies/lockfree_queue_backends.hpp>
12 #include <hpx/runtime/threads/policies/queue_helpers.hpp>
13 #include <hpx/runtime/threads/policies/scheduler_base.hpp>
14 #include <hpx/runtime/threads/policies/thread_queue.hpp>
15 #include <hpx/runtime/threads/thread_data.hpp>
16 #include <hpx/runtime/threads/topology.hpp>
17 #include <hpx/runtime/threads_fwd.hpp>
18 #include <hpx/runtime/threads/detail/thread_num_tss.hpp>
19 #include <hpx/throw_exception.hpp>
20 #include <hpx/util/assert.hpp>
21 #include <hpx/util/logging.hpp>
22 #include <hpx/util_fwd.hpp>
23 
24 #include <array>
25 #include <cstddef>
26 #include <cstdint>
27 #include <exception>
28 #include <memory>
29 #include <string>
30 #include <numeric>
31 #include <type_traits>
32 
33 #include <hpx/config/warnings_prefix.hpp>
34 
35 namespace hpx {
36 namespace threads {
37 namespace policies {
38     ///////////////////////////////////////////////////////////////////////////
39     /// The shared_priority_queue_scheduler maintains a set of high, normal, and
40     /// low priority queues. For each priority level there is a core/queue ratio
41     /// which determines how many cores share a single queue. If the high
42     /// priority core/queue ratio is 4 the first 4 cores will share a single
43     /// high priority queue, the next 4 will share another one and so on. In
44     /// addition, the shared_priority_queue_scheduler is NUMA-aware and takes
45     /// NUMA scheduling hints into account when creating and scheduling work.
46     template <typename Mutex = compat::mutex,
47         typename PendingQueuing = lockfree_fifo,
48         typename StagedQueuing = lockfree_fifo,
49         typename TerminatedQueuing = lockfree_lifo>
50     class shared_priority_queue_scheduler : public scheduler_base
51     {
52     protected:
53         // The maximum number of active threads this thread manager should
54         // create. This number will be a constraint only as long as the work
55         // items queue is not empty. Otherwise the number of active threads
56         // will be incremented in steps equal to the \a min_add_new_count
57         // specified above.
58         // FIXME: this is specified both here, and in thread_queue.
59         enum
60         {
61             max_thread_count = 1000
62         };
63 
64     public:
65         typedef std::false_type has_periodic_maintenance;
66 
67         typedef thread_queue<Mutex, PendingQueuing, StagedQueuing,
68             TerminatedQueuing>
69             thread_queue_type;
70 
shared_priority_queue_scheduler(std::size_t num_worker_threads,core_ratios cores_per_queue,char const * description,int max_tasks=max_thread_count)71         shared_priority_queue_scheduler(
72             std::size_t num_worker_threads,
73             core_ratios cores_per_queue,
74             char const* description,
75             int max_tasks = max_thread_count)
76           : scheduler_base(num_worker_threads, description)
77           , cores_per_queue_(cores_per_queue)
78           , max_queue_thread_count_(max_tasks)
79           , num_workers_(num_worker_threads)
80           , num_domains_(1)
81           , initialized_(false)
82         {
83             HPX_ASSERT(num_worker_threads != 0);
84         }
85 
~shared_priority_queue_scheduler()86         virtual ~shared_priority_queue_scheduler() {}
87 
numa_sensitive() const88         bool numa_sensitive() const override { return true; }
has_thread_stealing() const89         virtual bool has_thread_stealing() const override { return true; }
90 
get_scheduler_name()91         static std::string get_scheduler_name()
92         {
93             return "shared_priority_queue_scheduler";
94         }
95 
96 #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
get_creation_time(bool reset)97         std::uint64_t get_creation_time(bool reset)
98         {
99             std::uint64_t time = 0;
100 
101             for (std::size_t d = 0; d < num_domains_; ++d) {
102                 for (auto &queue : lp_queues_[d].queues_) {
103                     time += queue->get_creation_time(reset);
104                 }
105 
106                 for (auto &queue : np_queues_[d].queues_) {
107                     time += queue->get_creation_time(reset);
108                 }
109 
110                 for (auto &queue : hp_queues_[d].queues_) {
111                     time += queue->get_creation_time(reset);
112                 }
113             }
114 
115             return time;
116         }
117 
get_cleanup_time(bool reset)118         std::uint64_t get_cleanup_time(bool reset)
119         {
120             std::uint64_t time = 0;
121 
122             for (std::size_t d = 0; d < num_domains_; ++d) {
123                 for (auto &queue : lp_queues_[d].queues_) {
124                     time += queue->get_cleanup_time(reset);
125                 }
126 
127                 for (auto &queue : np_queues_[d].queues_) {
128                     time += queue->get_cleanup_time(reset);
129                 }
130 
131                 for (auto &queue : hp_queues_[d].queues_) {
132                     time += queue->get_cleanup_time(reset);
133                 }
134             }
135 
136             return time;
137         }
138 #endif
139 
140 #ifdef HPX_HAVE_THREAD_STEALING_COUNTS
get_num_pending_misses(std::size_t num_thread,bool reset)141         std::int64_t get_num_pending_misses(
142             std::size_t num_thread, bool reset) override
143         {
144             std::int64_t num_pending_misses = 0;
145 
146             if (num_thread == std::size_t(-1))
147             {
148                 for (std::size_t d = 0; d < num_domains_; ++d) {
149                     for (auto &queue : lp_queues_[d].queues_) {
150                         num_pending_misses += queue->get_num_pending_misses(
151                             reset);
152                     }
153 
154                     for (auto &queue : np_queues_[d].queues_) {
155                         num_pending_misses += queue->get_num_pending_misses(
156                             reset);
157                     }
158 
159                     for (auto &queue : hp_queues_[d].queues_) {
160                         num_pending_misses += queue->get_num_pending_misses(
161                             reset);
162                     }
163                 }
164 
165                 return num_pending_misses;
166             }
167 
168             std::size_t domain_num = d_lookup_[num_thread];
169 
170             num_pending_misses +=
171                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
172                     get_num_pending_misses(reset);
173 
174             num_pending_misses +=
175                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
176                     get_num_pending_misses(reset);
177 
178             num_pending_misses +=
179                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
180                     get_num_pending_misses(reset);
181 
182             return num_pending_misses;
183         }
184 
get_num_pending_accesses(std::size_t num_thread,bool reset)185         std::int64_t get_num_pending_accesses(
186             std::size_t num_thread, bool reset) override
187         {
188             std::int64_t num_pending_accesses = 0;
189 
190             if (num_thread == std::size_t(-1))
191             {
192                 for (std::size_t d = 0; d < num_domains_; ++d) {
193                     for (auto &queue : lp_queues_[d].queues_) {
194                         num_pending_accesses += queue->get_num_pending_accesses(
195                             reset);
196                     }
197 
198                     for (auto &queue : np_queues_[d].queues_) {
199                         num_pending_accesses += queue->get_num_pending_accesses(
200                             reset);
201                     }
202 
203                     for (auto &queue : hp_queues_[d].queues_) {
204                         num_pending_accesses += queue->get_num_pending_accesses(
205                             reset);
206                     }
207                 }
208 
209                 return num_pending_accesses;
210             }
211 
212             std::size_t domain_num = d_lookup_[num_thread];
213 
214             num_pending_accesses +=
215                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
216                     get_num_pending_accesses(reset);
217 
218             num_pending_accesses +=
219                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
220                     get_num_pending_accesses(reset);
221 
222             num_pending_accesses +=
223                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
224                     get_num_pending_accesses(reset);
225 
226             return num_pending_accesses;
227         }
228 
get_num_stolen_from_pending(std::size_t num_thread,bool reset)229         std::int64_t get_num_stolen_from_pending(
230             std::size_t num_thread, bool reset) override
231         {
232             std::int64_t num_stolen_threads = 0;
233 
234             if (num_thread == std::size_t(-1))
235             {
236                 for (std::size_t d = 0; d < num_domains_; ++d) {
237                     for (auto &queue : lp_queues_[d].queues_) {
238                         num_stolen_threads += queue->get_num_stolen_from_pending(
239                             reset);
240                     }
241 
242                     for (auto &queue : np_queues_[d].queues_) {
243                         num_stolen_threads += queue->get_num_stolen_from_pending(
244                             reset);
245                     }
246 
247                     for (auto &queue : hp_queues_[d].queues_) {
248                         num_stolen_threads += queue->get_num_stolen_from_pending(
249                             reset);
250                     }
251                 }
252 
253                 return num_stolen_threads;
254             }
255 
256             std::size_t domain_num = d_lookup_[num_thread];
257 
258             num_stolen_threads +=
259                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
260                     get_num_stolen_from_pending(reset);
261 
262             num_stolen_threads +=
263                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
264                     get_num_stolen_from_pending(reset);
265 
266             num_stolen_threads +=
267                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
268                     get_num_stolen_from_pending(reset);
269 
270             return num_stolen_threads;
271         }
272 
get_num_stolen_to_pending(std::size_t num_thread,bool reset)273         std::int64_t get_num_stolen_to_pending(
274             std::size_t num_thread, bool reset) override
275         {
276             std::int64_t num_stolen_threads = 0;
277 
278             if (num_thread == std::size_t(-1))
279             {
280                 for (std::size_t d = 0; d < num_domains_; ++d) {
281                     for (auto &queue : lp_queues_[d].queues_) {
282                         num_stolen_threads += queue->get_num_stolen_to_pending(
283                             reset);
284                     }
285 
286                     for (auto &queue : np_queues_[d].queues_) {
287                         num_stolen_threads += queue->get_num_stolen_to_pending(
288                             reset);
289                     }
290 
291                     for (auto &queue : hp_queues_[d].queues_) {
292                         num_stolen_threads += queue->get_num_stolen_to_pending(
293                             reset);
294                     }
295                 }
296 
297                 return num_stolen_threads;
298             }
299 
300             std::size_t domain_num = d_lookup_[num_thread];
301 
302             num_stolen_threads +=
303                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
304                     get_num_stolen_to_pending(reset);
305 
306             num_stolen_threads +=
307                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
308                     get_num_stolen_to_pending(reset);
309 
310             num_stolen_threads +=
311                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
312                     get_num_stolen_to_pending(reset);
313 
314             return num_stolen_threads;
315         }
316 
get_num_stolen_from_staged(std::size_t num_thread,bool reset)317         std::int64_t get_num_stolen_from_staged(
318             std::size_t num_thread, bool reset) override
319         {
320             std::int64_t num_stolen_threads = 0;
321 
322             if (num_thread == std::size_t(-1))
323             {
324                 for (std::size_t d = 0; d < num_domains_; ++d) {
325                     for (auto &queue : lp_queues_[d].queues_) {
326                         num_stolen_threads += queue->get_num_stolen_from_staged(
327                             reset);
328                     }
329 
330                     for (auto &queue : np_queues_[d].queues_) {
331                         num_stolen_threads += queue->get_num_stolen_from_staged(
332                             reset);
333                     }
334 
335                     for (auto &queue : hp_queues_[d].queues_) {
336                         num_stolen_threads += queue->get_num_stolen_from_staged(
337                             reset);
338                     }
339                 }
340 
341                 return num_stolen_threads;
342             }
343 
344             std::size_t domain_num = d_lookup_[num_thread];
345 
346             num_stolen_threads +=
347                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
348                     get_num_stolen_from_staged(reset);
349 
350             num_stolen_threads +=
351                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
352                     get_num_stolen_from_staged(reset);
353 
354             num_stolen_threads +=
355                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
356                     get_num_stolen_from_staged(reset);
357 
358             return num_stolen_threads;
359         }
360 
get_num_stolen_to_staged(std::size_t num_thread,bool reset)361         std::int64_t get_num_stolen_to_staged(
362             std::size_t num_thread, bool reset) override
363         {
364             std::int64_t num_stolen_threads = 0;
365 
366             if (num_thread == std::size_t(-1))
367             {
368                 for (std::size_t d = 0; d < num_domains_; ++d) {
369                     for (auto &queue : lp_queues_[d].queues_) {
370                         num_stolen_threads += queue->get_num_stolen_to_staged(
371                             reset);
372                     }
373 
374                     for (auto &queue : np_queues_[d].queues_) {
375                         num_stolen_threads += queue->get_num_stolen_to_staged(
376                             reset);
377                     }
378 
379                     for (auto &queue : hp_queues_[d].queues_) {
380                         num_stolen_threads += queue->get_num_stolen_to_staged(
381                             reset);
382                     }
383                 }
384 
385                 return num_stolen_threads;
386             }
387 
388             std::size_t domain_num = d_lookup_[num_thread];
389 
390             num_stolen_threads +=
391                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
392                     get_num_stolen_to_staged(reset);
393 
394             num_stolen_threads +=
395                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
396                     get_num_stolen_to_staged(reset);
397 
398             num_stolen_threads +=
399                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
400                     get_num_stolen_to_staged(reset);
401 
402             return num_stolen_threads;
403         }
404 #endif
405 
406 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
407         ///////////////////////////////////////////////////////////////////////
408         // Queries the current average thread wait time of the queues.
get_average_thread_wait_time(std::size_t num_thread=std::size_t (-1)) const409         std::int64_t get_average_thread_wait_time(
410             std::size_t num_thread = std::size_t(-1)) const override
411         {
412             // Return average thread wait time of one specific queue.
413             std::uint64_t wait_time = 0;
414             std::uint64_t count = 0;
415 
416             if (num_thread == std::size_t(-1))
417             {
418                 for (std::size_t d = 0; d < num_domains_; ++d) {
419                     for (auto &queue : lp_queues_[d].queues_) {
420                         wait_time += queue->get_average_thread_wait_time();
421                         ++count;
422                     }
423 
424                     for (auto &queue : np_queues_[d].queues_) {
425                         wait_time += queue->get_average_thread_wait_time();
426                         ++count;
427                     }
428 
429                     for (auto &queue : hp_queues_[d].queues_) {
430                         wait_time += queue->get_average_thread_wait_time();
431                         ++count;
432                     }
433                 }
434 
435                 return wait_time / (count + 1);
436             }
437 
438             std::size_t domain_num = d_lookup_[num_thread];
439 
440             wait_time +=
441                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
442                     get_average_thread_wait_time();
443             ++count;
444 
445             wait_time +=
446                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
447                     get_average_thread_wait_time();
448             ++count;
449 
450             wait_time +=
451                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
452                     get_average_thread_wait_time();
453             ++count;
454 
455             return wait_time / (count + 1);
456         }
457 
458         ///////////////////////////////////////////////////////////////////////
459         // Queries the current average task wait time of the queues.
get_average_task_wait_time(std::size_t num_thread=std::size_t (-1)) const460         std::int64_t get_average_task_wait_time(
461             std::size_t num_thread = std::size_t(-1)) const override
462         {
463             // Return average task wait time of one specific queue.
464             std::uint64_t wait_time = 0;
465             std::uint64_t count = 0;
466 
467             if (num_thread == std::size_t(-1))
468             {
469                 for (std::size_t d = 0; d < num_domains_; ++d) {
470                     for (auto &queue : lp_queues_[d].queues_) {
471                         wait_time += queue->get_average_task_wait_time();
472                         ++count;
473                     }
474 
475                     for (auto &queue : np_queues_[d].queues_) {
476                         wait_time += queue->get_average_task_wait_time();
477                         ++count;
478                     }
479 
480                     for (auto &queue : hp_queues_[d].queues_) {
481                         wait_time += queue->get_average_task_wait_time();
482                         ++count;
483                     }
484                 }
485 
486                 return wait_time / (count + 1);
487             }
488 
489             std::size_t domain_num = d_lookup_[num_thread];
490 
491             wait_time +=
492                 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]->
493                     get_average_task_wait_time();
494             ++count;
495 
496             wait_time +=
497                 np_queues_[domain_num].queues_[np_lookup_[num_thread]]->
498                     get_average_task_wait_time();
499             ++count;
500 
501             wait_time +=
502                 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]->
503                     get_average_task_wait_time();
504             ++count;
505 
506             return wait_time / (count + 1);
507         }
508 #endif
509 
510         // ------------------------------------------------------------
abort_all_suspended_threads()511         void abort_all_suspended_threads() override
512         {
513             for (std::size_t d = 0; d < num_domains_; ++d) {
514                 for (auto &queue : lp_queues_[d].queues_) {
515                      queue->abort_all_suspended_threads();
516                 }
517 
518                 for (auto &queue : np_queues_[d].queues_) {
519                      queue->abort_all_suspended_threads();
520                 }
521 
522                 for (auto &queue : hp_queues_[d].queues_) {
523                      queue->abort_all_suspended_threads();
524                 }
525             }
526         }
527 
528         // ------------------------------------------------------------
cleanup_terminated(bool delete_all)529         bool cleanup_terminated(bool delete_all) override
530         {
531             bool empty = true;
532 
533             for (std::size_t d=0; d<num_domains_; ++d) {
534                 for (auto &queue : lp_queues_[d].queues_) {
535                      empty = queue->cleanup_terminated(delete_all) && empty;
536                 }
537 
538                 for (auto &queue : np_queues_[d].queues_) {
539                      empty = queue->cleanup_terminated(delete_all) && empty;
540                 }
541 
542                 for (auto &queue : hp_queues_[d].queues_) {
543                      empty = queue->cleanup_terminated(delete_all) && empty;
544                 }
545             }
546 
547             return empty;
548         }
549 
cleanup_terminated(std::size_t thread_num,bool delete_all)550         bool cleanup_terminated(
551             std::size_t thread_num, bool delete_all) override
552         {
553             if (thread_num == std::size_t(-1)) {
554                 HPX_THROW_EXCEPTION(bad_parameter,
555                     "shared_priority_queue_scheduler::cleanup_terminated",
556                     "Invalid thread number: " + std::to_string(thread_num));
557             }
558             bool empty = true;
559 
560             // find the numa domain from the local thread index
561             std::size_t domain_num = d_lookup_[thread_num];
562 
563             // cleanup the queues assigned to this thread
564             empty = hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]->
565                     cleanup_terminated(delete_all) && empty;
566             empty = np_queues_[domain_num].queues_[np_lookup_[thread_num]]->
567                     cleanup_terminated(delete_all) && empty;
568             empty = lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]->
569                     cleanup_terminated(delete_all) && empty;
570             return empty;
571         }
572 
573         ///////////////////////////////////////////////////////////////////////
574         // create a new thread and schedule it if the initial state
575         // is equal to pending
create_thread(thread_init_data & data,thread_id_type * thrd,thread_state_enum initial_state,bool run_now,error_code & ec)576         void create_thread(thread_init_data& data, thread_id_type* thrd,
577             thread_state_enum initial_state, bool run_now, error_code& ec) override
578         {
579             // safety check that task was created by this thread/scheduler
580             HPX_ASSERT(data.scheduler_base == this);
581 
582             std::size_t thread_num = 0;
583             std::size_t domain_num = 0;
584             std::size_t q_index = std::size_t(-1);
585 
586             std::unique_lock<pu_mutex_type> l;
587 
588             using threads::thread_schedule_hint_mode;
589 
590             switch (data.schedulehint.mode) {
591             case thread_schedule_hint_mode::thread_schedule_hint_mode_none:
592             {
593                 // Create thread on this worker thread if possible
594                 std::size_t global_thread_num =
595                     threads::detail::thread_num_tss_.get_worker_thread_num();
596                 thread_num = this->global_to_local_thread_index(global_thread_num);
597                 if (thread_num>=num_workers_) {
598                     // This is a task being injected from a thread on another pool.
599                     // Reset thread_num to first queue.
600                     thread_num = 0;
601                 }
602                 thread_num = select_active_pu(l, thread_num);
603                 domain_num     = d_lookup_[thread_num];
604                 q_index        = q_lookup_[thread_num];
605                 break;
606             }
607             case thread_schedule_hint_mode::thread_schedule_hint_mode_thread:
608             {
609                 // Create thread on requested worker thread
610                 thread_num = select_active_pu(l, data.schedulehint.hint);
611                 domain_num = d_lookup_[thread_num];
612                 q_index    = q_lookup_[thread_num];
613                 break;
614             }
615             case thread_schedule_hint_mode::thread_schedule_hint_mode_numa:
616             {
617                 // Create thread on requested NUMA domain
618 
619                 // TODO: This case does not handle suspended PUs.
620                 domain_num = data.schedulehint.hint % num_domains_;
621                 // if the thread creating the new task is on the domain
622                 // assigned to the new task - try to reuse the core as well
623                 std::size_t global_thread_num =
624                     threads::detail::thread_num_tss_.get_worker_thread_num();
625                 thread_num = this->global_to_local_thread_index(global_thread_num);
626                 if (d_lookup_[thread_num] == domain_num) {
627                     q_index = q_lookup_[thread_num];
628                 }
629                 else {
630                     q_index = counters_[domain_num]++;
631                 }
632                 break;
633             }
634             default:
635                 HPX_THROW_EXCEPTION(bad_parameter,
636                     "shared_priority_queue_scheduler::create_thread",
637                     "Invalid schedule hint mode: " +
638                     std::to_string(data.schedulehint.mode));
639             }
640 
641             // create the thread using priority to select queue
642             if (data.priority == thread_priority_high ||
643                 data.priority == thread_priority_high_recursive ||
644                 data.priority == thread_priority_boost)
645             {
646                 // boosted threads return to normal after being queued
647                 if (data.priority == thread_priority_boost)
648                 {
649                     data.priority = thread_priority_normal;
650                 }
651 
652                 hp_queues_[domain_num].queues_[hp_lookup_[
653                     q_index % hp_queues_[domain_num].num_cores]]->
654                     create_thread(data, thrd, initial_state, run_now, ec);
655 
656                 return;
657             }
658 
659             if (data.priority == thread_priority_low)
660             {
661                 lp_queues_[domain_num].queues_[lp_lookup_[
662                     q_index % lp_queues_[domain_num].num_cores]]->
663                     create_thread(data, thrd, initial_state, run_now, ec);
664 
665                 return;
666             }
667 
668             // normal priority
669             np_queues_[domain_num].queues_[np_lookup_[
670                 q_index % np_queues_[domain_num].num_cores]]->
671                 create_thread(data, thrd, initial_state, run_now, ec);
672         }
673 
674         /// Return the next thread to be executed, return false if none available
get_next_thread(std::size_t thread_num,bool running,std::int64_t & idle_loop_count,threads::thread_data * & thrd)675         virtual bool get_next_thread(std::size_t thread_num,
676             bool running, std::int64_t& idle_loop_count,
677             threads::thread_data*& thrd) override
678         {
679             bool result = false;
680 
681             if (thread_num == std::size_t(-1)) {
682                 HPX_THROW_EXCEPTION(bad_parameter,
683                     "shared_priority_queue_scheduler::get_next_thread",
684                     "Invalid thread number: " + std::to_string(thread_num));
685             }
686 
687             // find the numa domain from the local thread index
688             std::size_t domain_num = d_lookup_[thread_num];
689 
690             // is there a high priority task, take first from our numa domain
691             // and then try to steal from others
692             for (std::size_t d=0; d<num_domains_; ++d) {
693                 std::size_t dom = (domain_num+d) % num_domains_;
694                 // set the preferred queue for this domain, if applicable
695                 std::size_t q_index = q_lookup_[thread_num];
696                 // get next task, steal if from another domain
697                 result = hp_queues_[dom].get_next_thread(q_index, thrd);
698                 if (result) break;
699             }
700 
701             // try a normal priority task
702             if (!result) {
703                 for (std::size_t d=0; d<num_domains_; ++d) {
704                     std::size_t dom = (domain_num+d) % num_domains_;
705                     // set the preferred queue for this domain, if applicable
706                     std::size_t q_index = q_lookup_[thread_num];
707                     // get next task, steal if from another domain
708                     result = np_queues_[dom].get_next_thread(q_index, thrd);
709                     if (result) break;
710                 }
711             }
712 
713             // low priority task
714             if (!result) {
715 #ifdef JB_LP_STEALING
716                 for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) {
717                     std::size_t dom = d % num_domains_;
718                     // set the preferred queue for this domain, if applicable
719                     std::size_t q_index = (dom==domain_num) ?
720                         q_lookup_[thread_num] :
721                         lp_lookup_[(counters_[dom]++ %
722                             lp_queues_[dom].num_cores)];
723 
724                     result = lp_queues_[dom].get_next_thread(q_index, thrd);
725                     if (result) break;
726                 }
727 #else
728                 // no cross domain stealing for LP queues
729                 result = lp_queues_[domain_num].get_next_thread(0, thrd);
730 #endif
731             }
732             if (result)
733             {
734                 HPX_ASSERT(thrd->get_scheduler_base() == this);
735             }
736             return result;
737         }
738 
739         /// Schedule the passed thread
schedule_thread(threads::thread_data * thrd,threads::thread_schedule_hint schedulehint,bool allow_fallback,thread_priority priority=thread_priority_normal)740         void schedule_thread(threads::thread_data* thrd,
741             threads::thread_schedule_hint schedulehint,
742             bool allow_fallback,
743             thread_priority priority = thread_priority_normal) override
744         {
745             HPX_ASSERT(thrd->get_scheduler_base() == this);
746 
747             std::size_t thread_num = 0;
748             std::size_t domain_num = 0;
749             std::size_t q_index = std::size_t(-1);
750 
751             std::unique_lock<pu_mutex_type> l;
752 
753             using threads::thread_schedule_hint_mode;
754 
755             switch (schedulehint.mode) {
756             case thread_schedule_hint_mode::thread_schedule_hint_mode_none:
757             {
758                 // Create thread on this worker thread if possible
759                 std::size_t global_thread_num =
760                     threads::detail::thread_num_tss_.get_worker_thread_num();
761                 thread_num = this->global_to_local_thread_index(global_thread_num);
762                 if (thread_num>=num_workers_) {
763                     // This is a task being injected from a thread on another pool.
764                     // Reset thread_num to first queue.
765                     thread_num = 0;
766                 }
767                 thread_num = select_active_pu(l, thread_num, allow_fallback);
768                 domain_num     = d_lookup_[thread_num];
769                 q_index        = q_lookup_[thread_num];
770                 break;
771             }
772             case thread_schedule_hint_mode::thread_schedule_hint_mode_thread:
773             {
774                 // Create thread on requested worker thread
775                 thread_num = select_active_pu(l, schedulehint.hint,
776                     allow_fallback);
777                 domain_num = d_lookup_[thread_num];
778                 q_index    = q_lookup_[thread_num];
779                 break;
780             }
781             case thread_schedule_hint_mode::thread_schedule_hint_mode_numa:
782             {
783                 // Create thread on requested NUMA domain
784 
785                 // TODO: This case does not handle suspended PUs.
786                 domain_num = schedulehint.hint % num_domains_;
787                 // if the thread creating the new task is on the domain
788                 // assigned to the new task - try to reuse the core as well
789                 std::size_t global_thread_num =
790                     threads::detail::thread_num_tss_.get_worker_thread_num();
791                 thread_num = this->global_to_local_thread_index(global_thread_num);
792                 if (d_lookup_[thread_num] == domain_num) {
793                     q_index = q_lookup_[thread_num];
794                 }
795                 else {
796                     q_index = counters_[domain_num]++;
797                 }
798                 break;
799             }
800             default:
801                 HPX_THROW_EXCEPTION(bad_parameter,
802                     "shared_priority_queue_scheduler::schedule_thread",
803                     "Invalid schedule hint mode: " +
804                     std::to_string(schedulehint.mode));
805             }
806 
807             if (priority == thread_priority_high ||
808                 priority == thread_priority_high_recursive ||
809                 priority == thread_priority_boost)
810             {
811                 hp_queues_[domain_num].queues_[hp_lookup_[
812                     q_index % hp_queues_[domain_num].num_cores]]->
813                     schedule_thread(thrd, false);
814             }
815             else if (priority == thread_priority_low)
816             {
817                 lp_queues_[domain_num].queues_[lp_lookup_[
818                     q_index % lp_queues_[domain_num].num_cores]]->
819                     schedule_thread(thrd, false);
820             }
821             else
822             {
823                 np_queues_[domain_num].queues_[np_lookup_[
824                     q_index % np_queues_[domain_num].num_cores]]->
825                     schedule_thread(thrd, false);
826             }
827         }
828 
829         /// Put task on the back of the queue
schedule_thread_last(threads::thread_data * thrd,threads::thread_schedule_hint schedulehint,bool allow_fallback,thread_priority priority=thread_priority_normal)830         void schedule_thread_last(threads::thread_data* thrd,
831             threads::thread_schedule_hint schedulehint,
832             bool allow_fallback,
833             thread_priority priority = thread_priority_normal) override
834         {
835             HPX_ASSERT(thrd->get_scheduler_base() == this);
836 
837             std::size_t thread_num = 0;
838             std::size_t domain_num = 0;
839             std::size_t q_index = std::size_t(-1);
840 
841             std::unique_lock<pu_mutex_type> l;
842 
843             using threads::thread_schedule_hint_mode;
844 
845             switch (schedulehint.mode) {
846             case thread_schedule_hint_mode::thread_schedule_hint_mode_none:
847             {
848                 // Create thread on this worker thread if possible
849                 std::size_t global_thread_num =
850                     threads::detail::thread_num_tss_.get_worker_thread_num();
851                 thread_num = this->global_to_local_thread_index(global_thread_num);
852                 if (thread_num>=num_workers_) {
853                     // This is a task being injected from a thread on another pool.
854                     // Reset thread_num to first queue.
855                     thread_num = 0;
856                 }
857                 thread_num = select_active_pu(l, thread_num, allow_fallback);
858                 domain_num     = d_lookup_[thread_num];
859                 q_index        = q_lookup_[thread_num];
860                 break;
861             }
862             case thread_schedule_hint_mode::thread_schedule_hint_mode_thread:
863             {
864                 // Create thread on requested worker thread
865                 thread_num = select_active_pu(l, schedulehint.hint,
866                     allow_fallback);
867                 domain_num = d_lookup_[thread_num];
868                 q_index    = q_lookup_[thread_num];
869                 break;
870             }
871             case thread_schedule_hint_mode::thread_schedule_hint_mode_numa:
872             {
873                 // Create thread on requested NUMA domain
874 
875                 // TODO: This case does not handle suspended PUs.
876                 domain_num = schedulehint.hint % num_domains_;
877                 // if the thread creating the new task is on the domain
878                 // assigned to the new task - try to reuse the core as well
879                 std::size_t global_thread_num =
880                     threads::detail::thread_num_tss_.get_worker_thread_num();
881                 thread_num = this->global_to_local_thread_index(global_thread_num);
882                 if (d_lookup_[thread_num] == domain_num) {
883                     q_index = q_lookup_[thread_num];
884                 }
885                 else {
886                     q_index = counters_[domain_num]++;
887                 }
888                 break;
889             }
890             default:
891                 HPX_THROW_EXCEPTION(bad_parameter,
892                     "shared_priority_queue_scheduler::schedule_thread_last",
893                     "Invalid schedule hint mode: " +
894                     std::to_string(schedulehint.mode));
895             }
896 
897             if (priority == thread_priority_high ||
898                 priority == thread_priority_high_recursive ||
899                 priority == thread_priority_boost)
900             {
901                 hp_queues_[domain_num].queues_[hp_lookup_[
902                     q_index % hp_queues_[domain_num].num_cores]]->
903                     schedule_thread(thrd, true);
904             }
905             else if (priority == thread_priority_low)
906             {
907                 lp_queues_[domain_num].queues_[lp_lookup_[
908                     q_index % lp_queues_[domain_num].num_cores]]->
909                     schedule_thread(thrd, true);
910             }
911             else
912             {
913                 np_queues_[domain_num].queues_[np_lookup_[
914                     q_index % np_queues_[domain_num].num_cores]]->
915                     schedule_thread(thrd, true);
916             }
917         }
918 
919         //---------------------------------------------------------------------
920         // Destroy the passed thread - as it has been terminated
921         //---------------------------------------------------------------------
destroy_thread(threads::thread_data * thrd,std::int64_t & busy_count)922         void destroy_thread(
923             threads::thread_data* thrd, std::int64_t& busy_count) override
924         {
925             HPX_ASSERT(thrd->get_scheduler_base() == this);
926             thrd->get_queue<thread_queue_type>().destroy_thread(thrd, busy_count);
927         }
928 
929         ///////////////////////////////////////////////////////////////////////
930         // This returns the current length of the queues (work items and new
931         // items)
932         //---------------------------------------------------------------------
get_queue_length(std::size_t thread_num=std::size_t (-1)) const933         std::int64_t get_queue_length(
934             std::size_t thread_num = std::size_t(-1)) const override
935         {
936             std::int64_t count = 0;
937             for (std::size_t d=0; d<num_domains_; ++d) {
938                 count += hp_queues_[d].get_queue_length();
939                 count += np_queues_[d].get_queue_length();
940                 count += lp_queues_[d].get_queue_length();
941             }
942 
943             if (thread_num != std::size_t(-1)) {
944                 // find the numa domain from the local thread index
945                 std::size_t domain = d_lookup_[thread_num];
946                 // get next task, steal if from another domain
947                 std::int64_t result =
948                     hp_queues_[domain].queues_[hp_lookup_[thread_num]]->
949                       get_queue_length();
950                 if (result>0) return result;
951                 result =
952                     np_queues_[domain].queues_[np_lookup_[thread_num]]->
953                       get_queue_length();
954                 if (result>0) return result;
955                 return
956                     lp_queues_[domain].queues_[lp_lookup_[thread_num]]->
957                       get_queue_length();
958             }
959             return count;
960         }
961 
962         //---------------------------------------------------------------------
963         // Queries the current thread count of the queues.
964         //---------------------------------------------------------------------
get_thread_count(thread_state_enum state=unknown,thread_priority priority=thread_priority_default,std::size_t thread_num=std::size_t (-1),bool reset=false) const965         std::int64_t get_thread_count(thread_state_enum state = unknown,
966             thread_priority priority = thread_priority_default,
967             std::size_t thread_num = std::size_t(-1),
968             bool reset = false) const override
969         {
970             std::int64_t count = 0;
971 
972             // if a specific worker id was requested
973             if (thread_num != std::size_t(-1)) {
974                 std::size_t domain_num = d_lookup_[thread_num];
975                 //
976                 switch (priority) {
977                 case thread_priority_default: {
978                     count += hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]->
979                         get_thread_count(state);
980                     count += np_queues_[domain_num].queues_[np_lookup_[thread_num]]->
981                         get_thread_count(state);
982                     count += lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]->
983                         get_thread_count(state);
984                     return count;
985                 }
986                 case thread_priority_low: {
987                     count += lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]->
988                         get_thread_count(state);
989                     return count;
990                 }
991                 case thread_priority_normal: {
992                     count += np_queues_[domain_num].queues_[np_lookup_[thread_num]]->
993                         get_thread_count(state);
994                     return count;
995                 }
996                 case thread_priority_boost:
997                 case thread_priority_high:
998                 case thread_priority_high_recursive: {
999                     count += hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]->
1000                         get_thread_count(state);
1001                     return count;
1002                 }
1003                 default:
1004                 case thread_priority_unknown:
1005                     HPX_THROW_EXCEPTION(bad_parameter,
1006                         "shared_priority_queue_scheduler::get_thread_count",
1007                         "unknown thread priority (thread_priority_unknown)");
1008                     return 0;
1009                 }
1010             }
1011 
1012             switch (priority) {
1013             case thread_priority_default: {
1014                 for (std::size_t d=0; d<num_domains_; ++d) {
1015                     count += hp_queues_[d].get_thread_count(state);
1016                     count += np_queues_[d].get_thread_count(state);
1017                     count += lp_queues_[d].get_thread_count(state);
1018                 }
1019                 return count;
1020             }
1021             case thread_priority_low: {
1022                 for (std::size_t d=0; d<num_domains_; ++d) {
1023                     count += lp_queues_[d].get_thread_count(state);
1024                 }
1025                 return count;
1026             }
1027             case thread_priority_normal: {
1028                 for (std::size_t d=0; d<num_domains_; ++d) {
1029                     count += np_queues_[d].get_thread_count(state);
1030                 }
1031                 return count;
1032             }
1033             case thread_priority_boost:
1034             case thread_priority_high:
1035             case thread_priority_high_recursive: {
1036                 for (std::size_t d=0; d<num_domains_; ++d) {
1037                     count += hp_queues_[d].get_thread_count(state);
1038                 }
1039                 return count;
1040             }
1041             default:
1042             case thread_priority_unknown:
1043                 HPX_THROW_EXCEPTION(bad_parameter,
1044                     "shared_priority_queue_scheduler::get_thread_count",
1045                     "unknown thread priority (thread_priority_unknown)");
1046                 return 0;
1047             }
1048 
1049             return count;
1050         }
1051 
1052         ///////////////////////////////////////////////////////////////////////
1053         // Enumerate matching threads from all queues
enumerate_threads(util::function_nonser<bool (thread_id_type)> const & f,thread_state_enum state=unknown) const1054         bool enumerate_threads(
1055             util::function_nonser<bool(thread_id_type)> const& f,
1056             thread_state_enum state = unknown) const override
1057         {
1058             bool result = true;
1059 
1060             for (std::size_t d=0; d<num_domains_; ++d) {
1061                 result = result &&
1062                     hp_queues_[d].enumerate_threads(f, state);
1063                 result = result &&
1064                     np_queues_[d].enumerate_threads(f, state);
1065                 result = result &&
1066                     lp_queues_[d].enumerate_threads(f, state);
1067             }
1068             return result;
1069         }
1070 
1071         /// This is a function which gets called periodically by the thread
1072         /// manager to allow for maintenance tasks to be executed in the
1073         /// scheduler. Returns true if the OS thread calling this function
1074         /// has to be terminated (i.e. no more work has to be done).
wait_or_add_new(std::size_t thread_num,bool running,std::int64_t & idle_loop_count)1075         virtual bool wait_or_add_new(std::size_t thread_num,
1076             bool running, std::int64_t& idle_loop_count) override
1077         {
1078             std::size_t added = 0;
1079             bool result = true;
1080 
1081             if (thread_num == std::size_t(-1)) {
1082                 HPX_THROW_EXCEPTION(bad_parameter,
1083                     "shared_priority_queue_scheduler::wait_or_add_new",
1084                     "Invalid thread number: " + std::to_string(thread_num));
1085             }
1086 
1087             // find the numa domain from the local thread index
1088             std::size_t domain_num = d_lookup_[thread_num];
1089 
1090             // is there a high priority task, take first from our numa domain
1091             // and then try to steal from others
1092             for (std::size_t d=0; d<num_domains_; ++d) {
1093                 std::size_t dom = (domain_num+d) % num_domains_;
1094                 // set the preferred queue for this domain, if applicable
1095                 std::size_t q_index = q_lookup_[thread_num];
1096                 // get next task, steal if from another domain
1097                 result = hp_queues_[dom].wait_or_add_new(q_index, running,
1098                     idle_loop_count, added);
1099                 if (0 != added) return result;
1100             }
1101 
1102             // try a normal priority task
1103             if (!result) {
1104                 for (std::size_t d=0; d<num_domains_; ++d) {
1105                     std::size_t dom = (domain_num+d) % num_domains_;
1106                     // set the preferred queue for this domain, if applicable
1107                     std::size_t q_index = q_lookup_[thread_num];
1108                     // get next task, steal if from another domain
1109                     result = np_queues_[dom].wait_or_add_new(q_index, running,
1110                         idle_loop_count, added);
1111                     if (0 != added) return result;
1112                 }
1113             }
1114 
1115             // low priority task
1116             if (!result) {
1117 #ifdef JB_LP_STEALING
1118                 for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) {
1119                     std::size_t dom = d % num_domains_;
1120                     // set the preferred queue for this domain, if applicable
1121                     std::size_t q_index = (dom==domain_num) ?
1122                         q_lookup_[thread_num] :
1123                         lp_lookup_[(counters_[dom]++ %
1124                             lp_queues_[dom].num_cores)];
1125 
1126                     result = lp_queues_[dom].wait_or_add_new(q_index, running,
1127                         idle_loop_count, added);
1128                     if (0 != added) return result;
1129                 }
1130 #else
1131                 // no cross domain stealing for LP queues
1132                 result = lp_queues_[domain_num].wait_or_add_new(0, running,
1133                     idle_loop_count, added);
1134                 if (0 != added) return result;
1135 #endif
1136             }
1137 
1138             return result;
1139         }
1140 
1141         ///////////////////////////////////////////////////////////////////////
on_start_thread(std::size_t thread_num)1142         void on_start_thread(std::size_t thread_num) override
1143         {
1144             std::unique_lock<hpx::lcos::local::spinlock> lock(init_mutex);
1145             if (!initialized_)
1146             {
1147                 initialized_ = true;
1148 
1149                 auto &rp = resource::get_partitioner();
1150                 auto const& topo = rp.get_topology();
1151 
1152                 // For each worker thread, count which each numa domain they
1153                 // belong to and build lists of useful indexes/refs
1154                 num_domains_ = 1;
1155                 std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> q_counts_;
1156                 std::fill(d_lookup_.begin(), d_lookup_.end(), 0);
1157                 std::fill(q_lookup_.begin(), q_lookup_.end(), 0);
1158                 std::fill(q_counts_.begin(), q_counts_.end(), 0);
1159                 std::fill(counters_.begin(), counters_.end(), 0);
1160 
1161                 for (std::size_t local_id=0; local_id!=num_workers_; ++local_id)
1162                 {
1163                     std::size_t global_id = local_to_global_thread_index(local_id);
1164                     std::size_t pu_num = rp.get_pu_num(global_id);
1165                     std::size_t domain = topo.get_numa_node_number(pu_num);
1166                     d_lookup_[local_id] = domain;
1167                     num_domains_ = (std::max)(num_domains_, domain+1);
1168                 }
1169 
1170                 HPX_ASSERT(num_domains_ <= HPX_HAVE_MAX_NUMA_DOMAIN_COUNT);
1171 
1172                 for (std::size_t local_id=0; local_id!=num_workers_; ++local_id)
1173                 {
1174                     q_lookup_[local_id] = q_counts_[d_lookup_[local_id]]++;
1175                 }
1176 
1177                 // create queue sets for each numa domain
1178                 for (std::size_t i = 0; i < num_domains_; ++i)
1179                 {
1180                     std::size_t queues = (std::max)(
1181                         q_counts_[i] / cores_per_queue_.high_priority,
1182                         std::size_t(1));
1183                     hp_queues_[i].init(
1184                         q_counts_[i], queues, max_queue_thread_count_);
1185 
1186                     queues = (std::max)(
1187                         q_counts_[i] / cores_per_queue_.normal_priority,
1188                         std::size_t(1));
1189                     np_queues_[i].init(
1190                         q_counts_[i], queues, max_queue_thread_count_);
1191 
1192                     queues =
1193                         (std::max)(q_counts_[i] / cores_per_queue_.low_priority,
1194                             std::size_t(1));
1195                     lp_queues_[i].init(
1196                         q_counts_[i], queues, max_queue_thread_count_);
1197                 }
1198 
1199                 // create worker_id to queue lookups for each queue type
1200                 for (std::size_t local_id=0; local_id!=num_workers_; ++local_id)
1201                 {
1202                     hp_lookup_[local_id] = hp_queues_[d_lookup_[local_id]].
1203                         get_queue_index(q_lookup_[local_id]);
1204                     np_lookup_[local_id] = np_queues_[d_lookup_[local_id]].
1205                         get_queue_index(q_lookup_[local_id]);
1206                     lp_lookup_[local_id] = lp_queues_[d_lookup_[local_id]].
1207                         get_queue_index(q_lookup_[local_id]);
1208                 }
1209             }
1210 
1211             lock.unlock();
1212 
1213             std::size_t domain_num = d_lookup_[thread_num];
1214 
1215             // NOTE: This may call on_start_thread multiple times for a single
1216             // thread_queue.
1217             lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]->
1218                 on_start_thread(thread_num);
1219 
1220             np_queues_[domain_num].queues_[np_lookup_[thread_num]]->
1221                 on_start_thread(thread_num);
1222 
1223             hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]->
1224                 on_start_thread(thread_num);
1225         }
1226 
on_stop_thread(std::size_t thread_num)1227         void on_stop_thread(std::size_t thread_num) override
1228         {
1229             if (thread_num>num_workers_) {
1230                 HPX_THROW_EXCEPTION(bad_parameter,
1231                     "shared_priority_queue_scheduler::on_stop_thread",
1232                     "Invalid thread number: " + std::to_string(thread_num));
1233             }
1234 
1235             std::size_t domain_num = d_lookup_[thread_num];
1236 
1237             // NOTE: This may call on_stop_thread multiple times for a single
1238             // thread_queue.
1239             lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]->
1240                 on_stop_thread(thread_num);
1241 
1242             np_queues_[domain_num].queues_[np_lookup_[thread_num]]->
1243                 on_stop_thread(thread_num);
1244 
1245             hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]->
1246                 on_stop_thread(thread_num);
1247         }
1248 
on_error(std::size_t thread_num,std::exception_ptr const & e)1249         void on_error(
1250             std::size_t thread_num, std::exception_ptr const& e) override
1251         {
1252             if (thread_num>num_workers_) {
1253                 HPX_THROW_EXCEPTION(bad_parameter,
1254                     "shared_priority_queue_scheduler::on_error",
1255                     "Invalid thread number: " + std::to_string(thread_num));
1256             }
1257 
1258             std::size_t domain_num = d_lookup_[thread_num];
1259 
1260             // NOTE: This may call on_error multiple times for a single
1261             // thread_queue.
1262             lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]->
1263                 on_error(thread_num, e);
1264 
1265             np_queues_[domain_num].queues_[np_lookup_[thread_num]]->
1266                 on_error(thread_num, e);
1267 
1268             hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]->
1269                 on_error(thread_num, e);
1270         }
1271 
reset_thread_distribution()1272         void reset_thread_distribution() override
1273         {
1274             std::fill(counters_.begin(), counters_.end(), 0);
1275         }
1276 
1277     protected:
1278         typedef queue_holder<thread_queue_type> numa_queues;
1279 
1280         std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> np_queues_;
1281         std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> hp_queues_;
1282         std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> lp_queues_;
1283         std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> counters_;
1284 
1285         // lookup domain from local worker index
1286         std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> d_lookup_;
1287 
1288         // index of queue on domain from local worker index
1289         std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> hp_lookup_;
1290         std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> np_lookup_;
1291         std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> lp_lookup_;
1292 
1293         // lookup sub domain queue index from local worker index
1294         std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> q_lookup_;
1295 
1296         // number of cores per queue for HP, NP, LP queues
1297         core_ratios cores_per_queue_;
1298 
1299         // max storage size of any queue
1300         std::size_t max_queue_thread_count_;
1301 
1302         // number of worker threads assigned to this pool
1303         std::size_t num_workers_;
1304 
1305         // number of numa domains that the threads are occupying
1306         std::size_t num_domains_;
1307 
1308         // used to make sure the scheduler is only initialized once on a thread
1309         bool initialized_;
1310         hpx::lcos::local::spinlock init_mutex;
1311     };
1312 }}}
1313 
1314 #include <hpx/config/warnings_suffix.hpp>
1315 
1316 #endif
1317