1 // Copyright (c) 2011-2013 Bryce Adelstein-Lelbach
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 // This test demonstrates the issue described in #1036: Scheduler hangs when
7 // user code attempts to "block" OS-threads
8
9 #include <hpx/hpx.hpp>
10 #include <hpx/hpx_init.hpp>
11 #include <hpx/util/bind.hpp>
12 #include <hpx/util/high_resolution_timer.hpp>
13 #include <hpx/util/lightweight_test.hpp>
14 #include <hpx/runtime/threads/thread_helpers.hpp>
15 #include <hpx/runtime/threads/topology.hpp>
16
17 #include <boost/scoped_array.hpp>
18
19 #include <atomic>
20 #include <cstdint>
21 #include <string>
22 #include <vector>
23
24 ///////////////////////////////////////////////////////////////////////////////
blocker(std::atomic<std::uint64_t> * entered,std::atomic<std::uint64_t> * started,boost::scoped_array<std::atomic<std::uint64_t>> * blocked_threads,std::uint64_t worker)25 void blocker(
26 std::atomic<std::uint64_t>* entered
27 , std::atomic<std::uint64_t>* started
28 , boost::scoped_array<std::atomic<std::uint64_t> >* blocked_threads
29 , std::uint64_t worker
30 )
31 {
32 // reschedule if we are not on the correct OS thread...
33 if (worker != hpx::get_worker_thread_num())
34 {
35 hpx::threads::register_work(
36 hpx::util::bind(&blocker, entered, started, blocked_threads, worker),
37 "blocker", hpx::threads::pending,
38 hpx::threads::thread_priority_normal,
39 hpx::threads::thread_schedule_hint(worker));
40 return;
41 }
42
43 (*blocked_threads)[hpx::get_worker_thread_num()].fetch_add(1);
44
45 entered->fetch_add(1);
46
47 HPX_TEST_EQ(worker, hpx::get_worker_thread_num());
48
49 while (started->load() != 1)
50 continue;
51 }
52
53 ///////////////////////////////////////////////////////////////////////////////
54 volatile int i = 0;
55 std::uint64_t delay = 100;
56
hpx_main()57 int hpx_main()
58 {
59 {
60 ///////////////////////////////////////////////////////////////////////
61 // Block all other OS threads.
62 std::atomic<std::uint64_t> entered(0);
63 std::atomic<std::uint64_t> started(0);
64
65 std::uint64_t const os_thread_count = hpx::get_os_thread_count();
66
67 boost::scoped_array<std::atomic<std::uint64_t> >
68 blocked_threads(
69 new std::atomic<std::uint64_t>[os_thread_count]);
70
71 for (std::uint64_t i = 0; i < os_thread_count; ++i)
72 blocked_threads[i].store(0);
73
74 std::uint64_t scheduled = 0;
75 for (std::uint64_t i = 0; i < os_thread_count; ++i)
76 {
77 if (i == hpx::get_worker_thread_num())
78 continue;
79
80 hpx::threads::register_work(
81 hpx::util::bind(&blocker, &entered, &started, &blocked_threads, i),
82 "blocker", hpx::threads::pending,
83 hpx::threads::thread_priority_normal,
84 hpx::threads::thread_schedule_hint(i));
85 ++scheduled;
86 }
87 HPX_TEST_EQ(scheduled, os_thread_count - 1);
88
89
90 while (entered.load() != (os_thread_count - 1))
91 continue;
92
93 {
94 double delay_sec = delay * 1e-6;
95 hpx::util::high_resolution_timer td;
96
97 while (true)
98 {
99 if (td.elapsed() > delay_sec)
100 break;
101 else
102 ++i;
103 }
104 }
105
106 started.fetch_add(1);
107
108 for (std::uint64_t i = 0; i < os_thread_count; ++i)
109 HPX_TEST(blocked_threads[i].load() <= 1);
110 }
111
112 return hpx::finalize();
113 }
114
115 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])116 int main(
117 int argc
118 , char* argv[]
119 )
120 {
121 using namespace boost::program_options;
122
123 // Configure application-specific options.
124 options_description cmdline("usage: " HPX_APPLICATION_STRING " [options]");
125
126 cmdline.add_options()
127 ( "delay"
128 , value<std::uint64_t>(&delay)->default_value(100)
129 , "time in micro-seconds for the delay loop")
130 ;
131
132 // We force this test to use all available threads by default.
133 std::vector<std::string> const cfg = {
134 "hpx.os_threads=all"
135 };
136
137 // Initialize and run HPX.
138 return hpx::init(cmdline, argc, argv, cfg);
139 }
140
141
142