1 //////////////////////////////////////////////////////////////////////////////// 2 // Copyright (c) 2017-2018 John Biddiscombe 3 // Copyright (c) 2007-2016 Hartmut Kaiser 4 // Copyright (c) 2011 Bryce Lelbach 5 // 6 // Distributed under the Boost Software License, Version 1.0. (See accompanying 7 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 8 //////////////////////////////////////////////////////////////////////////////// 9 10 #if !defined(HPX_F0153C92_99B1_4F31_8FA9_4208DB2F26CE) 11 #define HPX_F0153C92_99B1_4F31_8FA9_4208DB2F26CE 12 13 #include <hpx/config.hpp> 14 #include <hpx/runtime/threads/thread_data.hpp> 15 #include <hpx/util/logging.hpp> 16 #include <hpx/util/unused.hpp> 17 18 #include <cstddef> 19 #include <cstdint> 20 #include <vector> 21 22 /////////////////////////////////////////////////////////////////////////////// 23 namespace hpx { namespace threads { namespace policies 24 { 25 // Holds core/queue ratios used by schedulers. 26 struct core_ratios 27 { core_ratioshpx::threads::policies::core_ratios28 core_ratios(std::size_t high_priority, std::size_t normal_priority, 29 std::size_t low_priority) 30 : high_priority(high_priority), normal_priority(normal_priority), 31 low_priority(low_priority) {} 32 33 std::size_t high_priority; 34 std::size_t normal_priority; 35 std::size_t low_priority; 36 }; 37 38 // ---------------------------------------------------------------- 39 // Helper class to hold a set of queues. 40 // ---------------------------------------------------------------- 41 template <typename QueueType> 42 struct queue_holder 43 { inithpx::threads::policies::queue_holder44 void init(std::size_t cores, 45 std::size_t queues, 46 std::size_t max_tasks) 47 { 48 num_cores = cores; 49 num_queues = queues; 50 scale = num_cores==1 ? 0 51 : static_cast<double>(num_queues-1)/(num_cores-1); 52 // 53 queues_.resize(num_queues); 54 for (std::size_t i = 0; i < num_queues; ++i) { 55 queues_[i] = new QueueType(max_tasks); 56 } 57 } 58 59 // ---------------------------------------------------------------- ~queue_holderhpx::threads::policies::queue_holder60 ~queue_holder() 61 { 62 for(auto &q : queues_) delete q; 63 queues_.clear(); 64 } 65 66 // ---------------------------------------------------------------- get_queue_indexhpx::threads::policies::queue_holder67 inline std::size_t get_queue_index(std::size_t id) const 68 { 69 return static_cast<std::size_t>(0.5 + id*scale);; 70 } 71 72 // ---------------------------------------------------------------- get_next_threadhpx::threads::policies::queue_holder73 inline bool get_next_thread(std::size_t id, threads::thread_data*& thrd) 74 { 75 // loop over all queues and take one task, 76 // starting with the requested queue 77 // then stealing from any other one in the container 78 for (std::size_t i=0; i<num_queues; ++i) { 79 std::size_t q = (id + i) % num_queues; 80 if (queues_[q]->get_next_thread(thrd)) return true; 81 } 82 return false; 83 } 84 85 // ---------------------------------------------------------------- wait_or_add_newhpx::threads::policies::queue_holder86 inline bool wait_or_add_new(std::size_t id, bool running, 87 std::int64_t& idle_loop_count, std::size_t& added) 88 { 89 // loop over all queues and take one task, 90 // starting with the requested queue 91 // then stealing from any other one in the container 92 bool result = true; 93 for (std::size_t i=0; i<num_queues; ++i) { 94 std::size_t q = (id + i) % num_queues; 95 result = queues_[q]->wait_or_add_new(running, idle_loop_count, 96 added) && result; 97 if (0 != added) return result; 98 } 99 return result; 100 } 101 102 // ---------------------------------------------------------------- get_queue_lengthhpx::threads::policies::queue_holder103 inline std::size_t get_queue_length() const 104 { 105 std::size_t len = 0; 106 for (auto &q : queues_) len += q->get_queue_length(); 107 return len; 108 } 109 110 // ---------------------------------------------------------------- get_thread_counthpx::threads::policies::queue_holder111 inline std::size_t get_thread_count(thread_state_enum state = unknown) const 112 { 113 std::size_t len = 0; 114 for (auto &q : queues_) len += q->get_thread_count(state); 115 return len; 116 } 117 118 // ---------------------------------------------------------------- enumerate_threadshpx::threads::policies::queue_holder119 bool enumerate_threads(util::function_nonser<bool(thread_id_type)> const& f, 120 thread_state_enum state = unknown) const 121 { 122 bool result = true; 123 for (auto &q : queues_) result = result && q->enumerate_threads(f, state); 124 return result; 125 } 126 127 // ---------------------------------------------------------------- sizehpx::threads::policies::queue_holder128 inline std::size_t size() const { 129 return num_queues; 130 } 131 132 // ---------------------------------------------------------------- 133 std::size_t num_cores; 134 std::size_t num_queues; 135 double scale; 136 std::vector<QueueType*> queues_; 137 }; 138 139 struct add_new_tag {}; 140 141 #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION 142 /////////////////////////////////////////////////////////////////////////// 143 // We globally control whether to do minimal deadlock detection using this 144 // global bool variable. It will be set once by the runtime configuration 145 // startup code 146 extern bool minimal_deadlock_detection; 147 #endif 148 149 /////////////////////////////////////////////////////////////////////////////// 150 namespace detail 151 { 152 /////////////////////////////////////////////////////////////////////////// 153 // debug helper function, logs all suspended threads 154 // this returns true if all threads in the map are currently suspended 155 template <typename Map> 156 bool dump_suspended_threads(std::size_t num_thread, 157 Map& tm, std::int64_t& idle_loop_count, bool running) HPX_COLD; 158 159 template <typename Map> dump_suspended_threads(std::size_t num_thread,Map & tm,std::int64_t & idle_loop_count,bool running)160 bool dump_suspended_threads(std::size_t num_thread, 161 Map& tm, std::int64_t& idle_loop_count, bool running) 162 { 163 #ifndef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION 164 HPX_UNUSED(tm); 165 HPX_UNUSED(idle_loop_count); 166 HPX_UNUSED(running); //-V601 167 return false; 168 #else 169 if (!minimal_deadlock_detection) 170 return false; 171 172 // attempt to output possibly deadlocked threads occasionally only 173 if (HPX_LIKELY((idle_loop_count++ % HPX_IDLE_LOOP_COUNT_MAX) != 0)) 174 return false; 175 176 bool result = false; 177 bool collect_suspended = true; 178 179 bool logged_headline = false; 180 typename Map::const_iterator end = tm.end(); 181 for (typename Map::const_iterator it = tm.begin(); it != end; ++it) 182 { 183 threads::thread_data const* thrd = it->get(); 184 threads::thread_state_enum state = thrd->get_state().state(); 185 threads::thread_state_enum marked_state = thrd->get_marked_state(); 186 187 if (state != marked_state) { 188 // log each thread only once 189 if (!logged_headline) { 190 if (running) { 191 LTM_(error) //-V128 192 << "Listing suspended threads while queue (" 193 << num_thread << ") is empty:"; 194 } 195 else { 196 LHPX_CONSOLE_(hpx::util::logging::level::error) //-V128 197 << " [TM] Listing suspended threads while queue (" 198 << num_thread << ") is empty:\n"; 199 } 200 logged_headline = true; 201 } 202 203 if (running) { 204 LTM_(error) << "queue(" << num_thread << "): " //-V128 205 << get_thread_state_name(state) 206 << "(" << std::hex << std::setw(8) 207 << std::setfill('0') << (*it) 208 << "." << std::hex << std::setw(2) 209 << std::setfill('0') << thrd->get_thread_phase() 210 << "/" << std::hex << std::setw(8) 211 << std::setfill('0') << thrd->get_component_id() 212 << ")" 213 #ifdef HPX_HAVE_THREAD_PARENT_REFERENCE 214 << " P" << std::hex << std::setw(8) 215 << std::setfill('0') << thrd->get_parent_thread_id() 216 #endif 217 << ": " << thrd->get_description() 218 << ": " << thrd->get_lco_description(); 219 } 220 else { 221 LHPX_CONSOLE_(hpx::util::logging::level::error) << " [TM] " //-V128 222 << "queue(" << num_thread << "): " 223 << get_thread_state_name(state) 224 << "(" << std::hex << std::setw(8) 225 << std::setfill('0') << (*it) 226 << "." << std::hex << std::setw(2) 227 << std::setfill('0') << thrd->get_thread_phase() 228 << "/" << std::hex << std::setw(8) 229 << std::setfill('0') << thrd->get_component_id() 230 << ")" 231 #ifdef HPX_HAVE_THREAD_PARENT_REFERENCE 232 << " P" << std::hex << std::setw(8) 233 << std::setfill('0') << thrd->get_parent_thread_id() 234 #endif 235 << ": " << thrd->get_description() 236 << ": " << thrd->get_lco_description() << "\n"; 237 } 238 thrd->set_marked_state(state); 239 240 // result should be true if we found only suspended threads 241 if (collect_suspended) { 242 switch(state) { 243 case threads::suspended: 244 result = true; // at least one is suspended 245 break; 246 247 case threads::pending: 248 case threads::active: 249 result = false; // one is active, no deadlock (yet) 250 collect_suspended = false; 251 break; 252 253 default: 254 // If the thread is terminated we don't care too much 255 // anymore. 256 break; 257 } 258 } 259 } 260 } 261 return result; 262 #endif 263 } 264 } 265 266 }}} 267 268 #endif // HPX_F0153C92_99B1_4F31_8FA9_4208DB2F26CE 269 270