1 // Copyright (c) 2007-2017 Hartmut Kaiser 2 // Copyright (c) 2013 Agustin Berge 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 /// \file lcos/wait_some.hpp 8 9 #if !defined(HPX_LCOS_WAIT_SOME_APR_19_2012_0203PM) 10 #define HPX_LCOS_WAIT_SOME_APR_19_2012_0203PM 11 12 #if defined(DOXYGEN) 13 namespace hpx 14 { 15 /// The function \a wait_some is an operator allowing to join on the result 16 /// of all given futures. It AND-composes all future objects given and 17 /// returns a new future object representing the same list of futures 18 /// after n of them finished executing. 19 /// 20 /// \param n [in] The number of futures out of the arguments which 21 /// have to become ready in order for the returned future 22 /// to get ready. 23 /// \param first [in] The iterator pointing to the first element of a 24 /// sequence of \a future or \a shared_future objects for 25 /// which \a when_all should wait. 26 /// \param last [in] The iterator pointing to the last element of a 27 /// sequence of \a future or \a shared_future objects for 28 /// which \a when_all should wait. 29 /// \param ec [in,out] this represents the error status on exit, if 30 /// this is pre-initialized to \a hpx#throws the function 31 /// will throw on error instead. 32 /// 33 /// \note The future returned by the function \a wait_some becomes ready 34 /// when at least \a n argument futures have become ready. 35 /// 36 /// \return Returns a future holding the same list of futures as has 37 /// been passed to wait_some. 38 /// - future<vector<future<R>>>: If the input cardinality is 39 /// unknown at compile time and the futures are all of the 40 /// same type. 41 /// 42 /// \note Calling this version of \a wait_some where first == last, returns 43 /// a future with an empty vector that is immediately ready. 44 /// Each future and shared_future is waited upon and then copied into 45 /// the collection of the output (returned) future, maintaining the 46 /// order of the futures in the input collection. 47 /// The future returned by \a wait_some will not throw an exception, 48 /// but the futures held in the output collection may. 49 template <typename InputIter> 50 future<vector<future<typename std::iterator_traits<InputIter>::value_type>>> 51 wait_some(std::size_t n, Iterator first, Iterator last, error_code& ec = throws); 52 53 /// The function \a wait_some is an operator allowing to join on the result 54 /// of all given futures. It AND-composes all future objects given and 55 /// returns a new future object representing the same list of futures 56 /// after n of them finished executing. 57 /// 58 /// \param n [in] The number of futures out of the arguments which 59 /// have to become ready in order for the returned future 60 /// to get ready. 61 /// \param futures [in] A vector holding an arbitrary amount of \a future 62 /// or \a shared_future objects for which \a wait_some 63 /// should wait. 64 /// \param ec [in,out] this represents the error status on exit, if 65 /// this is pre-initialized to \a hpx#throws the function 66 /// will throw on error instead. 67 /// 68 /// \note The function \a wait_all returns after \a n futures have become 69 /// ready. All input futures are still valid after \a wait_all 70 /// returns. 71 /// 72 /// \note Each future and shared_future is waited upon and then copied into 73 /// the collection of the output (returned) future, maintaining the 74 /// order of the futures in the input collection. 75 /// The future returned by \a wait_some will not throw an exception, 76 /// but the futures held in the output collection may. 77 template <typename R> 78 void wait_some(std::size_t n, std::vector<future<R>>&& futures, 79 error_code& ec = throws); 80 81 /// The function \a wait_some is an operator allowing to join on the result 82 /// of all given futures. It AND-composes all future objects given and 83 /// returns a new future object representing the same list of futures 84 /// after n of them finished executing. 85 /// 86 /// \param n [in] The number of futures out of the arguments which 87 /// have to become ready in order for the returned future 88 /// to get ready. 89 /// \param futures [in] An array holding an arbitrary amount of \a future 90 /// or \a shared_future objects for which \a wait_some 91 /// should wait. 92 /// \param ec [in,out] this represents the error status on exit, if 93 /// this is pre-initialized to \a hpx#throws the function 94 /// will throw on error instead. 95 /// 96 /// \note The function \a wait_all returns after \a n futures have become 97 /// ready. All input futures are still valid after \a wait_all 98 /// returns. 99 /// 100 /// \note Each future and shared_future is waited upon and then copied into 101 /// the collection of the output (returned) future, maintaining the 102 /// order of the futures in the input collection. 103 /// The future returned by \a wait_some will not throw an exception, 104 /// but the futures held in the output collection may. 105 template <typename R, std::size_t N> 106 void wait_some(std::size_t n, std::array<future<R>, N>&& futures, 107 error_code& ec = throws); 108 109 /// The function \a wait_some is an operator allowing to join on the result 110 /// of all given futures. It AND-composes all future objects given and 111 /// returns a new future object representing the same list of futures 112 /// after n of them finished executing. 113 /// 114 /// \param n [in] The number of futures out of the arguments which 115 /// have to become ready in order for the returned future 116 /// to get ready. 117 /// \param futures [in] An arbitrary number of \a future or \a shared_future 118 /// objects, possibly holding different types for which 119 /// \a wait_some should wait. 120 /// \param ec [in,out] this represents the error status on exit, if 121 /// this is pre-initialized to \a hpx#throws the function 122 /// will throw on error instead. 123 /// 124 /// \note The function \a wait_all returns after \a n futures have become 125 /// ready. All input futures are still valid after \a wait_all 126 /// returns. 127 /// 128 /// \note Calling this version of \a wait_some where first == last, returns 129 /// a future with an empty vector that is immediately ready. 130 /// Each future and shared_future is waited upon and then copied into 131 /// the collection of the output (returned) future, maintaining the 132 /// order of the futures in the input collection. 133 /// The future returned by \a wait_some will not throw an exception, 134 /// but the futures held in the output collection may. 135 template <typename ...T> 136 void wait_some(std::size_t n, T &&... futures, error_code& ec = throws); 137 138 /// The function \a wait_some_n is an operator allowing to join on the result 139 /// of all given futures. It AND-composes all future objects given and 140 /// returns a new future object representing the same list of futures 141 /// after n of them finished executing. 142 /// 143 /// \param n [in] The number of futures out of the arguments which 144 /// have to become ready in order for the returned future 145 /// to get ready. 146 /// \param first [in] The iterator pointing to the first element of a 147 /// sequence of \a future or \a shared_future objects for 148 /// which \a when_all should wait. 149 /// \param count [in] The number of elements in the sequence starting at 150 /// \a first. 151 /// \param ec [in,out] this represents the error status on exit, if 152 /// this is pre-initialized to \a hpx#throws the function 153 /// will throw on error instead. 154 /// 155 /// \note The function \a wait_all returns after \a n futures have become 156 /// ready. All input futures are still valid after \a wait_all 157 /// returns. 158 /// 159 /// \return This function returns an Iterator referring to the first 160 /// element after the last processed input element. 161 /// 162 /// \note Calling this version of \a wait_some_n where count == 0, returns 163 /// a future with the same elements as the arguments that is 164 /// immediately ready. Possibly none of the futures in that vector 165 /// are ready. 166 /// Each future and shared_future is waited upon and then copied into 167 /// the collection of the output (returned) future, maintaining the 168 /// order of the futures in the input collection. 169 /// The future returned by \a wait_some_n will not throw an exception, 170 /// but the futures held in the output collection may. 171 template <typename InputIter> 172 InputIter wait_some_n(std::size_t n, Iterator first, 173 std::size_t count, error_code& ec = throws); 174 } 175 #else 176 177 #include <hpx/config.hpp> 178 #include <hpx/lcos/future.hpp> 179 #include <hpx/runtime/threads/thread.hpp> 180 #include <hpx/throw_exception.hpp> 181 #include <hpx/traits/acquire_shared_state.hpp> 182 #include <hpx/traits/future_access.hpp> 183 #include <hpx/traits/is_future.hpp> 184 #include <hpx/util/always_void.hpp> 185 #include <hpx/util/assert.hpp> 186 #include <hpx/util/deferred_call.hpp> 187 #include <hpx/util/detail/pack.hpp> 188 #include <hpx/util/detail/pp/strip_parens.hpp> 189 #include <hpx/util/tuple.hpp> 190 191 #include <algorithm> 192 #include <array> 193 #include <atomic> 194 #include <cstddef> 195 #include <iterator> 196 #include <memory> 197 #include <type_traits> 198 #include <utility> 199 #include <vector> 200 201 /////////////////////////////////////////////////////////////////////////////// 202 namespace hpx { namespace lcos 203 { 204 namespace detail 205 { 206 /////////////////////////////////////////////////////////////////////// 207 template <typename Sequence> 208 struct wait_some; 209 210 template <typename Sequence> 211 struct set_wait_some_callback_impl 212 { set_wait_some_callback_implhpx::lcos::detail::set_wait_some_callback_impl213 explicit set_wait_some_callback_impl(wait_some<Sequence>& wait) 214 : wait_(wait) 215 {} 216 217 template <typename SharedState> operator ()hpx::lcos::detail::set_wait_some_callback_impl218 void operator()(SharedState& shared_state, 219 typename std::enable_if< 220 traits::is_shared_state<SharedState>::value 221 >::type* = nullptr) const 222 { 223 std::size_t counter = 224 wait_.count_.load(std::memory_order_seq_cst); 225 if (counter < wait_.needed_count_ && 226 shared_state.get() != nullptr && !shared_state->is_ready()) 227 { 228 // handle future only if not enough futures are ready yet 229 // also, do not touch any futures which are already ready 230 231 shared_state->execute_deferred(); 232 233 // execute_deferred might have made the future ready 234 if (!shared_state->is_ready()) 235 { 236 shared_state->set_on_completed( 237 util::deferred_call( 238 &wait_some<Sequence>::on_future_ready, 239 wait_.shared_from_this(), 240 threads::get_self_id())); 241 return; 242 } 243 } 244 if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_) 245 { 246 wait_.goal_reached_on_calling_thread_ = true; 247 } 248 } 249 250 template <typename Sequence_> 251 HPX_FORCEINLINE operator ()hpx::lcos::detail::set_wait_some_callback_impl252 void operator()(Sequence_& sequence, 253 typename std::enable_if< 254 !traits::is_shared_state<Sequence_>::value 255 >::type* = nullptr) const 256 { 257 apply(sequence); 258 } 259 260 template <typename Tuple, std::size_t ...Is> 261 HPX_FORCEINLINE applyhpx::lcos::detail::set_wait_some_callback_impl262 void apply(Tuple& tuple, util::detail::pack_c<std::size_t, Is...>) const 263 { 264 int const _sequencer[]= { 265 (((*this)(util::get<Is>(tuple))), 0)... 266 }; 267 (void)_sequencer; 268 } 269 270 template <typename ...Ts> 271 HPX_FORCEINLINE applyhpx::lcos::detail::set_wait_some_callback_impl272 void apply(util::tuple<Ts...>& sequence) const 273 { 274 apply(sequence, 275 typename util::detail::make_index_pack<sizeof...(Ts)>::type()); 276 } 277 278 template <typename Sequence_> 279 HPX_FORCEINLINE applyhpx::lcos::detail::set_wait_some_callback_impl280 void apply(Sequence_& sequence) const 281 { 282 std::for_each(sequence.begin(), sequence.end(), *this); 283 } 284 285 wait_some<Sequence>& wait_; 286 }; 287 288 template <typename Sequence> set_on_completed_callback(wait_some<Sequence> & wait)289 void set_on_completed_callback(wait_some<Sequence>& wait) 290 { 291 set_wait_some_callback_impl<Sequence> callback(wait); 292 callback.apply(wait.lazy_values_); 293 } 294 295 template <typename Sequence> 296 struct wait_some : std::enable_shared_from_this<wait_some<Sequence> > //-V690 297 { 298 public: on_future_readyhpx::lcos::detail::wait_some299 void on_future_ready(threads::thread_id_type const& id) 300 { 301 if (count_.fetch_add(1) + 1 == needed_count_) 302 { 303 // reactivate waiting thread only if it's not us 304 if (id != threads::get_self_id()) 305 threads::set_thread_state(id, threads::pending); 306 else 307 goal_reached_on_calling_thread_ = true; 308 } 309 } 310 311 private: 312 // workaround gcc regression wrongly instantiating constructors 313 wait_some(); 314 wait_some(wait_some const&); 315 316 public: 317 typedef Sequence argument_type; 318 wait_somehpx::lcos::detail::wait_some319 wait_some(argument_type && lazy_values, std::size_t n) 320 : lazy_values_(std::move(lazy_values)) 321 , count_(0) 322 , needed_count_(n) 323 , goal_reached_on_calling_thread_(false) 324 {} 325 operator ()hpx::lcos::detail::wait_some326 void operator()() 327 { 328 // set callback functions to executed wait future is ready 329 set_on_completed_callback(*this); 330 331 // if all of the requested futures are already set, our 332 // callback above has already been called often enough, otherwise 333 // we suspend ourselves 334 if (!goal_reached_on_calling_thread_) 335 { 336 // wait for any of the futures to return to become ready 337 this_thread::suspend(threads::suspended, 338 "hpx::detail::wait_some::operator()"); 339 } 340 341 // at least N futures should be ready 342 HPX_ASSERT(count_.load(std::memory_order_seq_cst) >= needed_count_); 343 } 344 345 argument_type lazy_values_; 346 std::atomic<std::size_t> count_; 347 std::size_t const needed_count_; 348 bool goal_reached_on_calling_thread_; 349 }; 350 } 351 352 /////////////////////////////////////////////////////////////////////////// 353 template <typename Future> wait_some(std::size_t n,std::vector<Future> const & lazy_values,error_code & ec=throws)354 void wait_some(std::size_t n, 355 std::vector<Future> const& lazy_values, 356 error_code& ec = throws) 357 { 358 static_assert( 359 traits::is_future<Future>::value, "invalid use of wait_some"); 360 361 typedef 362 typename traits::detail::shared_state_ptr_for<Future>::type 363 shared_state_ptr; 364 typedef std::vector<shared_state_ptr> result_type; 365 366 if (n == 0) 367 { 368 return; 369 } 370 371 if (n > lazy_values.size()) 372 { 373 HPX_THROWS_IF(ec, hpx::bad_parameter, 374 "hpx::lcos::wait_some", 375 "number of results to wait for is out of bounds"); 376 return; 377 } 378 379 result_type lazy_values_; 380 std::transform(lazy_values.begin(), lazy_values.end(), 381 std::back_inserter(lazy_values_), 382 traits::detail::wait_get_shared_state<Future>()); 383 384 std::shared_ptr<detail::wait_some<result_type> > f = 385 std::make_shared<detail::wait_some<result_type> >( 386 std::move(lazy_values_), n); 387 388 return (*f.get())(); 389 } 390 391 template <typename Future> wait_some(std::size_t n,std::vector<Future> & lazy_values,error_code & ec=throws)392 void wait_some(std::size_t n, 393 std::vector<Future>& lazy_values, 394 error_code& ec = throws) 395 { 396 return lcos::wait_some( 397 n, const_cast<std::vector<Future> const&>(lazy_values), ec); 398 } 399 400 template <typename Future> wait_some(std::size_t n,std::vector<Future> && lazy_values,error_code & ec=throws)401 void wait_some(std::size_t n, 402 std::vector<Future> && lazy_values, 403 error_code& ec = throws) 404 { 405 return lcos::wait_some( 406 n, const_cast<std::vector<Future> const&>(lazy_values), ec); 407 } 408 409 /////////////////////////////////////////////////////////////////////////// 410 template <typename Future, std::size_t N> wait_some(std::size_t n,std::array<Future,N> const & lazy_values,error_code & ec=throws)411 void wait_some(std::size_t n, 412 std::array<Future, N> const& lazy_values, 413 error_code& ec = throws) 414 { 415 static_assert( 416 traits::is_future<Future>::value, "invalid use of wait_some"); 417 418 typedef 419 typename traits::detail::shared_state_ptr_for<Future>::type 420 shared_state_ptr; 421 typedef std::array<shared_state_ptr, N> result_type; 422 423 if (n == 0) 424 { 425 return; 426 } 427 428 if (n > lazy_values.size()) 429 { 430 HPX_THROWS_IF(ec, hpx::bad_parameter, 431 "hpx::lcos::wait_some", 432 "number of results to wait for is out of bounds"); 433 return; 434 } 435 436 result_type lazy_values_; 437 std::transform(lazy_values.begin(), lazy_values.end(), 438 lazy_values_.begin(), 439 traits::detail::wait_get_shared_state<Future>()); 440 441 std::shared_ptr<detail::wait_some<result_type> > f = 442 std::make_shared<detail::wait_some<result_type> >( 443 std::move(lazy_values_), n); 444 445 return (*f.get())(); 446 } 447 448 template <typename Future, std::size_t N> wait_some(std::size_t n,std::array<Future,N> & lazy_values,error_code & ec=throws)449 void wait_some(std::size_t n, 450 std::array<Future, N>& lazy_values, 451 error_code& ec = throws) 452 { 453 return lcos::wait_some( 454 n, const_cast<std::array<Future, N> const&>(lazy_values), ec); 455 } 456 457 template <typename Future, std::size_t N> wait_some(std::size_t n,std::array<Future,N> && lazy_values,error_code & ec=throws)458 void wait_some(std::size_t n, 459 std::array<Future, N> && lazy_values, 460 error_code& ec = throws) 461 { 462 return lcos::wait_some( 463 n, const_cast<std::array<Future, N> const&>(lazy_values), ec); 464 } 465 466 /////////////////////////////////////////////////////////////////////////// 467 template <typename Iterator> 468 typename util::always_void< 469 typename lcos::detail::future_iterator_traits<Iterator>::type 470 >::type wait_some(std::size_t n,Iterator begin,Iterator end,error_code & ec=throws)471 wait_some(std::size_t n, Iterator begin, Iterator end, 472 error_code& ec = throws) 473 { 474 typedef 475 typename lcos::detail::future_iterator_traits<Iterator>::type 476 future_type; 477 typedef 478 typename traits::detail::shared_state_ptr_for<future_type>::type 479 shared_state_ptr; 480 typedef std::vector<shared_state_ptr> result_type; 481 482 result_type lazy_values_; 483 std::transform(begin, end, std::back_inserter(lazy_values_), 484 traits::detail::wait_get_shared_state<future_type>()); 485 486 std::shared_ptr<detail::wait_some<result_type> > f = 487 std::make_shared<detail::wait_some<result_type> >( 488 std::move(lazy_values_), n); 489 490 return (*f.get())(); 491 } 492 493 template <typename Iterator> 494 Iterator wait_some_n(std::size_t n,Iterator begin,std::size_t count,error_code & ec=throws)495 wait_some_n(std::size_t n, Iterator begin, 496 std::size_t count, error_code& ec = throws) 497 { 498 typedef 499 typename lcos::detail::future_iterator_traits<Iterator>::type 500 future_type; 501 typedef 502 typename traits::detail::shared_state_ptr_for<future_type>::type 503 shared_state_ptr; 504 typedef std::vector<shared_state_ptr> result_type; 505 506 result_type lazy_values_; 507 lazy_values_.resize(count); 508 traits::detail::wait_get_shared_state<future_type> func; 509 for (std::size_t i = 0; i != count; ++i) 510 lazy_values_.push_back(func(*begin++)); 511 512 std::shared_ptr<detail::wait_some<result_type> > f = 513 std::make_shared<detail::wait_some<result_type> >( 514 std::move(lazy_values_), n); 515 516 (*f.get())(); 517 518 return begin; 519 } 520 wait_some(std::size_t n,error_code & ec=throws)521 inline void wait_some(std::size_t n, error_code& ec = throws) 522 { 523 if (n == 0) 524 { 525 return; 526 } 527 528 HPX_THROWS_IF(ec, hpx::bad_parameter, 529 "hpx::lcos::wait_some", 530 "number of results to wait for is out of bounds"); 531 } 532 533 /////////////////////////////////////////////////////////////////////////// 534 template <typename T> wait_some(std::size_t n,hpx::future<T> && f,error_code & ec=throws)535 void wait_some(std::size_t n, hpx::future<T> && f, error_code& ec = throws) 536 { 537 if (n != 1) 538 { 539 HPX_THROWS_IF(ec, hpx::bad_parameter, 540 "hpx::lcos::wait_some", 541 "number of results to wait for is out of bounds"); 542 return; 543 } 544 545 f.wait(); 546 } 547 548 template <typename T> wait_some(std::size_t n,hpx::shared_future<T> && f,error_code & ec=throws)549 void wait_some(std::size_t n, hpx::shared_future<T> && f, error_code& ec = throws) 550 { 551 if (n != 1) 552 { 553 HPX_THROWS_IF(ec, hpx::bad_parameter, 554 "hpx::lcos::wait_some", 555 "number of results to wait for is out of bounds"); 556 return; 557 } 558 559 f.wait(); 560 } 561 562 /////////////////////////////////////////////////////////////////////////// 563 template <typename... Ts> wait_some(std::size_t n,error_code & ec,Ts &&...ts)564 void wait_some(std::size_t n, error_code& ec, Ts&&...ts) 565 { 566 typedef util::tuple< 567 typename traits::detail::shared_state_ptr_for<Ts>::type... 568 > result_type; 569 570 result_type lazy_values_ = 571 result_type(traits::detail::get_shared_state(ts)...); 572 573 if (n == 0) 574 { 575 return; 576 } 577 578 if (n > sizeof...(Ts)) 579 { 580 HPX_THROWS_IF(ec, hpx::bad_parameter, 581 "hpx::lcos::wait_some", 582 "number of results to wait for is out of bounds"); 583 return; 584 } 585 586 std::shared_ptr<detail::wait_some<result_type> > f = 587 std::make_shared<detail::wait_some<result_type> >( 588 std::move(lazy_values_), n); 589 590 return (*f.get())(); 591 } 592 593 template <typename... Ts> wait_some(std::size_t n,Ts &&...ts)594 void wait_some(std::size_t n, Ts&&...ts) 595 { 596 typedef util::tuple< 597 typename traits::detail::shared_state_ptr_for<Ts>::type... 598 > result_type; 599 600 result_type lazy_values_ = 601 result_type(traits::detail::get_shared_state(ts)...); 602 603 if (n == 0) 604 { 605 return; 606 } 607 608 if (n > sizeof...(Ts)) 609 { 610 HPX_THROW_EXCEPTION(hpx::bad_parameter, 611 "hpx::lcos::wait_some", 612 "number of results to wait for is out of bounds"); 613 return; 614 } 615 616 std::shared_ptr<detail::wait_some<result_type> > f = 617 std::make_shared<detail::wait_some<result_type> >( 618 std::move(lazy_values_), n); 619 620 return (*f.get())(); 621 } 622 }} 623 624 namespace hpx 625 { 626 using lcos::wait_some; 627 using lcos::wait_some_n; 628 } 629 630 #endif // DOXYGEN 631 #endif 632