1 ////////////////////////////////////////////////////////////////////////////////
2 //  Copyright (c) 2011 Bryce Adelstein-Lelbach
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 ////////////////////////////////////////////////////////////////////////////////
7 
8 #include <hpx/hpx.hpp>
9 #include <hpx/hpx_init.hpp>
10 #include <hpx/util/lightweight_test.hpp>
11 
12 #include <boost/dynamic_bitset.hpp>
13 
14 #include <cstdint>
15 #include <chrono>
16 #include <vector>
17 
18 using boost::program_options::variables_map;
19 using boost::program_options::options_description;
20 using boost::program_options::value;
21 
22 using std::chrono::milliseconds;
23 
24 using hpx::naming::id_type;
25 
26 using hpx::applier::register_thread_nullary;
27 
28 using hpx::lcos::future;
29 using hpx::async;
30 
31 using hpx::threads::thread_id_type;
32 using hpx::this_thread::suspend;
33 using hpx::threads::set_thread_state;
34 using hpx::threads::thread_state_ex_enum;
35 using hpx::threads::pending;
36 using hpx::threads::suspended;
37 using hpx::threads::wait_signaled;
38 using hpx::threads::wait_terminate;
39 
40 using hpx::init;
41 using hpx::finalize;
42 using hpx::find_here;
43 
44 ///////////////////////////////////////////////////////////////////////////////
45 namespace detail
46 {
47     template <typename T1>
wait(std::vector<future<T1>> const & lazy_values,std::int32_t suspend_for=10)48     std::uint64_t wait(
49         std::vector<future<T1> > const& lazy_values
50       , std::int32_t suspend_for = 10
51         )
52     {
53         boost::dynamic_bitset<> handled(lazy_values.size());
54         std::uint64_t handled_count = 0;
55 
56         while (handled_count < lazy_values.size())
57         {
58             bool suspended = false;
59 
60             for (std::uint64_t i = 0; i < lazy_values.size(); ++i)
61             {
62                 // loop over all lazy_values, executing the next as soon as its
63                 // value gets available
64                 if (!handled[i] && lazy_values[i].is_ready())
65                 {
66                     handled[i] = true;
67                     ++handled_count;
68 
69                     // give thread-manager a chance to look for more work while
70                     // waiting
71                     suspend();
72                     suspended = true;
73                 }
74             }
75 
76             // suspend after one full loop over all values, 10ms should be fine
77             // (default parameter)
78             if (!suspended)
79                 suspend(milliseconds(suspend_for));
80         }
81         return handled.count();
82     }
83 }
84 
85 ///////////////////////////////////////////////////////////////////////////////
change_thread_state(thread_id_type thread)86 void change_thread_state(
87     thread_id_type thread
88     )
89 {
90     set_thread_state(thread, suspended);
91 }
92 
93 ///////////////////////////////////////////////////////////////////////////////
tree_boot(std::uint64_t count,std::uint64_t grain_size,thread_id_type thread)94 void tree_boot(
95     std::uint64_t count
96   , std::uint64_t grain_size
97   , thread_id_type thread
98     )
99 {
100     HPX_ASSERT(grain_size);
101     HPX_ASSERT(count);
102 
103     std::vector<future<void> > promises;
104 
105     std::uint64_t const actors = (count > grain_size) ? grain_size : count;
106 
107     std::uint64_t child_count = 0;
108     std::uint64_t children = 0;
109 
110     if (count > grain_size)
111     {
112         for (children = grain_size; children != 0; --children)
113         {
114             child_count = ((count - grain_size) / children);
115 
116             if (child_count >= grain_size)
117                 break;
118         }
119 
120         promises.reserve(children + grain_size);
121     }
122     else
123         promises.reserve(count);
124 
125     for (std::uint64_t i = 0; i < children; ++i)
126         promises.push_back(async(&tree_boot, child_count, grain_size, thread));
127 
128     for (std::uint64_t i = 0; i < actors; ++i)
129         promises.push_back(async(&change_thread_state, thread));
130 
131     detail::wait(promises);
132 }
133 
134 bool woken = false;
135 
136 ///////////////////////////////////////////////////////////////////////////////
test_dummy_thread(std::uint64_t)137 void test_dummy_thread(
138     std::uint64_t
139     )
140 {
141     while (true)
142     {
143         thread_state_ex_enum statex = suspend(suspended);
144 
145         if (statex == wait_terminate)
146         {
147             woken = true;
148             return;
149         }
150     }
151 }
152 
153 ///////////////////////////////////////////////////////////////////////////////
hpx_main(variables_map & vm)154 int hpx_main(variables_map& vm)
155 {
156     std::uint64_t const futures = vm["futures"].as<std::uint64_t>();
157     std::uint64_t const grain_size = vm["grain-size"].as<std::uint64_t>();
158 
159     {
160         thread_id_type thread_id = register_thread_nullary(
161             hpx::util::bind(&test_dummy_thread, futures));
162         HPX_TEST(thread_id != hpx::threads::invalid_thread_id);
163 
164         // Flood the queues with suspension operations before the rescheduling
165         // attempt.
166         future<void> before = async(&tree_boot, futures, grain_size, thread_id);
167 
168         set_thread_state(thread_id, pending, wait_signaled);
169 
170         // Flood the queues with suspension operations after the rescheduling
171         // attempt.
172         future<void> after = async(&tree_boot, futures, grain_size, thread_id);
173 
174         before.get();
175         after.get();
176 
177         set_thread_state(thread_id, pending, wait_terminate);
178     }
179 
180     finalize();
181 
182     return 0;
183 }
184 
185 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])186 int main(int argc, char* argv[])
187 {
188     // Configure application-specific options
189     options_description cmdline("Usage: " HPX_APPLICATION_STRING " [options]");
190 
191     cmdline.add_options()
192         ( "futures"
193         , value<std::uint64_t>()->default_value(64)
194         , "number of futures to invoke before and after the rescheduling")
195 
196         ( "grain-size"
197         , value<std::uint64_t>()->default_value(4)
198         , "grain size of the future tree")
199     ;
200 
201     // Initialize and run HPX
202     HPX_TEST_EQ(0, init(cmdline, argc, argv));
203 
204     HPX_TEST(woken);
205 
206     return hpx::util::report_errors();
207 }
208 
209