1 ///////////////////////////////////////////////////////////////////////////////
2 // Copyright (c) 2007-2014 Hartmut Kaiser
3 // Copyright (c) 2011 Bryce Adelstein-Lelbach
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 ///////////////////////////////////////////////////////////////////////////////
8
9 // Including 'hpx/hpx_main.hpp' instead of the usual 'hpx/hpx_init.hpp' enables
10 // to use the plain C-main below as the direct main HPX entry point.
11 #include <hpx/hpx_main.hpp>
12 #include <hpx/include/actions.hpp>
13 #include <hpx/include/components.hpp>
14 #include <hpx/include/iostreams.hpp>
15 #include <hpx/include/lcos.hpp>
16 #include <hpx/include/util.hpp>
17
18 #include <cstddef>
19 #include <list>
20 #include <mutex>
21 #include <set>
22 #include <vector>
23
24 ///////////////////////////////////////////////////////////////////////////////
25 // The purpose of this example is to execute a HPX-thread printing "Hello world"
26 // once on each OS-thread on each of the connected localities.
27 //
28 // The function hello_world_foreman_action is executed once on each locality.
29 // It schedules a HPX-thread (encapsulating hello_world_worker) once for each
30 // OS-thread on that locality. The code make sure that the HPX-thread gets
31 // really executed by the requested OS-thread. While the HPX-thread is scheduled
32 // to run on a particular OS-thread, we may have to retry as the HPX-thread may
33 // end up being 'stolen' by another OS-thread.
34
35 ///////////////////////////////////////////////////////////////////////////////
36 //[hello_world_worker
hello_world_worker(std::size_t desired)37 std::size_t hello_world_worker(std::size_t desired)
38 {
39 // Returns the OS-thread number of the worker that is running this
40 // HPX-thread.
41 std::size_t current = hpx::get_worker_thread_num();
42 if (current == desired)
43 {
44 // The HPX-thread has been run on the desired OS-thread.
45 char const* msg = "hello world from OS-thread {1} on locality {2}\n";
46
47 hpx::util::format_to(hpx::cout, msg, desired, hpx::get_locality_id())
48 << hpx::flush;
49
50 return desired;
51 }
52
53 // This HPX-thread has been run by the wrong OS-thread, make the foreman
54 // try again by rescheduling it.
55 return std::size_t(-1);
56 }
57
58 // Define the boilerplate code necessary for the function 'hello_world_worker'
59 // to be invoked as an HPX action (by a HPX future). This macro defines the
60 // type 'hello_world_worker_action'.
61 HPX_PLAIN_ACTION(hello_world_worker, hello_world_worker_action);
62 //]
63
64 ///////////////////////////////////////////////////////////////////////////////
65 //[hello_world_foreman
hello_world_foreman()66 void hello_world_foreman()
67 {
68 // Get the number of worker OS-threads in use by this locality.
69 std::size_t const os_threads = hpx::get_os_thread_count();
70
71 // Find the global name of the current locality.
72 hpx::naming::id_type const here = hpx::find_here();
73
74 // Populate a set with the OS-thread numbers of all OS-threads on this
75 // locality. When the hello world message has been printed on a particular
76 // OS-thread, we will remove it from the set.
77 std::set<std::size_t> attendance;
78 for (std::size_t os_thread = 0; os_thread < os_threads; ++os_thread)
79 attendance.insert(os_thread);
80
81 // As long as there are still elements in the set, we must keep scheduling
82 // HPX-threads. Because HPX features work-stealing task schedulers, we have
83 // no way of enforcing which worker OS-thread will actually execute
84 // each HPX-thread.
85 while (!attendance.empty())
86 {
87 // Each iteration, we create a task for each element in the set of
88 // OS-threads that have not said "Hello world". Each of these tasks
89 // is encapsulated in a future.
90 std::vector<hpx::lcos::future<std::size_t> > futures;
91 futures.reserve(attendance.size());
92
93 for (std::size_t worker : attendance)
94 {
95 // Asynchronously start a new task. The task is encapsulated in a
96 // future, which we can query to determine if the task has
97 // completed.
98 typedef hello_world_worker_action action_type;
99 futures.push_back(hpx::async<action_type>(here, worker));
100 }
101
102 // Wait for all of the futures to finish. The callback version of the
103 // hpx::lcos::wait_each function takes two arguments: a vector of futures,
104 // and a binary callback. The callback takes two arguments; the first
105 // is the index of the future in the vector, and the second is the
106 // return value of the future. hpx::lcos::wait_each doesn't return until
107 // all the futures in the vector have returned.
108 hpx::lcos::local::spinlock mtx;
109 hpx::lcos::wait_each(
110 hpx::util::unwrapping([&](std::size_t t) {
111 if (std::size_t(-1) != t)
112 {
113 std::lock_guard<hpx::lcos::local::spinlock> lk(mtx);
114 attendance.erase(t);
115 }
116 }),
117 futures);
118 }
119 }
120 //]
121
122 //[hello_world_action_wrapper
123 // Define the boilerplate code necessary for the function 'hello_world_foreman'
124 // to be invoked as an HPX action.
125 HPX_PLAIN_ACTION(hello_world_foreman, hello_world_foreman_action);
126 //]
127
128 ///////////////////////////////////////////////////////////////////////////////
129 //[hello_world_hpx_main
130 //` Here is the main entry point. By using the include 'hpx/hpx_main.hpp' HPX
131 //` will invoke the plain old C-main() as its first HPX thread.
main()132 int main()
133 {
134 // Get a list of all available localities.
135 std::vector<hpx::naming::id_type> localities =
136 hpx::find_all_localities();
137
138 // Reserve storage space for futures, one for each locality.
139 std::vector<hpx::lcos::future<void> > futures;
140 futures.reserve(localities.size());
141
142 for (hpx::naming::id_type const& node : localities)
143 {
144 // Asynchronously start a new task. The task is encapsulated in a
145 // future, which we can query to determine if the task has
146 // completed.
147 typedef hello_world_foreman_action action_type;
148 futures.push_back(hpx::async<action_type>(node));
149 }
150
151 // The non-callback version of hpx::lcos::wait_all takes a single parameter,
152 // a vector of futures to wait on. hpx::wait_all only returns when
153 // all of the futures have finished.
154 hpx::wait_all(futures);
155 return 0;
156 }
157 //]
158