1 // Copyright (c) 2017 Thomas Heller
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 // Simple test verifying basic resource_partitioner functionality.
7
8 #include <hpx/hpx_init.hpp>
9 #include <hpx/include/async.hpp>
10 #include <hpx/include/lcos.hpp>
11 #include <hpx/include/resource_partitioner.hpp>
12 #include <hpx/include/threads.hpp>
13 #include <hpx/runtime/threads/policies/scheduler_mode.hpp>
14 #include <hpx/runtime/threads/policies/schedulers.hpp>
15 #include <hpx/util/lightweight_test.hpp>
16
17 #include <cstddef>
18 #include <memory>
19 #include <stdexcept>
20 #include <string>
21 #include <utility>
22 #include <vector>
23
hpx_main(int argc,char * argv[])24 int hpx_main(int argc, char* argv[])
25 {
26 std::size_t const num_threads = hpx::resource::get_num_threads("worker");
27
28 HPX_TEST_EQ(std::size_t(3), num_threads);
29
30 hpx::threads::thread_pool_base& tp =
31 hpx::resource::get_thread_pool("worker");
32
33 {
34 // Check number of used resources
35 for (std::size_t thread_num = 0; thread_num < num_threads - 1; ++thread_num)
36 {
37 tp.suspend_processing_unit(thread_num).get();
38 HPX_TEST_EQ(std::size_t(num_threads - thread_num - 1),
39 tp.get_active_os_thread_count());
40 }
41
42 for (std::size_t thread_num = 0; thread_num < num_threads - 1; ++thread_num)
43 {
44 tp.resume_processing_unit(thread_num).get();
45 HPX_TEST_EQ(std::size_t(thread_num + 2),
46 tp.get_active_os_thread_count());
47 }
48 }
49
50 {
51 // Check suspending and resuming the same thread without waiting for
52 // each to finish.
53 for (std::size_t thread_num = 0;
54 thread_num < hpx::resource::get_num_threads("worker");
55 ++thread_num)
56 {
57 std::vector<hpx::future<void>> fs;
58
59 fs.push_back(tp.suspend_processing_unit(thread_num));
60 fs.push_back(tp.resume_processing_unit(thread_num));
61
62 hpx::wait_all(fs);
63
64 // Suspend is not guaranteed to run before resume, so make sure
65 // processing unit is running
66 tp.resume_processing_unit(thread_num).get();
67
68 fs.clear();
69
70 // Launching 4 (i.e. same as number of threads) tasks may deadlock
71 // as no thread is available to steal from the current thread.
72 fs.push_back(tp.suspend_processing_unit(thread_num));
73 fs.push_back(tp.suspend_processing_unit(thread_num));
74 fs.push_back(tp.suspend_processing_unit(thread_num));
75
76 hpx::wait_all(fs);
77
78 fs.clear();
79
80 // Launching 4 (i.e. same as number of threads) tasks may deadlock
81 // as no thread is available to steal from the current thread.
82 fs.push_back(tp.resume_processing_unit(thread_num));
83 fs.push_back(tp.resume_processing_unit(thread_num));
84 fs.push_back(tp.resume_processing_unit(thread_num));
85
86 hpx::wait_all(fs);
87 }
88 }
89
90 {
91 // Check random scheduling with reducing resources.
92 std::size_t thread_num = 0;
93 bool up = true;
94 std::vector<hpx::future<void>> fs;
95 hpx::util::high_resolution_timer t;
96 while (t.elapsed() < 2)
97 {
98 for (std::size_t i = 0;
99 i < hpx::resource::get_num_threads("worker") * 10;
100 ++i)
101 {
102 fs.push_back(hpx::async([](){}));
103 }
104
105 if (up)
106 {
107 if (thread_num < hpx::resource::get_num_threads("worker"))
108 {
109 tp.suspend_processing_unit(thread_num).get();
110 }
111
112 ++thread_num;
113
114 if (thread_num == hpx::resource::get_num_threads("worker"))
115 {
116 up = false;
117 --thread_num;
118 }
119 }
120 else
121 {
122 tp.resume_processing_unit(thread_num - 1).get();
123
124 --thread_num;
125
126 if (thread_num == 0)
127 {
128 up = true;
129 }
130 }
131 }
132
133 hpx::when_all(std::move(fs)).get();
134
135 // Don't exit with suspended pus
136 for (std::size_t thread_num_resume = 0; thread_num_resume < thread_num;
137 ++thread_num_resume)
138 {
139 tp.resume_processing_unit(thread_num_resume).get();
140 }
141 }
142
143 return hpx::finalize();
144 }
145
test_scheduler(int argc,char * argv[],hpx::resource::scheduling_policy scheduler)146 void test_scheduler(int argc, char* argv[],
147 hpx::resource::scheduling_policy scheduler)
148 {
149 std::vector<std::string> cfg =
150 {
151 "hpx.os_threads=4"
152 };
153
154 hpx::resource::partitioner rp(argc, argv, std::move(cfg));
155
156 rp.create_thread_pool("worker", scheduler,
157 hpx::threads::policies::scheduler_mode(
158 hpx::threads::policies::default_mode |
159 hpx::threads::policies::enable_elasticity));
160
161 int const worker_pool_threads = 3;
162 int worker_pool_threads_added = 0;
163
164 for (const hpx::resource::numa_domain& d : rp.numa_domains())
165 {
166 for (const hpx::resource::core& c : d.cores())
167 {
168 for (const hpx::resource::pu& p : c.pus())
169 {
170 if (worker_pool_threads_added < worker_pool_threads)
171 {
172 rp.add_resource(p, "worker");
173 ++worker_pool_threads_added;
174 }
175 }
176 }
177 }
178
179 HPX_TEST_EQ(hpx::init(argc, argv), 0);
180 }
181
main(int argc,char * argv[])182 int main(int argc, char* argv[])
183 {
184 std::vector<hpx::resource::scheduling_policy> schedulers =
185 {
186 #if defined(HPX_HAVE_LOCAL_SCHEDULER)
187 hpx::resource::scheduling_policy::local,
188 hpx::resource::scheduling_policy::local_priority_fifo,
189 hpx::resource::scheduling_policy::local_priority_lifo,
190 #endif
191 #if defined(HPX_HAVE_ABP_SCHEDULER)
192 hpx::resource::scheduling_policy::abp_priority_fifo,
193 hpx::resource::scheduling_policy::abp_priority_lifo,
194 #endif
195 #if defined(HPX_HAVE_STATIC_SCHEDULER)
196 hpx::resource::scheduling_policy::static_,
197 #endif
198 #if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
199 hpx::resource::scheduling_policy::static_priority,
200 #endif
201 #if defined(HPX_HAVE_SHARED_PRIORITY_SCHEDULER)
202 hpx::resource::scheduling_policy::shared_priority,
203 #endif
204 };
205
206 for (auto const scheduler : schedulers)
207 {
208 test_scheduler(argc, argv, scheduler);
209 }
210
211 return hpx::util::report_errors();
212 }
213