1 // Copyright (c) 2013 Thomas Heller 2 // Copyright (c) 2007-2017 Hartmut Kaiser 3 // Copyright (c) 2011 Bryce Lelbach 4 // 5 // Distributed under the Boost Software License, Version 1.0. (See accompanying 6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 7 8 #if !defined(HPX_THREADMANAGER_SCHEDULING_STATIC_PRIOTITY_QUEUE_HPP) 9 #define HPX_THREADMANAGER_SCHEDULING_STATIC_PRIOTITY_QUEUE_HPP 10 11 #include <hpx/config.hpp> 12 13 #if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER) 14 #include <hpx/compat/mutex.hpp> 15 #include <hpx/runtime/threads/policies/lockfree_queue_backends.hpp> 16 #include <hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp> 17 #include <hpx/runtime/threads_fwd.hpp> 18 #include <hpx/util/assert.hpp> 19 20 #include <cstddef> 21 #include <cstdint> 22 #include <string> 23 24 #include <hpx/config/warnings_prefix.hpp> 25 26 /////////////////////////////////////////////////////////////////////////////// 27 namespace hpx { namespace threads { namespace policies 28 { 29 /////////////////////////////////////////////////////////////////////////// 30 /// The static_priority_queue_scheduler maintains exactly one queue of work 31 /// items (threads) per OS thread, where this OS thread pulls its next work 32 /// from. Additionally it maintains separate queues: several for high 33 /// priority threads and one for low priority threads. 34 /// High priority threads are executed by the first N OS threads before any 35 /// other work is executed. Low priority threads are executed by the last 36 /// OS thread whenever no other work is available. 37 /// This scheduler does not do any work stealing. 38 template <typename Mutex = compat::mutex, 39 typename PendingQueuing = lockfree_fifo, 40 typename StagedQueuing = lockfree_fifo, 41 typename TerminatedQueuing = lockfree_lifo> 42 class HPX_EXPORT static_priority_queue_scheduler 43 : public local_priority_queue_scheduler< 44 Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing 45 > 46 { 47 public: 48 typedef local_priority_queue_scheduler< 49 Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing 50 > base_type; 51 52 typedef typename base_type::init_parameter_type 53 init_parameter_type; 54 static_priority_queue_scheduler(init_parameter_type const & init,bool deferred_initialization=true)55 static_priority_queue_scheduler(init_parameter_type const& init, 56 bool deferred_initialization = true) 57 : base_type(init, deferred_initialization) 58 {} 59 has_thread_stealing() const60 virtual bool has_thread_stealing() const override { return false; } 61 get_scheduler_name()62 static std::string get_scheduler_name() 63 { 64 return "static_priority_queue_scheduler"; 65 } 66 67 /// Return the next thread to be executed, return false if non is 68 /// available get_next_thread(std::size_t num_thread,bool running,std::int64_t & idle_loop_count,threads::thread_data * & thrd)69 bool get_next_thread(std::size_t num_thread, bool running, 70 std::int64_t& idle_loop_count, threads::thread_data*& thrd) override 71 { 72 std::size_t queues_size = this->queues_.size(); 73 74 typedef typename base_type::thread_queue_type thread_queue_type; 75 76 if (num_thread < this->high_priority_queues_.size()) 77 { 78 thread_queue_type* q = this->high_priority_queues_[num_thread]; 79 80 q->increment_num_pending_accesses(); 81 if (q->get_next_thread(thrd)) 82 return true; 83 q->increment_num_pending_misses(); 84 } 85 86 { 87 HPX_ASSERT(num_thread < queues_size); 88 thread_queue_type* q = this->queues_[num_thread]; 89 90 q->increment_num_pending_accesses(); 91 if (q->get_next_thread(thrd)) 92 return true; 93 q->increment_num_pending_misses(); 94 95 // Give up, we should have work to convert. 96 if (q->get_staged_queue_length(std::memory_order_relaxed) != 0) 97 return false; 98 } 99 100 // Limit access to the low priority queue to one worker thread 101 if ((queues_size - 1) == num_thread) 102 return this->low_priority_queue_.get_next_thread(thrd); 103 104 return false; 105 } 106 107 /// This is a function which gets called periodically by the thread 108 /// manager to allow for maintenance tasks to be executed in the 109 /// scheduler. Returns true if the OS thread calling this function 110 /// has to be terminated (i.e. no more work has to be done). wait_or_add_new(std::size_t num_thread,bool running,std::int64_t & idle_loop_count)111 bool wait_or_add_new(std::size_t num_thread, bool running, 112 std::int64_t& idle_loop_count) override 113 { 114 HPX_ASSERT(num_thread < this->queues_.size()); 115 116 std::size_t added = 0; 117 bool result = true; 118 119 if (num_thread < this->high_priority_queues_.size()) 120 { 121 result = this->high_priority_queues_[num_thread]-> 122 wait_or_add_new(running, idle_loop_count, added) && result; 123 if (0 != added) return result; 124 } 125 126 result = this->queues_[num_thread]->wait_or_add_new(running, 127 idle_loop_count, added) && result; 128 if (0 != added) return result; 129 130 // Check if we have been disabled 131 if (!running) 132 { 133 return true; 134 } 135 136 #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION 137 // no new work is available, are we deadlocked? 138 if (HPX_UNLIKELY(minimal_deadlock_detection && LHPX_ENABLED(error))) 139 { 140 bool suspended_only = true; 141 142 for (std::size_t i = 0; 143 suspended_only && i != this->queues_.size(); ++i) 144 { 145 suspended_only = this->queues_[i]->dump_suspended_threads( 146 i, idle_loop_count, running); 147 } 148 149 if (HPX_UNLIKELY(suspended_only)) { 150 if (running) { 151 LTM_(error) //-V128 152 << "queue(" << num_thread << "): " 153 << "no new work available, are we deadlocked?"; 154 } 155 else { 156 LHPX_CONSOLE_(hpx::util::logging::level::error) //-V128 157 << " [TM] queue(" << num_thread << "): " 158 << "no new work available, are we deadlocked?\n"; 159 } 160 } 161 } 162 #endif 163 164 result = this->low_priority_queue_.wait_or_add_new(running, 165 idle_loop_count, added) && result; 166 if (0 != added) return result; 167 168 return result; 169 } 170 }; 171 }}} 172 173 #include <hpx/config/warnings_suffix.hpp> 174 175 #endif 176 #endif 177 178