1 // Copyright (c) 2016 Thomas Heller 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #include <hpx/state.hpp> 7 #include <hpx/lcos/barrier.hpp> 8 #include <hpx/lcos/detail/barrier_node.hpp> 9 #include <hpx/lcos/when_all.hpp> 10 #include <hpx/runtime.hpp> 11 #include <hpx/runtime/basename_registration.hpp> 12 #include <hpx/runtime/components/server/component_heap.hpp> 13 #include <hpx/runtime/launch_policy.hpp> 14 #include <hpx/runtime/threads/run_as_hpx_thread.hpp> 15 #include <hpx/util/assert.hpp> 16 #include <hpx/util/runtime_configuration.hpp> 17 #include <hpx/util/unused.hpp> 18 19 #include <boost/intrusive_ptr.hpp> 20 21 #include <cstddef> 22 #include <string> 23 #include <utility> 24 25 /////////////////////////////////////////////////////////////////////////////// 26 namespace hpx 27 { 28 bool is_stopped_or_shutting_down(); 29 } 30 31 namespace hpx { namespace lcos { barrier(std::string const & base_name)32 barrier::barrier(std::string const& base_name) 33 : node_(new (hpx::components::component_heap<wrapping_type>().alloc()) 34 wrapping_type(new wrapped_type(base_name, 35 hpx::get_num_localities(hpx::launch::sync), 36 hpx::get_locality_id()))) 37 { 38 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0) 39 register_with_basename( 40 base_name, node_->get_unmanaged_id(), (*node_)->rank_).get(); 41 } 42 barrier(std::string const & base_name,std::size_t num)43 barrier::barrier(std::string const& base_name, std::size_t num) 44 : node_(new (hpx::components::component_heap<wrapping_type>().alloc()) 45 wrapping_type(new wrapped_type(base_name, num, hpx::get_locality_id() 46 ))) 47 { 48 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0) 49 register_with_basename( 50 base_name, node_->get_unmanaged_id(), (*node_)->rank_).get(); 51 } 52 barrier(std::string const & base_name,std::size_t num,std::size_t rank)53 barrier::barrier(std::string const& base_name, std::size_t num, std::size_t rank) 54 : node_(new (hpx::components::component_heap<wrapping_type>().alloc()) 55 wrapping_type(new wrapped_type(base_name, num, rank))) 56 { 57 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0) 58 register_with_basename( 59 base_name, node_->get_unmanaged_id(), (*node_)->rank_).get(); 60 } 61 barrier()62 barrier::barrier() 63 {} 64 barrier(barrier && other)65 barrier::barrier(barrier&& other) 66 : node_(std::move(other.node_)) 67 { 68 other.node_.reset(); 69 } 70 operator =(barrier && other)71 barrier& barrier::operator=(barrier&& other) 72 { 73 release(); 74 node_ = std::move(other.node_); 75 other.node_.reset(); 76 77 return *this; 78 } 79 ~barrier()80 barrier::~barrier() 81 { 82 release(); 83 } 84 wait()85 void barrier::wait() 86 { 87 (*node_)->wait(false).get(); 88 } 89 wait(hpx::launch::async_policy)90 future<void> barrier::wait(hpx::launch::async_policy) 91 { 92 return (*node_)->wait(true); 93 } 94 release()95 void barrier::release() 96 { 97 if (node_) 98 { 99 if (hpx::get_runtime_ptr() != nullptr && 100 hpx::threads::threadmanager_is(state_running) && 101 !hpx::is_stopped_or_shutting_down()) 102 { 103 // make sure this runs as an HPX thread 104 if (hpx::threads::get_self_ptr() == nullptr) 105 { 106 return hpx::threads::run_as_hpx_thread( 107 &barrier::release, this); 108 } 109 110 hpx::future<void> f; 111 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0) 112 { 113 f = hpx::unregister_with_basename( 114 (*node_)->base_name_, (*node_)->rank_); 115 } 116 117 // we need to wait on everyone to have its name unregistered, 118 // and hold on to our node long enough... 119 boost::intrusive_ptr<wrapping_type> node = node_; 120 hpx::when_all(f, wait(hpx::launch::async)).then( 121 hpx::launch::sync, 122 [HPX_CAPTURE_MOVE(node)](hpx::future<void> f) 123 { 124 HPX_UNUSED(node); 125 f.get(); 126 } 127 ).get(); 128 } 129 node_.reset(); 130 } 131 } 132 detach()133 void barrier::detach() 134 { 135 if (node_) 136 { 137 if (hpx::get_runtime_ptr() != nullptr && 138 hpx::threads::threadmanager_is(state_running) && 139 !hpx::is_stopped_or_shutting_down()) 140 { 141 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0) 142 hpx::unregister_with_basename( 143 (*node_)->base_name_, (*node_)->rank_); 144 } 145 node_.reset(); 146 } 147 } 148 create_global_barrier()149 barrier barrier::create_global_barrier() 150 { 151 runtime& rt = get_runtime(); 152 util::runtime_configuration const& cfg = rt.get_config(); 153 return barrier("/0/hpx/global_barrier", cfg.get_num_localities()); 154 } 155 get_global_barrier()156 barrier& barrier::get_global_barrier() 157 { 158 static barrier b; 159 return b; 160 } 161 synchronize()162 void barrier::synchronize() 163 { 164 static barrier& b = get_global_barrier(); 165 HPX_ASSERT(b.node_); 166 b.wait(); 167 } 168 }} 169