1 ////////////////////////////////////////////////////////////////////////////////
2 //  Copyright (c) 2011 Bryce Adelstein-Lelbach
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 ////////////////////////////////////////////////////////////////////////////////
7 
8 #include <hpx/hpx.hpp>
9 #include <hpx/hpx_init.hpp>
10 
11 #include <boost/dynamic_bitset.hpp>
12 
13 #include <chrono>
14 #include <cstdint>
15 #include <iostream>
16 #include <vector>
17 
18 using boost::program_options::variables_map;
19 using boost::program_options::options_description;
20 using boost::program_options::value;
21 
22 using std::chrono::milliseconds;
23 
24 using hpx::naming::id_type;
25 
26 using hpx::applier::register_thread_nullary;
27 
28 using hpx::lcos::future;
29 using hpx::async;
30 
31 using hpx::threads::thread_id_type;
32 using hpx::threads::thread_data;
33 using hpx::this_thread::suspend;
34 using hpx::threads::set_thread_state;
35 using hpx::threads::thread_state_ex_enum;
36 using hpx::threads::pending;
37 using hpx::threads::suspended;
38 using hpx::threads::wait_signaled;
39 using hpx::threads::wait_timeout;
40 using hpx::threads::wait_terminate;
41 
42 using hpx::init;
43 using hpx::finalize;
44 using hpx::find_here;
45 
46 ///////////////////////////////////////////////////////////////////////////////
47 namespace detail
48 {
49     template <typename T1>
wait(std::vector<future<T1>> const & lazy_values,std::int32_t suspend_for=10)50     std::uint64_t wait(
51         std::vector<future<T1> > const& lazy_values
52       , std::int32_t suspend_for = 10
53         )
54     {
55         boost::dynamic_bitset<> handled(lazy_values.size());
56         std::uint64_t handled_count = 0;
57 
58         while (handled_count < lazy_values.size())
59         {
60             bool suspended = false;
61 
62             for (std::uint64_t i = 0; i < lazy_values.size(); ++i)
63             {
64                 // loop over all lazy_values, executing the next as soon as its
65                 // value gets available
66                 if (!handled[i] && lazy_values[i].is_ready())
67                 {
68                     handled[i] = true;
69                     ++handled_count;
70 
71                     // give thread-manager a chance to look for more work while
72                     // waiting
73                     suspend();
74                     suspended = true;
75                 }
76             }
77 
78             // suspend after one full loop over all values, 10ms should be fine
79             // (default parameter)
80             if (!suspended)
81                 suspend(milliseconds(suspend_for));
82         }
83         return handled.count();
84     }
85 }
86 
87 ///////////////////////////////////////////////////////////////////////////////
change_thread_state(std::uint64_t thread)88 void change_thread_state(
89     std::uint64_t thread
90     )
91 {
92 //    std::cout << "waking up thread (wait_signaled)\n";
93     thread_id_type id(reinterpret_cast<thread_data*>(thread));
94     set_thread_state(id, pending, wait_signaled);
95 
96 //    std::cout << "suspending thread (wait_timeout)\n";
97     set_thread_state(id, suspended, wait_timeout);
98 }
99 
100 HPX_PLAIN_ACTION(change_thread_state, change_thread_state_action)
101 
102 ///////////////////////////////////////////////////////////////////////////////
103 void tree_boot(
104     std::uint64_t count
105   , std::uint64_t grain_size
106   , id_type const& prefix
107   , std::uint64_t thread
108     );
109 
HPX_PLAIN_ACTION(tree_boot,tree_boot_action)110 HPX_PLAIN_ACTION(tree_boot, tree_boot_action)
111 
112 ///////////////////////////////////////////////////////////////////////////////
113 void tree_boot(
114     std::uint64_t count
115   , std::uint64_t grain_size
116   , id_type const& prefix
117   , std::uint64_t thread
118     )
119 {
120     HPX_ASSERT(grain_size);
121     HPX_ASSERT(count);
122 
123     std::vector<future<void> > promises;
124 
125     std::uint64_t const actors = (count > grain_size) ? grain_size : count;
126 
127     std::uint64_t child_count = 0;
128     std::uint64_t children = 0;
129 
130     if (count > grain_size)
131     {
132         for (children = grain_size; children != 0; --children)
133         {
134             child_count = ((count - grain_size) / children);
135 
136             if (child_count >= grain_size)
137                 break;
138         }
139 
140         promises.reserve(children + grain_size);
141     }
142 
143     else
144         promises.reserve(count);
145 
146     for (std::uint64_t i = 0; i < children; ++i)
147     {
148         promises.push_back(async<tree_boot_action>
149             (prefix, child_count, grain_size, prefix, thread));
150     }
151 
152     for (std::uint64_t i = 0; i < actors; ++i)
153         promises.push_back(async<change_thread_state_action>(prefix, thread));
154 
155     detail::wait(promises);
156 }
157 
158 ///////////////////////////////////////////////////////////////////////////////
test_dummy_thread(std::uint64_t futures)159 void test_dummy_thread(
160     std::uint64_t futures
161     )
162 {
163     std::uint64_t woken = 0
164                   , signaled = 0
165                   , timed_out = 0;
166 
167     while (true)
168     {
169         thread_state_ex_enum statex = suspend(suspended);
170 
171         if (statex == wait_signaled)
172         {
173             ++signaled;
174             ++woken;
175         }
176 
177         else if (statex == wait_timeout)
178         {
179             ++timed_out;
180             ++woken;
181         }
182 
183         else if (statex == wait_terminate)
184         {
185             std::cout << "woken:     " << woken << "/" << (futures * 2) << "\n"
186                       << "signaled:  " << signaled << "/" << futures << "\n"
187                       << "timed out: " << timed_out << "/0\n";
188             return;
189         }
190     }
191 }
192 
193 ///////////////////////////////////////////////////////////////////////////////
hpx_main(variables_map & vm)194 int hpx_main(variables_map& vm)
195 {
196     std::uint64_t const futures = vm["futures"].as<std::uint64_t>();
197     std::uint64_t const grain_size = vm["grain-size"].as<std::uint64_t>();
198 
199     {
200         id_type const prefix = find_here();
201 
202         thread_id_type thread = register_thread_nullary
203             (hpx::util::bind(&test_dummy_thread, futures));
204 
205         tree_boot(futures, grain_size, prefix,
206             reinterpret_cast<std::uint64_t>(thread.get()));
207 
208         set_thread_state(thread, pending, wait_terminate);
209     }
210 
211     finalize();
212 
213     return 0;
214 }
215 
216 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])217 int main(int argc, char* argv[])
218 {
219     // Configure application-specific options
220     options_description cmdline("Usage: " HPX_APPLICATION_STRING " [options]");
221 
222     cmdline.add_options()
223         ( "futures"
224         , value<std::uint64_t>()->default_value(64)
225         , "number of futures to invoke")
226 
227         ( "grain-size"
228         , value<std::uint64_t>()->default_value(4)
229         , "grain size of the future tree")
230     ;
231 
232     // Initialize and run HPX
233     return init(cmdline, argc, argv);
234 }
235 
236