1 //  Copyright (c) 2016 Thomas Heller
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/state.hpp>
7 #include <hpx/lcos/barrier.hpp>
8 #include <hpx/lcos/detail/barrier_node.hpp>
9 #include <hpx/lcos/when_all.hpp>
10 #include <hpx/runtime.hpp>
11 #include <hpx/runtime/basename_registration.hpp>
12 #include <hpx/runtime/components/server/component_heap.hpp>
13 #include <hpx/runtime/launch_policy.hpp>
14 #include <hpx/runtime/threads/run_as_hpx_thread.hpp>
15 #include <hpx/util/assert.hpp>
16 #include <hpx/util/runtime_configuration.hpp>
17 #include <hpx/util/unused.hpp>
18 
19 #include <boost/intrusive_ptr.hpp>
20 
21 #include <cstddef>
22 #include <string>
23 #include <utility>
24 
25 ///////////////////////////////////////////////////////////////////////////////
26 namespace hpx
27 {
28     bool is_stopped_or_shutting_down();
29 }
30 
31 namespace hpx { namespace lcos {
barrier(std::string const & base_name)32     barrier::barrier(std::string const& base_name)
33       : node_(new (hpx::components::component_heap<wrapping_type>().alloc())
34             wrapping_type(new wrapped_type(base_name,
35                 hpx::get_num_localities(hpx::launch::sync),
36                 hpx::get_locality_id())))
37     {
38         if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0)
39             register_with_basename(
40                 base_name, node_->get_unmanaged_id(), (*node_)->rank_).get();
41     }
42 
barrier(std::string const & base_name,std::size_t num)43     barrier::barrier(std::string const& base_name, std::size_t num)
44       : node_(new (hpx::components::component_heap<wrapping_type>().alloc())
45             wrapping_type(new wrapped_type(base_name, num, hpx::get_locality_id()
46         )))
47     {
48         if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0)
49             register_with_basename(
50                 base_name, node_->get_unmanaged_id(), (*node_)->rank_).get();
51     }
52 
barrier(std::string const & base_name,std::size_t num,std::size_t rank)53     barrier::barrier(std::string const& base_name, std::size_t num, std::size_t rank)
54       : node_(new (hpx::components::component_heap<wrapping_type>().alloc())
55             wrapping_type(new wrapped_type(base_name, num, rank)))
56     {
57         if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0)
58             register_with_basename(
59                 base_name, node_->get_unmanaged_id(), (*node_)->rank_).get();
60     }
61 
barrier()62     barrier::barrier()
63     {}
64 
barrier(barrier && other)65     barrier::barrier(barrier&& other)
66       : node_(std::move(other.node_))
67     {
68         other.node_.reset();
69     }
70 
operator =(barrier && other)71     barrier& barrier::operator=(barrier&& other)
72     {
73         release();
74         node_ = std::move(other.node_);
75         other.node_.reset();
76 
77         return *this;
78     }
79 
~barrier()80     barrier::~barrier()
81     {
82         release();
83     }
84 
wait()85     void barrier::wait()
86     {
87         (*node_)->wait(false).get();
88     }
89 
wait(hpx::launch::async_policy)90     future<void> barrier::wait(hpx::launch::async_policy)
91     {
92         return (*node_)->wait(true);
93     }
94 
release()95     void barrier::release()
96     {
97         if (node_)
98         {
99             if (hpx::get_runtime_ptr() != nullptr &&
100                 hpx::threads::threadmanager_is(state_running) &&
101                 !hpx::is_stopped_or_shutting_down())
102             {
103                 // make sure this runs as an HPX thread
104                 if (hpx::threads::get_self_ptr() == nullptr)
105                 {
106                     return hpx::threads::run_as_hpx_thread(
107                         &barrier::release, this);
108                 }
109 
110                 hpx::future<void> f;
111                 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0)
112                 {
113                     f = hpx::unregister_with_basename(
114                         (*node_)->base_name_, (*node_)->rank_);
115                 }
116 
117                 // we need to wait on everyone to have its name unregistered,
118                 // and hold on to our node long enough...
119                 boost::intrusive_ptr<wrapping_type> node = node_;
120                 hpx::when_all(f, wait(hpx::launch::async)).then(
121                     hpx::launch::sync,
122                     [HPX_CAPTURE_MOVE(node)](hpx::future<void> f)
123                     {
124                         HPX_UNUSED(node);
125                         f.get();
126                     }
127                 ).get();
128             }
129             node_.reset();
130         }
131     }
132 
detach()133     void barrier::detach()
134     {
135         if (node_)
136         {
137             if (hpx::get_runtime_ptr() != nullptr &&
138                 hpx::threads::threadmanager_is(state_running) &&
139                 !hpx::is_stopped_or_shutting_down())
140             {
141                 if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0)
142                     hpx::unregister_with_basename(
143                         (*node_)->base_name_, (*node_)->rank_);
144             }
145             node_.reset();
146         }
147     }
148 
create_global_barrier()149     barrier barrier::create_global_barrier()
150     {
151         runtime& rt = get_runtime();
152         util::runtime_configuration const& cfg = rt.get_config();
153         return barrier("/0/hpx/global_barrier", cfg.get_num_localities());
154     }
155 
get_global_barrier()156     barrier& barrier::get_global_barrier()
157     {
158         static barrier b;
159         return b;
160     }
161 
synchronize()162     void barrier::synchronize()
163     {
164         static barrier& b = get_global_barrier();
165         HPX_ASSERT(b.node_);
166         b.wait();
167     }
168 }}
169