1 //  Copyright (c) 2016 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/hpx.hpp>
7 #include <hpx/hpx_init.hpp>
8 #include <hpx/include/actions.hpp>
9 #include <hpx/include/iostreams.hpp>
10 #include <hpx/include/parcel_coalescing.hpp>
11 #include <hpx/include/performance_counters.hpp>
12 #include <hpx/util/lightweight_test.hpp>
13 
14 #include <cstddef>
15 #include <iostream>
16 #include <string>
17 #include <utility>
18 #include <type_traits>
19 #include <vector>
20 
21 ///////////////////////////////////////////////////////////////////////////////
22 std::size_t const vsize_default = 1024;
23 std::size_t const numparcels_default = 10;
24 
25 ///////////////////////////////////////////////////////////////////////////////
26 template <typename Action, typename T>
27 hpx::parcelset::parcel
generate_parcel(hpx::id_type const & dest_id,hpx::id_type const & cont,T && data)28 generate_parcel(hpx::id_type const& dest_id, hpx::id_type const& cont, T && data)
29 {
30     hpx::naming::address addr;
31     hpx::naming::gid_type dest = dest_id.get_gid();
32     hpx::naming::detail::strip_credits_from_gid(dest);
33     hpx::parcelset::parcel p(hpx::parcelset::detail::create_parcel::call(
34         std::true_type(), std::move(dest), std::move(addr),
35         hpx::actions::typed_continuation<hpx::id_type>(cont),
36         Action(), hpx::threads::thread_priority_normal,
37         std::forward<T>(data)));
38 
39     p.set_source_id(hpx::find_here());
40     p.size() = 4096;
41 
42     return p;
43 }
44 
45 ///////////////////////////////////////////////////////////////////////////////
46 struct test_server : hpx::components::component_base<test_server>
47 {
48     typedef hpx::components::component_base<test_server> base_type;
49 
test1test_server50     hpx::id_type test1(std::vector<double> const& data)
51     {
52         return hpx::find_here();
53     }
54 
55     HPX_DEFINE_COMPONENT_ACTION(test_server, test1, test1_action);
56 };
57 
58 typedef hpx::components::component<test_server> server_type;
59 HPX_REGISTER_COMPONENT(server_type, test_server);
60 
61 typedef test_server::test1_action test1_action;
62 
63 HPX_REGISTER_ACTION_DECLARATION(test1_action);
64 HPX_ACTION_USES_MESSAGE_COALESCING(test1_action);
65 HPX_REGISTER_ACTION(test1_action);
66 
67 ///////////////////////////////////////////////////////////////////////////////
test_plain_argument(hpx::id_type const & id)68 void test_plain_argument(hpx::id_type const& id)
69 {
70     std::vector<double> data(vsize_default);
71     std::generate(data.begin(), data.end(), std::rand);
72 
73     std::vector<hpx::future<hpx::id_type> > results;
74     results.reserve(numparcels_default);
75 
76     hpx::components::client<test_server> c = hpx::new_<test_server>(id);
77 
78     // create parcels
79     std::vector<hpx::parcelset::parcel> parcels;
80     for (std::size_t i = 0; i != numparcels_default; ++i)
81     {
82         hpx::lcos::promise<hpx::id_type> p;
83         auto f = p.get_future();
84 
85         parcels.push_back(
86             generate_parcel<test1_action>(c.get_id(), p.get_id(), data)
87         );
88 
89         results.push_back(std::move(f));
90     }
91 
92     // send parcels
93     hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));
94 
95     // verify all messages got actually sent to the correct locality
96     hpx::wait_all(results);
97 
98     for (hpx::future<hpx::id_type>& f : results)
99     {
100         HPX_TEST(f.get() == id);
101     }
102 }
103 
104 ///////////////////////////////////////////////////////////////////////////////
test2(hpx::future<double> const & data)105 hpx::id_type test2(hpx::future<double> const& data)
106 {
107     return hpx::find_here();
108 }
109 HPX_DECLARE_PLAIN_ACTION(test2, test2_action);
110 HPX_ACTION_USES_MESSAGE_COALESCING(test2_action);
111 HPX_PLAIN_ACTION(test2, test2_action);
112 
test_future_argument(hpx::id_type const & id)113 void test_future_argument(hpx::id_type const& id)
114 {
115     std::vector<hpx::lcos::local::promise<double> > args;
116     args.reserve(numparcels_default);
117 
118     std::vector<hpx::future<hpx::id_type> > results;
119     results.reserve(numparcels_default);
120 
121     // create parcels
122     std::vector<hpx::parcelset::parcel> parcels;
123     for (std::size_t i = 0; i != numparcels_default; ++i)
124     {
125         hpx::lcos::local::promise<double> p_arg;
126         hpx::lcos::promise<hpx::id_type> p_cont;
127         auto f_cont = p_cont.get_future();
128 
129         parcels.push_back(
130             generate_parcel<test2_action>(id, p_cont.get_id(),
131                 p_arg.get_future())
132         );
133 
134         args.push_back(std::move(p_arg));
135         results.push_back(std::move(f_cont));
136     }
137 
138     // send parcels
139     hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));
140 
141     // now make the futures ready
142     for (hpx::lcos::local::promise<double>& arg : args)
143     {
144         arg.set_value(42.0);
145     }
146 
147     // verify all messages got actually sent to the correct locality
148     hpx::wait_all(results);
149 
150     for (hpx::future<hpx::id_type>& f : results)
151     {
152         HPX_TEST(f.get() == id);
153     }
154 }
155 
test_mixed_arguments(hpx::id_type const & id)156 void test_mixed_arguments(hpx::id_type const& id)
157 {
158     std::vector<double> data(vsize_default);
159     std::generate(data.begin(), data.end(), std::rand);
160 
161     std::vector<hpx::lcos::local::promise<double> > args;
162     args.reserve(numparcels_default);
163 
164     std::vector<hpx::future<hpx::id_type> > results;
165     results.reserve(numparcels_default);
166 
167     hpx::components::client<test_server> c = hpx::new_<test_server>(id);
168 
169     // create parcels
170     std::vector<hpx::parcelset::parcel> parcels;
171     for (std::size_t i = 0; i != numparcels_default; ++i)
172     {
173         hpx::lcos::promise<hpx::id_type> p_cont;
174         auto f_cont = p_cont.get_future();
175 
176         if (std::rand() % 2)
177         {
178             parcels.push_back(
179                 generate_parcel<test1_action>(c.get_id(), p_cont.get_id(), data)
180             );
181         }
182         else
183         {
184             hpx::lcos::local::promise<double> p_arg;
185 
186             parcels.push_back(
187                 generate_parcel<test2_action>(id, p_cont.get_id(),
188                     p_arg.get_future())
189             );
190 
191             args.push_back(std::move(p_arg));
192         }
193 
194         results.push_back(std::move(f_cont));
195     }
196 
197     // send parcels
198     hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));
199 
200     // now make the futures ready
201     for (hpx::lcos::local::promise<double>& arg : args)
202     {
203         arg.set_value(42.0);
204     }
205 
206     // verify all messages got actually sent to the correct locality
207     hpx::wait_all(results);
208 
209     for (hpx::future<hpx::id_type>& f : results)
210     {
211         HPX_TEST(f.get() == id);
212     }
213 }
214 
215 ///////////////////////////////////////////////////////////////////////////////
print_counters(char const * name)216 void print_counters(char const* name)
217 {
218     using namespace hpx::performance_counters;
219 
220     std::vector<performance_counter> counters = discover_counters(name);
221 
222     for (performance_counter const& c : counters)
223     {
224         counter_value value = c.get_counter_value(hpx::launch::sync);
225         HPX_TEST_NEQ(value.get_value<double>(), 0.0);
226 
227         hpx::cout
228             << "counter: " << c.get_name(hpx::launch::sync)
229             << ", value: " << value.get_value<double>()
230             << std::endl;
231     }
232 }
233 
234 ///////////////////////////////////////////////////////////////////////////////
hpx_main(boost::program_options::variables_map & vm)235 int hpx_main(boost::program_options::variables_map& vm)
236 {
237     unsigned int seed = (unsigned int)std::time(nullptr);
238     if (vm.count("seed"))
239         seed = vm["seed"].as<unsigned int>();
240 
241     std::cout << "using seed: " << seed << std::endl;
242     std::srand(seed);
243 
244     for (hpx::id_type const& id : hpx::find_remote_localities())
245     {
246         test_plain_argument(id);
247         test_future_argument(id);
248         test_mixed_arguments(id);
249     }
250 
251     // make sure coalescing was actually invoked
252     print_counters("/coalescing{locality#0/total}/count/parcels@test1_action");
253     print_counters("/coalescing{locality#0/total}/count/parcels@test2_action");
254     print_counters("/coalescing{locality#0/total}/count/messages@test1_action");
255     print_counters("/coalescing{locality#0/total}/count/messages@test2_action");
256 
257     return hpx::finalize();
258 }
259 
260 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])261 int main(int argc, char* argv[])
262 {
263     // add command line option which controls the random number generator seed
264     using namespace boost::program_options;
265     options_description desc_commandline(
266         "Usage: " HPX_APPLICATION_STRING " [options]");
267 
268     desc_commandline.add_options()
269         ("seed,s", value<unsigned int>(),
270         "the random number generator seed to use for this run")
271         ;
272 
273     // explicitly enable message handlers (parcel coalescing)
274     std::vector<std::string> const cfg = {
275         "hpx.parcel.message_handlers=1"
276     };
277 
278     // Initialize and run HPX
279     HPX_TEST_EQ_MSG(hpx::init(desc_commandline, argc, argv, cfg), 0,
280         "HPX main exited with non-zero status");
281 
282     return hpx::util::report_errors();
283 }
284