1 //  Copyright (c)      2013 Thomas Heller
2 //  Copyright (c) 2007-2017 Hartmut Kaiser
3 //  Copyright (c) 2011      Bryce Lelbach
4 //
5 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
6 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 
8 #if !defined(HPX_THREADMANAGER_SCHEDULING_STATIC_PRIOTITY_QUEUE_HPP)
9 #define HPX_THREADMANAGER_SCHEDULING_STATIC_PRIOTITY_QUEUE_HPP
10 
11 #include <hpx/config.hpp>
12 
13 #if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
14 #include <hpx/compat/mutex.hpp>
15 #include <hpx/runtime/threads/policies/lockfree_queue_backends.hpp>
16 #include <hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp>
17 #include <hpx/runtime/threads_fwd.hpp>
18 #include <hpx/util/assert.hpp>
19 
20 #include <cstddef>
21 #include <cstdint>
22 #include <string>
23 
24 #include <hpx/config/warnings_prefix.hpp>
25 
26 ///////////////////////////////////////////////////////////////////////////////
27 namespace hpx { namespace threads { namespace policies
28 {
29     ///////////////////////////////////////////////////////////////////////////
30     /// The static_priority_queue_scheduler maintains exactly one queue of work
31     /// items (threads) per OS thread, where this OS thread pulls its next work
32     /// from. Additionally it maintains separate queues: several for high
33     /// priority threads and one for low priority threads.
34     /// High priority threads are executed by the first N OS threads before any
35     /// other work is executed. Low priority threads are executed by the last
36     /// OS thread whenever no other work is available.
37     /// This scheduler does not do any work stealing.
38     template <typename Mutex = compat::mutex,
39         typename PendingQueuing = lockfree_fifo,
40         typename StagedQueuing = lockfree_fifo,
41         typename TerminatedQueuing = lockfree_lifo>
42     class HPX_EXPORT static_priority_queue_scheduler
43         : public local_priority_queue_scheduler<
44             Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing
45           >
46     {
47     public:
48         typedef local_priority_queue_scheduler<
49             Mutex, PendingQueuing, StagedQueuing, TerminatedQueuing
50         > base_type;
51 
52         typedef typename base_type::init_parameter_type
53             init_parameter_type;
54 
static_priority_queue_scheduler(init_parameter_type const & init,bool deferred_initialization=true)55         static_priority_queue_scheduler(init_parameter_type const& init,
56                 bool deferred_initialization = true)
57           : base_type(init, deferred_initialization)
58         {}
59 
has_thread_stealing() const60         virtual bool has_thread_stealing() const override { return false; }
61 
get_scheduler_name()62         static std::string get_scheduler_name()
63         {
64             return "static_priority_queue_scheduler";
65         }
66 
67         /// Return the next thread to be executed, return false if non is
68         /// available
get_next_thread(std::size_t num_thread,bool running,std::int64_t & idle_loop_count,threads::thread_data * & thrd)69         bool get_next_thread(std::size_t num_thread, bool running,
70             std::int64_t& idle_loop_count, threads::thread_data*& thrd) override
71         {
72             std::size_t queues_size = this->queues_.size();
73 
74             typedef typename base_type::thread_queue_type thread_queue_type;
75 
76             if (num_thread < this->high_priority_queues_.size())
77             {
78                 thread_queue_type* q = this->high_priority_queues_[num_thread];
79 
80                 q->increment_num_pending_accesses();
81                 if (q->get_next_thread(thrd))
82                     return true;
83                 q->increment_num_pending_misses();
84             }
85 
86             {
87                 HPX_ASSERT(num_thread < queues_size);
88                 thread_queue_type* q = this->queues_[num_thread];
89 
90                 q->increment_num_pending_accesses();
91                 if (q->get_next_thread(thrd))
92                     return true;
93                 q->increment_num_pending_misses();
94 
95                 // Give up, we should have work to convert.
96                 if (q->get_staged_queue_length(std::memory_order_relaxed) != 0)
97                     return false;
98             }
99 
100             // Limit access to the low priority queue to one worker thread
101             if ((queues_size - 1) == num_thread)
102                 return this->low_priority_queue_.get_next_thread(thrd);
103 
104             return false;
105         }
106 
107         /// This is a function which gets called periodically by the thread
108         /// manager to allow for maintenance tasks to be executed in the
109         /// scheduler. Returns true if the OS thread calling this function
110         /// has to be terminated (i.e. no more work has to be done).
wait_or_add_new(std::size_t num_thread,bool running,std::int64_t & idle_loop_count)111         bool wait_or_add_new(std::size_t num_thread, bool running,
112             std::int64_t& idle_loop_count) override
113         {
114             HPX_ASSERT(num_thread < this->queues_.size());
115 
116             std::size_t added = 0;
117             bool result = true;
118 
119             if (num_thread < this->high_priority_queues_.size())
120             {
121                 result = this->high_priority_queues_[num_thread]->
122                     wait_or_add_new(running, idle_loop_count, added) && result;
123                 if (0 != added) return result;
124             }
125 
126             result = this->queues_[num_thread]->wait_or_add_new(running,
127                 idle_loop_count, added) && result;
128             if (0 != added) return result;
129 
130             // Check if we have been disabled
131             if (!running)
132             {
133                 return true;
134             }
135 
136 #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
137             // no new work is available, are we deadlocked?
138             if (HPX_UNLIKELY(minimal_deadlock_detection && LHPX_ENABLED(error)))
139             {
140                 bool suspended_only = true;
141 
142                 for (std::size_t i = 0;
143                      suspended_only && i != this->queues_.size(); ++i)
144                 {
145                     suspended_only = this->queues_[i]->dump_suspended_threads(
146                         i, idle_loop_count, running);
147                 }
148 
149                 if (HPX_UNLIKELY(suspended_only)) {
150                     if (running) {
151                         LTM_(error) //-V128
152                             << "queue(" << num_thread << "): "
153                             << "no new work available, are we deadlocked?";
154                     }
155                     else {
156                         LHPX_CONSOLE_(hpx::util::logging::level::error) //-V128
157                               << "  [TM] queue(" << num_thread << "): "
158                               << "no new work available, are we deadlocked?\n";
159                     }
160                 }
161             }
162 #endif
163 
164             result = this->low_priority_queue_.wait_or_add_new(running,
165                 idle_loop_count, added) && result;
166             if (0 != added) return result;
167 
168             return result;
169         }
170     };
171 }}}
172 
173 #include <hpx/config/warnings_suffix.hpp>
174 
175 #endif
176 #endif
177 
178