1 // Copyright (c) 2007-2018 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 // hpxinspect:nodeprecatedinclude:boost/ref.hpp 7 // hpxinspect:nodeprecatedname:boost::reference_wrapper 8 9 #include <hpx/config.hpp> 10 11 // Intentionally #include future.hpp outside of the guards as it may #include 12 // dataflow.hpp itself 13 #include <hpx/lcos/future.hpp> 14 15 #ifndef HPX_LCOS_DATAFLOW_HPP 16 #define HPX_LCOS_DATAFLOW_HPP 17 18 #include <hpx/lcos/detail/future_transforms.hpp> 19 #include <hpx/runtime/get_worker_thread_num.hpp> 20 #include <hpx/runtime/launch_policy.hpp> 21 #include <hpx/runtime/threads/coroutines/detail/get_stack_pointer.hpp> 22 #include <hpx/traits/acquire_future.hpp> 23 #include <hpx/traits/extract_action.hpp> 24 #include <hpx/traits/future_access.hpp> 25 #include <hpx/traits/is_action.hpp> 26 #include <hpx/traits/is_executor.hpp> 27 #include <hpx/traits/is_future.hpp> 28 #include <hpx/traits/is_launch_policy.hpp> 29 #include <hpx/traits/promise_local_result.hpp> 30 #include <hpx/util/always_void.hpp> 31 #include <hpx/util/annotated_function.hpp> 32 #include <hpx/util/deferred_call.hpp> 33 #include <hpx/util/internal_allocator.hpp> 34 #include <hpx/util/invoke_fused.hpp> 35 #include <hpx/util/pack_traversal_async.hpp> 36 #include <hpx/util/thread_description.hpp> 37 #include <hpx/util/tuple.hpp> 38 39 #include <hpx/parallel/executors/execution.hpp> 40 #include <hpx/parallel/executors/parallel_executor.hpp> 41 42 #include <boost/intrusive_ptr.hpp> 43 #include <boost/ref.hpp> 44 45 #include <atomic> 46 #include <cstddef> 47 #include <exception> 48 #include <functional> 49 #include <iterator> 50 #include <memory> 51 #include <type_traits> 52 #include <utility> 53 54 /////////////////////////////////////////////////////////////////////////////// 55 namespace hpx { namespace lcos { namespace detail 56 { 57 template <typename F, typename Args> 58 struct dataflow_not_callable 59 { 60 #if defined(HPX_HAVE_CXX14_RETURN_TYPE_DEDUCTION) errorhpx::lcos::detail::dataflow_not_callable61 static auto error(F f, Args args) 62 { 63 hpx::util::invoke_fused(std::move(f), std::move(args)); 64 } 65 #else 66 static auto error(F f, Args args) 67 -> decltype(hpx::util::invoke_fused(std::move(f), std::move(args))); 68 #endif 69 70 using type = decltype( 71 error(std::declval<F>(), std::declval<Args>())); 72 }; 73 74 /////////////////////////////////////////////////////////////////////// 75 template <bool IsAction, typename F, typename Args, typename Enable = void> 76 struct dataflow_return_impl 77 { 78 typedef typename dataflow_not_callable<F, Args>::type type; 79 }; 80 81 template <typename Action, typename Args> 82 struct dataflow_return_impl</*IsAction=*/true, Action, Args> 83 { 84 typedef typename Action::result_type type; 85 }; 86 87 template <typename F, typename Args> 88 struct dataflow_return_impl< 89 /*IsAction=*/false, F, Args, 90 typename hpx::util::always_void< 91 typename hpx::util::detail::invoke_fused_result<F, Args>::type 92 >::type 93 > : util::detail::invoke_fused_result<F, Args> 94 {}; 95 96 template <typename F, typename Args> 97 struct dataflow_return 98 : detail::dataflow_return_impl<traits::is_action<F>::value, F, Args> 99 {}; 100 101 /////////////////////////////////////////////////////////////////////////// 102 template <typename Policy, typename Func, typename Futures> 103 struct dataflow_frame //-V690 104 : hpx::lcos::detail::future_data< 105 typename detail::dataflow_return<Func, Futures>::type> 106 { 107 typedef 108 typename detail::dataflow_return<Func, Futures>::type 109 result_type; 110 typedef hpx::lcos::detail::future_data<result_type> base_type; 111 112 typedef hpx::lcos::future<result_type> type; 113 114 typedef std::is_void<result_type> is_void; 115 116 private: 117 // workaround gcc regression wrongly instantiating constructors 118 dataflow_frame(); 119 dataflow_frame(dataflow_frame const&); 120 121 public: 122 typedef typename base_type::init_no_addref init_no_addref; 123 124 /// A struct to construct the dataflow_frame in-place 125 struct construction_data 126 { 127 Policy policy_; 128 Func func_; 129 }; 130 131 /// Construct the dataflow_frame from the given policy 132 /// and callable object. construct_fromhpx::lcos::detail::dataflow_frame133 static construction_data construct_from(Policy policy, Func func) 134 { 135 return construction_data{std::move(policy), std::move(func)}; 136 } 137 dataflow_framehpx::lcos::detail::dataflow_frame138 explicit dataflow_frame(construction_data data) 139 : base_type(init_no_addref{}) 140 , policy_(std::move(data.policy_)) 141 , func_(std::move(data.func_)) 142 { 143 } 144 145 private: 146 /////////////////////////////////////////////////////////////////////// 147 /// Passes the futures into the evaluation function and 148 /// sets the result future. 149 HPX_FORCEINLINE executehpx::lcos::detail::dataflow_frame150 void execute(std::false_type, Futures&& futures) 151 { 152 try { 153 Func func = std::move(func_); 154 155 result_type res = 156 util::invoke_fused(std::move(func), std::move(futures)); 157 158 this->set_data(std::move(res)); 159 } 160 catch(...) { 161 this->set_exception(std::current_exception()); 162 } 163 } 164 165 /// Passes the futures into the evaluation function and 166 /// sets the result future. 167 HPX_FORCEINLINE executehpx::lcos::detail::dataflow_frame168 void execute(std::true_type, Futures&& futures) 169 { 170 try { 171 Func func = std::move(func_); 172 173 util::invoke_fused(std::move(func), std::move(futures)); 174 175 this->set_data(util::unused_type()); 176 } 177 catch(...) { 178 this->set_exception(std::current_exception()); 179 } 180 } 181 donehpx::lcos::detail::dataflow_frame182 HPX_FORCEINLINE void done(Futures futures) 183 { 184 hpx::util::annotate_function annotate(func_); 185 186 execute(is_void{}, std::move(futures)); 187 } 188 189 /////////////////////////////////////////////////////////////////////// finalizehpx::lcos::detail::dataflow_frame190 void finalize(hpx::detail::async_policy policy, Futures&& futures) 191 { 192 // schedule the final function invocation with high priority 193 boost::intrusive_ptr<dataflow_frame> this_(this); 194 195 // simply schedule new thread 196 parallel::execution::parallel_policy_executor<launch::async_policy> 197 exec{policy}; 198 parallel::execution::post(exec, &dataflow_frame::done, 199 std::move(this_), std::move(futures)); 200 } 201 202 HPX_FORCEINLINE finalizehpx::lcos::detail::dataflow_frame203 void finalize(hpx::detail::sync_policy, Futures&& futures) 204 { 205 // We need to run the completion on a new thread if we are on a 206 // non HPX thread. 207 bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr; 208 #if defined(HPX_HAVE_THREADS_GET_STACK_POINTER) 209 recurse_asynchronously = 210 !this_thread::has_sufficient_stack_space(); 211 #else 212 struct handle_continuation_recursion_count 213 { 214 handle_continuation_recursion_count() 215 : count_(threads::get_continuation_recursion_count()) 216 { 217 ++count_; 218 } 219 ~handle_continuation_recursion_count() 220 { 221 --count_; 222 } 223 224 std::size_t& count_; 225 } cnt; 226 recurse_asynchronously = recurse_asynchronously || 227 cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH; 228 #endif 229 if (!recurse_asynchronously) 230 { 231 done(std::move(futures)); 232 } 233 else 234 { 235 finalize(hpx::launch::async, std::move(futures)); 236 } 237 } 238 finalizehpx::lcos::detail::dataflow_frame239 void finalize(hpx::detail::fork_policy policy, Futures&& futures) 240 { 241 // schedule the final function invocation with high priority 242 boost::intrusive_ptr<dataflow_frame> this_(this); 243 244 parallel::execution::parallel_policy_executor<launch::fork_policy> 245 exec{policy}; 246 parallel::execution::post(exec, &dataflow_frame::done, 247 std::move(this_), std::move(futures)); 248 } 249 finalizehpx::lcos::detail::dataflow_frame250 void finalize(launch policy, Futures&& futures) 251 { 252 if (policy == launch::sync) 253 { 254 finalize(launch::sync, std::move(futures)); 255 } 256 else if (policy == launch::fork) 257 { 258 finalize(launch::fork, std::move(futures)); 259 } 260 else 261 { 262 finalize(launch::async, std::move(futures)); 263 } 264 } 265 266 // The overload for hpx::dataflow taking an executor simply forwards 267 // to the corresponding executor customization point. 268 // 269 // parallel::execution::executor 270 // threads::executor 271 template <typename Executor> 272 HPX_FORCEINLINE 273 typename std::enable_if< 274 traits::is_one_way_executor<Executor>::value || 275 traits::is_two_way_executor<Executor>::value || 276 traits::is_threads_executor<Executor>::value 277 >::type finalizehpx::lcos::detail::dataflow_frame278 finalize(Executor&& exec, Futures&& futures) 279 { 280 using execute_function_type = 281 typename std::conditional< 282 is_void::value, 283 void (dataflow_frame::*)(std::true_type, Futures&&), 284 void (dataflow_frame::*)(std::false_type, Futures&&) 285 >::type; 286 287 execute_function_type f = &dataflow_frame::execute; 288 boost::intrusive_ptr<dataflow_frame> this_(this); 289 290 parallel::execution::post(std::forward<Executor>(exec), 291 f, std::move(this_), is_void{}, std::move(futures)); 292 } 293 294 public: 295 /// Check whether the current future is ready 296 template <typename T> operator ()hpx::lcos::detail::dataflow_frame297 auto operator()(util::async_traverse_visit_tag, T&& current) 298 -> decltype(async_visit_future(std::forward<T>(current))) 299 { 300 return async_visit_future(std::forward<T>(current)); 301 } 302 303 /// Detach the current execution context and continue when the 304 /// current future was set to be ready. 305 template <typename T, typename N> operator ()hpx::lcos::detail::dataflow_frame306 auto operator()(util::async_traverse_detach_tag, T&& current, N&& next) 307 -> decltype(async_detach_future( 308 std::forward<T>(current), std::forward<N>(next))) 309 { 310 return async_detach_future( 311 std::forward<T>(current), std::forward<N>(next)); 312 } 313 314 /// Finish the dataflow when the traversal has finished operator ()hpx::lcos::detail::dataflow_frame315 HPX_FORCEINLINE void operator()( 316 util::async_traverse_complete_tag, Futures futures) 317 { 318 finalize(policy_, std::move(futures)); 319 } 320 321 private: 322 Policy policy_; 323 Func func_; 324 }; 325 326 /////////////////////////////////////////////////////////////////////////// 327 template < 328 typename Policy, typename Func, typename ...Ts, 329 typename Frame = dataflow_frame< 330 typename std::decay<Policy>::type, 331 typename std::decay<Func>::type, 332 util::tuple<typename std::decay<Ts>::type...>>> create_dataflow(Policy && policy,Func && func,Ts &&...ts)333 typename Frame::type create_dataflow( 334 Policy && policy, Func && func, Ts &&... ts) 335 { 336 // Create the data which is used to construct the dataflow_frame 337 auto data = Frame::construct_from( 338 std::forward<Policy>(policy), std::forward<Func>(func)); 339 340 // Construct the dataflow_frame and traverse 341 // the arguments asynchronously 342 boost::intrusive_ptr<Frame> p = util::traverse_pack_async( 343 util::async_traverse_in_place_tag<Frame>{}, 344 std::move(data), std::forward<Ts>(ts)...); 345 346 using traits::future_access; 347 return future_access<typename Frame::type>::create(std::move(p)); 348 } 349 350 /////////////////////////////////////////////////////////////////////////// 351 template < 352 typename Allocator, typename Policy, typename Func, typename ...Ts, 353 typename Frame = dataflow_frame< 354 typename std::decay<Policy>::type, 355 typename std::decay<Func>::type, 356 util::tuple<typename std::decay<Ts>::type...>>> create_dataflow_alloc(Allocator const & alloc,Policy && policy,Func && func,Ts &&...ts)357 typename Frame::type create_dataflow_alloc( 358 Allocator const& alloc, Policy && policy, Func && func, Ts &&... ts) 359 { 360 // Create the data which is used to construct the dataflow_frame 361 auto data = Frame::construct_from( 362 std::forward<Policy>(policy), std::forward<Func>(func)); 363 364 // Construct the dataflow_frame and traverse 365 // the arguments asynchronously 366 boost::intrusive_ptr<Frame> p = util::traverse_pack_async_allocator( 367 alloc, util::async_traverse_in_place_tag<Frame>{}, 368 std::move(data), std::forward<Ts>(ts)...); 369 370 using traits::future_access; 371 return future_access<typename Frame::type>::create(std::move(p)); 372 } 373 374 /////////////////////////////////////////////////////////////////////////// 375 template <typename FD, typename Enable = void> 376 struct dataflow_dispatch; 377 378 // launch 379 template <typename Policy> 380 struct dataflow_dispatch<Policy, typename std::enable_if< 381 traits::is_launch_policy<Policy>::value 382 >::type> 383 { 384 template < 385 typename Allocator, typename Policy_, 386 typename Component, typename Signature, typename Derived, 387 typename ...Ts> 388 HPX_FORCEINLINE static lcos::future< 389 typename traits::promise_local_result< 390 typename hpx::actions::basic_action< 391 Component, Signature, Derived>::remote_result_type 392 >::type> callhpx::lcos::detail::dataflow_dispatch393 call(Allocator const& alloc, Policy_ && policy, 394 hpx::actions::basic_action<Component, Signature, Derived> const& act, 395 naming::id_type const& id, Ts &&... ts) 396 { 397 return detail::create_dataflow_alloc(alloc, 398 std::forward<Policy_>(policy), Derived{}, 399 id, traits::acquire_future_disp()(std::forward<Ts>(ts))...); 400 } 401 402 template <typename Allocator, typename Policy_, typename F, 403 typename ...Ts> 404 HPX_FORCEINLINE static typename std::enable_if< 405 !traits::is_action<typename std::decay<F>::type>::value, 406 lcos::future< 407 typename detail::dataflow_return< 408 typename std::decay<F>::type, 409 util::tuple<typename traits::acquire_future<Ts>::type...> 410 >::type> 411 >::type callhpx::lcos::detail::dataflow_dispatch412 call(Allocator const& alloc, Policy_ && policy, F && f, 413 Ts &&... ts) 414 { 415 return detail::create_dataflow_alloc(alloc, 416 std::forward<Policy_>(policy), std::forward<F>(f), 417 traits::acquire_future_disp()(std::forward<Ts>(ts))...); 418 } 419 }; 420 421 // parallel executors 422 // threads::executor 423 template <typename Executor> 424 struct dataflow_dispatch<Executor, typename std::enable_if< 425 traits::is_one_way_executor<Executor>::value || 426 traits::is_two_way_executor<Executor>::value || 427 traits::is_threads_executor<Executor>::value 428 >::type> 429 { 430 template <typename Allocator, typename Executor_, typename F, 431 typename ...Ts> 432 HPX_FORCEINLINE static typename std::enable_if< 433 !traits::is_action<typename std::decay<F>::type>::value, 434 lcos::future< 435 typename detail::dataflow_return< 436 typename std::decay<F>::type, 437 util::tuple<typename traits::acquire_future<Ts>::type...> 438 >::type> 439 >::type callhpx::lcos::detail::dataflow_dispatch440 call(Allocator const& alloc, Executor_ && exec, F && f, Ts &&... ts) 441 { 442 return detail::create_dataflow_alloc(alloc, 443 std::forward<Executor_>(exec), std::forward<F>(f), 444 traits::acquire_future_disp()(std::forward<Ts>(ts))...); 445 } 446 }; 447 448 // any action, plain function, or function object 449 template <typename FD> 450 struct dataflow_dispatch<FD, typename std::enable_if< 451 !traits::is_launch_policy<FD>::value && 452 !( 453 traits::is_one_way_executor<FD>::value || 454 traits::is_two_way_executor<FD>::value || 455 traits::is_threads_executor<FD>::value) 456 >::type> 457 { 458 template < 459 typename Allocator, typename Component, typename Signature, 460 typename Derived, typename ...Ts> 461 HPX_FORCEINLINE static auto callhpx::lcos::detail::dataflow_dispatch462 call(Allocator const& alloc, 463 hpx::actions::basic_action<Component, Signature, Derived> const& act, 464 naming::id_type const& id, Ts &&... ts) 465 -> decltype(dataflow_dispatch<launch>::call( 466 alloc, launch::async, act, id, std::forward<Ts>(ts)...)) 467 { 468 return dataflow_dispatch<launch>::call( 469 alloc, launch::async, act, id, std::forward<Ts>(ts)...); 470 } 471 472 template < 473 typename Allocator, typename F, typename ...Ts, 474 typename Enable = typename std::enable_if< 475 !traits::is_action<typename std::decay<F>::type>::value 476 >::type> 477 HPX_FORCEINLINE static auto callhpx::lcos::detail::dataflow_dispatch478 call(Allocator const& alloc, F && f, Ts &&... ts) 479 -> decltype(dataflow_dispatch<launch>::call( 480 alloc, launch::async, std::forward<F>(f), 481 std::forward<Ts>(ts)...)) 482 { 483 return dataflow_dispatch<launch>::call( 484 alloc, launch::async, std::forward<F>(f), 485 std::forward<Ts>(ts)...); 486 } 487 }; 488 489 /////////////////////////////////////////////////////////////////////////// 490 template <typename Action, typename T0, typename Enable = void> 491 struct dataflow_action_dispatch 492 { 493 template <typename Allocator, typename ...Ts> 494 HPX_FORCEINLINE static lcos::future< 495 typename traits::promise_local_result< 496 typename hpx::traits::extract_action<Action>::remote_result_type 497 >::type> callhpx::lcos::detail::dataflow_action_dispatch498 call(Allocator const& alloc, naming::id_type const& id, 499 Ts &&... ts) 500 { 501 return dataflow_dispatch<Action>::call(alloc, 502 Action(), id, std::forward<Ts>(ts)...); 503 } 504 }; 505 506 template <typename Action, typename Policy> 507 struct dataflow_action_dispatch<Action, Policy, typename std::enable_if< 508 traits::is_launch_policy<typename std::decay<Policy>::type>::value 509 >::type> 510 { 511 template <typename Allocator, typename ...Ts> 512 HPX_FORCEINLINE static lcos::future< 513 typename traits::promise_local_result< 514 typename hpx::traits::extract_action<Action>::remote_result_type 515 >::type> callhpx::lcos::detail::dataflow_action_dispatch516 call(Allocator const& alloc, Policy && policy, 517 naming::id_type const& id, Ts &&... ts) 518 { 519 return dataflow_dispatch<typename std::decay<Policy>::type>:: 520 call(alloc, std::forward<Policy>(policy), Action(), id, 521 std::forward<Ts>(ts)...); 522 } 523 }; 524 }}} 525 526 /////////////////////////////////////////////////////////////////////////////// 527 namespace hpx 528 { 529 template <typename F, typename ...Ts> 530 HPX_FORCEINLINE dataflow(F && f,Ts &&...ts)531 auto dataflow(F && f, Ts &&... ts) 532 -> decltype( 533 lcos::detail::dataflow_dispatch<typename std::decay<F>::type>::call( 534 hpx::util::internal_allocator<>{}, std::forward<F>(f), 535 std::forward<Ts>(ts)... 536 )) 537 { 538 return lcos::detail::dataflow_dispatch<typename std::decay<F>::type>:: 539 call(hpx::util::internal_allocator<>{}, std::forward<F>(f), 540 std::forward<Ts>(ts)...); 541 } 542 543 template <typename Allocator, typename F, typename ...Ts> 544 HPX_FORCEINLINE dataflow_alloc(Allocator const & alloc,F && f,Ts &&...ts)545 auto dataflow_alloc(Allocator const& alloc, F && f, Ts &&... ts) 546 -> decltype( 547 lcos::detail::dataflow_dispatch<typename std::decay<F>::type>:: 548 call(alloc, std::forward<F>(f), std::forward<Ts>(ts)... 549 )) 550 { 551 return lcos::detail::dataflow_dispatch<typename std::decay<F>::type>:: 552 call(alloc, std::forward<F>(f), std::forward<Ts>(ts)...); 553 } 554 555 template < 556 typename Action, typename T0, typename ...Ts, 557 typename Enable = typename std::enable_if< 558 traits::is_action<Action>::value>::type> 559 HPX_FORCEINLINE dataflow(T0 && t0,Ts &&...ts)560 auto dataflow(T0 && t0, Ts &&... ts) 561 -> decltype(lcos::detail::dataflow_action_dispatch<Action, T0>::call( 562 hpx::util::internal_allocator<>{}, std::forward<T0>(t0), 563 std::forward<Ts>(ts)...)) 564 { 565 return lcos::detail::dataflow_action_dispatch<Action, T0>::call( 566 hpx::util::internal_allocator<>{}, std::forward<T0>(t0), 567 std::forward<Ts>(ts)...); 568 } 569 570 template < 571 typename Action, typename Allocator, typename T0, typename ...Ts, 572 typename Enable = typename std::enable_if< 573 traits::is_action<Action>::value>::type> 574 HPX_FORCEINLINE dataflow_alloc(Allocator const & alloc,T0 && t0,Ts &&...ts)575 auto dataflow_alloc(Allocator const& alloc, T0 && t0, Ts &&... ts) 576 -> decltype(lcos::detail::dataflow_action_dispatch<Action, T0>::call( 577 alloc, std::forward<T0>(t0), std::forward<Ts>(ts)...)) 578 { 579 return lcos::detail::dataflow_action_dispatch<Action, T0>::call( 580 alloc, std::forward<T0>(t0), std::forward<Ts>(ts)...); 581 } 582 } 583 584 #endif /*HPX_LCOS_DATAFLOW_HPP*/ 585