1 //  Copyright (c) 2007-2018 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // hpxinspect:nodeprecatedinclude:boost/ref.hpp
7 // hpxinspect:nodeprecatedname:boost::reference_wrapper
8 
9 #include <hpx/config.hpp>
10 
11 // Intentionally #include future.hpp outside of the guards as it may #include
12 // dataflow.hpp itself
13 #include <hpx/lcos/future.hpp>
14 
15 #ifndef HPX_LCOS_DATAFLOW_HPP
16 #define HPX_LCOS_DATAFLOW_HPP
17 
18 #include <hpx/lcos/detail/future_transforms.hpp>
19 #include <hpx/runtime/get_worker_thread_num.hpp>
20 #include <hpx/runtime/launch_policy.hpp>
21 #include <hpx/runtime/threads/coroutines/detail/get_stack_pointer.hpp>
22 #include <hpx/traits/acquire_future.hpp>
23 #include <hpx/traits/extract_action.hpp>
24 #include <hpx/traits/future_access.hpp>
25 #include <hpx/traits/is_action.hpp>
26 #include <hpx/traits/is_executor.hpp>
27 #include <hpx/traits/is_future.hpp>
28 #include <hpx/traits/is_launch_policy.hpp>
29 #include <hpx/traits/promise_local_result.hpp>
30 #include <hpx/util/always_void.hpp>
31 #include <hpx/util/annotated_function.hpp>
32 #include <hpx/util/deferred_call.hpp>
33 #include <hpx/util/internal_allocator.hpp>
34 #include <hpx/util/invoke_fused.hpp>
35 #include <hpx/util/pack_traversal_async.hpp>
36 #include <hpx/util/thread_description.hpp>
37 #include <hpx/util/tuple.hpp>
38 
39 #include <hpx/parallel/executors/execution.hpp>
40 #include <hpx/parallel/executors/parallel_executor.hpp>
41 
42 #include <boost/intrusive_ptr.hpp>
43 #include <boost/ref.hpp>
44 
45 #include <atomic>
46 #include <cstddef>
47 #include <exception>
48 #include <functional>
49 #include <iterator>
50 #include <memory>
51 #include <type_traits>
52 #include <utility>
53 
54 ///////////////////////////////////////////////////////////////////////////////
55 namespace hpx { namespace lcos { namespace detail
56 {
57     template <typename F, typename Args>
58     struct dataflow_not_callable
59     {
60 #if defined(HPX_HAVE_CXX14_RETURN_TYPE_DEDUCTION)
errorhpx::lcos::detail::dataflow_not_callable61         static auto error(F f, Args args)
62         {
63             hpx::util::invoke_fused(std::move(f), std::move(args));
64         }
65 #else
66         static auto error(F f, Args args)
67          -> decltype(hpx::util::invoke_fused(std::move(f), std::move(args)));
68 #endif
69 
70         using type = decltype(
71             error(std::declval<F>(), std::declval<Args>()));
72     };
73 
74     ///////////////////////////////////////////////////////////////////////
75     template <bool IsAction, typename F, typename Args, typename Enable = void>
76     struct dataflow_return_impl
77     {
78         typedef typename dataflow_not_callable<F, Args>::type type;
79     };
80 
81     template <typename Action, typename Args>
82     struct dataflow_return_impl</*IsAction=*/true, Action, Args>
83     {
84         typedef typename Action::result_type type;
85     };
86 
87     template <typename F, typename Args>
88     struct dataflow_return_impl<
89         /*IsAction=*/false, F, Args,
90         typename hpx::util::always_void<
91             typename hpx::util::detail::invoke_fused_result<F, Args>::type
92         >::type
93     > : util::detail::invoke_fused_result<F, Args>
94     {};
95 
96     template <typename F, typename Args>
97     struct dataflow_return
98       : detail::dataflow_return_impl<traits::is_action<F>::value, F, Args>
99     {};
100 
101     ///////////////////////////////////////////////////////////////////////////
102     template <typename Policy, typename Func, typename Futures>
103     struct dataflow_frame //-V690
104       : hpx::lcos::detail::future_data<
105             typename detail::dataflow_return<Func, Futures>::type>
106     {
107         typedef
108             typename detail::dataflow_return<Func, Futures>::type
109             result_type;
110         typedef hpx::lcos::detail::future_data<result_type> base_type;
111 
112         typedef hpx::lcos::future<result_type> type;
113 
114         typedef std::is_void<result_type> is_void;
115 
116     private:
117         // workaround gcc regression wrongly instantiating constructors
118         dataflow_frame();
119         dataflow_frame(dataflow_frame const&);
120 
121     public:
122         typedef typename base_type::init_no_addref init_no_addref;
123 
124         /// A struct to construct the dataflow_frame in-place
125         struct construction_data
126         {
127             Policy policy_;
128             Func func_;
129         };
130 
131         /// Construct the dataflow_frame from the given policy
132         /// and callable object.
construct_fromhpx::lcos::detail::dataflow_frame133         static construction_data construct_from(Policy policy, Func func)
134         {
135             return construction_data{std::move(policy), std::move(func)};
136         }
137 
dataflow_framehpx::lcos::detail::dataflow_frame138         explicit dataflow_frame(construction_data data)
139           : base_type(init_no_addref{})
140           , policy_(std::move(data.policy_))
141           , func_(std::move(data.func_))
142         {
143         }
144 
145     private:
146         ///////////////////////////////////////////////////////////////////////
147         /// Passes the futures into the evaluation function and
148         /// sets the result future.
149         HPX_FORCEINLINE
executehpx::lcos::detail::dataflow_frame150         void execute(std::false_type, Futures&& futures)
151         {
152             try {
153                 Func func = std::move(func_);
154 
155                 result_type res =
156                     util::invoke_fused(std::move(func), std::move(futures));
157 
158                 this->set_data(std::move(res));
159             }
160             catch(...) {
161                 this->set_exception(std::current_exception());
162             }
163         }
164 
165         /// Passes the futures into the evaluation function and
166         /// sets the result future.
167         HPX_FORCEINLINE
executehpx::lcos::detail::dataflow_frame168         void execute(std::true_type, Futures&& futures)
169         {
170             try {
171                 Func func = std::move(func_);
172 
173                 util::invoke_fused(std::move(func), std::move(futures));
174 
175                 this->set_data(util::unused_type());
176             }
177             catch(...) {
178                 this->set_exception(std::current_exception());
179             }
180         }
181 
donehpx::lcos::detail::dataflow_frame182         HPX_FORCEINLINE void done(Futures futures)
183         {
184             hpx::util::annotate_function annotate(func_);
185 
186             execute(is_void{}, std::move(futures));
187         }
188 
189         ///////////////////////////////////////////////////////////////////////
finalizehpx::lcos::detail::dataflow_frame190         void finalize(hpx::detail::async_policy policy, Futures&& futures)
191         {
192             // schedule the final function invocation with high priority
193             boost::intrusive_ptr<dataflow_frame> this_(this);
194 
195             // simply schedule new thread
196             parallel::execution::parallel_policy_executor<launch::async_policy>
197                 exec{policy};
198             parallel::execution::post(exec, &dataflow_frame::done,
199                 std::move(this_), std::move(futures));
200         }
201 
202         HPX_FORCEINLINE
finalizehpx::lcos::detail::dataflow_frame203         void finalize(hpx::detail::sync_policy, Futures&& futures)
204         {
205             // We need to run the completion on a new thread if we are on a
206             // non HPX thread.
207             bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr;
208 #if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
209             recurse_asynchronously =
210                 !this_thread::has_sufficient_stack_space();
211 #else
212             struct handle_continuation_recursion_count
213             {
214                 handle_continuation_recursion_count()
215                   : count_(threads::get_continuation_recursion_count())
216                 {
217                     ++count_;
218                 }
219                 ~handle_continuation_recursion_count()
220                 {
221                     --count_;
222                 }
223 
224                 std::size_t& count_;
225             } cnt;
226             recurse_asynchronously = recurse_asynchronously ||
227                 cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH;
228 #endif
229             if (!recurse_asynchronously)
230             {
231                 done(std::move(futures));
232             }
233             else
234             {
235                 finalize(hpx::launch::async, std::move(futures));
236             }
237         }
238 
finalizehpx::lcos::detail::dataflow_frame239         void finalize(hpx::detail::fork_policy policy, Futures&& futures)
240         {
241             // schedule the final function invocation with high priority
242             boost::intrusive_ptr<dataflow_frame> this_(this);
243 
244             parallel::execution::parallel_policy_executor<launch::fork_policy>
245                 exec{policy};
246             parallel::execution::post(exec, &dataflow_frame::done,
247                 std::move(this_), std::move(futures));
248         }
249 
finalizehpx::lcos::detail::dataflow_frame250         void finalize(launch policy, Futures&& futures)
251         {
252             if (policy == launch::sync)
253             {
254                 finalize(launch::sync, std::move(futures));
255             }
256             else if (policy == launch::fork)
257             {
258                 finalize(launch::fork, std::move(futures));
259             }
260             else
261             {
262                 finalize(launch::async, std::move(futures));
263             }
264         }
265 
266         // The overload for hpx::dataflow taking an executor simply forwards
267         // to the corresponding executor customization point.
268         //
269         // parallel::execution::executor
270         // threads::executor
271         template <typename Executor>
272         HPX_FORCEINLINE
273         typename std::enable_if<
274             traits::is_one_way_executor<Executor>::value ||
275             traits::is_two_way_executor<Executor>::value ||
276             traits::is_threads_executor<Executor>::value
277         >::type
finalizehpx::lcos::detail::dataflow_frame278         finalize(Executor&& exec, Futures&& futures)
279         {
280             using execute_function_type =
281                 typename std::conditional<
282                     is_void::value,
283                     void (dataflow_frame::*)(std::true_type, Futures&&),
284                     void (dataflow_frame::*)(std::false_type, Futures&&)
285                 >::type;
286 
287             execute_function_type f = &dataflow_frame::execute;
288             boost::intrusive_ptr<dataflow_frame> this_(this);
289 
290             parallel::execution::post(std::forward<Executor>(exec),
291                 f, std::move(this_), is_void{}, std::move(futures));
292         }
293 
294     public:
295         /// Check whether the current future is ready
296         template <typename T>
operator ()hpx::lcos::detail::dataflow_frame297         auto operator()(util::async_traverse_visit_tag, T&& current)
298             -> decltype(async_visit_future(std::forward<T>(current)))
299         {
300             return async_visit_future(std::forward<T>(current));
301         }
302 
303         /// Detach the current execution context and continue when the
304         /// current future was set to be ready.
305         template <typename T, typename N>
operator ()hpx::lcos::detail::dataflow_frame306         auto operator()(util::async_traverse_detach_tag, T&& current, N&& next)
307             -> decltype(async_detach_future(
308                 std::forward<T>(current), std::forward<N>(next)))
309         {
310             return async_detach_future(
311                 std::forward<T>(current), std::forward<N>(next));
312         }
313 
314         /// Finish the dataflow when the traversal has finished
operator ()hpx::lcos::detail::dataflow_frame315         HPX_FORCEINLINE void operator()(
316             util::async_traverse_complete_tag, Futures futures)
317         {
318             finalize(policy_, std::move(futures));
319         }
320 
321     private:
322         Policy policy_;
323         Func func_;
324     };
325 
326     ///////////////////////////////////////////////////////////////////////////
327     template <
328         typename Policy, typename Func, typename ...Ts,
329         typename Frame = dataflow_frame<
330             typename std::decay<Policy>::type,
331             typename std::decay<Func>::type,
332             util::tuple<typename std::decay<Ts>::type...>>>
create_dataflow(Policy && policy,Func && func,Ts &&...ts)333     typename Frame::type create_dataflow(
334         Policy && policy, Func && func, Ts &&... ts)
335     {
336         // Create the data which is used to construct the dataflow_frame
337         auto data = Frame::construct_from(
338             std::forward<Policy>(policy), std::forward<Func>(func));
339 
340         // Construct the dataflow_frame and traverse
341         // the arguments asynchronously
342         boost::intrusive_ptr<Frame> p = util::traverse_pack_async(
343             util::async_traverse_in_place_tag<Frame>{},
344             std::move(data), std::forward<Ts>(ts)...);
345 
346         using traits::future_access;
347         return future_access<typename Frame::type>::create(std::move(p));
348     }
349 
350     ///////////////////////////////////////////////////////////////////////////
351     template <
352         typename Allocator, typename Policy, typename Func, typename ...Ts,
353         typename Frame = dataflow_frame<
354             typename std::decay<Policy>::type,
355             typename std::decay<Func>::type,
356             util::tuple<typename std::decay<Ts>::type...>>>
create_dataflow_alloc(Allocator const & alloc,Policy && policy,Func && func,Ts &&...ts)357     typename Frame::type create_dataflow_alloc(
358         Allocator const& alloc, Policy && policy, Func && func, Ts &&... ts)
359     {
360         // Create the data which is used to construct the dataflow_frame
361         auto data = Frame::construct_from(
362             std::forward<Policy>(policy), std::forward<Func>(func));
363 
364         // Construct the dataflow_frame and traverse
365         // the arguments asynchronously
366         boost::intrusive_ptr<Frame> p = util::traverse_pack_async_allocator(
367             alloc, util::async_traverse_in_place_tag<Frame>{},
368             std::move(data), std::forward<Ts>(ts)...);
369 
370         using traits::future_access;
371         return future_access<typename Frame::type>::create(std::move(p));
372     }
373 
374     ///////////////////////////////////////////////////////////////////////////
375     template <typename FD, typename Enable = void>
376     struct dataflow_dispatch;
377 
378     // launch
379     template <typename Policy>
380     struct dataflow_dispatch<Policy, typename std::enable_if<
381             traits::is_launch_policy<Policy>::value
382         >::type>
383     {
384         template <
385             typename Allocator, typename Policy_,
386             typename Component, typename Signature, typename Derived,
387             typename ...Ts>
388         HPX_FORCEINLINE static lcos::future<
389             typename traits::promise_local_result<
390                 typename hpx::actions::basic_action<
391                     Component, Signature, Derived>::remote_result_type
392             >::type>
callhpx::lcos::detail::dataflow_dispatch393         call(Allocator const& alloc, Policy_ && policy,
394             hpx::actions::basic_action<Component, Signature, Derived> const& act,
395             naming::id_type const& id, Ts &&... ts)
396         {
397             return detail::create_dataflow_alloc(alloc,
398                 std::forward<Policy_>(policy), Derived{},
399                 id, traits::acquire_future_disp()(std::forward<Ts>(ts))...);
400         }
401 
402         template <typename Allocator, typename Policy_, typename F,
403             typename ...Ts>
404         HPX_FORCEINLINE static typename std::enable_if<
405             !traits::is_action<typename std::decay<F>::type>::value,
406             lcos::future<
407                 typename detail::dataflow_return<
408                     typename std::decay<F>::type,
409                     util::tuple<typename traits::acquire_future<Ts>::type...>
410                 >::type>
411         >::type
callhpx::lcos::detail::dataflow_dispatch412         call(Allocator const& alloc, Policy_ && policy, F && f,
413             Ts &&... ts)
414         {
415             return detail::create_dataflow_alloc(alloc,
416                 std::forward<Policy_>(policy), std::forward<F>(f),
417                 traits::acquire_future_disp()(std::forward<Ts>(ts))...);
418         }
419     };
420 
421     // parallel executors
422     // threads::executor
423     template <typename Executor>
424     struct dataflow_dispatch<Executor, typename std::enable_if<
425             traits::is_one_way_executor<Executor>::value ||
426             traits::is_two_way_executor<Executor>::value ||
427             traits::is_threads_executor<Executor>::value
428         >::type>
429     {
430         template <typename Allocator, typename Executor_, typename F,
431             typename ...Ts>
432         HPX_FORCEINLINE static typename std::enable_if<
433             !traits::is_action<typename std::decay<F>::type>::value,
434             lcos::future<
435                 typename detail::dataflow_return<
436                     typename std::decay<F>::type,
437                     util::tuple<typename traits::acquire_future<Ts>::type...>
438                 >::type>
439         >::type
callhpx::lcos::detail::dataflow_dispatch440         call(Allocator const& alloc, Executor_ && exec, F && f, Ts &&... ts)
441         {
442             return detail::create_dataflow_alloc(alloc,
443                 std::forward<Executor_>(exec), std::forward<F>(f),
444                 traits::acquire_future_disp()(std::forward<Ts>(ts))...);
445         }
446     };
447 
448     // any action, plain function, or function object
449     template <typename FD>
450     struct dataflow_dispatch<FD, typename std::enable_if<
451         !traits::is_launch_policy<FD>::value &&
452         !(
453             traits::is_one_way_executor<FD>::value ||
454             traits::is_two_way_executor<FD>::value ||
455             traits::is_threads_executor<FD>::value)
456         >::type>
457     {
458         template <
459             typename Allocator, typename Component, typename Signature,
460             typename Derived, typename ...Ts>
461         HPX_FORCEINLINE static auto
callhpx::lcos::detail::dataflow_dispatch462         call(Allocator const& alloc,
463             hpx::actions::basic_action<Component, Signature, Derived> const& act,
464             naming::id_type const& id, Ts &&... ts)
465         ->  decltype(dataflow_dispatch<launch>::call(
466                 alloc, launch::async, act, id, std::forward<Ts>(ts)...))
467         {
468             return dataflow_dispatch<launch>::call(
469                 alloc, launch::async, act, id, std::forward<Ts>(ts)...);
470         }
471 
472         template <
473             typename Allocator, typename F, typename ...Ts,
474             typename Enable = typename std::enable_if<
475                 !traits::is_action<typename std::decay<F>::type>::value
476             >::type>
477         HPX_FORCEINLINE static auto
callhpx::lcos::detail::dataflow_dispatch478         call(Allocator const& alloc, F && f, Ts &&... ts)
479         ->  decltype(dataflow_dispatch<launch>::call(
480                 alloc, launch::async, std::forward<F>(f),
481                 std::forward<Ts>(ts)...))
482         {
483             return dataflow_dispatch<launch>::call(
484                 alloc, launch::async, std::forward<F>(f),
485                 std::forward<Ts>(ts)...);
486         }
487     };
488 
489     ///////////////////////////////////////////////////////////////////////////
490     template <typename Action, typename T0, typename Enable = void>
491     struct dataflow_action_dispatch
492     {
493         template <typename Allocator, typename ...Ts>
494         HPX_FORCEINLINE static lcos::future<
495             typename traits::promise_local_result<
496                 typename hpx::traits::extract_action<Action>::remote_result_type
497             >::type>
callhpx::lcos::detail::dataflow_action_dispatch498         call(Allocator const& alloc, naming::id_type const& id,
499             Ts &&... ts)
500         {
501             return dataflow_dispatch<Action>::call(alloc,
502                 Action(), id, std::forward<Ts>(ts)...);
503         }
504     };
505 
506     template <typename Action, typename Policy>
507     struct dataflow_action_dispatch<Action, Policy, typename std::enable_if<
508             traits::is_launch_policy<typename std::decay<Policy>::type>::value
509         >::type>
510     {
511         template <typename Allocator, typename ...Ts>
512         HPX_FORCEINLINE static lcos::future<
513             typename traits::promise_local_result<
514                 typename hpx::traits::extract_action<Action>::remote_result_type
515             >::type>
callhpx::lcos::detail::dataflow_action_dispatch516         call(Allocator const& alloc, Policy && policy,
517             naming::id_type const& id, Ts &&... ts)
518         {
519             return dataflow_dispatch<typename std::decay<Policy>::type>::
520                 call(alloc, std::forward<Policy>(policy), Action(), id,
521                     std::forward<Ts>(ts)...);
522         }
523     };
524 }}}
525 
526 ///////////////////////////////////////////////////////////////////////////////
527 namespace hpx
528 {
529     template <typename F, typename ...Ts>
530     HPX_FORCEINLINE
dataflow(F && f,Ts &&...ts)531     auto dataflow(F && f, Ts &&... ts)
532     ->  decltype(
533             lcos::detail::dataflow_dispatch<typename std::decay<F>::type>::call(
534                 hpx::util::internal_allocator<>{}, std::forward<F>(f),
535                 std::forward<Ts>(ts)...
536         ))
537     {
538         return lcos::detail::dataflow_dispatch<typename std::decay<F>::type>::
539             call(hpx::util::internal_allocator<>{}, std::forward<F>(f),
540                 std::forward<Ts>(ts)...);
541     }
542 
543     template <typename Allocator, typename F, typename ...Ts>
544     HPX_FORCEINLINE
dataflow_alloc(Allocator const & alloc,F && f,Ts &&...ts)545     auto dataflow_alloc(Allocator const& alloc, F && f, Ts &&... ts)
546     ->  decltype(
547             lcos::detail::dataflow_dispatch<typename std::decay<F>::type>::
548                 call(alloc, std::forward<F>(f), std::forward<Ts>(ts)...
549         ))
550     {
551         return lcos::detail::dataflow_dispatch<typename std::decay<F>::type>::
552             call(alloc, std::forward<F>(f), std::forward<Ts>(ts)...);
553     }
554 
555     template <
556         typename Action, typename T0, typename ...Ts,
557         typename Enable = typename std::enable_if<
558             traits::is_action<Action>::value>::type>
559     HPX_FORCEINLINE
dataflow(T0 && t0,Ts &&...ts)560     auto dataflow(T0 && t0, Ts &&... ts)
561     ->  decltype(lcos::detail::dataflow_action_dispatch<Action, T0>::call(
562             hpx::util::internal_allocator<>{}, std::forward<T0>(t0),
563             std::forward<Ts>(ts)...))
564     {
565         return lcos::detail::dataflow_action_dispatch<Action, T0>::call(
566             hpx::util::internal_allocator<>{}, std::forward<T0>(t0),
567             std::forward<Ts>(ts)...);
568     }
569 
570     template <
571         typename Action, typename Allocator, typename T0, typename ...Ts,
572         typename Enable = typename std::enable_if<
573             traits::is_action<Action>::value>::type>
574     HPX_FORCEINLINE
dataflow_alloc(Allocator const & alloc,T0 && t0,Ts &&...ts)575     auto dataflow_alloc(Allocator const& alloc, T0 && t0, Ts &&... ts)
576     ->  decltype(lcos::detail::dataflow_action_dispatch<Action, T0>::call(
577             alloc, std::forward<T0>(t0), std::forward<Ts>(ts)...))
578     {
579         return lcos::detail::dataflow_action_dispatch<Action, T0>::call(
580             alloc, std::forward<T0>(t0), std::forward<Ts>(ts)...);
581     }
582 }
583 
584 #endif /*HPX_LCOS_DATAFLOW_HPP*/
585