1 //  Copyright (c) 2007-2018 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_COMPONENTS_SERVER_MIGRATION_SUPPORT_FEB_03_2014_0230PM)
7 #define HPX_COMPONENTS_SERVER_MIGRATION_SUPPORT_FEB_03_2014_0230PM
8 
9 #include <hpx/config.hpp>
10 #include <hpx/lcos/future.hpp>
11 #include <hpx/lcos/local/promise.hpp>
12 #include <hpx/lcos/local/spinlock.hpp>
13 #include <hpx/runtime/agas/interface.hpp>
14 #include <hpx/runtime/components/pinned_ptr.hpp>
15 #include <hpx/runtime/naming/id_type.hpp>
16 #include <hpx/runtime/threads_fwd.hpp>
17 #include <hpx/traits/action_decorate_function.hpp>
18 #include <hpx/util/assert.hpp>
19 #include <hpx/util/bind.hpp>
20 
21 #include <cstdint>
22 #include <mutex>
23 #include <type_traits>
24 #include <utility>
25 
26 namespace hpx { namespace components
27 {
28     /// This hook has to be inserted into the derivation chain of any component
29     /// for it to support migration.
30     template <typename BaseComponent, typename Mutex = lcos::local::spinlock>
31     struct migration_support : BaseComponent
32     {
33     private:
34         typedef Mutex mutex_type;
35         typedef BaseComponent base_type;
36         typedef typename base_type::this_component_type this_component_type;
37 
38     public:
39         template <typename ...Arg>
migration_supporthpx::components::migration_support40         migration_support(Arg &&... arg)
41           : base_type(std::forward<Arg>(arg)...)
42           , pin_count_(0)
43           , was_marked_for_migration_(false)
44         {}
45 
~migration_supporthpx::components::migration_support46         ~migration_support()
47         {
48             // prevent base destructor from unregistering the gid if this
49             // instance has been migrated
50             if (pin_count_ == ~0x0u)
51                 this->gid_ = naming::invalid_gid;
52         }
53 
get_base_gidhpx::components::migration_support54         naming::gid_type get_base_gid(
55             naming::gid_type const& assign_gid = naming::invalid_gid) const
56         {
57             naming::gid_type result =
58                 this->BaseComponent::get_base_gid_dynamic(assign_gid,
59                     static_cast<this_component_type const&>(*this)
60                         .get_current_address(),
61                     [](naming::gid_type gid) -> naming::gid_type
62                     {
63                         // we don't store migrating objects in the AGAS cache
64                         naming::detail::set_dont_store_in_cache(gid);
65                         // also mark gid as migratable
66                         naming::detail::set_is_migratable(gid);
67                         return gid;
68                     });
69             return result;
70         }
71 
72         // This component type supports migration.
supports_migrationhpx::components::migration_support73         HPX_CONSTEXPR static bool supports_migration() { return true; }
74 
75         // Pinning functionality
pinhpx::components::migration_support76         void pin()
77         {
78             std::lock_guard<mutex_type> l(mtx_);
79             HPX_ASSERT(pin_count_ != ~0x0u);
80             if (pin_count_ != ~0x0u)
81                 ++pin_count_;
82         }
unpinhpx::components::migration_support83         bool unpin()
84         {
85             // no need to go through AGAS if the object is currently pinned
86             // more than once
87             {
88                 std::unique_lock<mutex_type> l(mtx_);
89                 if (this->pin_count_ != ~0x0u && this->pin_count_ > 1)
90                 {
91                     --this->pin_count_;
92                     return false;
93                 }
94             }
95 
96             // make sure to always grab the AGAS lock first
97             bool was_migrated = false;
98             agas::mark_as_migrated(this->gid_,
99                 [this, &was_migrated]() mutable -> std::pair<bool, hpx::future<void> >
100                 {
101                     std::unique_lock<mutex_type> l(mtx_);
102                     was_migrated = this->pin_count_ == ~0x0u;
103                     HPX_ASSERT(this->pin_count_ != 0);
104                     if (this->pin_count_ != ~0x0u)
105                     {
106                         if (--this->pin_count_ == 0)
107                         {
108                             // trigger pending migration if this was the last
109                             // unpin and a migration operation is pending
110                             HPX_ASSERT(trigger_migration_.valid());
111                             if (was_marked_for_migration_)
112                             {
113                                 was_marked_for_migration_ = false;
114 
115                                 hpx::lcos::local::promise<void> p;
116                                 std::swap(p, trigger_migration_);
117 
118                                 l.unlock();
119 
120                                 p.set_value();
121                                 return std::make_pair(true, make_ready_future());
122                             }
123                         }
124                     }
125                     return std::make_pair(false, make_ready_future());
126                 }).get();
127 
128             return was_migrated;
129         }
130 
pin_counthpx::components::migration_support131         std::uint32_t pin_count() const
132         {
133             std::lock_guard<mutex_type> l(mtx_);
134             return pin_count_;
135         }
mark_as_migratedhpx::components::migration_support136         void mark_as_migrated()
137         {
138             std::lock_guard<mutex_type> l(mtx_);
139             HPX_ASSERT(1 == pin_count_);
140             pin_count_ = ~0x0u;
141         }
142 
mark_as_migratedhpx::components::migration_support143         hpx::future<void> mark_as_migrated(hpx::id_type const& to_migrate)
144         {
145             // we need to first lock the AGAS migrated objects table, only then
146             // access (lock) the object
147             return agas::mark_as_migrated(to_migrate.get_gid(),
148                 [this]() mutable -> std::pair<bool, hpx::future<void> >
149                 {
150                     std::unique_lock<mutex_type> l(mtx_);
151 
152                     // make sure that no migration is currently in flight
153                     if (was_marked_for_migration_)
154                     {
155                         l.unlock();
156                         return std::make_pair(false,
157                             hpx::make_exceptional_future<void>(
158                                 HPX_GET_EXCEPTION(invalid_status,
159                                     "migration_support::mark_as_migrated",
160                                     "migration operation is already in flight")
161                             ));
162                     }
163 
164                     if (1 == pin_count_)
165                     {
166                         // all is well, migration can be triggered now
167                         return std::make_pair(true, make_ready_future());
168                     }
169 
170                     // delay migrate operation until pin count goes to zero
171                     was_marked_for_migration_ = true;
172                     hpx::future<void> f = trigger_migration_.get_future();
173 
174                     l.unlock();
175                     return std::make_pair(true, std::move(f));
176                 });
177         }
178 
179         /// This hook is invoked on the newly created object after the migration
180         /// has been finished
on_migratedhpx::components::migration_support181         HPX_CXX14_CONSTEXPR void on_migrated() {}
182 
183         typedef void decorates_action;
184 
185         /// This is the hook implementation for decorate_action which makes
186         /// sure that the object becomes pinned during the execution of an
187         /// action.
188         template <typename F>
189         static threads::thread_function_type
decorate_actionhpx::components::migration_support190         decorate_action(naming::address::address_type lva, F && f)
191         {
192             // Make sure we pin the component at construction of the bound object
193             // which will also unpin it once the thread runs to completion (the
194             // bound object goes out of scope).
195             return util::bind(
196                 util::one_shot(&migration_support::thread_function),
197                 get_lva<this_component_type>::call(lva),
198                 util::placeholders::_1,
199                 traits::component_decorate_function<base_type>::call(
200                     lva, std::forward<F>(f)),
201                 components::pinned_ptr::create<this_component_type>(lva));
202         }
203 
204         // Return whether the given object was migrated, if it was not
205         // migrated, it also returns a pinned pointer.
206         static std::pair<bool, components::pinned_ptr>
was_object_migratedhpx::components::migration_support207         was_object_migrated(hpx::naming::gid_type const& id,
208             naming::address::address_type lva)
209         {
210             return agas::was_object_migrated(id,
211                 [lva]() -> components::pinned_ptr
212                 {
213                     return components::pinned_ptr::create<this_component_type>(lva);
214                 });
215         }
216 
217     protected:
218         // Execute the wrapped action. This function is bound in decorate_action
219         // above. The bound object performs the pinning/unpinning.
thread_functionhpx::components::migration_support220         threads::thread_result_type thread_function(
221             threads::thread_state_ex_enum state,
222             threads::thread_function_type && f,
223             components::pinned_ptr)
224         {
225             return f(state);
226         }
227 
228     private:
229         mutable mutex_type mtx_;
230         std::uint32_t pin_count_;
231         hpx::lcos::local::promise<void> trigger_migration_;
232         bool was_marked_for_migration_;
233     };
234 }}
235 
236 #endif
237