1 // Copyright (c) 2013-2015 Thomas Heller 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #include <hpx/config.hpp> 7 8 #if defined(HPX_HAVE_NETWORKING) 9 10 #if defined(HPX_HAVE_PARCELPORT_MPI) 11 #include <hpx/plugins/parcelport/mpi/mpi.hpp> 12 #endif 13 14 #include <hpx/util/runtime_configuration.hpp> 15 #include <hpx/util/command_line_handling.hpp> 16 #include <hpx/plugins/parcelport/mpi/mpi_environment.hpp> 17 18 #include <boost/assign/std/vector.hpp> 19 #include <boost/lexical_cast.hpp> 20 #include <boost/tokenizer.hpp> 21 22 #include <cstddef> 23 #include <cstdlib> 24 #include <string> 25 26 namespace hpx { namespace util 27 { 28 namespace detail 29 { detect_mpi_environment(util::runtime_configuration const & cfg,char const * default_env)30 bool detect_mpi_environment(util::runtime_configuration const& cfg, 31 char const* default_env) 32 { 33 #if defined(__bgq__) 34 // If running on BG/Q, we can safely assume to always run in an 35 // MPI environment 36 return true; 37 #else 38 std::string mpi_environment_strings = cfg.get_entry( 39 "hpx.parcel.mpi.env", default_env); 40 41 typedef 42 boost::tokenizer<boost::char_separator<char> > 43 tokenizer; 44 boost::char_separator<char> sep(";,: "); 45 tokenizer tokens(mpi_environment_strings, sep); 46 for(tokenizer::iterator it = tokens.begin(); it != tokens.end(); ++it) 47 { 48 char *env = std::getenv(it->c_str()); 49 if(env) return true; 50 } 51 return false; 52 #endif 53 } 54 get_cfg_entry(runtime_configuration const & cfg,std::string const & str,int dflt)55 int get_cfg_entry(runtime_configuration const& cfg, 56 std::string const& str, int dflt) 57 { 58 try { 59 return boost::lexical_cast<int>( 60 cfg.get_entry(str, dflt)); 61 } 62 catch (boost::bad_lexical_cast const&) { 63 /**/; 64 } 65 return dflt; 66 } 67 get_cfg_entry(command_line_handling & cfg,std::string const & str,int dflt)68 int get_cfg_entry(command_line_handling& cfg, std::string const& str, 69 int dflt) 70 { 71 return get_cfg_entry(cfg.rtcfg_, str, dflt); 72 } 73 } 74 }} 75 76 namespace hpx { namespace util 77 { 78 mpi_environment::mutex_type mpi_environment::mtx_; 79 bool mpi_environment::enabled_ = false; 80 bool mpi_environment::has_called_init_ = false; 81 int mpi_environment::provided_threading_flag_ = MPI_THREAD_SINGLE; 82 MPI_Comm mpi_environment::communicator_ = MPI_COMM_NULL; 83 84 /////////////////////////////////////////////////////////////////////////// check_mpi_environment(runtime_configuration const & cfg)85 bool mpi_environment::check_mpi_environment(runtime_configuration const& cfg) 86 { 87 if (detail::get_cfg_entry(cfg, "hpx.parcel.mpi.enable", 1) == 0) 88 return false; 89 90 // We disable the MPI parcelport if the application is not run using 91 // mpirun and the tcp/ip parcelport is not explicitly disabled 92 // 93 // The bottom line is that we use the MPI parcelport either when the 94 // application was executed using mpirun or if the tcp/ip parcelport 95 // was disabled. 96 if (!detail::detect_mpi_environment(cfg, HPX_HAVE_PARCELPORT_MPI_ENV) && 97 detail::get_cfg_entry(cfg, "hpx.parcel.tcp.enable", 1)) 98 { 99 return false; 100 } 101 102 return true; 103 } 104 init(int * argc,char *** argv,command_line_handling & cfg)105 void mpi_environment::init(int *argc, char ***argv, command_line_handling& cfg) 106 { 107 using namespace boost::assign; 108 109 int this_rank = -1; 110 has_called_init_ = false; 111 112 // We assume to use the MPI parcelport if it is not explicitly disabled 113 enabled_ = check_mpi_environment(cfg.rtcfg_); 114 if (!enabled_) 115 { 116 cfg.ini_config_.push_back("hpx.parcel.mpi.enable = 0"); 117 return; 118 } 119 120 cfg.ini_config_ += "hpx.parcel.bootstrap!=mpi"; 121 122 #if defined(HPX_HAVE_PARCELPORT_MPI_MULTITHREADED) 123 int flag = (detail::get_cfg_entry( 124 cfg, "hpx.parcel.mpi.multithreaded", 1) != 0) ? 125 MPI_THREAD_MULTIPLE : MPI_THREAD_SINGLE; 126 127 #if defined(MVAPICH2_VERSION) && defined(_POSIX_SOURCE) 128 // This enables multi threading support in MVAPICH2 if requested. 129 if(flag == MPI_THREAD_MULTIPLE) 130 setenv("MV2_ENABLE_AFFINITY", "0", 1); 131 #endif 132 133 int retval = MPI_Init_thread(argc, argv, flag, &provided_threading_flag_); 134 #else 135 int retval = MPI_Init(argc, argv); 136 provided_threading_flag_ = MPI_THREAD_SINGLE; 137 #endif 138 if (MPI_SUCCESS != retval) 139 { 140 if (MPI_ERR_OTHER != retval) 141 { 142 // explicitly disable mpi if not run by mpirun 143 cfg.ini_config_.push_back("hpx.parcel.mpi.enable = 0"); 144 145 enabled_ = false; 146 147 int msglen = 0; 148 char message[MPI_MAX_ERROR_STRING+1]; 149 MPI_Error_string(retval, message, &msglen); 150 message[msglen] = '\0'; 151 152 std::string msg("mpi_environment::init: MPI_Init_thread failed: "); 153 msg = msg + message + "."; 154 throw std::runtime_error(msg.c_str()); 155 } 156 157 // somebody has already called MPI_Init before, we should be fine 158 has_called_init_ = false; 159 } 160 else 161 { 162 has_called_init_ = true; 163 } 164 165 MPI_Comm_dup(MPI_COMM_WORLD, &communicator_); 166 167 if (provided_threading_flag_ < MPI_THREAD_SERIALIZED) 168 { 169 // explicitly disable mpi if not run by mpirun 170 cfg.ini_config_.push_back("hpx.parcel.mpi.multithreaded = 0"); 171 } 172 173 if(provided_threading_flag_ == MPI_THREAD_FUNNELED) 174 { 175 enabled_ = false; 176 has_called_init_ = false; 177 throw std::runtime_error("mpi_environment::init: MPI_Init_thread: " 178 "The underlying MPI implementation only supports " 179 "MPI_THREAD_FUNNELED. This mode is not supported by HPX. Please " 180 "pass -Ihpx.parcel.mpi.multithreaded=0 to explicitly disable MPI" 181 " multithreading."); 182 } 183 184 this_rank = rank(); 185 cfg.num_localities_ = static_cast<std::size_t>(size()); 186 187 if(this_rank == 0) 188 { 189 cfg.rtcfg_.mode_ = hpx::runtime_mode_console; 190 } 191 else 192 { 193 cfg.rtcfg_.mode_ = hpx::runtime_mode_worker; 194 } 195 196 cfg.ini_config_ += std::string("hpx.parcel.mpi.rank!=") + 197 std::to_string(this_rank); 198 cfg.ini_config_ += std::string("hpx.parcel.mpi.processorname!=") + 199 get_processor_name(); 200 201 cfg.node_ = std::size_t(this_rank); 202 } 203 get_processor_name()204 std::string mpi_environment::get_processor_name() 205 { 206 char name[MPI_MAX_PROCESSOR_NAME + 1] = { '\0' }; 207 int len = 0; 208 MPI_Get_processor_name(name, &len); 209 210 return name; 211 } 212 finalize()213 void mpi_environment::finalize() 214 { 215 if(enabled() && has_called_init()) 216 { 217 MPI_Finalize(); 218 } 219 } 220 enabled()221 bool mpi_environment::enabled() 222 { 223 return enabled_; 224 } 225 multi_threaded()226 bool mpi_environment::multi_threaded() 227 { 228 return provided_threading_flag_ >= MPI_THREAD_SERIALIZED; 229 } 230 has_called_init()231 bool mpi_environment::has_called_init() 232 { 233 return has_called_init_; 234 } 235 size()236 int mpi_environment::size() 237 { 238 int res(-1); 239 if(enabled()) 240 MPI_Comm_size(communicator(), &res); 241 return res; 242 } 243 rank()244 int mpi_environment::rank() 245 { 246 int res(-1); 247 if(enabled()) 248 MPI_Comm_rank(communicator(), &res); 249 return res; 250 } 251 communicator()252 MPI_Comm& mpi_environment::communicator() 253 { 254 return communicator_; 255 } 256 scoped_lock()257 mpi_environment::scoped_lock::scoped_lock() 258 { 259 if(!multi_threaded()) 260 mtx_.lock(); 261 } 262 ~scoped_lock()263 mpi_environment::scoped_lock::~scoped_lock() 264 { 265 if(!multi_threaded()) 266 mtx_.unlock(); 267 } 268 unlock()269 void mpi_environment::scoped_lock::unlock() 270 { 271 if(!multi_threaded()) 272 mtx_.unlock(); 273 } 274 scoped_try_lock()275 mpi_environment::scoped_try_lock::scoped_try_lock() 276 : locked(true) 277 { 278 if(!multi_threaded()) 279 { 280 locked = mtx_.try_lock(); 281 } 282 } 283 ~scoped_try_lock()284 mpi_environment::scoped_try_lock::~scoped_try_lock() 285 { 286 if(!multi_threaded() && locked) 287 mtx_.unlock(); 288 } 289 unlock()290 void mpi_environment::scoped_try_lock::unlock() 291 { 292 if(!multi_threaded() && locked) 293 { 294 locked = false; 295 mtx_.unlock(); 296 } 297 } 298 }} 299 300 #endif 301