1 // Copyright (c) 2007-2018 Hartmut Kaiser 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #if !defined(HPX_COMPONENTS_SERVER_MIGRATION_SUPPORT_FEB_03_2014_0230PM) 7 #define HPX_COMPONENTS_SERVER_MIGRATION_SUPPORT_FEB_03_2014_0230PM 8 9 #include <hpx/config.hpp> 10 #include <hpx/lcos/future.hpp> 11 #include <hpx/lcos/local/promise.hpp> 12 #include <hpx/lcos/local/spinlock.hpp> 13 #include <hpx/runtime/agas/interface.hpp> 14 #include <hpx/runtime/components/pinned_ptr.hpp> 15 #include <hpx/runtime/naming/id_type.hpp> 16 #include <hpx/runtime/threads_fwd.hpp> 17 #include <hpx/traits/action_decorate_function.hpp> 18 #include <hpx/util/assert.hpp> 19 #include <hpx/util/bind.hpp> 20 21 #include <cstdint> 22 #include <mutex> 23 #include <type_traits> 24 #include <utility> 25 26 namespace hpx { namespace components 27 { 28 /// This hook has to be inserted into the derivation chain of any component 29 /// for it to support migration. 30 template <typename BaseComponent, typename Mutex = lcos::local::spinlock> 31 struct migration_support : BaseComponent 32 { 33 private: 34 typedef Mutex mutex_type; 35 typedef BaseComponent base_type; 36 typedef typename base_type::this_component_type this_component_type; 37 38 public: 39 template <typename ...Arg> migration_supporthpx::components::migration_support40 migration_support(Arg &&... arg) 41 : base_type(std::forward<Arg>(arg)...) 42 , pin_count_(0) 43 , was_marked_for_migration_(false) 44 {} 45 ~migration_supporthpx::components::migration_support46 ~migration_support() 47 { 48 // prevent base destructor from unregistering the gid if this 49 // instance has been migrated 50 if (pin_count_ == ~0x0u) 51 this->gid_ = naming::invalid_gid; 52 } 53 get_base_gidhpx::components::migration_support54 naming::gid_type get_base_gid( 55 naming::gid_type const& assign_gid = naming::invalid_gid) const 56 { 57 naming::gid_type result = 58 this->BaseComponent::get_base_gid_dynamic(assign_gid, 59 static_cast<this_component_type const&>(*this) 60 .get_current_address(), 61 [](naming::gid_type gid) -> naming::gid_type 62 { 63 // we don't store migrating objects in the AGAS cache 64 naming::detail::set_dont_store_in_cache(gid); 65 // also mark gid as migratable 66 naming::detail::set_is_migratable(gid); 67 return gid; 68 }); 69 return result; 70 } 71 72 // This component type supports migration. supports_migrationhpx::components::migration_support73 HPX_CONSTEXPR static bool supports_migration() { return true; } 74 75 // Pinning functionality pinhpx::components::migration_support76 void pin() 77 { 78 std::lock_guard<mutex_type> l(mtx_); 79 HPX_ASSERT(pin_count_ != ~0x0u); 80 if (pin_count_ != ~0x0u) 81 ++pin_count_; 82 } unpinhpx::components::migration_support83 bool unpin() 84 { 85 // no need to go through AGAS if the object is currently pinned 86 // more than once 87 { 88 std::unique_lock<mutex_type> l(mtx_); 89 if (this->pin_count_ != ~0x0u && this->pin_count_ > 1) 90 { 91 --this->pin_count_; 92 return false; 93 } 94 } 95 96 // make sure to always grab the AGAS lock first 97 bool was_migrated = false; 98 agas::mark_as_migrated(this->gid_, 99 [this, &was_migrated]() mutable -> std::pair<bool, hpx::future<void> > 100 { 101 std::unique_lock<mutex_type> l(mtx_); 102 was_migrated = this->pin_count_ == ~0x0u; 103 HPX_ASSERT(this->pin_count_ != 0); 104 if (this->pin_count_ != ~0x0u) 105 { 106 if (--this->pin_count_ == 0) 107 { 108 // trigger pending migration if this was the last 109 // unpin and a migration operation is pending 110 HPX_ASSERT(trigger_migration_.valid()); 111 if (was_marked_for_migration_) 112 { 113 was_marked_for_migration_ = false; 114 115 hpx::lcos::local::promise<void> p; 116 std::swap(p, trigger_migration_); 117 118 l.unlock(); 119 120 p.set_value(); 121 return std::make_pair(true, make_ready_future()); 122 } 123 } 124 } 125 return std::make_pair(false, make_ready_future()); 126 }).get(); 127 128 return was_migrated; 129 } 130 pin_counthpx::components::migration_support131 std::uint32_t pin_count() const 132 { 133 std::lock_guard<mutex_type> l(mtx_); 134 return pin_count_; 135 } mark_as_migratedhpx::components::migration_support136 void mark_as_migrated() 137 { 138 std::lock_guard<mutex_type> l(mtx_); 139 HPX_ASSERT(1 == pin_count_); 140 pin_count_ = ~0x0u; 141 } 142 mark_as_migratedhpx::components::migration_support143 hpx::future<void> mark_as_migrated(hpx::id_type const& to_migrate) 144 { 145 // we need to first lock the AGAS migrated objects table, only then 146 // access (lock) the object 147 return agas::mark_as_migrated(to_migrate.get_gid(), 148 [this]() mutable -> std::pair<bool, hpx::future<void> > 149 { 150 std::unique_lock<mutex_type> l(mtx_); 151 152 // make sure that no migration is currently in flight 153 if (was_marked_for_migration_) 154 { 155 l.unlock(); 156 return std::make_pair(false, 157 hpx::make_exceptional_future<void>( 158 HPX_GET_EXCEPTION(invalid_status, 159 "migration_support::mark_as_migrated", 160 "migration operation is already in flight") 161 )); 162 } 163 164 if (1 == pin_count_) 165 { 166 // all is well, migration can be triggered now 167 return std::make_pair(true, make_ready_future()); 168 } 169 170 // delay migrate operation until pin count goes to zero 171 was_marked_for_migration_ = true; 172 hpx::future<void> f = trigger_migration_.get_future(); 173 174 l.unlock(); 175 return std::make_pair(true, std::move(f)); 176 }); 177 } 178 179 /// This hook is invoked on the newly created object after the migration 180 /// has been finished on_migratedhpx::components::migration_support181 HPX_CXX14_CONSTEXPR void on_migrated() {} 182 183 typedef void decorates_action; 184 185 /// This is the hook implementation for decorate_action which makes 186 /// sure that the object becomes pinned during the execution of an 187 /// action. 188 template <typename F> 189 static threads::thread_function_type decorate_actionhpx::components::migration_support190 decorate_action(naming::address::address_type lva, F && f) 191 { 192 // Make sure we pin the component at construction of the bound object 193 // which will also unpin it once the thread runs to completion (the 194 // bound object goes out of scope). 195 return util::bind( 196 util::one_shot(&migration_support::thread_function), 197 get_lva<this_component_type>::call(lva), 198 util::placeholders::_1, 199 traits::component_decorate_function<base_type>::call( 200 lva, std::forward<F>(f)), 201 components::pinned_ptr::create<this_component_type>(lva)); 202 } 203 204 // Return whether the given object was migrated, if it was not 205 // migrated, it also returns a pinned pointer. 206 static std::pair<bool, components::pinned_ptr> was_object_migratedhpx::components::migration_support207 was_object_migrated(hpx::naming::gid_type const& id, 208 naming::address::address_type lva) 209 { 210 return agas::was_object_migrated(id, 211 [lva]() -> components::pinned_ptr 212 { 213 return components::pinned_ptr::create<this_component_type>(lva); 214 }); 215 } 216 217 protected: 218 // Execute the wrapped action. This function is bound in decorate_action 219 // above. The bound object performs the pinning/unpinning. thread_functionhpx::components::migration_support220 threads::thread_result_type thread_function( 221 threads::thread_state_ex_enum state, 222 threads::thread_function_type && f, 223 components::pinned_ptr) 224 { 225 return f(state); 226 } 227 228 private: 229 mutable mutex_type mtx_; 230 std::uint32_t pin_count_; 231 hpx::lcos::local::promise<void> trigger_migration_; 232 bool was_marked_for_migration_; 233 }; 234 }} 235 236 #endif 237