1 //  Copyright (c) 2011-2013 Bryce Adelstein-Lelbach
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // This test demonstrates the issue described in #1036: Scheduler hangs when
7 // user code attempts to "block" OS-threads
8 
9 #include <hpx/hpx.hpp>
10 #include <hpx/hpx_init.hpp>
11 #include <hpx/util/bind.hpp>
12 #include <hpx/util/high_resolution_timer.hpp>
13 #include <hpx/util/lightweight_test.hpp>
14 #include <hpx/runtime/threads/thread_helpers.hpp>
15 #include <hpx/runtime/threads/topology.hpp>
16 
17 #include <boost/scoped_array.hpp>
18 
19 #include <atomic>
20 #include <cstdint>
21 #include <string>
22 #include <vector>
23 
24 ///////////////////////////////////////////////////////////////////////////////
blocker(std::atomic<std::uint64_t> * entered,std::atomic<std::uint64_t> * started,boost::scoped_array<std::atomic<std::uint64_t>> * blocked_threads,std::uint64_t worker)25 void blocker(
26     std::atomic<std::uint64_t>* entered
27   , std::atomic<std::uint64_t>* started
28   , boost::scoped_array<std::atomic<std::uint64_t> >* blocked_threads
29   , std::uint64_t worker
30     )
31 {
32     // reschedule if we are not on the correct OS thread...
33     if (worker != hpx::get_worker_thread_num())
34     {
35         hpx::threads::register_work(
36             hpx::util::bind(&blocker, entered, started, blocked_threads, worker),
37             "blocker", hpx::threads::pending,
38             hpx::threads::thread_priority_normal,
39             hpx::threads::thread_schedule_hint(worker));
40         return;
41     }
42 
43     (*blocked_threads)[hpx::get_worker_thread_num()].fetch_add(1);
44 
45     entered->fetch_add(1);
46 
47     HPX_TEST_EQ(worker, hpx::get_worker_thread_num());
48 
49     while (started->load() != 1)
50         continue;
51 }
52 
53 ///////////////////////////////////////////////////////////////////////////////
54 volatile int i = 0;
55 std::uint64_t delay = 100;
56 
hpx_main()57 int hpx_main()
58 {
59     {
60         ///////////////////////////////////////////////////////////////////////
61         // Block all other OS threads.
62         std::atomic<std::uint64_t> entered(0);
63         std::atomic<std::uint64_t> started(0);
64 
65         std::uint64_t const os_thread_count = hpx::get_os_thread_count();
66 
67         boost::scoped_array<std::atomic<std::uint64_t> >
68             blocked_threads(
69                 new std::atomic<std::uint64_t>[os_thread_count]);
70 
71         for (std::uint64_t i = 0; i < os_thread_count; ++i)
72             blocked_threads[i].store(0);
73 
74         std::uint64_t scheduled = 0;
75         for (std::uint64_t i = 0; i < os_thread_count; ++i)
76         {
77             if (i == hpx::get_worker_thread_num())
78                 continue;
79 
80             hpx::threads::register_work(
81                 hpx::util::bind(&blocker, &entered, &started, &blocked_threads, i),
82                 "blocker", hpx::threads::pending,
83                 hpx::threads::thread_priority_normal,
84                 hpx::threads::thread_schedule_hint(i));
85             ++scheduled;
86         }
87         HPX_TEST_EQ(scheduled, os_thread_count - 1);
88 
89 
90         while (entered.load() != (os_thread_count - 1))
91             continue;
92 
93         {
94             double delay_sec = delay * 1e-6;
95             hpx::util::high_resolution_timer td;
96 
97             while (true)
98             {
99                 if (td.elapsed() > delay_sec)
100                     break;
101                 else
102                     ++i;
103             }
104         }
105 
106         started.fetch_add(1);
107 
108         for (std::uint64_t i = 0; i < os_thread_count; ++i)
109             HPX_TEST(blocked_threads[i].load() <= 1);
110     }
111 
112     return hpx::finalize();
113 }
114 
115 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])116 int main(
117     int argc
118   , char* argv[]
119     )
120 {
121     using namespace boost::program_options;
122 
123     // Configure application-specific options.
124     options_description cmdline("usage: " HPX_APPLICATION_STRING " [options]");
125 
126     cmdline.add_options()
127         ( "delay"
128         , value<std::uint64_t>(&delay)->default_value(100)
129         , "time in micro-seconds for the delay loop")
130         ;
131 
132     // We force this test to use all available threads by default.
133     std::vector<std::string> const cfg = {
134         "hpx.os_threads=all"
135     };
136 
137     // Initialize and run HPX.
138     return hpx::init(cmdline, argc, argv, cfg);
139 }
140 
141 
142