1 ///////////////////////////////////////////////////////////////////////////////
2 //  Copyright (c) 2007-2014 Hartmut Kaiser
3 //  Copyright (c) 2011 Bryce Adelstein-Lelbach
4 //
5 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
6 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 ///////////////////////////////////////////////////////////////////////////////
8 
9 // Including 'hpx/hpx_main.hpp' instead of the usual 'hpx/hpx_init.hpp' enables
10 // to use the plain C-main below as the direct main HPX entry point.
11 #include <hpx/hpx_main.hpp>
12 #include <hpx/include/actions.hpp>
13 #include <hpx/include/components.hpp>
14 #include <hpx/include/iostreams.hpp>
15 #include <hpx/include/lcos.hpp>
16 #include <hpx/include/util.hpp>
17 
18 #include <cstddef>
19 #include <list>
20 #include <mutex>
21 #include <set>
22 #include <vector>
23 
24 ///////////////////////////////////////////////////////////////////////////////
25 // The purpose of this example is to execute a HPX-thread printing "Hello world"
26 // once on each OS-thread on each of the connected localities.
27 //
28 // The function hello_world_foreman_action is executed once on each locality.
29 // It schedules a HPX-thread (encapsulating hello_world_worker) once for each
30 // OS-thread on that locality. The code make sure that the HPX-thread gets
31 // really executed by the requested OS-thread. While the HPX-thread is scheduled
32 // to run on a particular OS-thread, we may have to retry as the HPX-thread may
33 // end up being 'stolen' by another OS-thread.
34 
35 ///////////////////////////////////////////////////////////////////////////////
36 //[hello_world_worker
hello_world_worker(std::size_t desired)37 std::size_t hello_world_worker(std::size_t desired)
38 {
39     // Returns the OS-thread number of the worker that is running this
40     // HPX-thread.
41     std::size_t current = hpx::get_worker_thread_num();
42     if (current == desired)
43     {
44         // The HPX-thread has been run on the desired OS-thread.
45         char const* msg = "hello world from OS-thread {1} on locality {2}\n";
46 
47         hpx::util::format_to(hpx::cout, msg, desired, hpx::get_locality_id())
48             << hpx::flush;
49 
50         return desired;
51     }
52 
53     // This HPX-thread has been run by the wrong OS-thread, make the foreman
54     // try again by rescheduling it.
55     return std::size_t(-1);
56 }
57 
58 // Define the boilerplate code necessary for the function 'hello_world_worker'
59 // to be invoked as an HPX action (by a HPX future). This macro defines the
60 // type 'hello_world_worker_action'.
61 HPX_PLAIN_ACTION(hello_world_worker, hello_world_worker_action);
62 //]
63 
64 ///////////////////////////////////////////////////////////////////////////////
65 //[hello_world_foreman
hello_world_foreman()66 void hello_world_foreman()
67 {
68     // Get the number of worker OS-threads in use by this locality.
69     std::size_t const os_threads = hpx::get_os_thread_count();
70 
71     // Find the global name of the current locality.
72     hpx::naming::id_type const here = hpx::find_here();
73 
74     // Populate a set with the OS-thread numbers of all OS-threads on this
75     // locality. When the hello world message has been printed on a particular
76     // OS-thread, we will remove it from the set.
77     std::set<std::size_t> attendance;
78     for (std::size_t os_thread = 0; os_thread < os_threads; ++os_thread)
79         attendance.insert(os_thread);
80 
81     // As long as there are still elements in the set, we must keep scheduling
82     // HPX-threads. Because HPX features work-stealing task schedulers, we have
83     // no way of enforcing which worker OS-thread will actually execute
84     // each HPX-thread.
85     while (!attendance.empty())
86     {
87         // Each iteration, we create a task for each element in the set of
88         // OS-threads that have not said "Hello world". Each of these tasks
89         // is encapsulated in a future.
90         std::vector<hpx::lcos::future<std::size_t> > futures;
91         futures.reserve(attendance.size());
92 
93         for (std::size_t worker : attendance)
94         {
95             // Asynchronously start a new task. The task is encapsulated in a
96             // future, which we can query to determine if the task has
97             // completed.
98             typedef hello_world_worker_action action_type;
99             futures.push_back(hpx::async<action_type>(here, worker));
100         }
101 
102         // Wait for all of the futures to finish. The callback version of the
103         // hpx::lcos::wait_each function takes two arguments: a vector of futures,
104         // and a binary callback.  The callback takes two arguments; the first
105         // is the index of the future in the vector, and the second is the
106         // return value of the future. hpx::lcos::wait_each doesn't return until
107         // all the futures in the vector have returned.
108         hpx::lcos::local::spinlock mtx;
109         hpx::lcos::wait_each(
110             hpx::util::unwrapping([&](std::size_t t) {
111                 if (std::size_t(-1) != t)
112                 {
113                     std::lock_guard<hpx::lcos::local::spinlock> lk(mtx);
114                     attendance.erase(t);
115                 }
116             }),
117             futures);
118     }
119 }
120 //]
121 
122 //[hello_world_action_wrapper
123 // Define the boilerplate code necessary for the function 'hello_world_foreman'
124 // to be invoked as an HPX action.
125 HPX_PLAIN_ACTION(hello_world_foreman, hello_world_foreman_action);
126 //]
127 
128 ///////////////////////////////////////////////////////////////////////////////
129 //[hello_world_hpx_main
130 //` Here is the main entry point. By using the include 'hpx/hpx_main.hpp' HPX
131 //` will invoke the plain old C-main() as its first HPX thread.
main()132 int main()
133 {
134     // Get a list of all available localities.
135     std::vector<hpx::naming::id_type> localities =
136         hpx::find_all_localities();
137 
138     // Reserve storage space for futures, one for each locality.
139     std::vector<hpx::lcos::future<void> > futures;
140     futures.reserve(localities.size());
141 
142     for (hpx::naming::id_type const& node : localities)
143     {
144         // Asynchronously start a new task. The task is encapsulated in a
145         // future, which we can query to determine if the task has
146         // completed.
147         typedef hello_world_foreman_action action_type;
148         futures.push_back(hpx::async<action_type>(node));
149     }
150 
151     // The non-callback version of hpx::lcos::wait_all takes a single parameter,
152     // a vector of futures to wait on. hpx::wait_all only returns when
153     // all of the futures have finished.
154     hpx::wait_all(futures);
155     return 0;
156 }
157 //]
158