1 // Copyright (c) 2007-2017 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #if !defined(HPX_LCOS_DETAIL_FUTURE_DATA_MAR_06_2012_1055AM) 7 #define HPX_LCOS_DETAIL_FUTURE_DATA_MAR_06_2012_1055AM 8 9 #include <hpx/config.hpp> 10 #include <hpx/error_code.hpp> 11 #include <hpx/lcos/local/detail/condition_variable.hpp> 12 #include <hpx/lcos/local/spinlock.hpp> 13 #include <hpx/runtime/launch_policy.hpp> 14 #include <hpx/runtime/threads/coroutines/detail/get_stack_pointer.hpp> 15 #include <hpx/runtime/threads/thread_executor.hpp> 16 #include <hpx/runtime/threads/thread_helpers.hpp> 17 #include <hpx/throw_exception.hpp> 18 #include <hpx/traits/future_access.hpp> 19 #include <hpx/traits/get_remote_result.hpp> 20 #include <hpx/util/annotated_function.hpp> 21 #include <hpx/util/assert.hpp> 22 #include <hpx/util/assert_owns_lock.hpp> 23 #include <hpx/util/atomic_count.hpp> 24 #include <hpx/util/bind.hpp> 25 #include <hpx/util/decay.hpp> 26 #include <hpx/util/steady_clock.hpp> 27 #include <hpx/util/unique_function.hpp> 28 #include <hpx/util/unused.hpp> 29 30 #include <boost/intrusive_ptr.hpp> 31 32 // NOTE: small_vector was introduced in 1.58 but seems to be buggy in that 33 // version, so use only from 1.59 onwards. 34 #if BOOST_VERSION < 105900 35 #include <vector> 36 #else 37 #include <boost/container/small_vector.hpp> 38 #endif 39 40 #include <atomic> 41 #include <chrono> 42 #include <cstddef> 43 #include <exception> 44 #include <memory> 45 #include <mutex> 46 #include <string> 47 #include <type_traits> 48 #include <utility> 49 50 #include <hpx/config/warnings_prefix.hpp> 51 52 /////////////////////////////////////////////////////////////////////////////// 53 namespace hpx { namespace lcos 54 { 55 enum class future_status 56 { 57 ready, timeout, deferred, uninitialized 58 }; 59 }} 60 61 /////////////////////////////////////////////////////////////////////////////// 62 namespace hpx { namespace lcos 63 { 64 65 namespace detail 66 { 67 template <typename Result> struct future_data; 68 69 /////////////////////////////////////////////////////////////////////// 70 struct future_data_refcnt_base; 71 72 void intrusive_ptr_add_ref(future_data_refcnt_base* p); 73 void intrusive_ptr_release(future_data_refcnt_base* p); 74 75 /////////////////////////////////////////////////////////////////////// 76 struct HPX_EXPORT future_data_refcnt_base 77 { 78 public: 79 typedef util::unique_function_nonser<void()> completed_callback_type; 80 #if BOOST_VERSION < 105900 81 typedef std::vector<completed_callback_type> 82 completed_callback_vector_type; 83 #else 84 typedef boost::container::small_vector<completed_callback_type, 3> 85 completed_callback_vector_type; 86 #endif 87 88 typedef void has_future_data_refcnt_base; 89 90 virtual ~future_data_refcnt_base(); 91 92 virtual void set_on_completed(completed_callback_type) = 0; 93 requires_deletehpx::lcos::detail::future_data_refcnt_base94 virtual bool requires_delete() 95 { 96 return 0 == --count_; 97 } 98 destroyhpx::lcos::detail::future_data_refcnt_base99 virtual void destroy() 100 { 101 delete this; 102 } 103 104 // This is a tag type used to convey the information that the caller is 105 // _not_ going to addref the future_data instance 106 struct init_no_addref {}; 107 108 protected: future_data_refcnt_basehpx::lcos::detail::future_data_refcnt_base109 future_data_refcnt_base() : count_(0) {} future_data_refcnt_basehpx::lcos::detail::future_data_refcnt_base110 future_data_refcnt_base(init_no_addref) : count_(1) {} 111 112 // reference counting 113 friend void intrusive_ptr_add_ref(future_data_refcnt_base* p); 114 friend void intrusive_ptr_release(future_data_refcnt_base* p); 115 116 util::atomic_count count_; 117 }; 118 119 /// support functions for boost::intrusive_ptr intrusive_ptr_add_ref(future_data_refcnt_base * p)120 inline void intrusive_ptr_add_ref(future_data_refcnt_base* p) 121 { 122 ++p->count_; 123 } intrusive_ptr_release(future_data_refcnt_base * p)124 inline void intrusive_ptr_release(future_data_refcnt_base* p) 125 { 126 if (p->requires_delete()) 127 p->destroy(); 128 } 129 130 /////////////////////////////////////////////////////////////////////////// 131 template <typename Result> 132 struct future_data_result 133 { 134 typedef Result type; 135 136 template <typename U> 137 HPX_FORCEINLINE static sethpx::lcos::detail::future_data_result138 U && set(U && u) 139 { 140 return std::forward<U>(u); 141 } 142 }; 143 144 template <typename Result> 145 struct future_data_result<Result&> 146 { 147 typedef Result* type; 148 149 HPX_FORCEINLINE static sethpx::lcos::detail::future_data_result150 Result* set(Result* u) 151 { 152 return u; 153 } 154 155 HPX_FORCEINLINE static sethpx::lcos::detail::future_data_result156 Result* set(Result& u) 157 { 158 return &u; 159 } 160 }; 161 162 template <> 163 struct future_data_result<void> 164 { 165 typedef util::unused_type type; 166 167 HPX_FORCEINLINE static sethpx::lcos::detail::future_data_result168 util::unused_type set(util::unused_type u) 169 { 170 return u; 171 } 172 }; 173 174 /////////////////////////////////////////////////////////////////////////// 175 template <typename R> 176 struct future_data_storage 177 { 178 typedef typename future_data_result<R>::type value_type; 179 typedef std::exception_ptr error_type; 180 181 // determine the required alignment, define aligned storage of proper 182 // size 183 HPX_STATIC_CONSTEXPR std::size_t max_alignment = 184 (std::alignment_of<value_type>::value > 185 std::alignment_of<error_type>::value) ? 186 std::alignment_of<value_type>::value 187 : std::alignment_of<error_type>::value; 188 189 HPX_STATIC_CONSTEXPR std::size_t max_size = 190 (sizeof(value_type) > sizeof(error_type)) ? 191 sizeof(value_type) : sizeof(error_type); 192 193 typedef typename std::aligned_storage<max_size, max_alignment>::type type; 194 }; 195 196 /////////////////////////////////////////////////////////////////////////// 197 template <typename Result> 198 struct future_data_base; 199 200 template <> 201 struct HPX_EXPORT future_data_base<traits::detail::future_data_void> 202 : future_data_refcnt_base 203 { future_data_basehpx::lcos::detail::future_data_base204 future_data_base() 205 : state_(empty) 206 {} 207 future_data_basehpx::lcos::detail::future_data_base208 future_data_base(init_no_addref no_addref) 209 : future_data_refcnt_base(no_addref), state_(empty) 210 {} 211 212 using future_data_refcnt_base::completed_callback_type; 213 using future_data_refcnt_base::completed_callback_vector_type; 214 typedef lcos::local::spinlock mutex_type; 215 typedef util::unused_type result_type; 216 typedef future_data_refcnt_base::init_no_addref init_no_addref; 217 218 virtual ~future_data_base(); 219 220 enum state 221 { 222 empty = 0, 223 ready = 1, 224 value = 2 | ready, 225 exception = 4 | ready 226 }; 227 228 /// Return whether or not the data is available for this 229 /// \a future. is_readyhpx::lcos::detail::future_data_base230 bool is_ready(std::memory_order order = std::memory_order_acquire) const 231 { 232 return (state_.load(order) & ready) != 0; 233 } 234 has_valuehpx::lcos::detail::future_data_base235 bool has_value() const 236 { 237 return state_.load(std::memory_order_acquire) == value; 238 } 239 has_exceptionhpx::lcos::detail::future_data_base240 bool has_exception() const 241 { 242 return state_.load(std::memory_order_acquire) == exception; 243 } 244 execute_deferredhpx::lcos::detail::future_data_base245 virtual void execute_deferred(error_code& /*ec*/ = throws) {} 246 247 // cancellation is disabled by default cancelablehpx::lcos::detail::future_data_base248 virtual bool cancelable() const 249 { 250 return false; 251 } cancelhpx::lcos::detail::future_data_base252 virtual void cancel() 253 { 254 HPX_THROW_EXCEPTION(future_does_not_support_cancellation, 255 "future_data_base::cancel", 256 "this future does not support cancellation"); 257 } 258 259 result_type* get_result_void(void const* storage, error_code& ec = throws); 260 virtual result_type* get_result_void(error_code& ec = throws) = 0; 261 262 virtual void set_exception(std::exception_ptr data) = 0; 263 264 // continuation support 265 266 // deferred execution of a given continuation 267 static void run_on_completed( 268 completed_callback_type&& on_completed) noexcept; 269 static void run_on_completed( 270 completed_callback_vector_type&& on_completed) noexcept; 271 272 // make sure continuation invocation does not recurse deeper than 273 // allowed 274 template <typename Callback> 275 static void handle_on_completed(Callback&& on_completed); 276 277 /// Set the callback which needs to be invoked when the future becomes 278 /// ready. If the future is ready the function will be invoked 279 /// immediately. 280 void set_on_completed(completed_callback_type data_sink) override; 281 282 virtual state wait(error_code& ec = throws); 283 284 virtual future_status wait_until( 285 util::steady_clock::time_point const& abs_time, error_code& ec = throws); 286 287 virtual std::exception_ptr get_exception_ptr() const = 0; 288 get_registered_namehpx::lcos::detail::future_data_base289 virtual std::string const& get_registered_name() const 290 { 291 HPX_THROW_EXCEPTION(invalid_status, 292 "future_data_base::get_registered_name", 293 "this future does not support name registration"); 294 } register_ashpx::lcos::detail::future_data_base295 virtual void register_as(std::string const& /*name*/, bool /*manage_lifetime*/) 296 { 297 HPX_THROW_EXCEPTION(invalid_status, 298 "future_data_base::set_registered_name", 299 "this future does not support name registration"); 300 } 301 302 protected: 303 mutable mutex_type mtx_; 304 std::atomic<state> state_; // current state 305 completed_callback_vector_type on_completed_; 306 local::detail::condition_variable cond_; // threads waiting in read 307 }; 308 309 template <typename Result> 310 struct future_data_base 311 : future_data_base<traits::detail::future_data_void> 312 { 313 HPX_NON_COPYABLE(future_data_base); 314 315 private: constructhpx::lcos::detail::future_data_base316 static void construct(void* p) 317 { 318 ::new (p) result_type(); 319 } 320 321 template <typename T, typename ... Ts> constructhpx::lcos::detail::future_data_base322 static void construct(void* p, T && t, Ts &&... ts) 323 { 324 ::new (p) result_type( 325 future_data_result<Result>::set(std::forward<T>(t)), 326 std::forward<Ts>(ts)...); 327 } 328 329 public: 330 typedef typename future_data_result<Result>::type result_type; 331 typedef future_data_base<traits::detail::future_data_void> base_type; 332 typedef lcos::local::spinlock mutex_type; 333 typedef typename base_type::init_no_addref init_no_addref; 334 typedef typename base_type::completed_callback_type completed_callback_type; 335 typedef typename base_type::completed_callback_vector_type 336 completed_callback_vector_type; 337 338 future_data_base() = default; 339 future_data_basehpx::lcos::detail::future_data_base340 future_data_base(init_no_addref no_addref) 341 : base_type(no_addref) 342 {} 343 344 struct default_construct {}; 345 future_data_basehpx::lcos::detail::future_data_base346 future_data_base(init_no_addref no_addref, default_construct) 347 : base_type(no_addref) 348 { 349 result_type* value_ptr = reinterpret_cast<result_type*>(&storage_); 350 construct(value_ptr); 351 state_.store(value, std::memory_order_relaxed); 352 } 353 354 template <typename ... Ts> future_data_basehpx::lcos::detail::future_data_base355 future_data_base(init_no_addref no_addref, Ts&&... ts) 356 : base_type(no_addref) 357 { 358 result_type* value_ptr = reinterpret_cast<result_type*>(&storage_); 359 construct(value_ptr, std::forward<Ts>(ts)...); 360 state_.store(value, std::memory_order_relaxed); 361 } 362 future_data_basehpx::lcos::detail::future_data_base363 future_data_base(init_no_addref no_addref, std::exception_ptr const& e) 364 : base_type(no_addref) 365 { 366 std::exception_ptr* exception_ptr = 367 reinterpret_cast<std::exception_ptr*>(&storage_); 368 ::new ((void*)exception_ptr) std::exception_ptr(e); 369 state_.store(exception, std::memory_order_relaxed); 370 } future_data_basehpx::lcos::detail::future_data_base371 future_data_base(init_no_addref no_addref, std::exception_ptr && e) 372 : base_type(no_addref) 373 { 374 std::exception_ptr* exception_ptr = 375 reinterpret_cast<std::exception_ptr*>(&storage_); 376 ::new ((void*)exception_ptr) std::exception_ptr(std::move(e)); 377 state_.store(exception, std::memory_order_relaxed); 378 } 379 ~future_data_basehpx::lcos::detail::future_data_base380 virtual ~future_data_base() noexcept 381 { 382 reset(); 383 } 384 385 /// Get the result of the requested action. This call blocks (yields 386 /// control) if the result is not ready. As soon as the result has been 387 /// returned and the waiting thread has been re-scheduled by the thread 388 /// manager the function will return. 389 /// 390 /// \param ec [in,out] this represents the error status on exit, 391 /// if this is pre-initialized to \a hpx#throws 392 /// the function will throw on error instead. If the 393 /// operation blocks and is aborted because the object 394 /// went out of scope, the code \a hpx#yield_aborted is 395 /// set or thrown. 396 /// 397 /// \note If there has been an error reported (using the action 398 /// \a base_lco#set_exception), this function will throw an 399 /// exception encapsulating the reported error code and 400 /// error description if <code>&ec == &throws</code>. get_resulthpx::lcos::detail::future_data_base401 virtual result_type* get_result(error_code& ec = throws) 402 { 403 if (get_result_void(ec) != nullptr) 404 return reinterpret_cast<result_type*>(&storage_); 405 return nullptr; 406 } 407 get_result_voidhpx::lcos::detail::future_data_base408 util::unused_type* get_result_void(error_code& ec = throws) override 409 { 410 return base_type::get_result_void(&storage_, ec); 411 } 412 413 // Set the result of the requested action. 414 template <typename ... Ts> set_valuehpx::lcos::detail::future_data_base415 void set_value(Ts&& ... ts) 416 { 417 // Note: it is safe to access the data store as no other thread 418 // should access it concurrently. There shouldn't be any 419 // threads attempting to read the value as the state is still 420 // empty. Also, there can be only one thread (this thread) 421 // attempting to set the value by definition. 422 423 // set the data 424 result_type* value_ptr = reinterpret_cast<result_type*>(&storage_); 425 construct(value_ptr, std::forward<Ts>(ts)...); 426 427 // At this point the lock needs to be acquired to safely access the 428 // registered continuations 429 std::unique_lock<mutex_type> l(mtx_); 430 431 // handle all threads waiting for the future to become ready 432 auto on_completed = std::move(on_completed_); 433 on_completed_.clear(); 434 435 // The value has been set, changing the state to 'value' at this 436 // point signals to all other threads that this future is ready. 437 state expected = empty; 438 if (!state_.compare_exchange_strong( 439 expected, value, std::memory_order_release)) 440 { 441 // this future should be 'empty' still (it can't be made ready 442 // more than once). 443 l.unlock(); 444 HPX_THROW_EXCEPTION(promise_already_satisfied, 445 "future_data_base::set_value", 446 "data has already been set for this future"); 447 return; 448 } 449 450 // Note: we use notify_one repeatedly instead of notify_all as we 451 // know: a) that most of the time we have at most one thread 452 // waiting on the future (most futures are not shared), and 453 // b) our implementation of condition_variable::notify_one 454 // relinquishes the lock before resuming the waiting thread 455 // which avoids suspension of this thread when it tries to 456 // re-lock the mutex while exiting from condition_variable::wait 457 while (cond_.notify_one(std::move(l), threads::thread_priority_boost)) 458 { 459 l = std::unique_lock<mutex_type>(mtx_); 460 } 461 462 // Note: cv.notify_one() above 'consumes' the lock 'l' and leaves 463 // it unlocked when returning. 464 465 // invoke the callback (continuation) function 466 if (!on_completed.empty()) 467 handle_on_completed(std::move(on_completed)); 468 } 469 set_exceptionhpx::lcos::detail::future_data_base470 void set_exception(std::exception_ptr data) override 471 { 472 // Note: it is safe to access the data store as no other thread 473 // should access it concurrently. There shouldn't be any 474 // threads attempting to read the value as the state is still 475 // empty. Also, there can be only one thread (this thread) 476 // attempting to set the value by definition. 477 478 // set the data 479 std::exception_ptr* exception_ptr = 480 reinterpret_cast<std::exception_ptr*>(&storage_); 481 ::new ((void*)exception_ptr) std::exception_ptr(std::move(data)); 482 483 // At this point the lock needs to be acquired to safely access the 484 // registered continuations 485 std::unique_lock<mutex_type> l(mtx_); 486 487 // handle all threads waiting for the future to become ready 488 auto on_completed = std::move(on_completed_); 489 on_completed_.clear(); 490 491 // The value has been set, changing the state to 'exception' at this 492 // point signals to all other threads that this future is ready. 493 state expected = empty; 494 if (!state_.compare_exchange_strong( 495 expected, exception, std::memory_order_release)) 496 { 497 // this future should be 'empty' still (it can't be made ready 498 // more than once). 499 l.unlock(); 500 HPX_THROW_EXCEPTION(promise_already_satisfied, 501 "future_data_base::set_exception", 502 "data has already been set for this future"); 503 return; 504 } 505 506 // Note: we use notify_one repeatedly instead of notify_all as we 507 // know: a) that most of the time we have at most one thread 508 // waiting on the future (most futures are not shared), and 509 // b) our implementation of condition_variable::notify_one 510 // relinquishes the lock before resuming the waiting thread 511 // which avoids suspension of this thread when it tries to 512 // re-lock the mutex while exiting from condition_variable::wait 513 while (cond_.notify_one(std::move(l), threads::thread_priority_boost)) 514 { 515 l = std::unique_lock<mutex_type>(mtx_); 516 } 517 518 // Note: cv.notify_one() above 'consumes' the lock 'l' and leaves 519 // it unlocked when returning. 520 521 // invoke the callback (continuation) function 522 if (!on_completed.empty()) 523 handle_on_completed(std::move(on_completed)); 524 } 525 526 // helper functions for setting data (if successful) or the error (if 527 // non-successful) 528 template <typename T> set_datahpx::lcos::detail::future_data_base529 void set_data(T && result) 530 { 531 // set the received result, reset error status 532 try { 533 typedef typename util::decay<T>::type naked_type; 534 535 typedef traits::get_remote_result< 536 result_type, naked_type 537 > get_remote_result_type; 538 539 // store the value 540 set_value(std::move(get_remote_result_type::call( 541 std::forward<T>(result)))); 542 } 543 catch (...) { 544 // store the error instead 545 return set_exception(std::current_exception()); 546 } 547 } 548 549 // trigger the future with the given error condition set_errorhpx::lcos::detail::future_data_base550 void set_error(error e, char const* f, char const* msg) 551 { 552 try { 553 HPX_THROW_EXCEPTION(e, f, msg); 554 } 555 catch (...) { 556 // store the error code 557 set_exception(std::current_exception()); 558 } 559 } 560 561 /// Reset the promise to allow to restart an asynchronous 562 /// operation. Allows any subsequent set_data operation to succeed. resethpx::lcos::detail::future_data_base563 void reset(error_code& /*ec*/ = throws) 564 { 565 // no locking is required as semantics guarantee a single writer 566 // and no reader 567 568 // release any stored data and callback functions 569 switch (state_.exchange(empty)) { 570 case value: 571 { 572 result_type* value_ptr = 573 reinterpret_cast<result_type*>(&storage_); 574 value_ptr->~result_type(); 575 break; 576 } 577 case exception: 578 { 579 std::exception_ptr* exception_ptr = 580 reinterpret_cast<std::exception_ptr*>(&storage_); 581 exception_ptr->~exception_ptr(); 582 break; 583 } 584 default: break; 585 } 586 587 on_completed_.clear(); 588 } 589 get_exception_ptrhpx::lcos::detail::future_data_base590 std::exception_ptr get_exception_ptr() const override 591 { 592 HPX_ASSERT(state_.load(std::memory_order_acquire) == exception); 593 return *reinterpret_cast<std::exception_ptr const*>(&storage_); 594 } 595 596 protected: 597 using base_type::mtx_; 598 using base_type::state_; 599 using base_type::on_completed_; 600 601 private: 602 using base_type::cond_; 603 typename future_data_storage<Result>::type storage_; 604 }; 605 606 /////////////////////////////////////////////////////////////////////////// 607 // Customization point to have the ability for creating distinct shared 608 // states depending on the value type held. 609 template <typename Result> 610 struct future_data : future_data_base<Result> 611 { 612 HPX_NON_COPYABLE(future_data); 613 614 typedef typename future_data_base<Result>::init_no_addref init_no_addref; 615 616 future_data() = default; 617 future_datahpx::lcos::detail::future_data618 future_data(init_no_addref no_addref) 619 : future_data_base<Result>(no_addref) 620 {} 621 622 template <typename ... Ts> future_datahpx::lcos::detail::future_data623 future_data(init_no_addref no_addref, Ts&&... ts) 624 : future_data_base<Result>(no_addref, std::forward<Ts>(ts)...) 625 {} 626 future_datahpx::lcos::detail::future_data627 future_data(init_no_addref no_addref, std::exception_ptr const& e) 628 : future_data_base<Result>(no_addref, e) 629 {} future_datahpx::lcos::detail::future_data630 future_data(init_no_addref no_addref, std::exception_ptr && e) 631 : future_data_base<Result>(no_addref, std::move(e)) 632 {} 633 634 ~future_data() noexcept override = default; 635 }; 636 637 // Specialization for shared state of id_type, additionally (optionally) 638 // holds a registered name for the object it refers to. 639 template <> struct future_data<id_type>; 640 641 /////////////////////////////////////////////////////////////////////////// 642 template <typename Result, typename Allocator> 643 struct future_data_allocator : future_data<Result> 644 { 645 typedef typename future_data<Result>::init_no_addref init_no_addref; 646 typedef typename 647 std::allocator_traits<Allocator>::template 648 rebind_alloc<future_data_allocator> 649 other_allocator; 650 651 typedef typename future_data_base<Result>::default_construct 652 default_construct; 653 future_data_allocatorhpx::lcos::detail::future_data_allocator654 future_data_allocator(other_allocator const& alloc) 655 : future_data<Result>() 656 , alloc_(alloc) 657 {} 658 659 template <typename... T> future_data_allocatorhpx::lcos::detail::future_data_allocator660 future_data_allocator(init_no_addref no_addref, 661 other_allocator const& alloc, T&&... ts) 662 : future_data<Result>(no_addref, std::forward<T>(ts)...) 663 , alloc_(alloc) 664 {} 665 666 template <typename... T> future_data_allocatorhpx::lcos::detail::future_data_allocator667 future_data_allocator(init_no_addref no_addref, 668 default_construct defctr, 669 other_allocator const& alloc, T&&... ts) 670 : future_data<Result>(no_addref, defctr, std::forward<T>(ts)...) 671 , alloc_(alloc) 672 {} 673 future_data_allocatorhpx::lcos::detail::future_data_allocator674 future_data_allocator(init_no_addref no_addref, 675 std::exception_ptr const& e, other_allocator const& alloc) 676 : future_data<Result>(no_addref, e) 677 , alloc_(alloc) 678 {} 679 future_data_allocatorhpx::lcos::detail::future_data_allocator680 future_data_allocator(init_no_addref no_addref, 681 std::exception_ptr && e, other_allocator const& alloc) 682 : future_data<Result>(no_addref, std::move(e)) 683 , alloc_(alloc) 684 {} 685 686 private: destroyhpx::lcos::detail::future_data_allocator687 void destroy() 688 { 689 typedef std::allocator_traits<other_allocator> traits; 690 691 other_allocator alloc(alloc_); 692 traits::destroy(alloc, this); 693 traits::deallocate(alloc, this, 1); 694 } 695 696 private: 697 other_allocator alloc_; 698 }; 699 700 /////////////////////////////////////////////////////////////////////////// 701 template <typename Result> 702 struct timed_future_data : future_data<Result> 703 { 704 public: 705 typedef future_data<Result> base_type; 706 typedef typename base_type::result_type result_type; 707 typedef typename base_type::mutex_type mutex_type; 708 709 public: timed_future_datahpx::lcos::detail::timed_future_data710 timed_future_data() {} 711 712 template <typename Result_> timed_future_datahpx::lcos::detail::timed_future_data713 timed_future_data( 714 util::steady_clock::time_point const& abs_time, 715 Result_&& init) 716 { 717 boost::intrusive_ptr<timed_future_data> this_(this); 718 719 error_code ec; 720 threads::thread_id_type id = threads::register_thread_nullary( 721 [HPX_CAPTURE_MOVE(this_), HPX_CAPTURE_FORWARD(init)]() 722 { 723 this_->set_value(init); 724 }, 725 "timed_future_data<Result>::timed_future_data", 726 threads::suspended, true, threads::thread_priority_boost, 727 threads::thread_schedule_hint(), 728 threads::thread_stacksize_current, ec); 729 if (ec) { 730 // thread creation failed, report error to the new future 731 this->base_type::set_exception(hpx::detail::access_exception(ec)); 732 } 733 734 // start new thread at given point in time 735 threads::set_thread_state(id, abs_time, threads::pending, 736 threads::wait_timeout, threads::thread_priority_boost, ec); 737 if (ec) { 738 // thread scheduling failed, report error to the new future 739 this->base_type::set_exception(hpx::detail::access_exception(ec)); 740 } 741 } 742 }; 743 744 /////////////////////////////////////////////////////////////////////////// 745 template <typename Result> 746 struct task_base : future_data<Result> 747 { 748 protected: 749 typedef future_data<Result> base_type; 750 typedef typename future_data<Result>::mutex_type mutex_type; 751 typedef boost::intrusive_ptr<task_base> future_base_type; 752 typedef typename future_data<Result>::result_type result_type; 753 typedef typename base_type::init_no_addref init_no_addref; 754 755 public: task_basehpx::lcos::detail::task_base756 task_base() 757 : started_(false) 758 {} 759 task_basehpx::lcos::detail::task_base760 task_base(init_no_addref no_addref) 761 : base_type(no_addref), started_(false) 762 {} 763 execute_deferredhpx::lcos::detail::task_base764 virtual void execute_deferred(error_code& /*ec*/ = throws) 765 { 766 if (!started_test_and_set()) 767 this->do_run(); 768 } 769 770 // retrieving the value get_resulthpx::lcos::detail::task_base771 virtual result_type* get_result(error_code& ec = throws) 772 { 773 if (!started_test_and_set()) 774 this->do_run(); 775 return this->future_data<Result>::get_result(ec); 776 } 777 778 // wait support waithpx::lcos::detail::task_base779 virtual typename base_type::state wait(error_code& ec = throws) 780 { 781 if (!started_test_and_set()) 782 this->do_run(); 783 return this->future_data<Result>::wait(ec); 784 } 785 786 virtual future_status wait_untilhpx::lcos::detail::task_base787 wait_until(util::steady_clock::time_point const& abs_time, 788 error_code& ec = throws) 789 { 790 if (!started_test()) 791 return future_status::deferred; //-V110 792 return this->future_data<Result>::wait_until(abs_time, ec); 793 } 794 795 private: started_testhpx::lcos::detail::task_base796 bool started_test() const 797 { 798 std::lock_guard<mutex_type> l(this->mtx_); 799 return started_; 800 } 801 802 template <typename Lock> started_test_and_set_lockedhpx::lcos::detail::task_base803 bool started_test_and_set_locked(Lock& l) 804 { 805 HPX_ASSERT_OWNS_LOCK(l); 806 if (started_) 807 return true; 808 809 started_ = true; 810 return false; 811 } 812 813 protected: started_test_and_sethpx::lcos::detail::task_base814 bool started_test_and_set() 815 { 816 std::lock_guard<mutex_type> l(this->mtx_); 817 return started_test_and_set_locked(l); 818 } 819 check_startedhpx::lcos::detail::task_base820 void check_started() 821 { 822 std::unique_lock<mutex_type> l(this->mtx_); 823 if (started_) { 824 l.unlock(); 825 HPX_THROW_EXCEPTION(task_already_started, 826 "task_base::check_started", 827 "this task has already been started"); 828 return; 829 } 830 started_ = true; 831 } 832 833 public: 834 // run synchronously runhpx::lcos::detail::task_base835 void run() 836 { 837 check_started(); 838 this->do_run(); // always on this thread 839 } 840 841 // run in a separate thread applyhpx::lcos::detail::task_base842 virtual threads::thread_id_type apply(launch /*policy*/, 843 threads::thread_priority /*priority*/, 844 threads::thread_stacksize /*stacksize*/, 845 threads::thread_schedule_hint /*schedulehint*/, 846 error_code& /*ec*/) 847 { 848 HPX_ASSERT(false); // shouldn't ever be called 849 return threads::invalid_thread_id; 850 } 851 852 protected: run_implhpx::lcos::detail::task_base853 static threads::thread_result_type run_impl(future_base_type this_) 854 { 855 this_->do_run(); 856 return threads::thread_result_type( 857 threads::terminated, threads::invalid_thread_id); 858 } 859 860 public: 861 template <typename T> set_datahpx::lcos::detail::task_base862 void set_data(T && result) 863 { 864 this->future_data<Result>::set_data(std::forward<T>(result)); 865 } 866 set_exceptionhpx::lcos::detail::task_base867 void set_exception(std::exception_ptr e) 868 { 869 this->future_data<Result>::set_exception(std::move(e)); 870 } 871 do_runhpx::lcos::detail::task_base872 virtual void do_run() 873 { 874 HPX_ASSERT(false); // shouldn't ever be called 875 } 876 877 protected: 878 bool started_; 879 }; 880 881 /////////////////////////////////////////////////////////////////////////// 882 template <typename Result> 883 struct cancelable_task_base : task_base<Result> 884 { 885 protected: 886 typedef typename task_base<Result>::mutex_type mutex_type; 887 typedef boost::intrusive_ptr<cancelable_task_base> future_base_type; 888 typedef typename future_data<Result>::result_type result_type; 889 typedef typename task_base<Result>::init_no_addref init_no_addref; 890 891 protected: get_thread_idhpx::lcos::detail::cancelable_task_base892 threads::thread_id_type get_thread_id() const 893 { 894 std::lock_guard<mutex_type> l(this->mtx_); 895 return id_; 896 } set_thread_idhpx::lcos::detail::cancelable_task_base897 void set_thread_id(threads::thread_id_type id) 898 { 899 std::lock_guard<mutex_type> l(this->mtx_); 900 id_ = id; 901 } 902 903 public: cancelable_task_basehpx::lcos::detail::cancelable_task_base904 cancelable_task_base() 905 : id_(threads::invalid_thread_id) 906 {} 907 cancelable_task_basehpx::lcos::detail::cancelable_task_base908 cancelable_task_base(init_no_addref no_addref) 909 : task_base<Result>(no_addref), id_(threads::invalid_thread_id) 910 {} 911 912 private: 913 struct reset_id 914 { reset_idhpx::lcos::detail::cancelable_task_base::reset_id915 reset_id(cancelable_task_base& target) 916 : target_(target) 917 { 918 target.set_thread_id(threads::get_self_id()); 919 } ~reset_idhpx::lcos::detail::cancelable_task_base::reset_id920 ~reset_id() 921 { 922 target_.set_thread_id(threads::invalid_thread_id); 923 } 924 cancelable_task_base& target_; 925 }; 926 927 protected: run_implhpx::lcos::detail::cancelable_task_base928 static threads::thread_result_type run_impl(future_base_type this_) 929 { 930 reset_id r(*this_); 931 this_->do_run(); 932 return threads::thread_result_type( 933 threads::terminated, threads::invalid_thread_id); 934 } 935 936 public: 937 // cancellation support cancelablehpx::lcos::detail::cancelable_task_base938 bool cancelable() const 939 { 940 return true; 941 } 942 cancelhpx::lcos::detail::cancelable_task_base943 void cancel() 944 { 945 std::unique_lock<mutex_type> l(this->mtx_); 946 try { 947 if (!this->started_) 948 HPX_THROW_THREAD_INTERRUPTED_EXCEPTION(); 949 950 if (this->is_ready()) 951 return; // nothing we can do 952 953 if (id_ != threads::invalid_thread_id) { 954 // interrupt the executing thread 955 threads::interrupt_thread(id_); 956 957 this->started_ = true; 958 959 l.unlock(); 960 this->set_error(future_cancelled, 961 "task_base<Result>::cancel", 962 "future has been canceled"); 963 } 964 else { 965 l.unlock(); 966 HPX_THROW_EXCEPTION(future_can_not_be_cancelled, 967 "task_base<Result>::cancel", 968 "future can't be canceled at this time"); 969 } 970 } 971 catch (...) { 972 this->started_ = true; 973 this->set_exception(std::current_exception()); 974 throw; 975 } 976 } 977 978 protected: 979 threads::thread_id_type id_; 980 }; 981 }}} 982 983 namespace hpx { namespace traits 984 { 985 namespace detail 986 { 987 template <typename R, typename Allocator> 988 struct shared_state_allocator<lcos::detail::future_data<R>, Allocator> 989 { 990 typedef lcos::detail::future_data_allocator<R, Allocator> type; 991 }; 992 } 993 }} 994 995 #include <hpx/config/warnings_suffix.hpp> 996 997 #endif 998