1 //  Copyright (c) 2007-2017 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_RUNTIME_THREADS_DETAIL_SET_THREAD_STATE_JAN_13_2013_0518PM)
7 #define HPX_RUNTIME_THREADS_DETAIL_SET_THREAD_STATE_JAN_13_2013_0518PM
8 
9 #include <hpx/config.hpp>
10 #include <hpx/config/asio.hpp>
11 #include <hpx/error_code.hpp>
12 #include <hpx/runtime/threads/coroutines/coroutine.hpp>
13 #include <hpx/runtime/threads/detail/create_thread.hpp>
14 #include <hpx/runtime/threads/detail/create_work.hpp>
15 #include <hpx/runtime/threads/thread_data.hpp>
16 #include <hpx/runtime/threads/thread_helpers.hpp>
17 #include <hpx/runtime_fwd.hpp>
18 #include <hpx/throw_exception.hpp>
19 #include <hpx/util/assert.hpp>
20 #include <hpx/util/bind_front.hpp>
21 #include <hpx/util/bind.hpp>
22 #include <hpx/util/io_service_pool.hpp>
23 #include <hpx/util/logging.hpp>
24 #include <hpx/util/steady_clock.hpp>
25 
26 #include <boost/asio/basic_waitable_timer.hpp>
27 
28 #include <atomic>
29 #include <chrono>
30 #include <cstddef>
31 #include <functional>
32 #include <memory>
33 #include <sstream>
34 
35 namespace hpx { namespace threads { namespace detail
36 {
37     ///////////////////////////////////////////////////////////////////////////
38     inline thread_state set_thread_state(
39         thread_id_type const& id, thread_state_enum new_state,
40         thread_state_ex_enum new_state_ex, thread_priority priority,
41         thread_schedule_hint schedulehint = thread_schedule_hint(),
42         error_code& ec = throws);
43 
44     ///////////////////////////////////////////////////////////////////////////
set_active_state(thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_state previous_state)45     inline thread_result_type set_active_state(
46         thread_id_type const& thrd, thread_state_enum newstate,
47         thread_state_ex_enum newstate_ex, thread_priority priority,
48         thread_state previous_state)
49     {
50         if (HPX_UNLIKELY(!thrd)) {
51             HPX_THROW_EXCEPTION(null_thread_id,
52                 "threads::detail::set_active_state",
53                 "null thread id encountered");
54             return thread_result_type(terminated, invalid_thread_id);
55         }
56 
57         // make sure that the thread has not been suspended and set active again
58         // in the meantime
59         thread_state current_state = thrd->get_state();
60 
61         if (current_state.state() == previous_state.state() &&
62             current_state != previous_state)
63         {
64             LTM_(warning)
65                 << "set_active_state: thread is still active, however "
66                       "it was non-active since the original set_state "
67                       "request was issued, aborting state change, thread("
68                 << thrd << "), description("
69                 << thrd->get_description() << "), new state("
70                 << get_thread_state_name(newstate) << ")";
71             return thread_result_type(terminated, invalid_thread_id);
72         }
73 
74         // just retry, set_state will create new thread if target is still active
75         error_code ec(lightweight);      // do not throw
76         detail::set_thread_state(thrd, newstate, newstate_ex, priority,
77             thread_schedule_hint(), ec);
78 
79         return thread_result_type(terminated, invalid_thread_id);
80     }
81 
82     ///////////////////////////////////////////////////////////////////////////
set_thread_state(thread_id_type const & thrd,thread_state_enum new_state,thread_state_ex_enum new_state_ex,thread_priority priority,thread_schedule_hint schedulehint,error_code & ec)83     inline thread_state set_thread_state(
84         thread_id_type const& thrd, thread_state_enum new_state,
85         thread_state_ex_enum new_state_ex, thread_priority priority,
86         thread_schedule_hint schedulehint, error_code& ec)
87     {
88         if (HPX_UNLIKELY(!thrd)) {
89             HPX_THROWS_IF(ec, null_thread_id, "threads::detail::set_thread_state",
90                 "null thread id encountered");
91             return thread_state(unknown, wait_unknown);
92         }
93 
94         // set_state can't be used to force a thread into active state
95         if (new_state == threads::active) {
96             std::ostringstream strm;
97             strm << "invalid new state: " << get_thread_state_name(new_state);
98             HPX_THROWS_IF(ec, bad_parameter, "threads::detail::set_thread_state",
99                 strm.str());
100             return thread_state(unknown, wait_unknown);
101         }
102 
103         // we know that the id is actually the pointer to the thread
104         if (!thrd) {
105             if (&ec != &throws)
106                 ec = make_success_code();
107             return thread_state(terminated, wait_unknown);
108             // this thread has already been terminated
109         }
110 
111         thread_state previous_state;
112         do {
113             // action depends on the current state
114             previous_state = thrd->get_state();
115             thread_state_enum previous_state_val = previous_state.state();
116 
117             // nothing to do here if the state doesn't change
118             if (new_state == previous_state_val) {
119                 LTM_(warning)
120                     << "set_thread_state: old thread state is the same as new "
121                        "thread state, aborting state change, thread("
122                     << thrd << "), description("
123                     << thrd->get_description() << "), new state("
124                     << get_thread_state_name(new_state) << ")";
125 
126                 if (&ec != &throws)
127                     ec = make_success_code();
128 
129                 return thread_state(new_state, previous_state.state_ex());
130             }
131 
132             // the thread to set the state for is currently running, so we
133             // schedule another thread to execute the pending set_state
134             switch (previous_state_val) {
135             case active:
136                 {
137                     // schedule a new thread to set the state
138                     LTM_(warning)
139                         << "set_thread_state: thread is currently active, scheduling "
140                             "new thread, thread(" << thrd << "), description("
141                         << thrd->get_description() << "), new state("
142                         << get_thread_state_name(new_state) << ")";
143 
144                     thread_init_data data(
145                         util::bind(&set_active_state,
146                             thrd, new_state, new_state_ex,
147                             priority, previous_state),
148                         "set state for active thread", 0, priority);
149 
150                     create_work(thrd->get_scheduler_base(), data, pending, ec);
151 
152                     if (&ec != &throws)
153                         ec = make_success_code();
154 
155                     return previous_state;     // done
156                 }
157                 break;
158             case terminated:
159                 {
160                     LTM_(warning)
161                         << "set_thread_state: thread is terminated, aborting state "
162                             "change, thread(" << thrd << "), description("
163                         << thrd->get_description() << "), new state("
164                         << get_thread_state_name(new_state) << ")";
165 
166                     if (&ec != &throws)
167                         ec = make_success_code();
168 
169                     // If the thread has been terminated while this set_state was
170                     // pending nothing has to be done anymore.
171                     return previous_state;
172                 }
173                 break;
174             case pending:
175             case pending_boost:
176                 if (suspended == new_state) {
177                     // we do not allow explicit resetting of a state to suspended
178                     // without the thread being executed.
179                     std::ostringstream strm;
180                     strm << "set_thread_state: invalid new state, can't demote a "
181                             "pending thread, "
182                          << "thread(" << thrd << "), description("
183                          << thrd->get_description() << "), new state("
184                          << get_thread_state_name(new_state) << ")";
185 
186                     LTM_(fatal) << strm.str();
187 
188                     HPX_THROWS_IF(ec, bad_parameter,
189                         "threads::detail::set_thread_state",
190                         strm.str());
191                     return thread_state(unknown, wait_unknown);
192                 }
193                 break;
194             case suspended:
195                 break;      // fine, just set the new state
196             case pending_do_not_schedule:
197             default:
198                 HPX_ASSERT(false);    // should not happen
199                 break;
200             }
201 
202             // If the previous state was pending we are supposed to remove the
203             // thread from the queue. But in order to avoid linearly looking
204             // through the queue we defer this to the thread function, which
205             // at some point will ignore this thread by simply skipping it
206             // (if it's not pending anymore).
207 
208             LTM_(info) << "set_thread_state: thread(" << thrd << "), "
209                           "description(" << thrd->get_description() << "), "
210                           "new state(" << get_thread_state_name(new_state) << "), "
211                           "old state(" << get_thread_state_name(previous_state_val)
212                        << ")";
213 
214             // So all what we do here is to set the new state.
215             if (thrd->restore_state(new_state, new_state_ex, previous_state))
216                 break;
217 
218             // state has changed since we fetched it from the thread, retry
219             LTM_(error)
220                 << "set_thread_state: state has been changed since it was fetched, "
221                    "retrying, thread(" << thrd << "), "
222                    "description(" << thrd->get_description() << "), "
223                    "new state(" << get_thread_state_name(new_state) << "), "
224                    "old state(" << get_thread_state_name(previous_state_val)
225                 << ")";
226         } while (true);
227 
228         if (new_state == pending) {
229             // REVIEW: Passing a specific target thread may interfere with the
230             // round robin queuing.
231 
232             thrd->get_scheduler_base()->schedule_thread(thrd.get(),
233                 schedulehint, false, thrd.get()->get_priority());
234             // NOTE: Don't care if the hint is a NUMA hint, just want to wake up
235             // a thread.
236             thrd->get_scheduler_base()->do_some_work(schedulehint.hint);
237         }
238 
239         if (&ec != &throws)
240             ec = make_success_code();
241 
242         return previous_state;
243     }
244 
245     ///////////////////////////////////////////////////////////////////////////
246     /// This thread function is used by the at_timer thread below to trigger
247     /// the required action.
wake_timer_thread(thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_id_type const & timer_id,std::shared_ptr<std::atomic<bool>> const & triggered,thread_state_ex_enum my_statex)248     inline thread_result_type wake_timer_thread(
249         thread_id_type const& thrd, thread_state_enum newstate,
250         thread_state_ex_enum newstate_ex, thread_priority priority,
251         thread_id_type const& timer_id,
252         std::shared_ptr<std::atomic<bool> > const& triggered,
253         thread_state_ex_enum my_statex)
254     {
255         if (HPX_UNLIKELY(!thrd)) {
256             HPX_THROW_EXCEPTION(null_thread_id,
257                 "threads::detail::wake_timer_thread",
258                 "null thread id encountered (id)");
259             return thread_result_type(terminated, invalid_thread_id);
260         }
261         if (HPX_UNLIKELY(!timer_id)) {
262             HPX_THROW_EXCEPTION(null_thread_id,
263                 "threads::detail::wake_timer_thread",
264                 "null thread id encountered (timer_id)");
265             return thread_result_type(terminated, invalid_thread_id);
266         }
267 
268         HPX_ASSERT(my_statex == wait_abort || my_statex == wait_timeout);
269 
270         if (!triggered->load())
271         {
272             error_code ec(lightweight);    // do not throw
273             detail::set_thread_state(timer_id, pending, my_statex,
274                 thread_priority_boost, thread_schedule_hint(), ec);
275         }
276 
277         return thread_result_type(terminated, invalid_thread_id);
278     }
279 
280     /// This thread function initiates the required set_state action (on
281     /// behalf of one of the threads#detail#set_thread_state functions).
282     template <typename SchedulingPolicy>
at_timer(SchedulingPolicy & scheduler,util::steady_clock::time_point & abs_time,thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,std::atomic<bool> * started)283     thread_result_type at_timer(SchedulingPolicy& scheduler,
284         util::steady_clock::time_point& abs_time,
285         thread_id_type const& thrd, thread_state_enum newstate,
286         thread_state_ex_enum newstate_ex, thread_priority priority,
287         std::atomic<bool>* started)
288     {
289         if (HPX_UNLIKELY(!thrd)) {
290             HPX_THROW_EXCEPTION(null_thread_id,
291                 "threads::detail::at_timer",
292                 "null thread id encountered");
293             return thread_result_type(terminated, invalid_thread_id);
294         }
295 
296         // create a new thread in suspended state, which will execute the
297         // requested set_state when timer fires and will re-awaken this thread,
298         // allowing the deadline_timer to go out of scope gracefully
299         thread_id_type self_id = get_self_id();
300 
301         std::shared_ptr<std::atomic<bool> > triggered(
302             std::make_shared<std::atomic<bool> >(false));
303 
304         thread_init_data data(
305             util::bind_front(&wake_timer_thread,
306                 thrd, newstate, newstate_ex, priority,
307                 self_id, triggered),
308             "wake_timer", 0, priority);
309 
310         thread_id_type wake_id = invalid_thread_id;
311         create_thread(&scheduler, data, wake_id, suspended);
312 
313         // create timer firing in correspondence with given time
314         typedef boost::asio::basic_waitable_timer<
315             util::steady_clock> deadline_timer;
316 
317         deadline_timer t (
318             get_thread_pool("timer-pool")->get_io_service(), abs_time);
319 
320         // let the timer invoke the set_state on the new (suspended) thread
321         t.async_wait(
322             [wake_id, priority](const boost::system::error_code& ec)
323             {
324                 if (ec.value() == boost::system::errc::operation_canceled)
325                 {
326                     detail::set_thread_state(wake_id, pending, wait_abort,
327                         priority, thread_schedule_hint(), throws);
328                 }
329                 else
330                 {
331                     detail::set_thread_state(wake_id, pending, wait_timeout,
332                         priority, thread_schedule_hint(), throws);
333                 }
334             });
335 
336         if (started != nullptr)
337             started->store(true);
338 
339         // this waits for the thread to be reactivated when the timer fired
340         // if it returns signaled the timer has been canceled, otherwise
341         // the timer fired and the wake_timer_thread above has been executed
342         thread_state_ex_enum statex =
343             get_self().yield(thread_result_type(suspended, invalid_thread_id));
344 
345         HPX_ASSERT(statex == wait_abort || statex == wait_timeout);
346 
347         if (wait_timeout != statex) //-V601
348         {
349             triggered->store(true);
350             // wake_timer_thread has not been executed yet, cancel timer
351             t.cancel();
352         }
353         else
354         {
355             detail::set_thread_state(thrd, newstate, newstate_ex, priority);
356         }
357 
358         return thread_result_type(terminated, invalid_thread_id);
359     }
360 
361     /// Set a timer to set the state of the given \a thread to the given
362     /// new value after it expired (at the given time)
363     template <typename SchedulingPolicy>
set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_time_point const & abs_time,thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_schedule_hint schedulehint,std::atomic<bool> * started,error_code & ec)364     thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
365         util::steady_time_point const& abs_time, thread_id_type const& thrd,
366         thread_state_enum newstate, thread_state_ex_enum newstate_ex,
367         thread_priority priority, thread_schedule_hint schedulehint,
368         std::atomic<bool>* started, error_code& ec)
369     {
370         if (HPX_UNLIKELY(!thrd)) {
371             HPX_THROWS_IF(ec, null_thread_id,
372                 "threads::detail::set_thread_state",
373                 "null thread id encountered");
374             return invalid_thread_id;
375         }
376 
377         // this creates a new thread which creates the timer and handles the
378         // requested actions
379         thread_init_data data(
380             util::bind(&at_timer<SchedulingPolicy>,
381                 std::ref(scheduler), abs_time.value(), thrd, newstate, newstate_ex,
382                 priority, started),
383                 "at_timer (expire at)", 0, priority, schedulehint);
384 
385         thread_id_type newid = invalid_thread_id;
386         create_thread(&scheduler, data, newid, pending, true, ec); //-V601
387         return newid;
388     }
389 
390     template <typename SchedulingPolicy>
set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_time_point const & abs_time,thread_id_type const & id,std::atomic<bool> * started,error_code & ec)391     thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
392         util::steady_time_point const& abs_time, thread_id_type const& id,
393         std::atomic<bool>* started, error_code& ec)
394     {
395         return set_thread_state_timed(scheduler, abs_time, id, pending,
396             wait_timeout, thread_priority_normal,
397             thread_schedule_hint(), started, ec);
398     }
399 
400     /// Set a timer to set the state of the given \a thread to the given
401     /// new value after it expired (after the given duration)
402     template <typename SchedulingPolicy>
set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_duration const & rel_time,thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_schedule_hint schedulehint,std::atomic<bool> & started,error_code & ec)403     thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
404         util::steady_duration const& rel_time, thread_id_type const& thrd,
405         thread_state_enum newstate, thread_state_ex_enum newstate_ex,
406         thread_priority priority, thread_schedule_hint schedulehint,
407         std::atomic<bool>& started, error_code& ec)
408     {
409         return set_thread_state_timed(scheduler, rel_time.from_now(), thrd,
410             newstate, newstate_ex, priority, schedulehint, started, ec);
411     }
412 
413     template <typename SchedulingPolicy>
set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_duration const & rel_time,thread_id_type const & thrd,std::atomic<bool> * started,error_code & ec)414     thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
415         util::steady_duration const& rel_time, thread_id_type const& thrd,
416         std::atomic<bool>* started, error_code& ec)
417     {
418         return set_thread_state_timed(scheduler, rel_time.from_now(), thrd,
419             pending, wait_timeout, thread_priority_normal,
420             thread_schedule_hint(), started, ec);
421     }
422 }}}
423 
424 #endif
425