1 ////////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2011 Bryce Adelstein-Lelbach
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 ////////////////////////////////////////////////////////////////////////////////
7
8 #include <hpx/hpx.hpp>
9 #include <hpx/hpx_init.hpp>
10
11 #include <boost/dynamic_bitset.hpp>
12
13 #include <chrono>
14 #include <cstdint>
15 #include <iostream>
16 #include <vector>
17
18 using boost::program_options::variables_map;
19 using boost::program_options::options_description;
20 using boost::program_options::value;
21
22 using std::chrono::milliseconds;
23
24 using hpx::naming::id_type;
25
26 using hpx::applier::register_thread_nullary;
27
28 using hpx::lcos::future;
29 using hpx::async;
30
31 using hpx::threads::thread_id_type;
32 using hpx::threads::thread_data;
33 using hpx::this_thread::suspend;
34 using hpx::threads::set_thread_state;
35 using hpx::threads::thread_state_ex_enum;
36 using hpx::threads::pending;
37 using hpx::threads::suspended;
38 using hpx::threads::wait_signaled;
39 using hpx::threads::wait_timeout;
40 using hpx::threads::wait_terminate;
41
42 using hpx::init;
43 using hpx::finalize;
44 using hpx::find_here;
45
46 ///////////////////////////////////////////////////////////////////////////////
47 namespace detail
48 {
49 template <typename T1>
wait(std::vector<future<T1>> const & lazy_values,std::int32_t suspend_for=10)50 std::uint64_t wait(
51 std::vector<future<T1> > const& lazy_values
52 , std::int32_t suspend_for = 10
53 )
54 {
55 boost::dynamic_bitset<> handled(lazy_values.size());
56 std::uint64_t handled_count = 0;
57
58 while (handled_count < lazy_values.size())
59 {
60 bool suspended = false;
61
62 for (std::uint64_t i = 0; i < lazy_values.size(); ++i)
63 {
64 // loop over all lazy_values, executing the next as soon as its
65 // value gets available
66 if (!handled[i] && lazy_values[i].is_ready())
67 {
68 handled[i] = true;
69 ++handled_count;
70
71 // give thread-manager a chance to look for more work while
72 // waiting
73 suspend();
74 suspended = true;
75 }
76 }
77
78 // suspend after one full loop over all values, 10ms should be fine
79 // (default parameter)
80 if (!suspended)
81 suspend(milliseconds(suspend_for));
82 }
83 return handled.count();
84 }
85 }
86
87 ///////////////////////////////////////////////////////////////////////////////
change_thread_state(std::uint64_t thread)88 void change_thread_state(
89 std::uint64_t thread
90 )
91 {
92 // std::cout << "waking up thread (wait_signaled)\n";
93 thread_id_type id(reinterpret_cast<thread_data*>(thread));
94 set_thread_state(id, pending, wait_signaled);
95
96 // std::cout << "suspending thread (wait_timeout)\n";
97 set_thread_state(id, suspended, wait_timeout);
98 }
99
100 HPX_PLAIN_ACTION(change_thread_state, change_thread_state_action)
101
102 ///////////////////////////////////////////////////////////////////////////////
103 void tree_boot(
104 std::uint64_t count
105 , std::uint64_t grain_size
106 , id_type const& prefix
107 , std::uint64_t thread
108 );
109
HPX_PLAIN_ACTION(tree_boot,tree_boot_action)110 HPX_PLAIN_ACTION(tree_boot, tree_boot_action)
111
112 ///////////////////////////////////////////////////////////////////////////////
113 void tree_boot(
114 std::uint64_t count
115 , std::uint64_t grain_size
116 , id_type const& prefix
117 , std::uint64_t thread
118 )
119 {
120 HPX_ASSERT(grain_size);
121 HPX_ASSERT(count);
122
123 std::vector<future<void> > promises;
124
125 std::uint64_t const actors = (count > grain_size) ? grain_size : count;
126
127 std::uint64_t child_count = 0;
128 std::uint64_t children = 0;
129
130 if (count > grain_size)
131 {
132 for (children = grain_size; children != 0; --children)
133 {
134 child_count = ((count - grain_size) / children);
135
136 if (child_count >= grain_size)
137 break;
138 }
139
140 promises.reserve(children + grain_size);
141 }
142
143 else
144 promises.reserve(count);
145
146 for (std::uint64_t i = 0; i < children; ++i)
147 {
148 promises.push_back(async<tree_boot_action>
149 (prefix, child_count, grain_size, prefix, thread));
150 }
151
152 for (std::uint64_t i = 0; i < actors; ++i)
153 promises.push_back(async<change_thread_state_action>(prefix, thread));
154
155 detail::wait(promises);
156 }
157
158 ///////////////////////////////////////////////////////////////////////////////
test_dummy_thread(std::uint64_t futures)159 void test_dummy_thread(
160 std::uint64_t futures
161 )
162 {
163 std::uint64_t woken = 0
164 , signaled = 0
165 , timed_out = 0;
166
167 while (true)
168 {
169 thread_state_ex_enum statex = suspend(suspended);
170
171 if (statex == wait_signaled)
172 {
173 ++signaled;
174 ++woken;
175 }
176
177 else if (statex == wait_timeout)
178 {
179 ++timed_out;
180 ++woken;
181 }
182
183 else if (statex == wait_terminate)
184 {
185 std::cout << "woken: " << woken << "/" << (futures * 2) << "\n"
186 << "signaled: " << signaled << "/" << futures << "\n"
187 << "timed out: " << timed_out << "/0\n";
188 return;
189 }
190 }
191 }
192
193 ///////////////////////////////////////////////////////////////////////////////
hpx_main(variables_map & vm)194 int hpx_main(variables_map& vm)
195 {
196 std::uint64_t const futures = vm["futures"].as<std::uint64_t>();
197 std::uint64_t const grain_size = vm["grain-size"].as<std::uint64_t>();
198
199 {
200 id_type const prefix = find_here();
201
202 thread_id_type thread = register_thread_nullary
203 (hpx::util::bind(&test_dummy_thread, futures));
204
205 tree_boot(futures, grain_size, prefix,
206 reinterpret_cast<std::uint64_t>(thread.get()));
207
208 set_thread_state(thread, pending, wait_terminate);
209 }
210
211 finalize();
212
213 return 0;
214 }
215
216 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])217 int main(int argc, char* argv[])
218 {
219 // Configure application-specific options
220 options_description cmdline("Usage: " HPX_APPLICATION_STRING " [options]");
221
222 cmdline.add_options()
223 ( "futures"
224 , value<std::uint64_t>()->default_value(64)
225 , "number of futures to invoke")
226
227 ( "grain-size"
228 , value<std::uint64_t>()->default_value(4)
229 , "grain size of the future tree")
230 ;
231
232 // Initialize and run HPX
233 return init(cmdline, argc, argv);
234 }
235
236