1 //  Copyright (c) 2017 Thomas Heller
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // Simple test verifying basic resource_partitioner functionality.
7 
8 #include <hpx/hpx_init.hpp>
9 #include <hpx/include/async.hpp>
10 #include <hpx/include/lcos.hpp>
11 #include <hpx/include/resource_partitioner.hpp>
12 #include <hpx/include/threads.hpp>
13 #include <hpx/runtime/threads/policies/scheduler_mode.hpp>
14 #include <hpx/runtime/threads/policies/schedulers.hpp>
15 #include <hpx/util/lightweight_test.hpp>
16 
17 #include <cstddef>
18 #include <memory>
19 #include <stdexcept>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
hpx_main(int argc,char * argv[])24 int hpx_main(int argc, char* argv[])
25 {
26     std::size_t const num_threads = hpx::resource::get_num_threads("worker");
27 
28     HPX_TEST_EQ(std::size_t(3), num_threads);
29 
30     hpx::threads::thread_pool_base& tp =
31         hpx::resource::get_thread_pool("worker");
32 
33     {
34         // Check number of used resources
35         for (std::size_t thread_num = 0; thread_num < num_threads - 1; ++thread_num)
36         {
37             tp.suspend_processing_unit(thread_num).get();
38             HPX_TEST_EQ(std::size_t(num_threads - thread_num - 1),
39                 tp.get_active_os_thread_count());
40         }
41 
42         for (std::size_t thread_num = 0; thread_num < num_threads - 1; ++thread_num)
43         {
44             tp.resume_processing_unit(thread_num).get();
45             HPX_TEST_EQ(std::size_t(thread_num + 2),
46                 tp.get_active_os_thread_count());
47         }
48     }
49 
50     {
51         // Check suspending and resuming the same thread without waiting for
52         // each to finish.
53         for (std::size_t thread_num = 0;
54              thread_num < hpx::resource::get_num_threads("worker");
55              ++thread_num)
56         {
57             std::vector<hpx::future<void>> fs;
58 
59             fs.push_back(tp.suspend_processing_unit(thread_num));
60             fs.push_back(tp.resume_processing_unit(thread_num));
61 
62             hpx::wait_all(fs);
63 
64             // Suspend is not guaranteed to run before resume, so make sure
65             // processing unit is running
66             tp.resume_processing_unit(thread_num).get();
67 
68             fs.clear();
69 
70             // Launching 4 (i.e. same as number of threads) tasks may deadlock
71             // as no thread is available to steal from the current thread.
72             fs.push_back(tp.suspend_processing_unit(thread_num));
73             fs.push_back(tp.suspend_processing_unit(thread_num));
74             fs.push_back(tp.suspend_processing_unit(thread_num));
75 
76             hpx::wait_all(fs);
77 
78             fs.clear();
79 
80             // Launching 4 (i.e. same as number of threads) tasks may deadlock
81             // as no thread is available to steal from the current thread.
82             fs.push_back(tp.resume_processing_unit(thread_num));
83             fs.push_back(tp.resume_processing_unit(thread_num));
84             fs.push_back(tp.resume_processing_unit(thread_num));
85 
86             hpx::wait_all(fs);
87         }
88     }
89 
90     {
91         // Check random scheduling with reducing resources.
92         std::size_t thread_num = 0;
93         bool up = true;
94         std::vector<hpx::future<void>> fs;
95         hpx::util::high_resolution_timer t;
96         while (t.elapsed() < 2)
97         {
98             for (std::size_t i = 0;
99                 i < hpx::resource::get_num_threads("worker") * 10;
100                 ++i)
101             {
102                 fs.push_back(hpx::async([](){}));
103             }
104 
105             if (up)
106             {
107                 if (thread_num < hpx::resource::get_num_threads("worker"))
108                 {
109                     tp.suspend_processing_unit(thread_num).get();
110                 }
111 
112                 ++thread_num;
113 
114                 if (thread_num == hpx::resource::get_num_threads("worker"))
115                 {
116                     up = false;
117                     --thread_num;
118                 }
119             }
120             else
121             {
122                 tp.resume_processing_unit(thread_num - 1).get();
123 
124                 --thread_num;
125 
126                 if (thread_num == 0)
127                 {
128                     up = true;
129                 }
130             }
131         }
132 
133         hpx::when_all(std::move(fs)).get();
134 
135         // Don't exit with suspended pus
136         for (std::size_t thread_num_resume = 0; thread_num_resume < thread_num;
137             ++thread_num_resume)
138         {
139             tp.resume_processing_unit(thread_num_resume).get();
140         }
141     }
142 
143     return hpx::finalize();
144 }
145 
test_scheduler(int argc,char * argv[],hpx::resource::scheduling_policy scheduler)146 void test_scheduler(int argc, char* argv[],
147     hpx::resource::scheduling_policy scheduler)
148 {
149     std::vector<std::string> cfg =
150     {
151         "hpx.os_threads=4"
152     };
153 
154     hpx::resource::partitioner rp(argc, argv, std::move(cfg));
155 
156     rp.create_thread_pool("worker", scheduler,
157         hpx::threads::policies::scheduler_mode(
158             hpx::threads::policies::default_mode |
159             hpx::threads::policies::enable_elasticity));
160 
161     int const worker_pool_threads = 3;
162     int worker_pool_threads_added = 0;
163 
164     for (const hpx::resource::numa_domain& d : rp.numa_domains())
165     {
166         for (const hpx::resource::core& c : d.cores())
167         {
168             for (const hpx::resource::pu& p : c.pus())
169             {
170                 if (worker_pool_threads_added < worker_pool_threads)
171                 {
172                     rp.add_resource(p, "worker");
173                     ++worker_pool_threads_added;
174                 }
175             }
176         }
177     }
178 
179     HPX_TEST_EQ(hpx::init(argc, argv), 0);
180 }
181 
main(int argc,char * argv[])182 int main(int argc, char* argv[])
183 {
184     std::vector<hpx::resource::scheduling_policy> schedulers =
185         {
186 #if defined(HPX_HAVE_LOCAL_SCHEDULER)
187             hpx::resource::scheduling_policy::local,
188             hpx::resource::scheduling_policy::local_priority_fifo,
189             hpx::resource::scheduling_policy::local_priority_lifo,
190 #endif
191 #if defined(HPX_HAVE_ABP_SCHEDULER)
192             hpx::resource::scheduling_policy::abp_priority_fifo,
193             hpx::resource::scheduling_policy::abp_priority_lifo,
194 #endif
195 #if defined(HPX_HAVE_STATIC_SCHEDULER)
196             hpx::resource::scheduling_policy::static_,
197 #endif
198 #if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
199             hpx::resource::scheduling_policy::static_priority,
200 #endif
201 #if defined(HPX_HAVE_SHARED_PRIORITY_SCHEDULER)
202             hpx::resource::scheduling_policy::shared_priority,
203 #endif
204         };
205 
206     for (auto const scheduler : schedulers)
207     {
208         test_scheduler(argc, argv, scheduler);
209     }
210 
211     return hpx::util::report_errors();
212 }
213