1 //  Copyright (c) 2007-2015 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/hpx_init.hpp>
7 #include <hpx/include/traits.hpp>
8 #include <hpx/include/lcos.hpp>
9 #include <hpx/include/components.hpp>
10 #include <hpx/include/async.hpp>
11 #include <hpx/util/lightweight_test.hpp>
12 
13 #include <atomic>
14 #include <cstdint>
15 #include <utility>
16 #include <vector>
17 
18 ///////////////////////////////////////////////////////////////////////////////
increment(std::int32_t i)19 std::int32_t increment(std::int32_t i)
20 {
21     return i + 1;
22 }
23 HPX_PLAIN_ACTION(increment);
24 
increment_with_future(hpx::shared_future<std::int32_t> fi)25 std::int32_t increment_with_future(hpx::shared_future<std::int32_t> fi)
26 {
27     return fi.get() + 1;
28 }
29 HPX_PLAIN_ACTION(increment_with_future);
30 
31 ///////////////////////////////////////////////////////////////////////////////
32 struct decrement_server
33   : hpx::components::managed_component_base<decrement_server>
34 {
calldecrement_server35     std::int32_t call(std::int32_t i) const
36     {
37         return i - 1;
38     }
39 
40     HPX_DEFINE_COMPONENT_ACTION(decrement_server, call);
41 };
42 
43 typedef hpx::components::managed_component<decrement_server> server_type;
44 HPX_REGISTER_COMPONENT(server_type, decrement_server);
45 
46 typedef decrement_server::call_action call_action;
47 HPX_REGISTER_ACTION_DECLARATION(call_action);
48 HPX_REGISTER_ACTION(call_action);
49 
50 ///////////////////////////////////////////////////////////////////////////////
51 struct test_server : hpx::components::simple_component_base<test_server>
52 {
53 };
54 
55 typedef hpx::components::simple_component<test_server> test_server_type;
56 HPX_REGISTER_COMPONENT(test_server_type, test_server);
57 
58 struct test_client : hpx::components::client_base<test_client, test_server>
59 {
60     typedef hpx::components::client_base<test_client, test_server> base_type;
61 
test_clienttest_client62     test_client(hpx::id_type const& id)
63       : base_type(id)
64     {}
test_clienttest_client65     test_client(hpx::future<hpx::id_type> && id)
66       : base_type(std::move(id))
67     {}
68 };
69 
70 ///////////////////////////////////////////////////////////////////////////////
71 std::atomic<int> callback_called(0);
72 
cb(boost::system::error_code const & ec,hpx::parcelset::parcel const & p)73 void cb(boost::system::error_code const& ec,
74     hpx::parcelset::parcel const& p)
75 {
76     ++callback_called;
77 }
78 
79 ///////////////////////////////////////////////////////////////////////////////
test_remote_async_cb_colocated(test_client const & target)80 void test_remote_async_cb_colocated(test_client const& target)
81 {
82     {
83         increment_action inc;
84 
85         callback_called.store(0);
86         hpx::future<std::int32_t> f1 =
87             hpx::async_cb(inc, hpx::colocated(target), &cb, 42);
88         HPX_TEST_EQ(f1.get(), 43);
89         HPX_TEST_EQ(callback_called.load(), 1);
90 
91         callback_called.store(0);
92         hpx::future<std::int32_t> f2 =
93             hpx::async_cb(hpx::launch::all, inc, hpx::colocated(target), &cb, 42);
94         HPX_TEST_EQ(f2.get(), 43);
95         HPX_TEST_EQ(callback_called.load(), 1);
96     }
97 
98     {
99         increment_with_future_action inc;
100 
101         hpx::promise<std::int32_t> p;
102         hpx::shared_future<std::int32_t> f = p.get_future();
103 
104         callback_called.store(0);
105         hpx::future<std::int32_t> f1 =
106             hpx::async_cb(inc, hpx::colocated(target), &cb, f);
107         hpx::future<std::int32_t> f2 =
108             hpx::async_cb(hpx::launch::all, inc, hpx::colocated(target), &cb, f);
109 
110         p.set_value(42);
111         HPX_TEST_EQ(f1.get(), 43);
112         HPX_TEST_EQ(f2.get(), 43);
113         HPX_TEST_EQ(callback_called.load(), 2);
114     }
115 
116     {
117         callback_called.store(0);
118         hpx::future<std::int32_t> f1 =
119             hpx::async_cb<increment_action>(hpx::colocated(target), &cb, 42);
120         HPX_TEST_EQ(f1.get(), 43);
121         HPX_TEST_EQ(callback_called.load(), 1);
122 
123         callback_called.store(0);
124         hpx::future<std::int32_t> f2 = hpx::async_cb<increment_action>(
125             hpx::launch::all, hpx::colocated(target), &cb, 42);
126         HPX_TEST_EQ(f2.get(), 43);
127         HPX_TEST_EQ(callback_called.load(), 1);
128     }
129 
130     {
131         hpx::future<hpx::id_type> dec_f =
132             hpx::components::new_<decrement_server>(hpx::colocated(target));
133         hpx::id_type dec = dec_f.get();
134 
135         call_action call;
136 
137         callback_called.store(0);
138         hpx::future<std::int32_t> f1 = hpx::async_cb(call, dec, &cb, 42);
139         HPX_TEST_EQ(f1.get(), 41);
140         HPX_TEST_EQ(callback_called.load(), 1);
141 
142         callback_called.store(0);
143         hpx::future<std::int32_t> f2 =
144             hpx::async_cb(hpx::launch::all, call, dec, &cb, 42);
145         HPX_TEST_EQ(f2.get(), 41);
146         HPX_TEST_EQ(callback_called.load(), 1);
147     }
148 
149     {
150         hpx::future<hpx::id_type> dec_f =
151             hpx::components::new_<decrement_server>(hpx::colocated(target));
152         hpx::id_type dec = dec_f.get();
153 
154         callback_called.store(0);
155         hpx::future<std::int32_t> f1 =
156             hpx::async_cb<call_action>(dec, &cb, 42);
157         HPX_TEST_EQ(f1.get(), 41);
158         HPX_TEST_EQ(callback_called.load(), 1);
159 
160         callback_called.store(0);
161         hpx::future<std::int32_t> f2 =
162             hpx::async_cb<call_action>(hpx::launch::all, dec, &cb, 42);
163         HPX_TEST_EQ(f2.get(), 41);
164         HPX_TEST_EQ(callback_called.load(), 1);
165     }
166 
167     {
168         increment_with_future_action inc;
169         hpx::shared_future<std::int32_t> f =
170             hpx::async(hpx::launch::deferred, hpx::util::bind(&increment, 42));
171 
172         callback_called.store(0);
173         hpx::future<std::int32_t> f1 = hpx::async_cb(
174             inc, hpx::colocated(target), &cb, f);
175         hpx::future<std::int32_t> f2 = hpx::async_cb(
176             hpx::launch::all, inc, hpx::colocated(target), &cb, f);
177 
178         HPX_TEST_EQ(f1.get(), 44);
179         HPX_TEST_EQ(f2.get(), 44);
180         HPX_TEST_EQ(callback_called.load(), 2);
181     }
182 }
183 
hpx_main()184 int hpx_main()
185 {
186     std::vector<hpx::id_type> localities = hpx::find_all_localities();
187     for (hpx::id_type const& id : localities)
188     {
189         test_client client(hpx::new_<test_client>(id));
190         test_remote_async_cb_colocated(client);
191     }
192     return hpx::finalize();
193 }
194 
main(int argc,char * argv[])195 int main(int argc, char* argv[])
196 {
197     // Initialize and run HPX
198     HPX_TEST_EQ_MSG(hpx::init(argc, argv), 0,
199         "HPX main exited with non-zero status");
200 
201     return hpx::util::report_errors();
202 }
203 
204