1 //  Copyright (c) 2007-2016 Hartmut Kaiser
2 //  Copyright (c)      2011 Bryce Lelbach
3 //  Copyright (c)      2011 Thomas Heller
4 //
5 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
6 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 
8 /// \file transfer_continuation_action.hpp
9 
10 #ifndef HPX_RUNTIME_ACTIONS_TRANSFER_CONTINUATION_ACTION_HPP
11 #define HPX_RUNTIME_ACTIONS_TRANSFER_CONTINUATION_ACTION_HPP
12 
13 #include <hpx/config.hpp>
14 #include <hpx/runtime/actions/continuation.hpp>
15 #include <hpx/runtime/actions/transfer_base_action.hpp>
16 #include <hpx/runtime/applier/apply_helper.hpp>
17 #include <hpx/runtime/parcelset/detail/per_action_data_counter_registry.hpp>
18 #include <hpx/runtime/serialization/input_archive.hpp>
19 #include <hpx/runtime/serialization/output_archive.hpp>
20 #include <hpx/runtime/serialization/serialization_fwd.hpp>
21 #include <hpx/runtime/threads/thread_helpers.hpp>
22 #include <hpx/runtime/threads/thread_init_data.hpp>
23 #include <hpx/util/detail/pack.hpp>
24 
25 #include <cstddef>
26 #include <cstdint>
27 #include <utility>
28 
29 #include <hpx/config/warnings_prefix.hpp>
30 
31 namespace hpx { namespace actions
32 {
33     /// \cond NOINTERNAL
34 
35     ///////////////////////////////////////////////////////////////////////////
36     template <typename Action>
37     struct transfer_continuation_action : transfer_base_action<Action>
38     {
39     public:
40         HPX_NON_COPYABLE(transfer_continuation_action);
41 
42         typedef transfer_base_action<Action> base_type;
43         typedef typename base_type::continuation_type continuation_type;
44 
45     public:
46         // construct an empty transfer_continuation_action to avoid serialization
47         // overhead
48         transfer_continuation_action() = default;
49 
50         // construct an action from its arguments
51         template <typename ...Ts>
52         explicit transfer_continuation_action(continuation_type&& cont, Ts&&... vs);
53 
54         template <typename ...Ts>
55         transfer_continuation_action(
56             threads::thread_priority priority, continuation_type&& cont,
57             Ts&&... vs);
58 
59         bool has_continuation() const override;
60 
61         /// The \a get_thread_function constructs a proper thread function for
62         /// a \a thread, encapsulating the functionality and the arguments
63         /// of the action it is called for.
64         ///
65         /// \param lva    [in] This is the local virtual address of the
66         ///               component the action has to be invoked on.
67         ///
68         /// \returns      This function returns a proper thread function usable
69         ///               for a \a thread.
70         ///
71         /// \note This \a get_thread_function will be invoked to retrieve the
72         ///       thread function for an action which has to be invoked without
73         ///       continuations.
74         template <std::size_t ...Is>
75         threads::thread_function_type
76         get_thread_function(util::detail::pack_c<std::size_t, Is...>,
77             naming::id_type&& target, naming::address::address_type lva,
78             naming::address::component_type comptype);
79 
80         threads::thread_function_type
81         get_thread_function(naming::id_type&& target,
82             naming::address::address_type lva,
83             naming::address::component_type comptype) override;
84 
85         template <std::size_t... Is>
86         void schedule_thread(util::detail::pack_c<std::size_t, Is...>,
87             naming::gid_type const& target_gid,
88             naming::address::address_type lva,
89             naming::address::component_type comptype, std::size_t num_thread);
90 
91         // schedule a new thread
92         void schedule_thread(naming::gid_type const& target_gid,
93             naming::address::address_type lva,
94             naming::address::component_type comptype,
95             std::size_t num_thread) override;
96 
97         // serialization support
98         // loading ...
99         void load(hpx::serialization::input_archive & ar) override;
100 
101         // saving ...
102         void save(hpx::serialization::output_archive & ar) override;
103 
104         void load_schedule(serialization::input_archive& ar,
105             naming::gid_type&& target, naming::address_type lva,
106             naming::component_type comptype, std::size_t num_thread,
107             bool& deferred_schedule) override;
108 
109     private:
110         continuation_type cont_;
111     };
112     /// \endcond
113 
114     template <typename Action>
115     template <typename ...Ts>
transfer_continuation_action(continuation_type && cont,Ts &&...vs)116     transfer_continuation_action<Action>::transfer_continuation_action(
117         continuation_type&& cont, Ts&&... vs)
118       : base_type(std::forward<Ts>(vs)...)
119       , cont_(std::move(cont))
120     {}
121 
122     template <typename Action>
123     template <typename ...Ts>
transfer_continuation_action(threads::thread_priority priority,continuation_type && cont,Ts &&...vs)124     transfer_continuation_action<Action>::transfer_continuation_action(
125         threads::thread_priority priority, continuation_type&& cont, Ts&&... vs)
126       : base_type(priority, std::forward<Ts>(vs)...)
127       , cont_(std::move(cont))
128     {}
129 
130     template <typename Action>
has_continuation() const131     bool transfer_continuation_action<Action>::has_continuation() const
132     {
133         return true;
134     }
135 
136     template <typename Action>
137     template <std::size_t ...Is>
138     threads::thread_function_type
get_thread_function(util::detail::pack_c<std::size_t,Is...>,naming::id_type && target,naming::address::address_type lva,naming::address::component_type comptype)139     transfer_continuation_action<Action>::get_thread_function(
140         util::detail::pack_c<std::size_t, Is...>,
141         naming::id_type&& target, naming::address::address_type lva,
142         naming::address::component_type comptype)
143     {
144         return base_type::derived_type::construct_thread_function(
145             std::move(target), std::move(cont_), lva, comptype,
146             util::get<Is>(std::move(this->arguments_))...);
147     }
148 
149     template <typename Action>
150     threads::thread_function_type
get_thread_function(naming::id_type && target,naming::address::address_type lva,naming::address::component_type comptype)151     transfer_continuation_action<Action>::get_thread_function(
152         naming::id_type&& target, naming::address::address_type lva,
153         naming::address::component_type comptype)
154     {
155         return get_thread_function(
156             typename util::detail::make_index_pack<Action::arity>::type(),
157             std::move(target), lva, comptype);
158     }
159 
160     template <typename Action>
161     template <std::size_t ...Is>
162     void
schedule_thread(util::detail::pack_c<std::size_t,Is...>,naming::gid_type const & target_gid,naming::address::address_type lva,naming::address::component_type comptype,std::size_t)163     transfer_continuation_action<Action>::schedule_thread(
164         util::detail::pack_c<std::size_t, Is...>,
165         naming::gid_type const& target_gid,
166         naming::address::address_type lva,
167         naming::address::component_type comptype, std::size_t /*num_thread*/)
168     {
169         naming::id_type target;
170         if (naming::detail::has_credits(target_gid))
171         {
172             target = naming::id_type(target_gid, naming::id_type::managed);
173         }
174 
175         threads::thread_init_data data;
176 #if defined(HPX_HAVE_THREAD_PARENT_REFERENCE)
177         data.parent_id = this->parent_id_;
178         data.parent_locality_id = this->parent_locality_;
179 #endif
180         applier::detail::apply_helper<typename base_type::derived_type>::call(
181             std::move(data), std::move(cont_), target, lva, comptype,
182             this->priority_, std::move(util::get<Is>(this->arguments_))...);
183     }
184 
185     template <typename Action>
schedule_thread(naming::gid_type const & target_gid,naming::address::address_type lva,naming::address::component_type comptype,std::size_t num_thread)186     void transfer_continuation_action<Action>::schedule_thread(
187         naming::gid_type const& target_gid,
188         naming::address::address_type lva,
189         naming::address::component_type comptype, std::size_t num_thread)
190     {
191         schedule_thread(
192             typename util::detail::make_index_pack<Action::arity>::type(),
193             target_gid, lva, comptype, num_thread);
194 
195         // keep track of number of invocations
196         this->increment_invocation_count();
197     }
198 
199     template <typename Action>
load(hpx::serialization::input_archive & ar)200     void transfer_continuation_action<Action>::load(
201         hpx::serialization::input_archive & ar)
202     {
203         this->load_base(ar);
204         ar >> cont_;
205     }
206 
207     template <typename Action>
save(hpx::serialization::output_archive & ar)208     void transfer_continuation_action<Action>::save(
209         hpx::serialization::output_archive & ar)
210     {
211         this->save_base(ar);
212         ar << cont_;
213     }
214 
215     template <typename Action>
load_schedule(serialization::input_archive & ar,naming::gid_type && target,naming::address_type lva,naming::component_type comptype,std::size_t num_thread,bool & deferred_schedule)216     void transfer_continuation_action<Action>::load_schedule(
217         serialization::input_archive& ar,
218         naming::gid_type&& target, naming::address_type lva,
219         naming::component_type comptype, std::size_t num_thread,
220         bool& deferred_schedule)
221     {
222         // First, serialize, then schedule
223         load(ar);
224 
225         if (deferred_schedule)
226         {
227             // If this is a direct action and deferred schedule was requested,
228             // that is we are not the last parcel, return immediately
229             if (base_type::direct_execution::value) {
230                 return;
231             } else {
232                 // If this is not a direct action, we can safely set deferred_schedule
233                 // to false
234                 deferred_schedule = false;
235             }
236         }
237 
238         schedule_thread(std::move(target), lva, comptype, num_thread);
239     }
240 }}
241 
242 namespace hpx { namespace traits
243 {
244     /// \cond NOINTERNAL
245     template <typename Action>
246     struct needs_automatic_registration<
247         hpx::actions::transfer_continuation_action<Action> >
248       : needs_automatic_registration<Action>
249     {};
250     /// \endcond
251 }}
252 
253 #include <hpx/config/warnings_suffix.hpp>
254 
255 #endif /*HPX_RUNTIME_ACTIONS_TRANSFER_ACTION_HPP*/
256