1 //  Copyright (c) 2016 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/hpx_main.hpp>
7 #include <hpx/include/apply.hpp>
8 #include <hpx/include/lcos.hpp>
9 #include <hpx/util/lightweight_test.hpp>
10 
11 #include <atomic>
12 #include <numeric>
13 #include <string>
14 #include <vector>
15 
16 ///////////////////////////////////////////////////////////////////////////////
sum(std::vector<int> const & s,hpx::lcos::local::channel<int> c)17 void sum(std::vector<int> const& s, hpx::lcos::local::channel<int> c)
18 {
19     c.set(std::accumulate(s.begin(), s.end(), 0));      // send sum to channel
20 }
21 
calculate_sum()22 void calculate_sum()
23 {
24     std::vector<int> s = { 7, 2, 8, -9, 4, 0 };
25     hpx::lcos::local::channel<int> c;
26 
27     hpx::apply(&sum, std::vector<int>(s.begin(), s.begin() + s.size()/2), c);
28     hpx::apply(&sum, std::vector<int>(s.begin() + s.size()/2, s.end()), c);
29 
30     int x = c.get(hpx::launch::sync);    // receive from c
31     int y = c.get(hpx::launch::sync);
32 
33     int expected = std::accumulate(s.begin(), s.end(), 0);
34     HPX_TEST_EQ(expected, x + y);
35 }
36 
37 ///////////////////////////////////////////////////////////////////////////////
ping(hpx::lcos::local::send_channel<std::string> pings,std::string const & msg)38 void ping(
39     hpx::lcos::local::send_channel<std::string> pings,
40     std::string const& msg)
41 {
42     pings.set(msg);
43 }
44 
pong(hpx::lcos::local::receive_channel<std::string> pings,hpx::lcos::local::send_channel<std::string> pongs)45 void pong(
46     hpx::lcos::local::receive_channel<std::string> pings,
47     hpx::lcos::local::send_channel<std::string> pongs)
48 {
49     std::string msg = pings.get(hpx::launch::sync);
50     pongs.set(msg);
51 }
52 
pingpong()53 void pingpong()
54 {
55     hpx::lcos::local::channel<std::string> pings;
56     hpx::lcos::local::channel<std::string> pongs;
57 
58     ping(pings, "passed message");
59     pong(pings, pongs);
60 
61     std::string result = pongs.get(hpx::launch::sync);
62     HPX_TEST_EQ(std::string("passed message"), result);
63 }
64 
pingpong1()65 void pingpong1()
66 {
67     hpx::lcos::local::one_element_channel<std::string> pings;
68     hpx::lcos::local::one_element_channel<std::string> pongs;
69 
70     for (int i = 0; i != 10; ++i)
71     {
72         ping(pings, "passed message");
73         pong(pings, pongs);
74 
75         std::string result = pongs.get(hpx::launch::sync);
76         HPX_TEST_EQ(std::string("passed message"), result);
77     }
78 }
79 
80 ///////////////////////////////////////////////////////////////////////////////
ping_void(hpx::lcos::local::send_channel<> pings)81 void ping_void(hpx::lcos::local::send_channel<> pings)
82 {
83     pings.set();
84 }
85 
pong_void(hpx::lcos::local::receive_channel<> pings,hpx::lcos::local::send_channel<> pongs,bool & pingponged)86 void pong_void(
87     hpx::lcos::local::receive_channel<> pings,
88     hpx::lcos::local::send_channel<> pongs,
89     bool& pingponged)
90 {
91     pings.get(hpx::launch::sync);
92     pongs.set();
93 
94     HPX_TEST(!pingponged);
95     pingponged = true;
96 }
97 
pingpong_void()98 void pingpong_void()
99 {
100     hpx::lcos::local::channel<> pings;
101     hpx::lcos::local::channel<> pongs;
102 
103     bool pingponged = false;
104 
105     ping_void(pings);
106     pong_void(pings, pongs, pingponged);
107 
108     pongs.get(hpx::launch::sync);
109     HPX_TEST(pingponged);
110 }
111 
pingpong_void1()112 void pingpong_void1()
113 {
114     hpx::lcos::local::one_element_channel<> pings;
115     hpx::lcos::local::one_element_channel<> pongs;
116 
117     for (int i = 0; i != 10; ++i)
118     {
119         bool pingponged = false;
120 
121         ping_void(pings);
122         pong_void(pings, pongs, pingponged);
123 
124         pongs.get(hpx::launch::sync);
125         HPX_TEST(pingponged);
126     }
127 }
128 
129 ///////////////////////////////////////////////////////////////////////////////
dispatch_work()130 void dispatch_work()
131 {
132     hpx::lcos::local::channel<int> jobs;
133     hpx::lcos::local::channel<> done;
134 
135     std::atomic<int> received_jobs(0);
136     std::atomic<bool> was_closed(false);
137 
138     hpx::apply(
139         [jobs, done, &received_jobs, &was_closed]() mutable
140         {
141             while(true)
142             {
143                 hpx::error_code ec(hpx::lightweight);
144                 int next = jobs.get(hpx::launch::sync, ec);
145                 (void)next;
146                 if (!ec)
147                 {
148                     ++received_jobs;
149                 }
150                 else
151                 {
152                     was_closed = true;
153                     done.set();
154                     break;
155                 }
156             }
157         });
158 
159     for (int j = 1; j <= 3; ++j)
160     {
161         jobs.set(j);
162     }
163 
164     jobs.close();
165     done.get(hpx::launch::sync);
166 
167     HPX_TEST_EQ(received_jobs.load(), 3);
168     HPX_TEST(was_closed.load());
169 }
170 
171 ///////////////////////////////////////////////////////////////////////////////
channel_range()172 void channel_range()
173 {
174     std::atomic<int> received_elements(0);
175 
176     hpx::lcos::local::channel<std::string> queue;
177     queue.set("one");
178     queue.set("two");
179     queue.set("three");
180     queue.close();
181 
182     for (auto const& elem : queue)
183     {
184         (void)elem;
185         ++received_elements;
186     }
187 
188     HPX_TEST_EQ(received_elements.load(), 3);
189 }
190 
channel_range_void()191 void channel_range_void()
192 {
193     std::atomic<int> received_elements(0);
194 
195     hpx::lcos::local::channel<> queue;
196     queue.set();
197     queue.set();
198     queue.set();
199     queue.close();
200 
201     for (auto const& elem : queue)
202     {
203         (void)elem;
204         ++received_elements;
205     }
206 
207     HPX_TEST_EQ(received_elements.load(), 3);
208 }
209 
210 ///////////////////////////////////////////////////////////////////////////////
deadlock_test()211 void deadlock_test()
212 {
213     bool caught_exception = false;
214     try {
215         hpx::lcos::local::channel<int> c;
216         int value = c.get(hpx::launch::sync);
217         HPX_TEST(false);
218         (void)value;
219     }
220     catch(hpx::exception const&) {
221         caught_exception = true;
222     }
223     HPX_TEST(caught_exception);
224 }
225 
closed_channel_get()226 void closed_channel_get()
227 {
228     bool caught_exception = false;
229     try {
230         hpx::lcos::local::channel<int> c;
231         c.close();
232 
233         int value = c.get(hpx::launch::sync);
234         HPX_TEST(false);
235         (void)value;
236     }
237     catch(hpx::exception const&) {
238         caught_exception = true;
239     }
240     HPX_TEST(caught_exception);
241 }
242 
closed_channel_get_generation()243 void closed_channel_get_generation()
244 {
245     bool caught_exception = false;
246     try {
247         hpx::lcos::local::channel<int> c;
248         c.set(42, 122);         // setting value for generation 122
249         c.close();
250 
251         HPX_TEST_EQ(c.get(hpx::launch::sync, 122), 42);
252 
253         int value = c.get(hpx::launch::sync, 123); // asking for generation 123
254         HPX_TEST(false);
255         (void)value;
256     }
257     catch(hpx::exception const&) {
258         caught_exception = true;
259     }
260     HPX_TEST(caught_exception);
261 }
262 
closed_channel_set()263 void closed_channel_set()
264 {
265     bool caught_exception = false;
266     try {
267         hpx::lcos::local::channel<int> c;
268         c.close();
269 
270         c.set(42);
271         HPX_TEST(false);
272     }
273     catch(hpx::exception const&) {
274         caught_exception = true;
275     }
276     HPX_TEST(caught_exception);
277 }
278 
279 ///////////////////////////////////////////////////////////////////////////////
deadlock_test1()280 void deadlock_test1()
281 {
282     bool caught_exception = false;
283     try {
284         hpx::lcos::local::one_element_channel<int> c;
285         int value = c.get(hpx::launch::sync);
286         HPX_TEST(false);
287         (void)value;
288     }
289     catch(hpx::exception const&) {
290         caught_exception = true;
291     }
292     HPX_TEST(caught_exception);
293 }
294 
closed_channel_get1()295 void closed_channel_get1()
296 {
297     bool caught_exception = false;
298     try {
299         hpx::lcos::local::one_element_channel<int> c;
300         c.close();
301 
302         int value = c.get(hpx::launch::sync);
303         HPX_TEST(false);
304         (void)value;
305     }
306     catch(hpx::exception const&) {
307         caught_exception = true;
308     }
309     HPX_TEST(caught_exception);
310 }
311 
closed_channel_set1()312 void closed_channel_set1()
313 {
314     bool caught_exception = false;
315     try {
316         hpx::lcos::local::one_element_channel<int> c;
317         c.close();
318 
319         c.set(42);
320         HPX_TEST(false);
321     }
322     catch(hpx::exception const&) {
323         caught_exception = true;
324     }
325     HPX_TEST(caught_exception);
326 }
327 
328 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])329 int main(int argc, char* argv[])
330 {
331     calculate_sum();
332     pingpong();
333     pingpong1();
334     pingpong_void();
335     pingpong_void1();
336     dispatch_work();
337     channel_range();
338     channel_range_void();
339 
340     deadlock_test();
341     closed_channel_get();
342     closed_channel_get_generation();
343     closed_channel_set();
344 
345     deadlock_test1();
346     closed_channel_get1();
347     closed_channel_set1();
348 
349     return hpx::util::report_errors();
350 }
351