1 // Copyright (c) 2017-2018 John Biddiscombe 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #if !defined(HPX_RUNTIME_THREADS_POLICIES_SHARED_PRIORITY_QUEUE_SCHEDULER) 7 #define HPX_RUNTIME_THREADS_POLICIES_SHARED_PRIORITY_QUEUE_SCHEDULER 8 9 #include <hpx/config.hpp> 10 #include <hpx/compat/mutex.hpp> 11 #include <hpx/runtime/threads/policies/lockfree_queue_backends.hpp> 12 #include <hpx/runtime/threads/policies/queue_helpers.hpp> 13 #include <hpx/runtime/threads/policies/scheduler_base.hpp> 14 #include <hpx/runtime/threads/policies/thread_queue.hpp> 15 #include <hpx/runtime/threads/thread_data.hpp> 16 #include <hpx/runtime/threads/topology.hpp> 17 #include <hpx/runtime/threads_fwd.hpp> 18 #include <hpx/runtime/threads/detail/thread_num_tss.hpp> 19 #include <hpx/throw_exception.hpp> 20 #include <hpx/util/assert.hpp> 21 #include <hpx/util/logging.hpp> 22 #include <hpx/util_fwd.hpp> 23 24 #include <array> 25 #include <cstddef> 26 #include <cstdint> 27 #include <exception> 28 #include <memory> 29 #include <string> 30 #include <numeric> 31 #include <type_traits> 32 33 #include <hpx/config/warnings_prefix.hpp> 34 35 namespace hpx { 36 namespace threads { 37 namespace policies { 38 /////////////////////////////////////////////////////////////////////////// 39 /// The shared_priority_queue_scheduler maintains a set of high, normal, and 40 /// low priority queues. For each priority level there is a core/queue ratio 41 /// which determines how many cores share a single queue. If the high 42 /// priority core/queue ratio is 4 the first 4 cores will share a single 43 /// high priority queue, the next 4 will share another one and so on. In 44 /// addition, the shared_priority_queue_scheduler is NUMA-aware and takes 45 /// NUMA scheduling hints into account when creating and scheduling work. 46 template <typename Mutex = compat::mutex, 47 typename PendingQueuing = lockfree_fifo, 48 typename StagedQueuing = lockfree_fifo, 49 typename TerminatedQueuing = lockfree_lifo> 50 class shared_priority_queue_scheduler : public scheduler_base 51 { 52 protected: 53 // The maximum number of active threads this thread manager should 54 // create. This number will be a constraint only as long as the work 55 // items queue is not empty. Otherwise the number of active threads 56 // will be incremented in steps equal to the \a min_add_new_count 57 // specified above. 58 // FIXME: this is specified both here, and in thread_queue. 59 enum 60 { 61 max_thread_count = 1000 62 }; 63 64 public: 65 typedef std::false_type has_periodic_maintenance; 66 67 typedef thread_queue<Mutex, PendingQueuing, StagedQueuing, 68 TerminatedQueuing> 69 thread_queue_type; 70 shared_priority_queue_scheduler(std::size_t num_worker_threads,core_ratios cores_per_queue,char const * description,int max_tasks=max_thread_count)71 shared_priority_queue_scheduler( 72 std::size_t num_worker_threads, 73 core_ratios cores_per_queue, 74 char const* description, 75 int max_tasks = max_thread_count) 76 : scheduler_base(num_worker_threads, description) 77 , cores_per_queue_(cores_per_queue) 78 , max_queue_thread_count_(max_tasks) 79 , num_workers_(num_worker_threads) 80 , num_domains_(1) 81 , initialized_(false) 82 { 83 HPX_ASSERT(num_worker_threads != 0); 84 } 85 ~shared_priority_queue_scheduler()86 virtual ~shared_priority_queue_scheduler() {} 87 numa_sensitive() const88 bool numa_sensitive() const override { return true; } has_thread_stealing() const89 virtual bool has_thread_stealing() const override { return true; } 90 get_scheduler_name()91 static std::string get_scheduler_name() 92 { 93 return "shared_priority_queue_scheduler"; 94 } 95 96 #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES get_creation_time(bool reset)97 std::uint64_t get_creation_time(bool reset) 98 { 99 std::uint64_t time = 0; 100 101 for (std::size_t d = 0; d < num_domains_; ++d) { 102 for (auto &queue : lp_queues_[d].queues_) { 103 time += queue->get_creation_time(reset); 104 } 105 106 for (auto &queue : np_queues_[d].queues_) { 107 time += queue->get_creation_time(reset); 108 } 109 110 for (auto &queue : hp_queues_[d].queues_) { 111 time += queue->get_creation_time(reset); 112 } 113 } 114 115 return time; 116 } 117 get_cleanup_time(bool reset)118 std::uint64_t get_cleanup_time(bool reset) 119 { 120 std::uint64_t time = 0; 121 122 for (std::size_t d = 0; d < num_domains_; ++d) { 123 for (auto &queue : lp_queues_[d].queues_) { 124 time += queue->get_cleanup_time(reset); 125 } 126 127 for (auto &queue : np_queues_[d].queues_) { 128 time += queue->get_cleanup_time(reset); 129 } 130 131 for (auto &queue : hp_queues_[d].queues_) { 132 time += queue->get_cleanup_time(reset); 133 } 134 } 135 136 return time; 137 } 138 #endif 139 140 #ifdef HPX_HAVE_THREAD_STEALING_COUNTS get_num_pending_misses(std::size_t num_thread,bool reset)141 std::int64_t get_num_pending_misses( 142 std::size_t num_thread, bool reset) override 143 { 144 std::int64_t num_pending_misses = 0; 145 146 if (num_thread == std::size_t(-1)) 147 { 148 for (std::size_t d = 0; d < num_domains_; ++d) { 149 for (auto &queue : lp_queues_[d].queues_) { 150 num_pending_misses += queue->get_num_pending_misses( 151 reset); 152 } 153 154 for (auto &queue : np_queues_[d].queues_) { 155 num_pending_misses += queue->get_num_pending_misses( 156 reset); 157 } 158 159 for (auto &queue : hp_queues_[d].queues_) { 160 num_pending_misses += queue->get_num_pending_misses( 161 reset); 162 } 163 } 164 165 return num_pending_misses; 166 } 167 168 std::size_t domain_num = d_lookup_[num_thread]; 169 170 num_pending_misses += 171 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 172 get_num_pending_misses(reset); 173 174 num_pending_misses += 175 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 176 get_num_pending_misses(reset); 177 178 num_pending_misses += 179 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 180 get_num_pending_misses(reset); 181 182 return num_pending_misses; 183 } 184 get_num_pending_accesses(std::size_t num_thread,bool reset)185 std::int64_t get_num_pending_accesses( 186 std::size_t num_thread, bool reset) override 187 { 188 std::int64_t num_pending_accesses = 0; 189 190 if (num_thread == std::size_t(-1)) 191 { 192 for (std::size_t d = 0; d < num_domains_; ++d) { 193 for (auto &queue : lp_queues_[d].queues_) { 194 num_pending_accesses += queue->get_num_pending_accesses( 195 reset); 196 } 197 198 for (auto &queue : np_queues_[d].queues_) { 199 num_pending_accesses += queue->get_num_pending_accesses( 200 reset); 201 } 202 203 for (auto &queue : hp_queues_[d].queues_) { 204 num_pending_accesses += queue->get_num_pending_accesses( 205 reset); 206 } 207 } 208 209 return num_pending_accesses; 210 } 211 212 std::size_t domain_num = d_lookup_[num_thread]; 213 214 num_pending_accesses += 215 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 216 get_num_pending_accesses(reset); 217 218 num_pending_accesses += 219 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 220 get_num_pending_accesses(reset); 221 222 num_pending_accesses += 223 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 224 get_num_pending_accesses(reset); 225 226 return num_pending_accesses; 227 } 228 get_num_stolen_from_pending(std::size_t num_thread,bool reset)229 std::int64_t get_num_stolen_from_pending( 230 std::size_t num_thread, bool reset) override 231 { 232 std::int64_t num_stolen_threads = 0; 233 234 if (num_thread == std::size_t(-1)) 235 { 236 for (std::size_t d = 0; d < num_domains_; ++d) { 237 for (auto &queue : lp_queues_[d].queues_) { 238 num_stolen_threads += queue->get_num_stolen_from_pending( 239 reset); 240 } 241 242 for (auto &queue : np_queues_[d].queues_) { 243 num_stolen_threads += queue->get_num_stolen_from_pending( 244 reset); 245 } 246 247 for (auto &queue : hp_queues_[d].queues_) { 248 num_stolen_threads += queue->get_num_stolen_from_pending( 249 reset); 250 } 251 } 252 253 return num_stolen_threads; 254 } 255 256 std::size_t domain_num = d_lookup_[num_thread]; 257 258 num_stolen_threads += 259 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 260 get_num_stolen_from_pending(reset); 261 262 num_stolen_threads += 263 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 264 get_num_stolen_from_pending(reset); 265 266 num_stolen_threads += 267 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 268 get_num_stolen_from_pending(reset); 269 270 return num_stolen_threads; 271 } 272 get_num_stolen_to_pending(std::size_t num_thread,bool reset)273 std::int64_t get_num_stolen_to_pending( 274 std::size_t num_thread, bool reset) override 275 { 276 std::int64_t num_stolen_threads = 0; 277 278 if (num_thread == std::size_t(-1)) 279 { 280 for (std::size_t d = 0; d < num_domains_; ++d) { 281 for (auto &queue : lp_queues_[d].queues_) { 282 num_stolen_threads += queue->get_num_stolen_to_pending( 283 reset); 284 } 285 286 for (auto &queue : np_queues_[d].queues_) { 287 num_stolen_threads += queue->get_num_stolen_to_pending( 288 reset); 289 } 290 291 for (auto &queue : hp_queues_[d].queues_) { 292 num_stolen_threads += queue->get_num_stolen_to_pending( 293 reset); 294 } 295 } 296 297 return num_stolen_threads; 298 } 299 300 std::size_t domain_num = d_lookup_[num_thread]; 301 302 num_stolen_threads += 303 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 304 get_num_stolen_to_pending(reset); 305 306 num_stolen_threads += 307 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 308 get_num_stolen_to_pending(reset); 309 310 num_stolen_threads += 311 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 312 get_num_stolen_to_pending(reset); 313 314 return num_stolen_threads; 315 } 316 get_num_stolen_from_staged(std::size_t num_thread,bool reset)317 std::int64_t get_num_stolen_from_staged( 318 std::size_t num_thread, bool reset) override 319 { 320 std::int64_t num_stolen_threads = 0; 321 322 if (num_thread == std::size_t(-1)) 323 { 324 for (std::size_t d = 0; d < num_domains_; ++d) { 325 for (auto &queue : lp_queues_[d].queues_) { 326 num_stolen_threads += queue->get_num_stolen_from_staged( 327 reset); 328 } 329 330 for (auto &queue : np_queues_[d].queues_) { 331 num_stolen_threads += queue->get_num_stolen_from_staged( 332 reset); 333 } 334 335 for (auto &queue : hp_queues_[d].queues_) { 336 num_stolen_threads += queue->get_num_stolen_from_staged( 337 reset); 338 } 339 } 340 341 return num_stolen_threads; 342 } 343 344 std::size_t domain_num = d_lookup_[num_thread]; 345 346 num_stolen_threads += 347 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 348 get_num_stolen_from_staged(reset); 349 350 num_stolen_threads += 351 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 352 get_num_stolen_from_staged(reset); 353 354 num_stolen_threads += 355 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 356 get_num_stolen_from_staged(reset); 357 358 return num_stolen_threads; 359 } 360 get_num_stolen_to_staged(std::size_t num_thread,bool reset)361 std::int64_t get_num_stolen_to_staged( 362 std::size_t num_thread, bool reset) override 363 { 364 std::int64_t num_stolen_threads = 0; 365 366 if (num_thread == std::size_t(-1)) 367 { 368 for (std::size_t d = 0; d < num_domains_; ++d) { 369 for (auto &queue : lp_queues_[d].queues_) { 370 num_stolen_threads += queue->get_num_stolen_to_staged( 371 reset); 372 } 373 374 for (auto &queue : np_queues_[d].queues_) { 375 num_stolen_threads += queue->get_num_stolen_to_staged( 376 reset); 377 } 378 379 for (auto &queue : hp_queues_[d].queues_) { 380 num_stolen_threads += queue->get_num_stolen_to_staged( 381 reset); 382 } 383 } 384 385 return num_stolen_threads; 386 } 387 388 std::size_t domain_num = d_lookup_[num_thread]; 389 390 num_stolen_threads += 391 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 392 get_num_stolen_to_staged(reset); 393 394 num_stolen_threads += 395 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 396 get_num_stolen_to_staged(reset); 397 398 num_stolen_threads += 399 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 400 get_num_stolen_to_staged(reset); 401 402 return num_stolen_threads; 403 } 404 #endif 405 406 #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME 407 /////////////////////////////////////////////////////////////////////// 408 // Queries the current average thread wait time of the queues. get_average_thread_wait_time(std::size_t num_thread=std::size_t (-1)) const409 std::int64_t get_average_thread_wait_time( 410 std::size_t num_thread = std::size_t(-1)) const override 411 { 412 // Return average thread wait time of one specific queue. 413 std::uint64_t wait_time = 0; 414 std::uint64_t count = 0; 415 416 if (num_thread == std::size_t(-1)) 417 { 418 for (std::size_t d = 0; d < num_domains_; ++d) { 419 for (auto &queue : lp_queues_[d].queues_) { 420 wait_time += queue->get_average_thread_wait_time(); 421 ++count; 422 } 423 424 for (auto &queue : np_queues_[d].queues_) { 425 wait_time += queue->get_average_thread_wait_time(); 426 ++count; 427 } 428 429 for (auto &queue : hp_queues_[d].queues_) { 430 wait_time += queue->get_average_thread_wait_time(); 431 ++count; 432 } 433 } 434 435 return wait_time / (count + 1); 436 } 437 438 std::size_t domain_num = d_lookup_[num_thread]; 439 440 wait_time += 441 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 442 get_average_thread_wait_time(); 443 ++count; 444 445 wait_time += 446 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 447 get_average_thread_wait_time(); 448 ++count; 449 450 wait_time += 451 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 452 get_average_thread_wait_time(); 453 ++count; 454 455 return wait_time / (count + 1); 456 } 457 458 /////////////////////////////////////////////////////////////////////// 459 // Queries the current average task wait time of the queues. get_average_task_wait_time(std::size_t num_thread=std::size_t (-1)) const460 std::int64_t get_average_task_wait_time( 461 std::size_t num_thread = std::size_t(-1)) const override 462 { 463 // Return average task wait time of one specific queue. 464 std::uint64_t wait_time = 0; 465 std::uint64_t count = 0; 466 467 if (num_thread == std::size_t(-1)) 468 { 469 for (std::size_t d = 0; d < num_domains_; ++d) { 470 for (auto &queue : lp_queues_[d].queues_) { 471 wait_time += queue->get_average_task_wait_time(); 472 ++count; 473 } 474 475 for (auto &queue : np_queues_[d].queues_) { 476 wait_time += queue->get_average_task_wait_time(); 477 ++count; 478 } 479 480 for (auto &queue : hp_queues_[d].queues_) { 481 wait_time += queue->get_average_task_wait_time(); 482 ++count; 483 } 484 } 485 486 return wait_time / (count + 1); 487 } 488 489 std::size_t domain_num = d_lookup_[num_thread]; 490 491 wait_time += 492 lp_queues_[domain_num].queues_[lp_lookup_[num_thread]]-> 493 get_average_task_wait_time(); 494 ++count; 495 496 wait_time += 497 np_queues_[domain_num].queues_[np_lookup_[num_thread]]-> 498 get_average_task_wait_time(); 499 ++count; 500 501 wait_time += 502 hp_queues_[domain_num].queues_[hp_lookup_[num_thread]]-> 503 get_average_task_wait_time(); 504 ++count; 505 506 return wait_time / (count + 1); 507 } 508 #endif 509 510 // ------------------------------------------------------------ abort_all_suspended_threads()511 void abort_all_suspended_threads() override 512 { 513 for (std::size_t d = 0; d < num_domains_; ++d) { 514 for (auto &queue : lp_queues_[d].queues_) { 515 queue->abort_all_suspended_threads(); 516 } 517 518 for (auto &queue : np_queues_[d].queues_) { 519 queue->abort_all_suspended_threads(); 520 } 521 522 for (auto &queue : hp_queues_[d].queues_) { 523 queue->abort_all_suspended_threads(); 524 } 525 } 526 } 527 528 // ------------------------------------------------------------ cleanup_terminated(bool delete_all)529 bool cleanup_terminated(bool delete_all) override 530 { 531 bool empty = true; 532 533 for (std::size_t d=0; d<num_domains_; ++d) { 534 for (auto &queue : lp_queues_[d].queues_) { 535 empty = queue->cleanup_terminated(delete_all) && empty; 536 } 537 538 for (auto &queue : np_queues_[d].queues_) { 539 empty = queue->cleanup_terminated(delete_all) && empty; 540 } 541 542 for (auto &queue : hp_queues_[d].queues_) { 543 empty = queue->cleanup_terminated(delete_all) && empty; 544 } 545 } 546 547 return empty; 548 } 549 cleanup_terminated(std::size_t thread_num,bool delete_all)550 bool cleanup_terminated( 551 std::size_t thread_num, bool delete_all) override 552 { 553 if (thread_num == std::size_t(-1)) { 554 HPX_THROW_EXCEPTION(bad_parameter, 555 "shared_priority_queue_scheduler::cleanup_terminated", 556 "Invalid thread number: " + std::to_string(thread_num)); 557 } 558 bool empty = true; 559 560 // find the numa domain from the local thread index 561 std::size_t domain_num = d_lookup_[thread_num]; 562 563 // cleanup the queues assigned to this thread 564 empty = hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]-> 565 cleanup_terminated(delete_all) && empty; 566 empty = np_queues_[domain_num].queues_[np_lookup_[thread_num]]-> 567 cleanup_terminated(delete_all) && empty; 568 empty = lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]-> 569 cleanup_terminated(delete_all) && empty; 570 return empty; 571 } 572 573 /////////////////////////////////////////////////////////////////////// 574 // create a new thread and schedule it if the initial state 575 // is equal to pending create_thread(thread_init_data & data,thread_id_type * thrd,thread_state_enum initial_state,bool run_now,error_code & ec)576 void create_thread(thread_init_data& data, thread_id_type* thrd, 577 thread_state_enum initial_state, bool run_now, error_code& ec) override 578 { 579 // safety check that task was created by this thread/scheduler 580 HPX_ASSERT(data.scheduler_base == this); 581 582 std::size_t thread_num = 0; 583 std::size_t domain_num = 0; 584 std::size_t q_index = std::size_t(-1); 585 586 std::unique_lock<pu_mutex_type> l; 587 588 using threads::thread_schedule_hint_mode; 589 590 switch (data.schedulehint.mode) { 591 case thread_schedule_hint_mode::thread_schedule_hint_mode_none: 592 { 593 // Create thread on this worker thread if possible 594 std::size_t global_thread_num = 595 threads::detail::thread_num_tss_.get_worker_thread_num(); 596 thread_num = this->global_to_local_thread_index(global_thread_num); 597 if (thread_num>=num_workers_) { 598 // This is a task being injected from a thread on another pool. 599 // Reset thread_num to first queue. 600 thread_num = 0; 601 } 602 thread_num = select_active_pu(l, thread_num); 603 domain_num = d_lookup_[thread_num]; 604 q_index = q_lookup_[thread_num]; 605 break; 606 } 607 case thread_schedule_hint_mode::thread_schedule_hint_mode_thread: 608 { 609 // Create thread on requested worker thread 610 thread_num = select_active_pu(l, data.schedulehint.hint); 611 domain_num = d_lookup_[thread_num]; 612 q_index = q_lookup_[thread_num]; 613 break; 614 } 615 case thread_schedule_hint_mode::thread_schedule_hint_mode_numa: 616 { 617 // Create thread on requested NUMA domain 618 619 // TODO: This case does not handle suspended PUs. 620 domain_num = data.schedulehint.hint % num_domains_; 621 // if the thread creating the new task is on the domain 622 // assigned to the new task - try to reuse the core as well 623 std::size_t global_thread_num = 624 threads::detail::thread_num_tss_.get_worker_thread_num(); 625 thread_num = this->global_to_local_thread_index(global_thread_num); 626 if (d_lookup_[thread_num] == domain_num) { 627 q_index = q_lookup_[thread_num]; 628 } 629 else { 630 q_index = counters_[domain_num]++; 631 } 632 break; 633 } 634 default: 635 HPX_THROW_EXCEPTION(bad_parameter, 636 "shared_priority_queue_scheduler::create_thread", 637 "Invalid schedule hint mode: " + 638 std::to_string(data.schedulehint.mode)); 639 } 640 641 // create the thread using priority to select queue 642 if (data.priority == thread_priority_high || 643 data.priority == thread_priority_high_recursive || 644 data.priority == thread_priority_boost) 645 { 646 // boosted threads return to normal after being queued 647 if (data.priority == thread_priority_boost) 648 { 649 data.priority = thread_priority_normal; 650 } 651 652 hp_queues_[domain_num].queues_[hp_lookup_[ 653 q_index % hp_queues_[domain_num].num_cores]]-> 654 create_thread(data, thrd, initial_state, run_now, ec); 655 656 return; 657 } 658 659 if (data.priority == thread_priority_low) 660 { 661 lp_queues_[domain_num].queues_[lp_lookup_[ 662 q_index % lp_queues_[domain_num].num_cores]]-> 663 create_thread(data, thrd, initial_state, run_now, ec); 664 665 return; 666 } 667 668 // normal priority 669 np_queues_[domain_num].queues_[np_lookup_[ 670 q_index % np_queues_[domain_num].num_cores]]-> 671 create_thread(data, thrd, initial_state, run_now, ec); 672 } 673 674 /// Return the next thread to be executed, return false if none available get_next_thread(std::size_t thread_num,bool running,std::int64_t & idle_loop_count,threads::thread_data * & thrd)675 virtual bool get_next_thread(std::size_t thread_num, 676 bool running, std::int64_t& idle_loop_count, 677 threads::thread_data*& thrd) override 678 { 679 bool result = false; 680 681 if (thread_num == std::size_t(-1)) { 682 HPX_THROW_EXCEPTION(bad_parameter, 683 "shared_priority_queue_scheduler::get_next_thread", 684 "Invalid thread number: " + std::to_string(thread_num)); 685 } 686 687 // find the numa domain from the local thread index 688 std::size_t domain_num = d_lookup_[thread_num]; 689 690 // is there a high priority task, take first from our numa domain 691 // and then try to steal from others 692 for (std::size_t d=0; d<num_domains_; ++d) { 693 std::size_t dom = (domain_num+d) % num_domains_; 694 // set the preferred queue for this domain, if applicable 695 std::size_t q_index = q_lookup_[thread_num]; 696 // get next task, steal if from another domain 697 result = hp_queues_[dom].get_next_thread(q_index, thrd); 698 if (result) break; 699 } 700 701 // try a normal priority task 702 if (!result) { 703 for (std::size_t d=0; d<num_domains_; ++d) { 704 std::size_t dom = (domain_num+d) % num_domains_; 705 // set the preferred queue for this domain, if applicable 706 std::size_t q_index = q_lookup_[thread_num]; 707 // get next task, steal if from another domain 708 result = np_queues_[dom].get_next_thread(q_index, thrd); 709 if (result) break; 710 } 711 } 712 713 // low priority task 714 if (!result) { 715 #ifdef JB_LP_STEALING 716 for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) { 717 std::size_t dom = d % num_domains_; 718 // set the preferred queue for this domain, if applicable 719 std::size_t q_index = (dom==domain_num) ? 720 q_lookup_[thread_num] : 721 lp_lookup_[(counters_[dom]++ % 722 lp_queues_[dom].num_cores)]; 723 724 result = lp_queues_[dom].get_next_thread(q_index, thrd); 725 if (result) break; 726 } 727 #else 728 // no cross domain stealing for LP queues 729 result = lp_queues_[domain_num].get_next_thread(0, thrd); 730 #endif 731 } 732 if (result) 733 { 734 HPX_ASSERT(thrd->get_scheduler_base() == this); 735 } 736 return result; 737 } 738 739 /// Schedule the passed thread schedule_thread(threads::thread_data * thrd,threads::thread_schedule_hint schedulehint,bool allow_fallback,thread_priority priority=thread_priority_normal)740 void schedule_thread(threads::thread_data* thrd, 741 threads::thread_schedule_hint schedulehint, 742 bool allow_fallback, 743 thread_priority priority = thread_priority_normal) override 744 { 745 HPX_ASSERT(thrd->get_scheduler_base() == this); 746 747 std::size_t thread_num = 0; 748 std::size_t domain_num = 0; 749 std::size_t q_index = std::size_t(-1); 750 751 std::unique_lock<pu_mutex_type> l; 752 753 using threads::thread_schedule_hint_mode; 754 755 switch (schedulehint.mode) { 756 case thread_schedule_hint_mode::thread_schedule_hint_mode_none: 757 { 758 // Create thread on this worker thread if possible 759 std::size_t global_thread_num = 760 threads::detail::thread_num_tss_.get_worker_thread_num(); 761 thread_num = this->global_to_local_thread_index(global_thread_num); 762 if (thread_num>=num_workers_) { 763 // This is a task being injected from a thread on another pool. 764 // Reset thread_num to first queue. 765 thread_num = 0; 766 } 767 thread_num = select_active_pu(l, thread_num, allow_fallback); 768 domain_num = d_lookup_[thread_num]; 769 q_index = q_lookup_[thread_num]; 770 break; 771 } 772 case thread_schedule_hint_mode::thread_schedule_hint_mode_thread: 773 { 774 // Create thread on requested worker thread 775 thread_num = select_active_pu(l, schedulehint.hint, 776 allow_fallback); 777 domain_num = d_lookup_[thread_num]; 778 q_index = q_lookup_[thread_num]; 779 break; 780 } 781 case thread_schedule_hint_mode::thread_schedule_hint_mode_numa: 782 { 783 // Create thread on requested NUMA domain 784 785 // TODO: This case does not handle suspended PUs. 786 domain_num = schedulehint.hint % num_domains_; 787 // if the thread creating the new task is on the domain 788 // assigned to the new task - try to reuse the core as well 789 std::size_t global_thread_num = 790 threads::detail::thread_num_tss_.get_worker_thread_num(); 791 thread_num = this->global_to_local_thread_index(global_thread_num); 792 if (d_lookup_[thread_num] == domain_num) { 793 q_index = q_lookup_[thread_num]; 794 } 795 else { 796 q_index = counters_[domain_num]++; 797 } 798 break; 799 } 800 default: 801 HPX_THROW_EXCEPTION(bad_parameter, 802 "shared_priority_queue_scheduler::schedule_thread", 803 "Invalid schedule hint mode: " + 804 std::to_string(schedulehint.mode)); 805 } 806 807 if (priority == thread_priority_high || 808 priority == thread_priority_high_recursive || 809 priority == thread_priority_boost) 810 { 811 hp_queues_[domain_num].queues_[hp_lookup_[ 812 q_index % hp_queues_[domain_num].num_cores]]-> 813 schedule_thread(thrd, false); 814 } 815 else if (priority == thread_priority_low) 816 { 817 lp_queues_[domain_num].queues_[lp_lookup_[ 818 q_index % lp_queues_[domain_num].num_cores]]-> 819 schedule_thread(thrd, false); 820 } 821 else 822 { 823 np_queues_[domain_num].queues_[np_lookup_[ 824 q_index % np_queues_[domain_num].num_cores]]-> 825 schedule_thread(thrd, false); 826 } 827 } 828 829 /// Put task on the back of the queue schedule_thread_last(threads::thread_data * thrd,threads::thread_schedule_hint schedulehint,bool allow_fallback,thread_priority priority=thread_priority_normal)830 void schedule_thread_last(threads::thread_data* thrd, 831 threads::thread_schedule_hint schedulehint, 832 bool allow_fallback, 833 thread_priority priority = thread_priority_normal) override 834 { 835 HPX_ASSERT(thrd->get_scheduler_base() == this); 836 837 std::size_t thread_num = 0; 838 std::size_t domain_num = 0; 839 std::size_t q_index = std::size_t(-1); 840 841 std::unique_lock<pu_mutex_type> l; 842 843 using threads::thread_schedule_hint_mode; 844 845 switch (schedulehint.mode) { 846 case thread_schedule_hint_mode::thread_schedule_hint_mode_none: 847 { 848 // Create thread on this worker thread if possible 849 std::size_t global_thread_num = 850 threads::detail::thread_num_tss_.get_worker_thread_num(); 851 thread_num = this->global_to_local_thread_index(global_thread_num); 852 if (thread_num>=num_workers_) { 853 // This is a task being injected from a thread on another pool. 854 // Reset thread_num to first queue. 855 thread_num = 0; 856 } 857 thread_num = select_active_pu(l, thread_num, allow_fallback); 858 domain_num = d_lookup_[thread_num]; 859 q_index = q_lookup_[thread_num]; 860 break; 861 } 862 case thread_schedule_hint_mode::thread_schedule_hint_mode_thread: 863 { 864 // Create thread on requested worker thread 865 thread_num = select_active_pu(l, schedulehint.hint, 866 allow_fallback); 867 domain_num = d_lookup_[thread_num]; 868 q_index = q_lookup_[thread_num]; 869 break; 870 } 871 case thread_schedule_hint_mode::thread_schedule_hint_mode_numa: 872 { 873 // Create thread on requested NUMA domain 874 875 // TODO: This case does not handle suspended PUs. 876 domain_num = schedulehint.hint % num_domains_; 877 // if the thread creating the new task is on the domain 878 // assigned to the new task - try to reuse the core as well 879 std::size_t global_thread_num = 880 threads::detail::thread_num_tss_.get_worker_thread_num(); 881 thread_num = this->global_to_local_thread_index(global_thread_num); 882 if (d_lookup_[thread_num] == domain_num) { 883 q_index = q_lookup_[thread_num]; 884 } 885 else { 886 q_index = counters_[domain_num]++; 887 } 888 break; 889 } 890 default: 891 HPX_THROW_EXCEPTION(bad_parameter, 892 "shared_priority_queue_scheduler::schedule_thread_last", 893 "Invalid schedule hint mode: " + 894 std::to_string(schedulehint.mode)); 895 } 896 897 if (priority == thread_priority_high || 898 priority == thread_priority_high_recursive || 899 priority == thread_priority_boost) 900 { 901 hp_queues_[domain_num].queues_[hp_lookup_[ 902 q_index % hp_queues_[domain_num].num_cores]]-> 903 schedule_thread(thrd, true); 904 } 905 else if (priority == thread_priority_low) 906 { 907 lp_queues_[domain_num].queues_[lp_lookup_[ 908 q_index % lp_queues_[domain_num].num_cores]]-> 909 schedule_thread(thrd, true); 910 } 911 else 912 { 913 np_queues_[domain_num].queues_[np_lookup_[ 914 q_index % np_queues_[domain_num].num_cores]]-> 915 schedule_thread(thrd, true); 916 } 917 } 918 919 //--------------------------------------------------------------------- 920 // Destroy the passed thread - as it has been terminated 921 //--------------------------------------------------------------------- destroy_thread(threads::thread_data * thrd,std::int64_t & busy_count)922 void destroy_thread( 923 threads::thread_data* thrd, std::int64_t& busy_count) override 924 { 925 HPX_ASSERT(thrd->get_scheduler_base() == this); 926 thrd->get_queue<thread_queue_type>().destroy_thread(thrd, busy_count); 927 } 928 929 /////////////////////////////////////////////////////////////////////// 930 // This returns the current length of the queues (work items and new 931 // items) 932 //--------------------------------------------------------------------- get_queue_length(std::size_t thread_num=std::size_t (-1)) const933 std::int64_t get_queue_length( 934 std::size_t thread_num = std::size_t(-1)) const override 935 { 936 std::int64_t count = 0; 937 for (std::size_t d=0; d<num_domains_; ++d) { 938 count += hp_queues_[d].get_queue_length(); 939 count += np_queues_[d].get_queue_length(); 940 count += lp_queues_[d].get_queue_length(); 941 } 942 943 if (thread_num != std::size_t(-1)) { 944 // find the numa domain from the local thread index 945 std::size_t domain = d_lookup_[thread_num]; 946 // get next task, steal if from another domain 947 std::int64_t result = 948 hp_queues_[domain].queues_[hp_lookup_[thread_num]]-> 949 get_queue_length(); 950 if (result>0) return result; 951 result = 952 np_queues_[domain].queues_[np_lookup_[thread_num]]-> 953 get_queue_length(); 954 if (result>0) return result; 955 return 956 lp_queues_[domain].queues_[lp_lookup_[thread_num]]-> 957 get_queue_length(); 958 } 959 return count; 960 } 961 962 //--------------------------------------------------------------------- 963 // Queries the current thread count of the queues. 964 //--------------------------------------------------------------------- get_thread_count(thread_state_enum state=unknown,thread_priority priority=thread_priority_default,std::size_t thread_num=std::size_t (-1),bool reset=false) const965 std::int64_t get_thread_count(thread_state_enum state = unknown, 966 thread_priority priority = thread_priority_default, 967 std::size_t thread_num = std::size_t(-1), 968 bool reset = false) const override 969 { 970 std::int64_t count = 0; 971 972 // if a specific worker id was requested 973 if (thread_num != std::size_t(-1)) { 974 std::size_t domain_num = d_lookup_[thread_num]; 975 // 976 switch (priority) { 977 case thread_priority_default: { 978 count += hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]-> 979 get_thread_count(state); 980 count += np_queues_[domain_num].queues_[np_lookup_[thread_num]]-> 981 get_thread_count(state); 982 count += lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]-> 983 get_thread_count(state); 984 return count; 985 } 986 case thread_priority_low: { 987 count += lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]-> 988 get_thread_count(state); 989 return count; 990 } 991 case thread_priority_normal: { 992 count += np_queues_[domain_num].queues_[np_lookup_[thread_num]]-> 993 get_thread_count(state); 994 return count; 995 } 996 case thread_priority_boost: 997 case thread_priority_high: 998 case thread_priority_high_recursive: { 999 count += hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]-> 1000 get_thread_count(state); 1001 return count; 1002 } 1003 default: 1004 case thread_priority_unknown: 1005 HPX_THROW_EXCEPTION(bad_parameter, 1006 "shared_priority_queue_scheduler::get_thread_count", 1007 "unknown thread priority (thread_priority_unknown)"); 1008 return 0; 1009 } 1010 } 1011 1012 switch (priority) { 1013 case thread_priority_default: { 1014 for (std::size_t d=0; d<num_domains_; ++d) { 1015 count += hp_queues_[d].get_thread_count(state); 1016 count += np_queues_[d].get_thread_count(state); 1017 count += lp_queues_[d].get_thread_count(state); 1018 } 1019 return count; 1020 } 1021 case thread_priority_low: { 1022 for (std::size_t d=0; d<num_domains_; ++d) { 1023 count += lp_queues_[d].get_thread_count(state); 1024 } 1025 return count; 1026 } 1027 case thread_priority_normal: { 1028 for (std::size_t d=0; d<num_domains_; ++d) { 1029 count += np_queues_[d].get_thread_count(state); 1030 } 1031 return count; 1032 } 1033 case thread_priority_boost: 1034 case thread_priority_high: 1035 case thread_priority_high_recursive: { 1036 for (std::size_t d=0; d<num_domains_; ++d) { 1037 count += hp_queues_[d].get_thread_count(state); 1038 } 1039 return count; 1040 } 1041 default: 1042 case thread_priority_unknown: 1043 HPX_THROW_EXCEPTION(bad_parameter, 1044 "shared_priority_queue_scheduler::get_thread_count", 1045 "unknown thread priority (thread_priority_unknown)"); 1046 return 0; 1047 } 1048 1049 return count; 1050 } 1051 1052 /////////////////////////////////////////////////////////////////////// 1053 // Enumerate matching threads from all queues enumerate_threads(util::function_nonser<bool (thread_id_type)> const & f,thread_state_enum state=unknown) const1054 bool enumerate_threads( 1055 util::function_nonser<bool(thread_id_type)> const& f, 1056 thread_state_enum state = unknown) const override 1057 { 1058 bool result = true; 1059 1060 for (std::size_t d=0; d<num_domains_; ++d) { 1061 result = result && 1062 hp_queues_[d].enumerate_threads(f, state); 1063 result = result && 1064 np_queues_[d].enumerate_threads(f, state); 1065 result = result && 1066 lp_queues_[d].enumerate_threads(f, state); 1067 } 1068 return result; 1069 } 1070 1071 /// This is a function which gets called periodically by the thread 1072 /// manager to allow for maintenance tasks to be executed in the 1073 /// scheduler. Returns true if the OS thread calling this function 1074 /// has to be terminated (i.e. no more work has to be done). wait_or_add_new(std::size_t thread_num,bool running,std::int64_t & idle_loop_count)1075 virtual bool wait_or_add_new(std::size_t thread_num, 1076 bool running, std::int64_t& idle_loop_count) override 1077 { 1078 std::size_t added = 0; 1079 bool result = true; 1080 1081 if (thread_num == std::size_t(-1)) { 1082 HPX_THROW_EXCEPTION(bad_parameter, 1083 "shared_priority_queue_scheduler::wait_or_add_new", 1084 "Invalid thread number: " + std::to_string(thread_num)); 1085 } 1086 1087 // find the numa domain from the local thread index 1088 std::size_t domain_num = d_lookup_[thread_num]; 1089 1090 // is there a high priority task, take first from our numa domain 1091 // and then try to steal from others 1092 for (std::size_t d=0; d<num_domains_; ++d) { 1093 std::size_t dom = (domain_num+d) % num_domains_; 1094 // set the preferred queue for this domain, if applicable 1095 std::size_t q_index = q_lookup_[thread_num]; 1096 // get next task, steal if from another domain 1097 result = hp_queues_[dom].wait_or_add_new(q_index, running, 1098 idle_loop_count, added); 1099 if (0 != added) return result; 1100 } 1101 1102 // try a normal priority task 1103 if (!result) { 1104 for (std::size_t d=0; d<num_domains_; ++d) { 1105 std::size_t dom = (domain_num+d) % num_domains_; 1106 // set the preferred queue for this domain, if applicable 1107 std::size_t q_index = q_lookup_[thread_num]; 1108 // get next task, steal if from another domain 1109 result = np_queues_[dom].wait_or_add_new(q_index, running, 1110 idle_loop_count, added); 1111 if (0 != added) return result; 1112 } 1113 } 1114 1115 // low priority task 1116 if (!result) { 1117 #ifdef JB_LP_STEALING 1118 for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) { 1119 std::size_t dom = d % num_domains_; 1120 // set the preferred queue for this domain, if applicable 1121 std::size_t q_index = (dom==domain_num) ? 1122 q_lookup_[thread_num] : 1123 lp_lookup_[(counters_[dom]++ % 1124 lp_queues_[dom].num_cores)]; 1125 1126 result = lp_queues_[dom].wait_or_add_new(q_index, running, 1127 idle_loop_count, added); 1128 if (0 != added) return result; 1129 } 1130 #else 1131 // no cross domain stealing for LP queues 1132 result = lp_queues_[domain_num].wait_or_add_new(0, running, 1133 idle_loop_count, added); 1134 if (0 != added) return result; 1135 #endif 1136 } 1137 1138 return result; 1139 } 1140 1141 /////////////////////////////////////////////////////////////////////// on_start_thread(std::size_t thread_num)1142 void on_start_thread(std::size_t thread_num) override 1143 { 1144 std::unique_lock<hpx::lcos::local::spinlock> lock(init_mutex); 1145 if (!initialized_) 1146 { 1147 initialized_ = true; 1148 1149 auto &rp = resource::get_partitioner(); 1150 auto const& topo = rp.get_topology(); 1151 1152 // For each worker thread, count which each numa domain they 1153 // belong to and build lists of useful indexes/refs 1154 num_domains_ = 1; 1155 std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> q_counts_; 1156 std::fill(d_lookup_.begin(), d_lookup_.end(), 0); 1157 std::fill(q_lookup_.begin(), q_lookup_.end(), 0); 1158 std::fill(q_counts_.begin(), q_counts_.end(), 0); 1159 std::fill(counters_.begin(), counters_.end(), 0); 1160 1161 for (std::size_t local_id=0; local_id!=num_workers_; ++local_id) 1162 { 1163 std::size_t global_id = local_to_global_thread_index(local_id); 1164 std::size_t pu_num = rp.get_pu_num(global_id); 1165 std::size_t domain = topo.get_numa_node_number(pu_num); 1166 d_lookup_[local_id] = domain; 1167 num_domains_ = (std::max)(num_domains_, domain+1); 1168 } 1169 1170 HPX_ASSERT(num_domains_ <= HPX_HAVE_MAX_NUMA_DOMAIN_COUNT); 1171 1172 for (std::size_t local_id=0; local_id!=num_workers_; ++local_id) 1173 { 1174 q_lookup_[local_id] = q_counts_[d_lookup_[local_id]]++; 1175 } 1176 1177 // create queue sets for each numa domain 1178 for (std::size_t i = 0; i < num_domains_; ++i) 1179 { 1180 std::size_t queues = (std::max)( 1181 q_counts_[i] / cores_per_queue_.high_priority, 1182 std::size_t(1)); 1183 hp_queues_[i].init( 1184 q_counts_[i], queues, max_queue_thread_count_); 1185 1186 queues = (std::max)( 1187 q_counts_[i] / cores_per_queue_.normal_priority, 1188 std::size_t(1)); 1189 np_queues_[i].init( 1190 q_counts_[i], queues, max_queue_thread_count_); 1191 1192 queues = 1193 (std::max)(q_counts_[i] / cores_per_queue_.low_priority, 1194 std::size_t(1)); 1195 lp_queues_[i].init( 1196 q_counts_[i], queues, max_queue_thread_count_); 1197 } 1198 1199 // create worker_id to queue lookups for each queue type 1200 for (std::size_t local_id=0; local_id!=num_workers_; ++local_id) 1201 { 1202 hp_lookup_[local_id] = hp_queues_[d_lookup_[local_id]]. 1203 get_queue_index(q_lookup_[local_id]); 1204 np_lookup_[local_id] = np_queues_[d_lookup_[local_id]]. 1205 get_queue_index(q_lookup_[local_id]); 1206 lp_lookup_[local_id] = lp_queues_[d_lookup_[local_id]]. 1207 get_queue_index(q_lookup_[local_id]); 1208 } 1209 } 1210 1211 lock.unlock(); 1212 1213 std::size_t domain_num = d_lookup_[thread_num]; 1214 1215 // NOTE: This may call on_start_thread multiple times for a single 1216 // thread_queue. 1217 lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]-> 1218 on_start_thread(thread_num); 1219 1220 np_queues_[domain_num].queues_[np_lookup_[thread_num]]-> 1221 on_start_thread(thread_num); 1222 1223 hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]-> 1224 on_start_thread(thread_num); 1225 } 1226 on_stop_thread(std::size_t thread_num)1227 void on_stop_thread(std::size_t thread_num) override 1228 { 1229 if (thread_num>num_workers_) { 1230 HPX_THROW_EXCEPTION(bad_parameter, 1231 "shared_priority_queue_scheduler::on_stop_thread", 1232 "Invalid thread number: " + std::to_string(thread_num)); 1233 } 1234 1235 std::size_t domain_num = d_lookup_[thread_num]; 1236 1237 // NOTE: This may call on_stop_thread multiple times for a single 1238 // thread_queue. 1239 lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]-> 1240 on_stop_thread(thread_num); 1241 1242 np_queues_[domain_num].queues_[np_lookup_[thread_num]]-> 1243 on_stop_thread(thread_num); 1244 1245 hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]-> 1246 on_stop_thread(thread_num); 1247 } 1248 on_error(std::size_t thread_num,std::exception_ptr const & e)1249 void on_error( 1250 std::size_t thread_num, std::exception_ptr const& e) override 1251 { 1252 if (thread_num>num_workers_) { 1253 HPX_THROW_EXCEPTION(bad_parameter, 1254 "shared_priority_queue_scheduler::on_error", 1255 "Invalid thread number: " + std::to_string(thread_num)); 1256 } 1257 1258 std::size_t domain_num = d_lookup_[thread_num]; 1259 1260 // NOTE: This may call on_error multiple times for a single 1261 // thread_queue. 1262 lp_queues_[domain_num].queues_[lp_lookup_[thread_num]]-> 1263 on_error(thread_num, e); 1264 1265 np_queues_[domain_num].queues_[np_lookup_[thread_num]]-> 1266 on_error(thread_num, e); 1267 1268 hp_queues_[domain_num].queues_[hp_lookup_[thread_num]]-> 1269 on_error(thread_num, e); 1270 } 1271 reset_thread_distribution()1272 void reset_thread_distribution() override 1273 { 1274 std::fill(counters_.begin(), counters_.end(), 0); 1275 } 1276 1277 protected: 1278 typedef queue_holder<thread_queue_type> numa_queues; 1279 1280 std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> np_queues_; 1281 std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> hp_queues_; 1282 std::array<numa_queues, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> lp_queues_; 1283 std::array<std::size_t, HPX_HAVE_MAX_NUMA_DOMAIN_COUNT> counters_; 1284 1285 // lookup domain from local worker index 1286 std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> d_lookup_; 1287 1288 // index of queue on domain from local worker index 1289 std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> hp_lookup_; 1290 std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> np_lookup_; 1291 std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> lp_lookup_; 1292 1293 // lookup sub domain queue index from local worker index 1294 std::array<std::size_t, HPX_HAVE_MAX_CPU_COUNT> q_lookup_; 1295 1296 // number of cores per queue for HP, NP, LP queues 1297 core_ratios cores_per_queue_; 1298 1299 // max storage size of any queue 1300 std::size_t max_queue_thread_count_; 1301 1302 // number of worker threads assigned to this pool 1303 std::size_t num_workers_; 1304 1305 // number of numa domains that the threads are occupying 1306 std::size_t num_domains_; 1307 1308 // used to make sure the scheduler is only initialized once on a thread 1309 bool initialized_; 1310 hpx::lcos::local::spinlock init_mutex; 1311 }; 1312 }}} 1313 1314 #include <hpx/config/warnings_suffix.hpp> 1315 1316 #endif 1317