1 // Copyright (c) 2007-2017 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #if !defined(HPX_RUNTIME_THREADS_DETAIL_SET_THREAD_STATE_JAN_13_2013_0518PM) 7 #define HPX_RUNTIME_THREADS_DETAIL_SET_THREAD_STATE_JAN_13_2013_0518PM 8 9 #include <hpx/config.hpp> 10 #include <hpx/config/asio.hpp> 11 #include <hpx/error_code.hpp> 12 #include <hpx/runtime/threads/coroutines/coroutine.hpp> 13 #include <hpx/runtime/threads/detail/create_thread.hpp> 14 #include <hpx/runtime/threads/detail/create_work.hpp> 15 #include <hpx/runtime/threads/thread_data.hpp> 16 #include <hpx/runtime/threads/thread_helpers.hpp> 17 #include <hpx/runtime_fwd.hpp> 18 #include <hpx/throw_exception.hpp> 19 #include <hpx/util/assert.hpp> 20 #include <hpx/util/bind_front.hpp> 21 #include <hpx/util/bind.hpp> 22 #include <hpx/util/io_service_pool.hpp> 23 #include <hpx/util/logging.hpp> 24 #include <hpx/util/steady_clock.hpp> 25 26 #include <boost/asio/basic_waitable_timer.hpp> 27 28 #include <atomic> 29 #include <chrono> 30 #include <cstddef> 31 #include <functional> 32 #include <memory> 33 #include <sstream> 34 35 namespace hpx { namespace threads { namespace detail 36 { 37 /////////////////////////////////////////////////////////////////////////// 38 inline thread_state set_thread_state( 39 thread_id_type const& id, thread_state_enum new_state, 40 thread_state_ex_enum new_state_ex, thread_priority priority, 41 thread_schedule_hint schedulehint = thread_schedule_hint(), 42 error_code& ec = throws); 43 44 /////////////////////////////////////////////////////////////////////////// set_active_state(thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_state previous_state)45 inline thread_result_type set_active_state( 46 thread_id_type const& thrd, thread_state_enum newstate, 47 thread_state_ex_enum newstate_ex, thread_priority priority, 48 thread_state previous_state) 49 { 50 if (HPX_UNLIKELY(!thrd)) { 51 HPX_THROW_EXCEPTION(null_thread_id, 52 "threads::detail::set_active_state", 53 "null thread id encountered"); 54 return thread_result_type(terminated, invalid_thread_id); 55 } 56 57 // make sure that the thread has not been suspended and set active again 58 // in the meantime 59 thread_state current_state = thrd->get_state(); 60 61 if (current_state.state() == previous_state.state() && 62 current_state != previous_state) 63 { 64 LTM_(warning) 65 << "set_active_state: thread is still active, however " 66 "it was non-active since the original set_state " 67 "request was issued, aborting state change, thread(" 68 << thrd << "), description(" 69 << thrd->get_description() << "), new state(" 70 << get_thread_state_name(newstate) << ")"; 71 return thread_result_type(terminated, invalid_thread_id); 72 } 73 74 // just retry, set_state will create new thread if target is still active 75 error_code ec(lightweight); // do not throw 76 detail::set_thread_state(thrd, newstate, newstate_ex, priority, 77 thread_schedule_hint(), ec); 78 79 return thread_result_type(terminated, invalid_thread_id); 80 } 81 82 /////////////////////////////////////////////////////////////////////////// set_thread_state(thread_id_type const & thrd,thread_state_enum new_state,thread_state_ex_enum new_state_ex,thread_priority priority,thread_schedule_hint schedulehint,error_code & ec)83 inline thread_state set_thread_state( 84 thread_id_type const& thrd, thread_state_enum new_state, 85 thread_state_ex_enum new_state_ex, thread_priority priority, 86 thread_schedule_hint schedulehint, error_code& ec) 87 { 88 if (HPX_UNLIKELY(!thrd)) { 89 HPX_THROWS_IF(ec, null_thread_id, "threads::detail::set_thread_state", 90 "null thread id encountered"); 91 return thread_state(unknown, wait_unknown); 92 } 93 94 // set_state can't be used to force a thread into active state 95 if (new_state == threads::active) { 96 std::ostringstream strm; 97 strm << "invalid new state: " << get_thread_state_name(new_state); 98 HPX_THROWS_IF(ec, bad_parameter, "threads::detail::set_thread_state", 99 strm.str()); 100 return thread_state(unknown, wait_unknown); 101 } 102 103 // we know that the id is actually the pointer to the thread 104 if (!thrd) { 105 if (&ec != &throws) 106 ec = make_success_code(); 107 return thread_state(terminated, wait_unknown); 108 // this thread has already been terminated 109 } 110 111 thread_state previous_state; 112 do { 113 // action depends on the current state 114 previous_state = thrd->get_state(); 115 thread_state_enum previous_state_val = previous_state.state(); 116 117 // nothing to do here if the state doesn't change 118 if (new_state == previous_state_val) { 119 LTM_(warning) 120 << "set_thread_state: old thread state is the same as new " 121 "thread state, aborting state change, thread(" 122 << thrd << "), description(" 123 << thrd->get_description() << "), new state(" 124 << get_thread_state_name(new_state) << ")"; 125 126 if (&ec != &throws) 127 ec = make_success_code(); 128 129 return thread_state(new_state, previous_state.state_ex()); 130 } 131 132 // the thread to set the state for is currently running, so we 133 // schedule another thread to execute the pending set_state 134 switch (previous_state_val) { 135 case active: 136 { 137 // schedule a new thread to set the state 138 LTM_(warning) 139 << "set_thread_state: thread is currently active, scheduling " 140 "new thread, thread(" << thrd << "), description(" 141 << thrd->get_description() << "), new state(" 142 << get_thread_state_name(new_state) << ")"; 143 144 thread_init_data data( 145 util::bind(&set_active_state, 146 thrd, new_state, new_state_ex, 147 priority, previous_state), 148 "set state for active thread", 0, priority); 149 150 create_work(thrd->get_scheduler_base(), data, pending, ec); 151 152 if (&ec != &throws) 153 ec = make_success_code(); 154 155 return previous_state; // done 156 } 157 break; 158 case terminated: 159 { 160 LTM_(warning) 161 << "set_thread_state: thread is terminated, aborting state " 162 "change, thread(" << thrd << "), description(" 163 << thrd->get_description() << "), new state(" 164 << get_thread_state_name(new_state) << ")"; 165 166 if (&ec != &throws) 167 ec = make_success_code(); 168 169 // If the thread has been terminated while this set_state was 170 // pending nothing has to be done anymore. 171 return previous_state; 172 } 173 break; 174 case pending: 175 case pending_boost: 176 if (suspended == new_state) { 177 // we do not allow explicit resetting of a state to suspended 178 // without the thread being executed. 179 std::ostringstream strm; 180 strm << "set_thread_state: invalid new state, can't demote a " 181 "pending thread, " 182 << "thread(" << thrd << "), description(" 183 << thrd->get_description() << "), new state(" 184 << get_thread_state_name(new_state) << ")"; 185 186 LTM_(fatal) << strm.str(); 187 188 HPX_THROWS_IF(ec, bad_parameter, 189 "threads::detail::set_thread_state", 190 strm.str()); 191 return thread_state(unknown, wait_unknown); 192 } 193 break; 194 case suspended: 195 break; // fine, just set the new state 196 case pending_do_not_schedule: 197 default: 198 HPX_ASSERT(false); // should not happen 199 break; 200 } 201 202 // If the previous state was pending we are supposed to remove the 203 // thread from the queue. But in order to avoid linearly looking 204 // through the queue we defer this to the thread function, which 205 // at some point will ignore this thread by simply skipping it 206 // (if it's not pending anymore). 207 208 LTM_(info) << "set_thread_state: thread(" << thrd << "), " 209 "description(" << thrd->get_description() << "), " 210 "new state(" << get_thread_state_name(new_state) << "), " 211 "old state(" << get_thread_state_name(previous_state_val) 212 << ")"; 213 214 // So all what we do here is to set the new state. 215 if (thrd->restore_state(new_state, new_state_ex, previous_state)) 216 break; 217 218 // state has changed since we fetched it from the thread, retry 219 LTM_(error) 220 << "set_thread_state: state has been changed since it was fetched, " 221 "retrying, thread(" << thrd << "), " 222 "description(" << thrd->get_description() << "), " 223 "new state(" << get_thread_state_name(new_state) << "), " 224 "old state(" << get_thread_state_name(previous_state_val) 225 << ")"; 226 } while (true); 227 228 if (new_state == pending) { 229 // REVIEW: Passing a specific target thread may interfere with the 230 // round robin queuing. 231 232 thrd->get_scheduler_base()->schedule_thread(thrd.get(), 233 schedulehint, false, thrd.get()->get_priority()); 234 // NOTE: Don't care if the hint is a NUMA hint, just want to wake up 235 // a thread. 236 thrd->get_scheduler_base()->do_some_work(schedulehint.hint); 237 } 238 239 if (&ec != &throws) 240 ec = make_success_code(); 241 242 return previous_state; 243 } 244 245 /////////////////////////////////////////////////////////////////////////// 246 /// This thread function is used by the at_timer thread below to trigger 247 /// the required action. wake_timer_thread(thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_id_type const & timer_id,std::shared_ptr<std::atomic<bool>> const & triggered,thread_state_ex_enum my_statex)248 inline thread_result_type wake_timer_thread( 249 thread_id_type const& thrd, thread_state_enum newstate, 250 thread_state_ex_enum newstate_ex, thread_priority priority, 251 thread_id_type const& timer_id, 252 std::shared_ptr<std::atomic<bool> > const& triggered, 253 thread_state_ex_enum my_statex) 254 { 255 if (HPX_UNLIKELY(!thrd)) { 256 HPX_THROW_EXCEPTION(null_thread_id, 257 "threads::detail::wake_timer_thread", 258 "null thread id encountered (id)"); 259 return thread_result_type(terminated, invalid_thread_id); 260 } 261 if (HPX_UNLIKELY(!timer_id)) { 262 HPX_THROW_EXCEPTION(null_thread_id, 263 "threads::detail::wake_timer_thread", 264 "null thread id encountered (timer_id)"); 265 return thread_result_type(terminated, invalid_thread_id); 266 } 267 268 HPX_ASSERT(my_statex == wait_abort || my_statex == wait_timeout); 269 270 if (!triggered->load()) 271 { 272 error_code ec(lightweight); // do not throw 273 detail::set_thread_state(timer_id, pending, my_statex, 274 thread_priority_boost, thread_schedule_hint(), ec); 275 } 276 277 return thread_result_type(terminated, invalid_thread_id); 278 } 279 280 /// This thread function initiates the required set_state action (on 281 /// behalf of one of the threads#detail#set_thread_state functions). 282 template <typename SchedulingPolicy> at_timer(SchedulingPolicy & scheduler,util::steady_clock::time_point & abs_time,thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,std::atomic<bool> * started)283 thread_result_type at_timer(SchedulingPolicy& scheduler, 284 util::steady_clock::time_point& abs_time, 285 thread_id_type const& thrd, thread_state_enum newstate, 286 thread_state_ex_enum newstate_ex, thread_priority priority, 287 std::atomic<bool>* started) 288 { 289 if (HPX_UNLIKELY(!thrd)) { 290 HPX_THROW_EXCEPTION(null_thread_id, 291 "threads::detail::at_timer", 292 "null thread id encountered"); 293 return thread_result_type(terminated, invalid_thread_id); 294 } 295 296 // create a new thread in suspended state, which will execute the 297 // requested set_state when timer fires and will re-awaken this thread, 298 // allowing the deadline_timer to go out of scope gracefully 299 thread_id_type self_id = get_self_id(); 300 301 std::shared_ptr<std::atomic<bool> > triggered( 302 std::make_shared<std::atomic<bool> >(false)); 303 304 thread_init_data data( 305 util::bind_front(&wake_timer_thread, 306 thrd, newstate, newstate_ex, priority, 307 self_id, triggered), 308 "wake_timer", 0, priority); 309 310 thread_id_type wake_id = invalid_thread_id; 311 create_thread(&scheduler, data, wake_id, suspended); 312 313 // create timer firing in correspondence with given time 314 typedef boost::asio::basic_waitable_timer< 315 util::steady_clock> deadline_timer; 316 317 deadline_timer t ( 318 get_thread_pool("timer-pool")->get_io_service(), abs_time); 319 320 // let the timer invoke the set_state on the new (suspended) thread 321 t.async_wait( 322 [wake_id, priority](const boost::system::error_code& ec) 323 { 324 if (ec.value() == boost::system::errc::operation_canceled) 325 { 326 detail::set_thread_state(wake_id, pending, wait_abort, 327 priority, thread_schedule_hint(), throws); 328 } 329 else 330 { 331 detail::set_thread_state(wake_id, pending, wait_timeout, 332 priority, thread_schedule_hint(), throws); 333 } 334 }); 335 336 if (started != nullptr) 337 started->store(true); 338 339 // this waits for the thread to be reactivated when the timer fired 340 // if it returns signaled the timer has been canceled, otherwise 341 // the timer fired and the wake_timer_thread above has been executed 342 thread_state_ex_enum statex = 343 get_self().yield(thread_result_type(suspended, invalid_thread_id)); 344 345 HPX_ASSERT(statex == wait_abort || statex == wait_timeout); 346 347 if (wait_timeout != statex) //-V601 348 { 349 triggered->store(true); 350 // wake_timer_thread has not been executed yet, cancel timer 351 t.cancel(); 352 } 353 else 354 { 355 detail::set_thread_state(thrd, newstate, newstate_ex, priority); 356 } 357 358 return thread_result_type(terminated, invalid_thread_id); 359 } 360 361 /// Set a timer to set the state of the given \a thread to the given 362 /// new value after it expired (at the given time) 363 template <typename SchedulingPolicy> set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_time_point const & abs_time,thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_schedule_hint schedulehint,std::atomic<bool> * started,error_code & ec)364 thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler, 365 util::steady_time_point const& abs_time, thread_id_type const& thrd, 366 thread_state_enum newstate, thread_state_ex_enum newstate_ex, 367 thread_priority priority, thread_schedule_hint schedulehint, 368 std::atomic<bool>* started, error_code& ec) 369 { 370 if (HPX_UNLIKELY(!thrd)) { 371 HPX_THROWS_IF(ec, null_thread_id, 372 "threads::detail::set_thread_state", 373 "null thread id encountered"); 374 return invalid_thread_id; 375 } 376 377 // this creates a new thread which creates the timer and handles the 378 // requested actions 379 thread_init_data data( 380 util::bind(&at_timer<SchedulingPolicy>, 381 std::ref(scheduler), abs_time.value(), thrd, newstate, newstate_ex, 382 priority, started), 383 "at_timer (expire at)", 0, priority, schedulehint); 384 385 thread_id_type newid = invalid_thread_id; 386 create_thread(&scheduler, data, newid, pending, true, ec); //-V601 387 return newid; 388 } 389 390 template <typename SchedulingPolicy> set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_time_point const & abs_time,thread_id_type const & id,std::atomic<bool> * started,error_code & ec)391 thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler, 392 util::steady_time_point const& abs_time, thread_id_type const& id, 393 std::atomic<bool>* started, error_code& ec) 394 { 395 return set_thread_state_timed(scheduler, abs_time, id, pending, 396 wait_timeout, thread_priority_normal, 397 thread_schedule_hint(), started, ec); 398 } 399 400 /// Set a timer to set the state of the given \a thread to the given 401 /// new value after it expired (after the given duration) 402 template <typename SchedulingPolicy> set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_duration const & rel_time,thread_id_type const & thrd,thread_state_enum newstate,thread_state_ex_enum newstate_ex,thread_priority priority,thread_schedule_hint schedulehint,std::atomic<bool> & started,error_code & ec)403 thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler, 404 util::steady_duration const& rel_time, thread_id_type const& thrd, 405 thread_state_enum newstate, thread_state_ex_enum newstate_ex, 406 thread_priority priority, thread_schedule_hint schedulehint, 407 std::atomic<bool>& started, error_code& ec) 408 { 409 return set_thread_state_timed(scheduler, rel_time.from_now(), thrd, 410 newstate, newstate_ex, priority, schedulehint, started, ec); 411 } 412 413 template <typename SchedulingPolicy> set_thread_state_timed(SchedulingPolicy & scheduler,util::steady_duration const & rel_time,thread_id_type const & thrd,std::atomic<bool> * started,error_code & ec)414 thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler, 415 util::steady_duration const& rel_time, thread_id_type const& thrd, 416 std::atomic<bool>* started, error_code& ec) 417 { 418 return set_thread_state_timed(scheduler, rel_time.from_now(), thrd, 419 pending, wait_timeout, thread_priority_normal, 420 thread_schedule_hint(), started, ec); 421 } 422 }}} 423 424 #endif 425