1 ////////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2011 Bryce Adelstein-Lelbach
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 ////////////////////////////////////////////////////////////////////////////////
7
8 #include <hpx/hpx.hpp>
9 #include <hpx/hpx_init.hpp>
10 #include <hpx/util/lightweight_test.hpp>
11
12 #include <boost/dynamic_bitset.hpp>
13
14 #include <cstdint>
15 #include <chrono>
16 #include <vector>
17
18 using boost::program_options::variables_map;
19 using boost::program_options::options_description;
20 using boost::program_options::value;
21
22 using std::chrono::milliseconds;
23
24 using hpx::naming::id_type;
25
26 using hpx::applier::register_thread_nullary;
27
28 using hpx::lcos::future;
29 using hpx::async;
30
31 using hpx::threads::thread_id_type;
32 using hpx::this_thread::suspend;
33 using hpx::threads::set_thread_state;
34 using hpx::threads::thread_state_ex_enum;
35 using hpx::threads::pending;
36 using hpx::threads::suspended;
37 using hpx::threads::wait_signaled;
38 using hpx::threads::wait_terminate;
39
40 using hpx::init;
41 using hpx::finalize;
42 using hpx::find_here;
43
44 ///////////////////////////////////////////////////////////////////////////////
45 namespace detail
46 {
47 template <typename T1>
wait(std::vector<future<T1>> const & lazy_values,std::int32_t suspend_for=10)48 std::uint64_t wait(
49 std::vector<future<T1> > const& lazy_values
50 , std::int32_t suspend_for = 10
51 )
52 {
53 boost::dynamic_bitset<> handled(lazy_values.size());
54 std::uint64_t handled_count = 0;
55
56 while (handled_count < lazy_values.size())
57 {
58 bool suspended = false;
59
60 for (std::uint64_t i = 0; i < lazy_values.size(); ++i)
61 {
62 // loop over all lazy_values, executing the next as soon as its
63 // value gets available
64 if (!handled[i] && lazy_values[i].is_ready())
65 {
66 handled[i] = true;
67 ++handled_count;
68
69 // give thread-manager a chance to look for more work while
70 // waiting
71 suspend();
72 suspended = true;
73 }
74 }
75
76 // suspend after one full loop over all values, 10ms should be fine
77 // (default parameter)
78 if (!suspended)
79 suspend(milliseconds(suspend_for));
80 }
81 return handled.count();
82 }
83 }
84
85 ///////////////////////////////////////////////////////////////////////////////
change_thread_state(thread_id_type thread)86 void change_thread_state(
87 thread_id_type thread
88 )
89 {
90 set_thread_state(thread, suspended);
91 }
92
93 ///////////////////////////////////////////////////////////////////////////////
tree_boot(std::uint64_t count,std::uint64_t grain_size,thread_id_type thread)94 void tree_boot(
95 std::uint64_t count
96 , std::uint64_t grain_size
97 , thread_id_type thread
98 )
99 {
100 HPX_ASSERT(grain_size);
101 HPX_ASSERT(count);
102
103 std::vector<future<void> > promises;
104
105 std::uint64_t const actors = (count > grain_size) ? grain_size : count;
106
107 std::uint64_t child_count = 0;
108 std::uint64_t children = 0;
109
110 if (count > grain_size)
111 {
112 for (children = grain_size; children != 0; --children)
113 {
114 child_count = ((count - grain_size) / children);
115
116 if (child_count >= grain_size)
117 break;
118 }
119
120 promises.reserve(children + grain_size);
121 }
122 else
123 promises.reserve(count);
124
125 for (std::uint64_t i = 0; i < children; ++i)
126 promises.push_back(async(&tree_boot, child_count, grain_size, thread));
127
128 for (std::uint64_t i = 0; i < actors; ++i)
129 promises.push_back(async(&change_thread_state, thread));
130
131 detail::wait(promises);
132 }
133
134 bool woken = false;
135
136 ///////////////////////////////////////////////////////////////////////////////
test_dummy_thread(std::uint64_t)137 void test_dummy_thread(
138 std::uint64_t
139 )
140 {
141 while (true)
142 {
143 thread_state_ex_enum statex = suspend(suspended);
144
145 if (statex == wait_terminate)
146 {
147 woken = true;
148 return;
149 }
150 }
151 }
152
153 ///////////////////////////////////////////////////////////////////////////////
hpx_main(variables_map & vm)154 int hpx_main(variables_map& vm)
155 {
156 std::uint64_t const futures = vm["futures"].as<std::uint64_t>();
157 std::uint64_t const grain_size = vm["grain-size"].as<std::uint64_t>();
158
159 {
160 thread_id_type thread_id = register_thread_nullary(
161 hpx::util::bind(&test_dummy_thread, futures));
162 HPX_TEST(thread_id != hpx::threads::invalid_thread_id);
163
164 // Flood the queues with suspension operations before the rescheduling
165 // attempt.
166 future<void> before = async(&tree_boot, futures, grain_size, thread_id);
167
168 set_thread_state(thread_id, pending, wait_signaled);
169
170 // Flood the queues with suspension operations after the rescheduling
171 // attempt.
172 future<void> after = async(&tree_boot, futures, grain_size, thread_id);
173
174 before.get();
175 after.get();
176
177 set_thread_state(thread_id, pending, wait_terminate);
178 }
179
180 finalize();
181
182 return 0;
183 }
184
185 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])186 int main(int argc, char* argv[])
187 {
188 // Configure application-specific options
189 options_description cmdline("Usage: " HPX_APPLICATION_STRING " [options]");
190
191 cmdline.add_options()
192 ( "futures"
193 , value<std::uint64_t>()->default_value(64)
194 , "number of futures to invoke before and after the rescheduling")
195
196 ( "grain-size"
197 , value<std::uint64_t>()->default_value(4)
198 , "grain size of the future tree")
199 ;
200
201 // Initialize and run HPX
202 HPX_TEST_EQ(0, init(cmdline, argc, argv));
203
204 HPX_TEST(woken);
205
206 return hpx::util::report_errors();
207 }
208
209