1 // Copyright (c) 2007-2017 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #include <hpx/error_code.hpp> 7 #include <hpx/runtime/config_entry.hpp> 8 #include <hpx/runtime/resource/detail/partitioner.hpp> 9 #include <hpx/runtime/threads/cpu_mask.hpp> 10 #include <hpx/runtime/threads/policies/affinity_data.hpp> 11 #include <hpx/runtime/threads/topology.hpp> 12 #include <hpx/util/assert.hpp> 13 #include <hpx/util/command_line_handling.hpp> 14 #include <hpx/util/format.hpp> 15 #include <hpx/util/safe_lexical_cast.hpp> 16 17 #include <algorithm> 18 #include <atomic> 19 #include <cstddef> 20 #include <string> 21 #include <vector> 22 23 namespace hpx { namespace detail 24 { 25 std::string get_affinity_domain(util::command_line_handling const& cfg); 26 std::size_t get_affinity_description(util::command_line_handling const& cfg, 27 std::string& affinity_desc); 28 }} 29 30 namespace hpx { namespace threads { namespace policies { namespace detail 31 { get_pu_offset(util::command_line_handling const & cfg)32 std::size_t get_pu_offset(util::command_line_handling const& cfg) 33 { 34 std::size_t pu_offset = std::size_t(-1); 35 36 if (cfg.pu_offset_ != std::size_t(-1)) 37 { 38 pu_offset = cfg.pu_offset_; 39 if (pu_offset >= hpx::threads::hardware_concurrency()) 40 { 41 throw hpx::detail::command_line_error( 42 "Invalid command line option " 43 "--hpx:pu-offset, value must be smaller than number of " 44 "available processing units."); 45 } 46 } 47 48 return pu_offset; 49 } 50 get_pu_step(util::command_line_handling const & cfg)51 std::size_t get_pu_step(util::command_line_handling const& cfg) 52 { 53 std::size_t pu_step = 1; 54 55 if (cfg.pu_step_ != 1) 56 { 57 pu_step = cfg.pu_step_; 58 if (pu_step == 0 || pu_step >= hpx::threads::hardware_concurrency()) 59 { 60 throw hpx::detail::command_line_error( 61 "Invalid command line option " 62 "--hpx:pu-step, value must be non-zero and smaller than " 63 "number of available processing units."); 64 } 65 } 66 67 return pu_step; 68 } 69 count_initialized(std::vector<mask_type> const & masks)70 inline std::size_t count_initialized(std::vector<mask_type> const& masks) 71 { 72 std::size_t count = 0; 73 for (mask_cref_type m : masks) 74 { 75 if(threads::any(m)) 76 ++count; 77 } 78 return count; 79 } 80 affinity_data()81 affinity_data::affinity_data() 82 : num_threads_(0) 83 , pu_offset_(std::size_t(-1)) 84 , pu_step_(1) 85 , used_cores_(0) 86 , affinity_domain_("pu") 87 , affinity_masks_() 88 , pu_nums_() 89 , no_affinity_() 90 { 91 // allow only one affinity-data instance 92 if (instance_number_counter_++ >= 0) 93 { 94 throw std::runtime_error( 95 "Cannot instantiate more than one affinity data instance"); 96 } 97 } 98 ~affinity_data()99 affinity_data::~affinity_data() 100 { 101 --instance_number_counter_; 102 } 103 init(util::command_line_handling const & cfg_)104 std::size_t affinity_data::init(util::command_line_handling const& cfg_) 105 { 106 num_threads_ = cfg_.num_threads_; 107 std::size_t num_system_pus = hardware_concurrency(); 108 109 // initialize from command line 110 std::size_t pu_offset = get_pu_offset(cfg_); 111 if (pu_offset == std::size_t(-1)) 112 { 113 pu_offset_ = 0; 114 } 115 else 116 { 117 pu_offset_ = pu_offset; 118 } 119 120 if(num_system_pus > 1) 121 pu_step_ = get_pu_step(cfg_) % num_system_pus; 122 123 affinity_domain_ = hpx::detail::get_affinity_domain(cfg_); 124 pu_nums_.clear(); 125 126 std::size_t const used_cores = cfg_.rtcfg_.get_first_used_core(); 127 std::size_t max_cores = 128 hpx::util::safe_lexical_cast<std::size_t>( 129 get_config_entry("hpx.cores", 0), 0); 130 131 init_cached_pu_nums(num_system_pus); 132 133 auto const& topo = threads::create_topology(); 134 135 std::string affinity_desc; 136 hpx::detail::get_affinity_description(cfg_, affinity_desc); 137 if (affinity_desc == "none") 138 { 139 // don't use any affinity for any of the os-threads 140 threads::resize(no_affinity_, num_system_pus); 141 for (std::size_t i = 0; i != num_threads_; ++i) 142 threads::set(no_affinity_, get_pu_num(i)); 143 } 144 else if (!affinity_desc.empty()) 145 { 146 affinity_masks_.clear(); 147 affinity_masks_.resize(num_threads_, 0); 148 149 for (std::size_t i = 0; i != num_threads_; ++i) 150 threads::resize(affinity_masks_[i], num_system_pus); 151 152 parse_affinity_options(affinity_desc, affinity_masks_, 153 used_cores, max_cores, num_threads_, pu_nums_); 154 155 std::size_t num_initialized = count_initialized(affinity_masks_); 156 if (num_initialized != num_threads_) { 157 HPX_THROW_EXCEPTION(bad_parameter, 158 "affinity_data::affinity_data", 159 hpx::util::format( 160 "The number of OS threads requested " 161 "({1}) does not match the number of threads to " 162 "bind ({2})", num_threads_, num_initialized)); 163 } 164 } 165 else if (pu_offset == std::size_t(-1)) 166 { 167 // calculate the pu offset based on the used cores, but only if its 168 // not explicitly specified 169 for(std::size_t num_core = 0; num_core != used_cores; ++num_core) 170 { 171 pu_offset_ += topo.get_number_of_core_pus(num_core); 172 } 173 } 174 175 // correct used_cores from config data if appropriate 176 if (used_cores_ == 0) 177 { 178 used_cores_ = used_cores; 179 } 180 181 pu_offset_ %= num_system_pus; 182 183 std::vector<std::size_t> cores; 184 cores.reserve(num_threads_); 185 for (std::size_t i = 0; i != num_threads_; ++i) 186 { 187 std::size_t add_me = topo.get_core_number(get_pu_num(i)); 188 cores.push_back(add_me); 189 } 190 191 std::sort(cores.begin(), cores.end()); 192 std::vector<std::size_t>::iterator it = 193 std::unique(cores.begin(), cores.end()); 194 cores.erase(it, cores.end()); 195 196 std::size_t num_unique_cores = cores.size(); 197 return (std::max)(num_unique_cores, max_cores); 198 } 199 get_pu_mask(threads::topology const & topo,std::size_t global_thread_num) const200 mask_cref_type affinity_data::get_pu_mask(threads::topology const& topo, 201 std::size_t global_thread_num) const 202 { 203 // --hpx:bind=none disables all affinity 204 if (threads::test(no_affinity_, global_thread_num)) 205 { 206 static mask_type m = mask_type(); 207 return m; 208 } 209 210 // if we have individual, predefined affinity masks, return those 211 if (!affinity_masks_.empty()) 212 return affinity_masks_[global_thread_num]; 213 214 // otherwise return mask based on affinity domain 215 std::size_t pu_num = get_pu_num(global_thread_num); 216 if (0 == std::string("pu").find(affinity_domain_)) 217 { 218 // The affinity domain is 'processing unit', just convert the 219 // pu-number into a bit-mask. 220 return topo.get_thread_affinity_mask(pu_num); 221 } 222 if (0 == std::string("core").find(affinity_domain_)) 223 { 224 // The affinity domain is 'core', return a bit mask corresponding 225 // to all processing units of the core containing the given 226 // pu_num. 227 return topo.get_core_affinity_mask(pu_num); 228 } 229 if (0 == std::string("numa").find(affinity_domain_)) 230 { 231 // The affinity domain is 'numa', return a bit mask corresponding 232 // to all processing units of the NUMA domain containing the 233 // given pu_num. 234 return topo.get_numa_node_affinity_mask(pu_num); 235 } 236 237 // The affinity domain is 'machine', return a bit mask corresponding 238 // to all processing units of the machine. 239 HPX_ASSERT(0 == std::string("machine").find(affinity_domain_)); 240 return topo.get_machine_affinity_mask(); 241 } 242 get_used_pus_mask(threads::topology const & topo,std::size_t pu_num) const243 mask_type affinity_data::get_used_pus_mask(threads::topology const& topo, 244 std::size_t pu_num) const 245 { 246 mask_type ret = mask_type(); 247 threads::resize(ret, hardware_concurrency()); 248 249 // --hpx:bind=none disables all affinity 250 if (threads::test(no_affinity_, pu_num)) 251 { 252 threads::set(ret, pu_num); 253 return ret; 254 } 255 256 for (std::size_t thread_num = 0; thread_num < num_threads_; ++thread_num) 257 { 258 ret |= get_pu_mask(topo, thread_num); 259 } 260 261 return ret; 262 } 263 get_thread_occupancy(threads::topology const & topo,std::size_t pu_num) const264 std::size_t affinity_data::get_thread_occupancy( 265 threads::topology const& topo, std::size_t pu_num) const 266 { 267 std::size_t count = 0; 268 if (threads::test(no_affinity_, pu_num)) 269 { 270 ++count; 271 } 272 else 273 { 274 mask_type pu_mask = mask_type(); 275 276 threads::resize(pu_mask, hardware_concurrency()); 277 threads::set(pu_mask, pu_num); 278 279 for (std::size_t num_thread = 0; num_thread < num_threads_; ++num_thread) 280 { 281 mask_cref_type affinity_mask = get_pu_mask(topo, num_thread); 282 if (threads::any(pu_mask & affinity_mask)) 283 ++count; 284 } 285 } 286 return count; 287 } 288 289 // means of adding a processing unit after initialization add_punit(std::size_t virt_core,std::size_t thread_num)290 void affinity_data::add_punit(std::size_t virt_core, std::size_t thread_num) 291 { 292 std::size_t num_system_pus = hardware_concurrency(); 293 294 // initialize affinity_masks and set the mask for the given virt_core 295 if (affinity_masks_.empty()) 296 { 297 affinity_masks_.resize(num_threads_); 298 for (std::size_t i = 0; i != num_threads_; ++i) 299 threads::resize(affinity_masks_[i], num_system_pus); 300 } 301 threads::set(affinity_masks_[virt_core], thread_num); 302 303 // find first used pu, which is then stored as the pu_offset 304 std::size_t first_pu = std::size_t(-1); 305 for (std::size_t i = 0; i != num_threads_; ++i) 306 { 307 std::size_t first = threads::find_first(affinity_masks_[i]); 308 first_pu = (std::min)(first_pu, first); 309 } 310 if (first_pu != std::size_t(-1)) 311 pu_offset_ = first_pu; 312 313 init_cached_pu_nums(num_system_pus); 314 } 315 init_cached_pu_nums(std::size_t hardware_concurrency)316 void affinity_data::init_cached_pu_nums(std::size_t hardware_concurrency) 317 { 318 if (pu_nums_.empty()) 319 { 320 pu_nums_.resize(num_threads_); 321 for (std::size_t i = 0; i != num_threads_; ++i) 322 { 323 pu_nums_[i] = get_pu_num(i, hardware_concurrency); 324 } 325 } 326 } 327 get_pu_num(std::size_t num_thread,std::size_t hardware_concurrency) const328 std::size_t affinity_data::get_pu_num(std::size_t num_thread, 329 std::size_t hardware_concurrency) const 330 { 331 // The offset shouldn't be larger than the number of available 332 // processing units. 333 HPX_ASSERT(pu_offset_ < hardware_concurrency); 334 335 // The distance between assigned processing units shouldn't be zero 336 HPX_ASSERT(pu_step_ > 0 && pu_step_ <= hardware_concurrency); 337 338 // We 'scale' the thread number to compute the corresponding 339 // processing unit number. 340 // 341 // The base line processing unit number is computed from the given 342 // pu-offset and pu-step. 343 std::size_t num_pu = pu_offset_ + pu_step_ * num_thread; 344 345 // We add an additional offset, which allows to 'roll over' if the 346 // pu number would get larger than the number of available 347 // processing units. Note that it does not make sense to 'roll over' 348 // farther than the given pu-step. 349 std::size_t offset = (num_pu / hardware_concurrency) % pu_step_; 350 351 // The resulting pu number has to be smaller than the available 352 // number of processing units. 353 return (num_pu + offset) % hardware_concurrency; 354 } 355 356 std::atomic<int> affinity_data::instance_number_counter_(-1); 357 }}}} 358