1 //  Copyright (c) 2007-2017 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/error_code.hpp>
7 #include <hpx/runtime/config_entry.hpp>
8 #include <hpx/runtime/resource/detail/partitioner.hpp>
9 #include <hpx/runtime/threads/cpu_mask.hpp>
10 #include <hpx/runtime/threads/policies/affinity_data.hpp>
11 #include <hpx/runtime/threads/topology.hpp>
12 #include <hpx/util/assert.hpp>
13 #include <hpx/util/command_line_handling.hpp>
14 #include <hpx/util/format.hpp>
15 #include <hpx/util/safe_lexical_cast.hpp>
16 
17 #include <algorithm>
18 #include <atomic>
19 #include <cstddef>
20 #include <string>
21 #include <vector>
22 
23 namespace hpx { namespace detail
24 {
25     std::string get_affinity_domain(util::command_line_handling const& cfg);
26     std::size_t get_affinity_description(util::command_line_handling const& cfg,
27         std::string& affinity_desc);
28 }}
29 
30 namespace hpx { namespace threads { namespace policies { namespace detail
31 {
get_pu_offset(util::command_line_handling const & cfg)32     std::size_t get_pu_offset(util::command_line_handling const& cfg)
33     {
34         std::size_t pu_offset = std::size_t(-1);
35 
36         if (cfg.pu_offset_ != std::size_t(-1))
37         {
38             pu_offset = cfg.pu_offset_;
39             if (pu_offset >= hpx::threads::hardware_concurrency())
40             {
41                 throw hpx::detail::command_line_error(
42                     "Invalid command line option "
43                     "--hpx:pu-offset, value must be smaller than number of "
44                     "available processing units.");
45             }
46         }
47 
48         return pu_offset;
49     }
50 
get_pu_step(util::command_line_handling const & cfg)51     std::size_t get_pu_step(util::command_line_handling const& cfg)
52     {
53         std::size_t pu_step = 1;
54 
55         if (cfg.pu_step_ != 1)
56         {
57             pu_step = cfg.pu_step_;
58             if (pu_step == 0 || pu_step >= hpx::threads::hardware_concurrency())
59             {
60                 throw hpx::detail::command_line_error(
61                     "Invalid command line option "
62                     "--hpx:pu-step, value must be non-zero and smaller than "
63                     "number of available processing units.");
64             }
65         }
66 
67         return pu_step;
68     }
69 
count_initialized(std::vector<mask_type> const & masks)70     inline std::size_t count_initialized(std::vector<mask_type> const& masks)
71     {
72         std::size_t count = 0;
73         for (mask_cref_type m : masks)
74         {
75             if(threads::any(m))
76                 ++count;
77         }
78         return count;
79     }
80 
affinity_data()81     affinity_data::affinity_data()
82       : num_threads_(0)
83       , pu_offset_(std::size_t(-1))
84       , pu_step_(1)
85       , used_cores_(0)
86       , affinity_domain_("pu")
87       , affinity_masks_()
88       , pu_nums_()
89       , no_affinity_()
90     {
91         // allow only one affinity-data instance
92         if (instance_number_counter_++ >= 0)
93         {
94             throw std::runtime_error(
95                 "Cannot instantiate more than one affinity data instance");
96         }
97     }
98 
~affinity_data()99     affinity_data::~affinity_data()
100     {
101         --instance_number_counter_;
102     }
103 
init(util::command_line_handling const & cfg_)104     std::size_t affinity_data::init(util::command_line_handling const& cfg_)
105     {
106         num_threads_ = cfg_.num_threads_;
107         std::size_t num_system_pus = hardware_concurrency();
108 
109         // initialize from command line
110         std::size_t pu_offset = get_pu_offset(cfg_);
111         if (pu_offset == std::size_t(-1))
112         {
113             pu_offset_ = 0;
114         }
115         else
116         {
117             pu_offset_ = pu_offset;
118         }
119 
120         if(num_system_pus > 1)
121             pu_step_ = get_pu_step(cfg_) % num_system_pus;
122 
123         affinity_domain_ = hpx::detail::get_affinity_domain(cfg_);
124         pu_nums_.clear();
125 
126         std::size_t const used_cores = cfg_.rtcfg_.get_first_used_core();
127         std::size_t max_cores =
128             hpx::util::safe_lexical_cast<std::size_t>(
129                 get_config_entry("hpx.cores", 0), 0);
130 
131         init_cached_pu_nums(num_system_pus);
132 
133         auto const& topo = threads::create_topology();
134 
135         std::string affinity_desc;
136         hpx::detail::get_affinity_description(cfg_, affinity_desc);
137         if (affinity_desc == "none")
138         {
139             // don't use any affinity for any of the os-threads
140             threads::resize(no_affinity_, num_system_pus);
141             for (std::size_t i = 0; i != num_threads_; ++i)
142                 threads::set(no_affinity_, get_pu_num(i));
143         }
144         else if (!affinity_desc.empty())
145         {
146             affinity_masks_.clear();
147             affinity_masks_.resize(num_threads_, 0);
148 
149             for (std::size_t i = 0; i != num_threads_; ++i)
150                 threads::resize(affinity_masks_[i], num_system_pus);
151 
152             parse_affinity_options(affinity_desc, affinity_masks_,
153                 used_cores, max_cores, num_threads_, pu_nums_);
154 
155             std::size_t num_initialized = count_initialized(affinity_masks_);
156             if (num_initialized != num_threads_) {
157                 HPX_THROW_EXCEPTION(bad_parameter,
158                     "affinity_data::affinity_data",
159                     hpx::util::format(
160                         "The number of OS threads requested "
161                         "({1}) does not match the number of threads to "
162                         "bind ({2})", num_threads_, num_initialized));
163             }
164         }
165         else if (pu_offset == std::size_t(-1))
166         {
167             // calculate the pu offset based on the used cores, but only if its
168             // not explicitly specified
169             for(std::size_t num_core = 0; num_core != used_cores; ++num_core)
170             {
171                 pu_offset_ += topo.get_number_of_core_pus(num_core);
172             }
173         }
174 
175         // correct used_cores from config data if appropriate
176         if (used_cores_ == 0)
177         {
178             used_cores_ = used_cores;
179         }
180 
181         pu_offset_ %= num_system_pus;
182 
183         std::vector<std::size_t> cores;
184         cores.reserve(num_threads_);
185         for (std::size_t i = 0; i != num_threads_; ++i)
186         {
187             std::size_t add_me = topo.get_core_number(get_pu_num(i));
188             cores.push_back(add_me);
189         }
190 
191         std::sort(cores.begin(), cores.end());
192         std::vector<std::size_t>::iterator it =
193             std::unique(cores.begin(), cores.end());
194         cores.erase(it, cores.end());
195 
196         std::size_t num_unique_cores = cores.size();
197         return (std::max)(num_unique_cores, max_cores);
198     }
199 
get_pu_mask(threads::topology const & topo,std::size_t global_thread_num) const200     mask_cref_type affinity_data::get_pu_mask(threads::topology const& topo,
201         std::size_t global_thread_num) const
202     {
203         // --hpx:bind=none disables all affinity
204         if (threads::test(no_affinity_, global_thread_num))
205         {
206             static mask_type m = mask_type();
207             return m;
208         }
209 
210         // if we have individual, predefined affinity masks, return those
211         if (!affinity_masks_.empty())
212             return affinity_masks_[global_thread_num];
213 
214         // otherwise return mask based on affinity domain
215         std::size_t pu_num = get_pu_num(global_thread_num);
216         if (0 == std::string("pu").find(affinity_domain_))
217         {
218             // The affinity domain is 'processing unit', just convert the
219             // pu-number into a bit-mask.
220             return topo.get_thread_affinity_mask(pu_num);
221         }
222         if (0 == std::string("core").find(affinity_domain_))
223         {
224             // The affinity domain is 'core', return a bit mask corresponding
225             // to all processing units of the core containing the given
226             // pu_num.
227             return topo.get_core_affinity_mask(pu_num);
228         }
229         if (0 == std::string("numa").find(affinity_domain_))
230         {
231             // The affinity domain is 'numa', return a bit mask corresponding
232             // to all processing units of the NUMA domain containing the
233             // given pu_num.
234             return topo.get_numa_node_affinity_mask(pu_num);
235         }
236 
237         // The affinity domain is 'machine', return a bit mask corresponding
238         // to all processing units of the machine.
239         HPX_ASSERT(0 == std::string("machine").find(affinity_domain_));
240         return topo.get_machine_affinity_mask();
241     }
242 
get_used_pus_mask(threads::topology const & topo,std::size_t pu_num) const243     mask_type affinity_data::get_used_pus_mask(threads::topology const& topo,
244         std::size_t pu_num) const
245     {
246         mask_type ret = mask_type();
247         threads::resize(ret, hardware_concurrency());
248 
249         // --hpx:bind=none disables all affinity
250         if (threads::test(no_affinity_, pu_num))
251         {
252             threads::set(ret, pu_num);
253             return ret;
254         }
255 
256         for (std::size_t thread_num = 0; thread_num < num_threads_; ++thread_num)
257         {
258             ret |= get_pu_mask(topo, thread_num);
259         }
260 
261         return ret;
262     }
263 
get_thread_occupancy(threads::topology const & topo,std::size_t pu_num) const264     std::size_t affinity_data::get_thread_occupancy(
265         threads::topology const& topo, std::size_t pu_num) const
266     {
267         std::size_t count = 0;
268         if (threads::test(no_affinity_, pu_num))
269         {
270             ++count;
271         }
272         else
273         {
274             mask_type pu_mask = mask_type();
275 
276             threads::resize(pu_mask, hardware_concurrency());
277             threads::set(pu_mask, pu_num);
278 
279             for (std::size_t num_thread = 0; num_thread < num_threads_; ++num_thread)
280             {
281                 mask_cref_type affinity_mask = get_pu_mask(topo, num_thread);
282                 if (threads::any(pu_mask & affinity_mask))
283                     ++count;
284             }
285         }
286         return count;
287     }
288 
289     // means of adding a processing unit after initialization
add_punit(std::size_t virt_core,std::size_t thread_num)290     void affinity_data::add_punit(std::size_t virt_core, std::size_t thread_num)
291     {
292         std::size_t num_system_pus = hardware_concurrency();
293 
294         // initialize affinity_masks and set the mask for the given virt_core
295         if (affinity_masks_.empty())
296         {
297             affinity_masks_.resize(num_threads_);
298             for (std::size_t i = 0; i != num_threads_; ++i)
299                 threads::resize(affinity_masks_[i], num_system_pus);
300         }
301         threads::set(affinity_masks_[virt_core], thread_num);
302 
303         // find first used pu, which is then stored as the pu_offset
304         std::size_t first_pu = std::size_t(-1);
305         for (std::size_t i = 0; i != num_threads_; ++i)
306         {
307             std::size_t first = threads::find_first(affinity_masks_[i]);
308             first_pu = (std::min)(first_pu, first);
309         }
310         if (first_pu != std::size_t(-1))
311             pu_offset_ = first_pu;
312 
313         init_cached_pu_nums(num_system_pus);
314     }
315 
init_cached_pu_nums(std::size_t hardware_concurrency)316     void affinity_data::init_cached_pu_nums(std::size_t hardware_concurrency)
317     {
318         if (pu_nums_.empty())
319         {
320             pu_nums_.resize(num_threads_);
321             for (std::size_t i = 0; i != num_threads_; ++i)
322             {
323                 pu_nums_[i] = get_pu_num(i, hardware_concurrency);
324             }
325         }
326     }
327 
get_pu_num(std::size_t num_thread,std::size_t hardware_concurrency) const328     std::size_t affinity_data::get_pu_num(std::size_t num_thread,
329         std::size_t hardware_concurrency) const
330     {
331         // The offset shouldn't be larger than the number of available
332         // processing units.
333         HPX_ASSERT(pu_offset_ < hardware_concurrency);
334 
335         // The distance between assigned processing units shouldn't be zero
336         HPX_ASSERT(pu_step_ > 0 && pu_step_ <= hardware_concurrency);
337 
338         // We 'scale' the thread number to compute the corresponding
339         // processing unit number.
340         //
341         // The base line processing unit number is computed from the given
342         // pu-offset and pu-step.
343         std::size_t num_pu = pu_offset_ + pu_step_ * num_thread;
344 
345         // We add an additional offset, which allows to 'roll over' if the
346         // pu number would get larger than the number of available
347         // processing units. Note that it does not make sense to 'roll over'
348         // farther than the given pu-step.
349         std::size_t offset = (num_pu / hardware_concurrency) % pu_step_;
350 
351         // The resulting pu number has to be smaller than the available
352         // number of processing units.
353         return (num_pu + offset) % hardware_concurrency;
354     }
355 
356     std::atomic<int> affinity_data::instance_number_counter_(-1);
357 }}}}
358