1 //  Copyright (c) 2013-2015 Thomas Heller
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/config.hpp>
7 
8 #if defined(HPX_HAVE_NETWORKING)
9 
10 #if defined(HPX_HAVE_PARCELPORT_MPI)
11 #include <hpx/plugins/parcelport/mpi/mpi.hpp>
12 #endif
13 
14 #include <hpx/util/runtime_configuration.hpp>
15 #include <hpx/util/command_line_handling.hpp>
16 #include <hpx/plugins/parcelport/mpi/mpi_environment.hpp>
17 
18 #include <boost/assign/std/vector.hpp>
19 #include <boost/lexical_cast.hpp>
20 #include <boost/tokenizer.hpp>
21 
22 #include <cstddef>
23 #include <cstdlib>
24 #include <string>
25 
26 namespace hpx { namespace util
27 {
28     namespace detail
29     {
detect_mpi_environment(util::runtime_configuration const & cfg,char const * default_env)30         bool detect_mpi_environment(util::runtime_configuration const& cfg,
31             char const* default_env)
32         {
33 #if defined(__bgq__)
34             // If running on BG/Q, we can safely assume to always run in an
35             // MPI environment
36             return true;
37 #else
38             std::string mpi_environment_strings = cfg.get_entry(
39                 "hpx.parcel.mpi.env", default_env);
40 
41             typedef
42                 boost::tokenizer<boost::char_separator<char> >
43                 tokenizer;
44             boost::char_separator<char> sep(";,: ");
45             tokenizer tokens(mpi_environment_strings, sep);
46             for(tokenizer::iterator it = tokens.begin(); it != tokens.end(); ++it)
47             {
48                 char *env = std::getenv(it->c_str());
49                 if(env) return true;
50             }
51             return false;
52 #endif
53         }
54 
get_cfg_entry(runtime_configuration const & cfg,std::string const & str,int dflt)55         int get_cfg_entry(runtime_configuration const& cfg,
56             std::string const& str, int dflt)
57         {
58             try {
59                 return boost::lexical_cast<int>(
60                     cfg.get_entry(str, dflt));
61             }
62             catch (boost::bad_lexical_cast const&) {
63                 /**/;
64             }
65             return dflt;
66         }
67 
get_cfg_entry(command_line_handling & cfg,std::string const & str,int dflt)68         int get_cfg_entry(command_line_handling& cfg, std::string const& str,
69             int dflt)
70         {
71             return get_cfg_entry(cfg.rtcfg_, str, dflt);
72         }
73     }
74 }}
75 
76 namespace hpx { namespace util
77 {
78     mpi_environment::mutex_type mpi_environment::mtx_;
79     bool mpi_environment::enabled_ = false;
80     bool mpi_environment::has_called_init_ = false;
81     int mpi_environment::provided_threading_flag_ = MPI_THREAD_SINGLE;
82     MPI_Comm mpi_environment::communicator_ = MPI_COMM_NULL;
83 
84     ///////////////////////////////////////////////////////////////////////////
check_mpi_environment(runtime_configuration const & cfg)85     bool mpi_environment::check_mpi_environment(runtime_configuration const& cfg)
86     {
87         if (detail::get_cfg_entry(cfg, "hpx.parcel.mpi.enable", 1) == 0)
88             return false;
89 
90         // We disable the MPI parcelport if the application is not run using
91         // mpirun and the tcp/ip parcelport is not explicitly disabled
92         //
93         // The bottom line is that we use the MPI parcelport either when the
94         // application was executed using mpirun or if the tcp/ip parcelport
95         // was disabled.
96         if (!detail::detect_mpi_environment(cfg, HPX_HAVE_PARCELPORT_MPI_ENV) &&
97             detail::get_cfg_entry(cfg, "hpx.parcel.tcp.enable", 1))
98         {
99             return false;
100         }
101 
102         return true;
103     }
104 
init(int * argc,char *** argv,command_line_handling & cfg)105     void mpi_environment::init(int *argc, char ***argv, command_line_handling& cfg)
106     {
107         using namespace boost::assign;
108 
109         int this_rank = -1;
110         has_called_init_ = false;
111 
112         // We assume to use the MPI parcelport if it is not explicitly disabled
113         enabled_ = check_mpi_environment(cfg.rtcfg_);
114         if (!enabled_)
115         {
116             cfg.ini_config_.push_back("hpx.parcel.mpi.enable = 0");
117             return;
118         }
119 
120         cfg.ini_config_ += "hpx.parcel.bootstrap!=mpi";
121 
122 #if defined(HPX_HAVE_PARCELPORT_MPI_MULTITHREADED)
123         int flag = (detail::get_cfg_entry(
124             cfg, "hpx.parcel.mpi.multithreaded", 1) != 0) ?
125                 MPI_THREAD_MULTIPLE : MPI_THREAD_SINGLE;
126 
127 #if defined(MVAPICH2_VERSION) && defined(_POSIX_SOURCE)
128         // This enables multi threading support in MVAPICH2 if requested.
129         if(flag == MPI_THREAD_MULTIPLE)
130             setenv("MV2_ENABLE_AFFINITY", "0", 1);
131 #endif
132 
133         int retval = MPI_Init_thread(argc, argv, flag, &provided_threading_flag_);
134 #else
135         int retval = MPI_Init(argc, argv);
136         provided_threading_flag_ = MPI_THREAD_SINGLE;
137 #endif
138         if (MPI_SUCCESS != retval)
139         {
140             if (MPI_ERR_OTHER != retval)
141             {
142                 // explicitly disable mpi if not run by mpirun
143                 cfg.ini_config_.push_back("hpx.parcel.mpi.enable = 0");
144 
145                 enabled_ = false;
146 
147                 int msglen = 0;
148                 char message[MPI_MAX_ERROR_STRING+1];
149                 MPI_Error_string(retval, message, &msglen);
150                 message[msglen] = '\0';
151 
152                 std::string msg("mpi_environment::init: MPI_Init_thread failed: ");
153                 msg = msg + message + ".";
154                 throw std::runtime_error(msg.c_str());
155             }
156 
157             // somebody has already called MPI_Init before, we should be fine
158             has_called_init_ = false;
159         }
160         else
161         {
162             has_called_init_ = true;
163         }
164 
165         MPI_Comm_dup(MPI_COMM_WORLD, &communicator_);
166 
167         if (provided_threading_flag_ < MPI_THREAD_SERIALIZED)
168         {
169             // explicitly disable mpi if not run by mpirun
170             cfg.ini_config_.push_back("hpx.parcel.mpi.multithreaded = 0");
171         }
172 
173         if(provided_threading_flag_ == MPI_THREAD_FUNNELED)
174         {
175             enabled_ = false;
176             has_called_init_ = false;
177             throw std::runtime_error("mpi_environment::init: MPI_Init_thread: "
178                 "The underlying MPI implementation only supports "
179                 "MPI_THREAD_FUNNELED. This mode is not supported by HPX. Please "
180                 "pass -Ihpx.parcel.mpi.multithreaded=0 to explicitly disable MPI"
181                 " multithreading.");
182         }
183 
184         this_rank = rank();
185         cfg.num_localities_ = static_cast<std::size_t>(size());
186 
187         if(this_rank == 0)
188         {
189             cfg.rtcfg_.mode_ = hpx::runtime_mode_console;
190         }
191         else
192         {
193             cfg.rtcfg_.mode_ = hpx::runtime_mode_worker;
194         }
195 
196         cfg.ini_config_ += std::string("hpx.parcel.mpi.rank!=") +
197             std::to_string(this_rank);
198         cfg.ini_config_ += std::string("hpx.parcel.mpi.processorname!=") +
199             get_processor_name();
200 
201         cfg.node_ = std::size_t(this_rank);
202     }
203 
get_processor_name()204     std::string mpi_environment::get_processor_name()
205     {
206         char name[MPI_MAX_PROCESSOR_NAME + 1] = { '\0' };
207         int len = 0;
208         MPI_Get_processor_name(name, &len);
209 
210         return name;
211     }
212 
finalize()213     void mpi_environment::finalize()
214     {
215         if(enabled() && has_called_init())
216         {
217             MPI_Finalize();
218         }
219     }
220 
enabled()221     bool mpi_environment::enabled()
222     {
223         return enabled_;
224     }
225 
multi_threaded()226     bool mpi_environment::multi_threaded()
227     {
228         return provided_threading_flag_ >= MPI_THREAD_SERIALIZED;
229     }
230 
has_called_init()231     bool mpi_environment::has_called_init()
232     {
233         return has_called_init_;
234     }
235 
size()236     int mpi_environment::size()
237     {
238         int res(-1);
239         if(enabled())
240             MPI_Comm_size(communicator(), &res);
241         return res;
242     }
243 
rank()244     int mpi_environment::rank()
245     {
246         int res(-1);
247         if(enabled())
248             MPI_Comm_rank(communicator(), &res);
249         return res;
250     }
251 
communicator()252     MPI_Comm& mpi_environment::communicator()
253     {
254         return communicator_;
255     }
256 
scoped_lock()257     mpi_environment::scoped_lock::scoped_lock()
258     {
259         if(!multi_threaded())
260             mtx_.lock();
261     }
262 
~scoped_lock()263     mpi_environment::scoped_lock::~scoped_lock()
264     {
265         if(!multi_threaded())
266             mtx_.unlock();
267     }
268 
unlock()269     void mpi_environment::scoped_lock::unlock()
270     {
271         if(!multi_threaded())
272             mtx_.unlock();
273     }
274 
scoped_try_lock()275     mpi_environment::scoped_try_lock::scoped_try_lock()
276       : locked(true)
277     {
278         if(!multi_threaded())
279         {
280             locked = mtx_.try_lock();
281         }
282     }
283 
~scoped_try_lock()284     mpi_environment::scoped_try_lock::~scoped_try_lock()
285     {
286         if(!multi_threaded() && locked)
287             mtx_.unlock();
288     }
289 
unlock()290     void mpi_environment::scoped_try_lock::unlock()
291     {
292         if(!multi_threaded() && locked)
293         {
294             locked = false;
295             mtx_.unlock();
296         }
297     }
298 }}
299 
300 #endif
301