1 //  Copyright (c) 2007-2017 Hartmut Kaiser
2 //  Copyright (c) 2013 Agustin Berge
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 /// \file lcos/wait_some.hpp
8 
9 #if !defined(HPX_LCOS_WAIT_SOME_APR_19_2012_0203PM)
10 #define HPX_LCOS_WAIT_SOME_APR_19_2012_0203PM
11 
12 #if defined(DOXYGEN)
13 namespace hpx
14 {
15     /// The function \a wait_some is an operator allowing to join on the result
16     /// of all given futures. It AND-composes all future objects given and
17     /// returns a new future object representing the same list of futures
18     /// after n of them finished executing.
19     ///
20     /// \param n        [in] The number of futures out of the arguments which
21     ///                 have to become ready in order for the returned future
22     ///                 to get ready.
23     /// \param first    [in] The iterator pointing to the first element of a
24     ///                 sequence of \a future or \a shared_future objects for
25     ///                 which \a when_all should wait.
26     /// \param last     [in] The iterator pointing to the last element of a
27     ///                 sequence of \a future or \a shared_future objects for
28     ///                 which \a when_all should wait.
29     /// \param ec       [in,out] this represents the error status on exit, if
30     ///                 this is pre-initialized to \a hpx#throws the function
31     ///                 will throw on error instead.
32     ///
33     /// \note The future returned by the function \a wait_some becomes ready
34     ///       when at least \a n argument futures have become ready.
35     ///
36     /// \return   Returns a future holding the same list of futures as has
37     ///           been passed to wait_some.
38     ///           - future<vector<future<R>>>: If the input cardinality is
39     ///             unknown at compile time and the futures are all of the
40     ///             same type.
41     ///
42     /// \note Calling this version of \a wait_some where first == last, returns
43     ///       a future with an empty vector that is immediately ready.
44     ///       Each future and shared_future is waited upon and then copied into
45     ///       the collection of the output (returned) future, maintaining the
46     ///       order of the futures in the input collection.
47     ///       The future returned by \a wait_some will not throw an exception,
48     ///       but the futures held in the output collection may.
49     template <typename InputIter>
50     future<vector<future<typename std::iterator_traits<InputIter>::value_type>>>
51     wait_some(std::size_t n, Iterator first, Iterator last, error_code& ec = throws);
52 
53     /// The function \a wait_some is an operator allowing to join on the result
54     /// of all given futures. It AND-composes all future objects given and
55     /// returns a new future object representing the same list of futures
56     /// after n of them finished executing.
57     ///
58     /// \param n        [in] The number of futures out of the arguments which
59     ///                 have to become ready in order for the returned future
60     ///                 to get ready.
61     /// \param futures  [in] A vector holding an arbitrary amount of \a future
62     ///                 or \a shared_future objects for which \a wait_some
63     ///                 should wait.
64     /// \param ec       [in,out] this represents the error status on exit, if
65     ///                 this is pre-initialized to \a hpx#throws the function
66     ///                 will throw on error instead.
67     ///
68     /// \note The function \a wait_all returns after \a n futures have become
69     ///       ready. All input futures are still valid after \a wait_all
70     ///       returns.
71     ///
72     /// \note Each future and shared_future is waited upon and then copied into
73     ///       the collection of the output (returned) future, maintaining the
74     ///       order of the futures in the input collection.
75     ///       The future returned by \a wait_some will not throw an exception,
76     ///       but the futures held in the output collection may.
77     template <typename R>
78     void wait_some(std::size_t n, std::vector<future<R>>&& futures,
79         error_code& ec = throws);
80 
81     /// The function \a wait_some is an operator allowing to join on the result
82     /// of all given futures. It AND-composes all future objects given and
83     /// returns a new future object representing the same list of futures
84     /// after n of them finished executing.
85     ///
86     /// \param n        [in] The number of futures out of the arguments which
87     ///                 have to become ready in order for the returned future
88     ///                 to get ready.
89     /// \param futures  [in] An array holding an arbitrary amount of \a future
90     ///                 or \a shared_future objects for which \a wait_some
91     ///                 should wait.
92     /// \param ec       [in,out] this represents the error status on exit, if
93     ///                 this is pre-initialized to \a hpx#throws the function
94     ///                 will throw on error instead.
95     ///
96     /// \note The function \a wait_all returns after \a n futures have become
97     ///       ready. All input futures are still valid after \a wait_all
98     ///       returns.
99     ///
100     /// \note Each future and shared_future is waited upon and then copied into
101     ///       the collection of the output (returned) future, maintaining the
102     ///       order of the futures in the input collection.
103     ///       The future returned by \a wait_some will not throw an exception,
104     ///       but the futures held in the output collection may.
105     template <typename R, std::size_t N>
106     void wait_some(std::size_t n, std::array<future<R>, N>&& futures,
107         error_code& ec = throws);
108 
109     /// The function \a wait_some is an operator allowing to join on the result
110     /// of all given futures. It AND-composes all future objects given and
111     /// returns a new future object representing the same list of futures
112     /// after n of them finished executing.
113     ///
114     /// \param n        [in] The number of futures out of the arguments which
115     ///                 have to become ready in order for the returned future
116     ///                 to get ready.
117     /// \param futures  [in] An arbitrary number of \a future or \a shared_future
118     ///                 objects, possibly holding different types for which
119     ///                 \a wait_some should wait.
120     /// \param ec       [in,out] this represents the error status on exit, if
121     ///                 this is pre-initialized to \a hpx#throws the function
122     ///                 will throw on error instead.
123     ///
124     /// \note The function \a wait_all returns after \a n futures have become
125     ///       ready. All input futures are still valid after \a wait_all
126     ///       returns.
127     ///
128     /// \note Calling this version of \a wait_some where first == last, returns
129     ///       a future with an empty vector that is immediately ready.
130     ///       Each future and shared_future is waited upon and then copied into
131     ///       the collection of the output (returned) future, maintaining the
132     ///       order of the futures in the input collection.
133     ///       The future returned by \a wait_some will not throw an exception,
134     ///       but the futures held in the output collection may.
135     template <typename ...T>
136     void wait_some(std::size_t n, T &&... futures, error_code& ec = throws);
137 
138     /// The function \a wait_some_n is an operator allowing to join on the result
139     /// of all given futures. It AND-composes all future objects given and
140     /// returns a new future object representing the same list of futures
141     /// after n of them finished executing.
142     ///
143     /// \param n        [in] The number of futures out of the arguments which
144     ///                 have to become ready in order for the returned future
145     ///                 to get ready.
146     /// \param first    [in] The iterator pointing to the first element of a
147     ///                 sequence of \a future or \a shared_future objects for
148     ///                 which \a when_all should wait.
149     /// \param count    [in] The number of elements in the sequence starting at
150     ///                 \a first.
151     /// \param ec       [in,out] this represents the error status on exit, if
152     ///                 this is pre-initialized to \a hpx#throws the function
153     ///                 will throw on error instead.
154     ///
155     /// \note The function \a wait_all returns after \a n futures have become
156     ///       ready. All input futures are still valid after \a wait_all
157     ///       returns.
158     ///
159     /// \return This function returns an Iterator referring to the first
160     ///         element after the last processed input element.
161     ///
162     /// \note Calling this version of \a wait_some_n where count == 0, returns
163     ///       a future with the same elements as the arguments that is
164     ///       immediately ready. Possibly none of the futures in that vector
165     ///       are ready.
166     ///       Each future and shared_future is waited upon and then copied into
167     ///       the collection of the output (returned) future, maintaining the
168     ///       order of the futures in the input collection.
169     ///       The future returned by \a wait_some_n will not throw an exception,
170     ///       but the futures held in the output collection may.
171     template <typename InputIter>
172     InputIter wait_some_n(std::size_t n, Iterator first,
173         std::size_t count, error_code& ec = throws);
174 }
175 #else
176 
177 #include <hpx/config.hpp>
178 #include <hpx/lcos/future.hpp>
179 #include <hpx/runtime/threads/thread.hpp>
180 #include <hpx/throw_exception.hpp>
181 #include <hpx/traits/acquire_shared_state.hpp>
182 #include <hpx/traits/future_access.hpp>
183 #include <hpx/traits/is_future.hpp>
184 #include <hpx/util/always_void.hpp>
185 #include <hpx/util/assert.hpp>
186 #include <hpx/util/deferred_call.hpp>
187 #include <hpx/util/detail/pack.hpp>
188 #include <hpx/util/detail/pp/strip_parens.hpp>
189 #include <hpx/util/tuple.hpp>
190 
191 #include <algorithm>
192 #include <array>
193 #include <atomic>
194 #include <cstddef>
195 #include <iterator>
196 #include <memory>
197 #include <type_traits>
198 #include <utility>
199 #include <vector>
200 
201 ///////////////////////////////////////////////////////////////////////////////
202 namespace hpx { namespace lcos
203 {
204     namespace detail
205     {
206         ///////////////////////////////////////////////////////////////////////
207         template <typename Sequence>
208         struct wait_some;
209 
210         template <typename Sequence>
211         struct set_wait_some_callback_impl
212         {
set_wait_some_callback_implhpx::lcos::detail::set_wait_some_callback_impl213             explicit set_wait_some_callback_impl(wait_some<Sequence>& wait)
214               : wait_(wait)
215             {}
216 
217             template <typename SharedState>
operator ()hpx::lcos::detail::set_wait_some_callback_impl218             void operator()(SharedState& shared_state,
219                 typename std::enable_if<
220                     traits::is_shared_state<SharedState>::value
221                 >::type* = nullptr) const
222             {
223                 std::size_t counter =
224                     wait_.count_.load(std::memory_order_seq_cst);
225                 if (counter < wait_.needed_count_ &&
226                     shared_state.get() != nullptr && !shared_state->is_ready())
227                 {
228                     // handle future only if not enough futures are ready yet
229                     // also, do not touch any futures which are already ready
230 
231                     shared_state->execute_deferred();
232 
233                     // execute_deferred might have made the future ready
234                     if (!shared_state->is_ready())
235                     {
236                         shared_state->set_on_completed(
237                             util::deferred_call(
238                                 &wait_some<Sequence>::on_future_ready,
239                                 wait_.shared_from_this(),
240                                 threads::get_self_id()));
241                         return;
242                     }
243                 }
244                 if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_)
245                 {
246                     wait_.goal_reached_on_calling_thread_ = true;
247                 }
248             }
249 
250             template <typename Sequence_>
251             HPX_FORCEINLINE
operator ()hpx::lcos::detail::set_wait_some_callback_impl252             void operator()(Sequence_& sequence,
253                 typename std::enable_if<
254                     !traits::is_shared_state<Sequence_>::value
255                 >::type* = nullptr) const
256             {
257                 apply(sequence);
258             }
259 
260             template <typename Tuple, std::size_t ...Is>
261             HPX_FORCEINLINE
applyhpx::lcos::detail::set_wait_some_callback_impl262             void apply(Tuple& tuple, util::detail::pack_c<std::size_t, Is...>) const
263             {
264                 int const _sequencer[]= {
265                     (((*this)(util::get<Is>(tuple))), 0)...
266                 };
267                 (void)_sequencer;
268             }
269 
270             template <typename ...Ts>
271             HPX_FORCEINLINE
applyhpx::lcos::detail::set_wait_some_callback_impl272             void apply(util::tuple<Ts...>& sequence) const
273             {
274                 apply(sequence,
275                     typename util::detail::make_index_pack<sizeof...(Ts)>::type());
276             }
277 
278             template <typename Sequence_>
279             HPX_FORCEINLINE
applyhpx::lcos::detail::set_wait_some_callback_impl280             void apply(Sequence_& sequence) const
281             {
282                 std::for_each(sequence.begin(), sequence.end(), *this);
283             }
284 
285             wait_some<Sequence>& wait_;
286         };
287 
288         template <typename Sequence>
set_on_completed_callback(wait_some<Sequence> & wait)289         void set_on_completed_callback(wait_some<Sequence>& wait)
290         {
291             set_wait_some_callback_impl<Sequence> callback(wait);
292             callback.apply(wait.lazy_values_);
293         }
294 
295         template <typename Sequence>
296         struct wait_some : std::enable_shared_from_this<wait_some<Sequence> > //-V690
297         {
298         public:
on_future_readyhpx::lcos::detail::wait_some299             void on_future_ready(threads::thread_id_type const& id)
300             {
301                 if (count_.fetch_add(1) + 1 == needed_count_)
302                 {
303                     // reactivate waiting thread only if it's not us
304                     if (id != threads::get_self_id())
305                         threads::set_thread_state(id, threads::pending);
306                     else
307                         goal_reached_on_calling_thread_ = true;
308                 }
309             }
310 
311         private:
312             // workaround gcc regression wrongly instantiating constructors
313             wait_some();
314             wait_some(wait_some const&);
315 
316         public:
317             typedef Sequence argument_type;
318 
wait_somehpx::lcos::detail::wait_some319             wait_some(argument_type && lazy_values, std::size_t n)
320               : lazy_values_(std::move(lazy_values))
321               , count_(0)
322               , needed_count_(n)
323               , goal_reached_on_calling_thread_(false)
324             {}
325 
operator ()hpx::lcos::detail::wait_some326             void operator()()
327             {
328                 // set callback functions to executed wait future is ready
329                 set_on_completed_callback(*this);
330 
331                 // if all of the requested futures are already set, our
332                 // callback above has already been called often enough, otherwise
333                 // we suspend ourselves
334                 if (!goal_reached_on_calling_thread_)
335                 {
336                     // wait for any of the futures to return to become ready
337                     this_thread::suspend(threads::suspended,
338                         "hpx::detail::wait_some::operator()");
339                 }
340 
341                 // at least N futures should be ready
342                 HPX_ASSERT(count_.load(std::memory_order_seq_cst) >= needed_count_);
343             }
344 
345             argument_type lazy_values_;
346             std::atomic<std::size_t> count_;
347             std::size_t const needed_count_;
348             bool goal_reached_on_calling_thread_;
349         };
350     }
351 
352     ///////////////////////////////////////////////////////////////////////////
353     template <typename Future>
wait_some(std::size_t n,std::vector<Future> const & lazy_values,error_code & ec=throws)354     void wait_some(std::size_t n,
355         std::vector<Future> const& lazy_values,
356         error_code& ec = throws)
357     {
358         static_assert(
359             traits::is_future<Future>::value, "invalid use of wait_some");
360 
361         typedef
362             typename traits::detail::shared_state_ptr_for<Future>::type
363             shared_state_ptr;
364         typedef std::vector<shared_state_ptr> result_type;
365 
366         if (n == 0)
367         {
368             return;
369         }
370 
371         if (n > lazy_values.size())
372         {
373             HPX_THROWS_IF(ec, hpx::bad_parameter,
374                 "hpx::lcos::wait_some",
375                 "number of results to wait for is out of bounds");
376             return;
377         }
378 
379         result_type lazy_values_;
380         std::transform(lazy_values.begin(), lazy_values.end(),
381             std::back_inserter(lazy_values_),
382             traits::detail::wait_get_shared_state<Future>());
383 
384         std::shared_ptr<detail::wait_some<result_type> > f =
385             std::make_shared<detail::wait_some<result_type> >(
386                 std::move(lazy_values_), n);
387 
388         return (*f.get())();
389     }
390 
391     template <typename Future>
wait_some(std::size_t n,std::vector<Future> & lazy_values,error_code & ec=throws)392     void wait_some(std::size_t n,
393         std::vector<Future>& lazy_values,
394         error_code& ec = throws)
395     {
396         return lcos::wait_some(
397             n, const_cast<std::vector<Future> const&>(lazy_values), ec);
398     }
399 
400     template <typename Future>
wait_some(std::size_t n,std::vector<Future> && lazy_values,error_code & ec=throws)401     void wait_some(std::size_t n,
402         std::vector<Future> && lazy_values,
403         error_code& ec = throws)
404     {
405         return lcos::wait_some(
406             n, const_cast<std::vector<Future> const&>(lazy_values), ec);
407     }
408 
409     ///////////////////////////////////////////////////////////////////////////
410     template <typename Future, std::size_t N>
wait_some(std::size_t n,std::array<Future,N> const & lazy_values,error_code & ec=throws)411     void wait_some(std::size_t n,
412         std::array<Future, N> const& lazy_values,
413         error_code& ec = throws)
414     {
415         static_assert(
416             traits::is_future<Future>::value, "invalid use of wait_some");
417 
418         typedef
419             typename traits::detail::shared_state_ptr_for<Future>::type
420             shared_state_ptr;
421         typedef std::array<shared_state_ptr, N> result_type;
422 
423         if (n == 0)
424         {
425             return;
426         }
427 
428         if (n > lazy_values.size())
429         {
430             HPX_THROWS_IF(ec, hpx::bad_parameter,
431                 "hpx::lcos::wait_some",
432                 "number of results to wait for is out of bounds");
433             return;
434         }
435 
436         result_type lazy_values_;
437         std::transform(lazy_values.begin(), lazy_values.end(),
438             lazy_values_.begin(),
439             traits::detail::wait_get_shared_state<Future>());
440 
441         std::shared_ptr<detail::wait_some<result_type> > f =
442             std::make_shared<detail::wait_some<result_type> >(
443                 std::move(lazy_values_), n);
444 
445         return (*f.get())();
446     }
447 
448     template <typename Future, std::size_t N>
wait_some(std::size_t n,std::array<Future,N> & lazy_values,error_code & ec=throws)449     void wait_some(std::size_t n,
450         std::array<Future, N>& lazy_values,
451         error_code& ec = throws)
452     {
453         return lcos::wait_some(
454             n, const_cast<std::array<Future, N> const&>(lazy_values), ec);
455     }
456 
457     template <typename Future, std::size_t N>
wait_some(std::size_t n,std::array<Future,N> && lazy_values,error_code & ec=throws)458     void wait_some(std::size_t n,
459         std::array<Future, N> && lazy_values,
460         error_code& ec = throws)
461     {
462         return lcos::wait_some(
463             n, const_cast<std::array<Future, N> const&>(lazy_values), ec);
464     }
465 
466     ///////////////////////////////////////////////////////////////////////////
467     template <typename Iterator>
468     typename util::always_void<
469         typename lcos::detail::future_iterator_traits<Iterator>::type
470     >::type
wait_some(std::size_t n,Iterator begin,Iterator end,error_code & ec=throws)471     wait_some(std::size_t n, Iterator begin, Iterator end,
472         error_code& ec = throws)
473     {
474         typedef
475             typename lcos::detail::future_iterator_traits<Iterator>::type
476             future_type;
477         typedef
478             typename traits::detail::shared_state_ptr_for<future_type>::type
479             shared_state_ptr;
480         typedef std::vector<shared_state_ptr> result_type;
481 
482         result_type lazy_values_;
483         std::transform(begin, end, std::back_inserter(lazy_values_),
484             traits::detail::wait_get_shared_state<future_type>());
485 
486         std::shared_ptr<detail::wait_some<result_type> > f =
487             std::make_shared<detail::wait_some<result_type> >(
488                 std::move(lazy_values_), n);
489 
490         return (*f.get())();
491     }
492 
493     template <typename Iterator>
494     Iterator
wait_some_n(std::size_t n,Iterator begin,std::size_t count,error_code & ec=throws)495     wait_some_n(std::size_t n, Iterator begin,
496         std::size_t count, error_code& ec = throws)
497     {
498         typedef
499             typename lcos::detail::future_iterator_traits<Iterator>::type
500             future_type;
501         typedef
502             typename traits::detail::shared_state_ptr_for<future_type>::type
503             shared_state_ptr;
504         typedef std::vector<shared_state_ptr> result_type;
505 
506         result_type lazy_values_;
507         lazy_values_.resize(count);
508         traits::detail::wait_get_shared_state<future_type> func;
509         for (std::size_t i = 0; i != count; ++i)
510             lazy_values_.push_back(func(*begin++));
511 
512         std::shared_ptr<detail::wait_some<result_type> > f =
513             std::make_shared<detail::wait_some<result_type> >(
514                 std::move(lazy_values_), n);
515 
516         (*f.get())();
517 
518         return begin;
519     }
520 
wait_some(std::size_t n,error_code & ec=throws)521     inline void wait_some(std::size_t n, error_code& ec = throws)
522     {
523         if (n == 0)
524         {
525             return;
526         }
527 
528         HPX_THROWS_IF(ec, hpx::bad_parameter,
529             "hpx::lcos::wait_some",
530             "number of results to wait for is out of bounds");
531     }
532 
533     ///////////////////////////////////////////////////////////////////////////
534     template <typename T>
wait_some(std::size_t n,hpx::future<T> && f,error_code & ec=throws)535     void wait_some(std::size_t n, hpx::future<T> && f, error_code& ec = throws)
536     {
537         if (n != 1)
538         {
539             HPX_THROWS_IF(ec, hpx::bad_parameter,
540                 "hpx::lcos::wait_some",
541                 "number of results to wait for is out of bounds");
542             return;
543         }
544 
545         f.wait();
546     }
547 
548     template <typename T>
wait_some(std::size_t n,hpx::shared_future<T> && f,error_code & ec=throws)549     void wait_some(std::size_t n, hpx::shared_future<T> && f, error_code& ec = throws)
550     {
551         if (n != 1)
552         {
553             HPX_THROWS_IF(ec, hpx::bad_parameter,
554                 "hpx::lcos::wait_some",
555                 "number of results to wait for is out of bounds");
556             return;
557         }
558 
559         f.wait();
560     }
561 
562     ///////////////////////////////////////////////////////////////////////////
563     template <typename... Ts>
wait_some(std::size_t n,error_code & ec,Ts &&...ts)564     void wait_some(std::size_t n, error_code& ec, Ts&&...ts)
565     {
566         typedef util::tuple<
567                 typename traits::detail::shared_state_ptr_for<Ts>::type...
568             > result_type;
569 
570         result_type lazy_values_ =
571             result_type(traits::detail::get_shared_state(ts)...);
572 
573         if (n == 0)
574         {
575             return;
576         }
577 
578         if (n > sizeof...(Ts))
579         {
580             HPX_THROWS_IF(ec, hpx::bad_parameter,
581                 "hpx::lcos::wait_some",
582                 "number of results to wait for is out of bounds");
583             return;
584         }
585 
586         std::shared_ptr<detail::wait_some<result_type> > f =
587             std::make_shared<detail::wait_some<result_type> >(
588                 std::move(lazy_values_), n);
589 
590         return (*f.get())();
591     }
592 
593     template <typename... Ts>
wait_some(std::size_t n,Ts &&...ts)594     void wait_some(std::size_t n, Ts&&...ts)
595     {
596         typedef util::tuple<
597                 typename traits::detail::shared_state_ptr_for<Ts>::type...
598             > result_type;
599 
600         result_type lazy_values_ =
601             result_type(traits::detail::get_shared_state(ts)...);
602 
603         if (n == 0)
604         {
605             return;
606         }
607 
608         if (n > sizeof...(Ts))
609         {
610             HPX_THROW_EXCEPTION(hpx::bad_parameter,
611                 "hpx::lcos::wait_some",
612                 "number of results to wait for is out of bounds");
613             return;
614         }
615 
616         std::shared_ptr<detail::wait_some<result_type> > f =
617             std::make_shared<detail::wait_some<result_type> >(
618                 std::move(lazy_values_), n);
619 
620         return (*f.get())();
621     }
622 }}
623 
624 namespace hpx
625 {
626     using lcos::wait_some;
627     using lcos::wait_some_n;
628 }
629 
630 #endif // DOXYGEN
631 #endif
632