1 // Copyright (c) 2015-2016 John Biddiscombe 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 6 #include <plugins/parcelport/libfabric/parcelport_libfabric.hpp> 7 8 // TODO: cleanup includes 9 10 // config 11 #include <hpx/config.hpp> 12 // util 13 #include <hpx/lcos/local/condition_variable.hpp> 14 #include <hpx/runtime/threads/thread_data.hpp> 15 #include <hpx/util/command_line_handling.hpp> 16 #include <hpx/util/high_resolution_timer.hpp> 17 #include <hpx/util/runtime_configuration.hpp> 18 19 // The memory pool specialization need to be pulled in before encode_parcels 20 #include <hpx/plugins/parcelport_factory.hpp> 21 #include <hpx/runtime.hpp> 22 #include <hpx/runtime/parcelset/decode_parcels.hpp> 23 #include <hpx/runtime/parcelset/encode_parcels.hpp> 24 #include <hpx/runtime/parcelset/parcel_buffer.hpp> 25 #include <hpx/runtime/parcelset/parcelport.hpp> 26 #include <hpx/runtime/parcelset/parcelport_impl.hpp> 27 // 28 #include <hpx/util/assert.hpp> 29 #include <hpx/util/debug/thread_stacktrace.hpp> 30 // 31 #include <boost/asio/ip/host_name.hpp> 32 // 33 // This header is generated by CMake and contains a number of configurable 34 // setting that affect the parcelport. It needs to be #included before 35 // other files that are part of the parcelport 36 #include <hpx/config/parcelport_defines.hpp> 37 38 // -------------------------------------------------------------------- 39 // Controls whether we are allowed to suspend threads that are sending 40 // when we have maxed out the number of sends we can handle 41 #define HPX_PARCELPORT_LIBFABRIC_SUSPEND_WAKE (HPX_PARCELPORT_LIBFABRIC_THROTTLE_SENDS/2) 42 43 // -------------------------------------------------------------------- 44 // Enable the use of boost small_vector for certain short lived storage 45 // elements within the parcelport. This can reduce some memory allocations 46 #define HPX_PARCELPORT_LIBFABRIC_USE_SMALL_VECTOR true 47 48 // -------------------------------------------------------------------- 49 #include <plugins/parcelport/unordered_map.hpp> 50 #include <plugins/parcelport/libfabric/header.hpp> 51 #include <plugins/parcelport/libfabric/locality.hpp> 52 53 #include <plugins/parcelport/libfabric/libfabric_region_provider.hpp> 54 #include <plugins/parcelport/performance_counter.hpp> 55 #include <plugins/parcelport/rma_memory_pool.hpp> 56 #include <plugins/parcelport/libfabric/connection_handler.hpp> 57 #include <plugins/parcelport/parcelport_logging.hpp> 58 #include <plugins/parcelport/libfabric/rdma_locks.hpp> 59 #include <plugins/parcelport/libfabric/libfabric_controller.hpp> 60 61 // 62 #if HPX_PARCELPORT_LIBFABRIC_USE_SMALL_VECTOR 63 # include <boost/container/small_vector.hpp> 64 #endif 65 // 66 #include <unordered_map> 67 #include <memory> 68 #include <mutex> 69 #include <sstream> 70 #include <cstddef> 71 #include <cstdint> 72 #include <cstring> 73 #include <iostream> 74 #include <list> 75 #include <string> 76 #include <utility> 77 #include <vector> 78 79 using namespace hpx::parcelset::policies; 80 81 namespace hpx { 82 namespace parcelset { 83 namespace policies { 84 namespace libfabric 85 { 86 // -------------------------------------------------------------------- 87 // parcelport, the implementation of the parcelport itself 88 // -------------------------------------------------------------------- 89 90 // -------------------------------------------------------------------- 91 // Constructor : mostly just initializes the superclass with 'here' 92 // -------------------------------------------------------------------- parcelport(util::runtime_configuration const & ini,util::function_nonser<void (std::size_t,char const *)> const & on_start_thread,util::function_nonser<void (std::size_t,char const *)> const & on_stop_thread)93 parcelport::parcelport(util::runtime_configuration const& ini, 94 util::function_nonser<void(std::size_t, char const*)> const& on_start_thread, 95 util::function_nonser<void(std::size_t, char const*)> const& on_stop_thread) 96 : base_type(ini, locality(), on_start_thread, on_stop_thread) 97 , stopped_(false) 98 , completions_handled_(0) 99 , senders_in_use_(0) 100 { 101 FUNC_START_DEBUG_MSG; 102 103 // if we are not enabled, then skip allocating resources 104 parcelport_enabled_ = hpx::util::get_entry_as<bool>(ini, 105 "hpx.parcel.libfabric.enable", 0); 106 LOG_DEBUG_MSG("Got enabled " << parcelport_enabled_); 107 108 bootstrap_enabled_ = ("libfabric" == 109 hpx::util::get_entry_as<std::string>(ini, "hpx.parcel.bootstrap", "")); 110 LOG_DEBUG_MSG("Got bootstrap " << bootstrap_enabled_); 111 112 if (!parcelport_enabled_) return; 113 114 // Get parameters that determine our fabric selection 115 std::string provider = ini.get_entry("hpx.parcel.libfabric.provider", 116 HPX_PARCELPORT_LIBFABRIC_PROVIDER); 117 std::string domain = ini.get_entry("hpx.parcel.libfabric.domain", 118 HPX_PARCELPORT_LIBFABRIC_DOMAIN); 119 std::string endpoint = ini.get_entry("hpx.parcel.libfabric.endpoint", 120 HPX_PARCELPORT_LIBFABRIC_ENDPOINT); 121 122 LOG_DEBUG_MSG("libfabric parcelport function using attributes " 123 << provider << " " << domain << " " << endpoint); 124 125 // create our main fabric control structure 126 libfabric_controller_ = std::make_shared<libfabric_controller>( 127 provider, domain, endpoint); 128 129 // get 'this' locality from the controller 130 LOG_DEBUG_MSG("Getting local locality object"); 131 const locality & local = libfabric_controller_->here(); 132 here_ = parcelset::locality(local); 133 // and make a note of our ip address for convenience 134 ip_addr_ = local.ip_address(); 135 136 FUNC_END_DEBUG_MSG; 137 } 138 139 // -------------------------------------------------------------------- 140 // during bootup, this is used by the service threads io_service_work()141 void parcelport::io_service_work() 142 { 143 while (hpx::is_starting()) 144 { 145 background_work(0); 146 } 147 LOG_DEBUG_MSG("io service task completed"); 148 } 149 150 // -------------------------------------------------------------------- 151 // Start the handling of communication. do_run()152 bool parcelport::do_run() 153 { 154 if (!parcelport_enabled_) return false; 155 156 #ifndef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI 157 auto &as = this->applier_->get_agas_client(); 158 libfabric_controller_->initialize_localities(as); 159 #endif 160 161 FUNC_START_DEBUG_MSG; 162 libfabric_controller_->startup(this); 163 164 LOG_DEBUG_MSG("Fetching memory pool"); 165 chunk_pool_ = &libfabric_controller_->get_memory_pool(); 166 167 for (std::size_t i = 0; i < HPX_PARCELPORT_LIBFABRIC_THROTTLE_SENDS; ++i) 168 { 169 sender *snd = 170 new sender(this, 171 libfabric_controller_->ep_active_, 172 libfabric_controller_->get_domain(), 173 chunk_pool_); 174 snd->postprocess_handler_ = [this](sender* s) 175 { 176 --senders_in_use_; 177 senders_.push(s); 178 trigger_pending_work(); 179 }; 180 senders_.push(snd); 181 } 182 183 if (bootstrap_enabled_) 184 { 185 for (std::size_t i = 0; i != io_service_pool_.size(); ++i) 186 { 187 io_service_pool_.get_io_service(int(i)).post( 188 hpx::util::bind( 189 &parcelport::io_service_work, this)); 190 } 191 } 192 return true; 193 } 194 195 // -------------------------------------------------------------------- 196 // return a sender object back to the parcelport_impl 197 // this is used by the send_immediate version of parcelport_impl 198 // -------------------------------------------------------------------- get_connection(parcelset::locality const & dest,fi_addr_t & fi_addr)199 sender* parcelport::get_connection( 200 parcelset::locality const& dest, fi_addr_t &fi_addr) 201 { 202 sender* snd = nullptr; 203 if (senders_.pop(snd)) 204 { 205 FUNC_START_DEBUG_MSG; 206 const locality &fabric_locality = dest.get<locality>(); 207 LOG_DEBUG_MSG("get_fabric_address from " 208 << ipaddress(here_.get<locality>().ip_address()) << "to " 209 << ipaddress(fabric_locality.ip_address())); 210 ++senders_in_use_; 211 fi_addr = libfabric_controller_->get_fabric_address(fabric_locality); 212 FUNC_END_DEBUG_MSG; 213 return snd; 214 } 215 // else if(threads::get_self_ptr()) 216 // // else if(this_thread::has_sufficient_stack_space()) 217 // { 218 // // background_work_OS_thread(); 219 // hpx::this_thread::suspend(hpx::threads::pending_boost, 220 // "libfabric::parcelport::async_write"); 221 // } 222 223 // if no senders are available shutdown 224 FUNC_END_DEBUG_MSG; 225 return nullptr; 226 } 227 reclaim_connection(sender * s)228 void parcelport::reclaim_connection(sender* s) 229 { 230 --senders_in_use_; 231 senders_.push(s); 232 } 233 234 // -------------------------------------------------------------------- 235 // return a sender object back to the parcelport_impl 236 // this is for compatibility with non send_immediate operation 237 // -------------------------------------------------------------------- create_connection(parcelset::locality const & dest,error_code & ec)238 std::shared_ptr<sender> parcelport::create_connection( 239 parcelset::locality const& dest, error_code& ec) 240 { 241 LOG_DEBUG_MSG("Creating new sender"); 242 return std::shared_ptr<sender>(); 243 } 244 245 // -------------------------------------------------------------------- 246 // cleanup ~parcelport()247 parcelport::~parcelport() { 248 FUNC_START_DEBUG_MSG; 249 scoped_lock lk(stop_mutex); 250 sender *snd = nullptr; 251 252 unsigned int sends_posted = 0; 253 unsigned int sends_deleted = 0; 254 unsigned int acks_received = 0; 255 // 256 while (senders_.pop(snd)) { 257 LOG_DEBUG_MSG("Popped a sender for delete " << hexpointer(snd)); 258 sends_posted += snd->sends_posted_; 259 sends_deleted += snd->sends_deleted_; 260 acks_received += snd->acks_received_; 261 delete snd; 262 } 263 LOG_DEBUG_MSG( 264 "sends_posted " << decnumber(sends_posted) 265 << "sends_deleted " << decnumber(sends_deleted) 266 << "acks_received " << decnumber(acks_received) 267 << "non_rma-send " << decnumber(sends_posted-acks_received)); 268 // 269 libfabric_controller_ = nullptr; 270 FUNC_END_DEBUG_MSG; 271 } 272 273 // -------------------------------------------------------------------- 274 /// Should not be used any more as parcelport_impl handles this? can_bootstrap() const275 bool parcelport::can_bootstrap() const { 276 FUNC_START_DEBUG_MSG; 277 bool can_boot = HPX_PARCELPORT_LIBFABRIC_HAVE_BOOTSTRAPPING(); 278 LOG_TRACE_MSG("Returning " << can_boot << " from can_bootstrap") 279 FUNC_END_DEBUG_MSG; 280 return can_boot; 281 } 282 283 // -------------------------------------------------------------------- 284 /// return a string form of the locality name get_locality_name() const285 std::string parcelport::get_locality_name() const 286 { 287 FUNC_START_DEBUG_MSG; 288 // return hostname:iblibfabric ip address 289 std::stringstream temp; 290 temp << boost::asio::ip::host_name() << ":" << ipaddress(ip_addr_); 291 std::string tstr = temp.str(); 292 FUNC_END_DEBUG_MSG; 293 return tstr.substr(0, tstr.size()-1); 294 } 295 296 // -------------------------------------------------------------------- 297 // the root node has spacial handlig, this returns its Id 298 parcelset::locality parcelport:: agas_locality(util::runtime_configuration const & ini) const299 agas_locality(util::runtime_configuration const & ini) const 300 { 301 FUNC_START_DEBUG_MSG; 302 // load all components as described in the configuration information 303 if (!bootstrap_enabled_) 304 { 305 LOG_ERROR_MSG("Should only return agas locality when bootstrapping"); 306 } 307 FUNC_END_DEBUG_MSG; 308 return libfabric_controller_->agas_; 309 } 310 311 // -------------------------------------------------------------------- create_locality() const312 parcelset::locality parcelport::create_locality() const { 313 FUNC_START_DEBUG_MSG; 314 FUNC_END_DEBUG_MSG; 315 return parcelset::locality(locality()); 316 } 317 318 // -------------------------------------------------------------------- 319 /// for debugging suspended_task_debug(const std::string & match)320 void parcelport::suspended_task_debug(const std::string &match) 321 { 322 std::string temp = hpx::util::debug::suspended_task_backtraces(); 323 if (match.size()==0 || 324 temp.find(match)!=std::string::npos) 325 { 326 LOG_DEBUG_MSG("Suspended threads " << temp); 327 } 328 } 329 330 // -------------------------------------------------------------------- 331 /// stop the parcelport, prior to shutdown do_stop()332 void parcelport::do_stop() { 333 LOG_DEBUG_MSG("Entering libfabric stop "); 334 FUNC_START_DEBUG_MSG; 335 if (!stopped_) { 336 // we don't want multiple threads trying to stop the clients 337 scoped_lock lock(stop_mutex); 338 339 LOG_DEBUG_MSG("Removing all initiated connections"); 340 libfabric_controller_->disconnect_all(); 341 342 // wait for all clients initiated elsewhere to be disconnected 343 while (libfabric_controller_->active() /*&& !hpx::is_stopped()*/) { 344 completions_handled_ += libfabric_controller_->poll_endpoints(true); 345 LOG_TIMED_INIT(disconnect_poll); 346 LOG_TIMED_BLOCK(disconnect_poll, DEVEL, 5.0, 347 { 348 LOG_DEBUG_MSG("Polling before shutdown"); 349 } 350 ) 351 } 352 LOG_DEBUG_MSG("stopped removing clients and terminating"); 353 } 354 stopped_ = true; 355 // Stop receiving and sending of parcels 356 } 357 358 // -------------------------------------------------------------------- can_send_immediate()359 bool parcelport::can_send_immediate() 360 { 361 // hpx::util::yield_while([this]() 362 // { 363 // this->background_work(0); 364 // return this->senders_.empty(); 365 // }, "libfabric::can_send_immediate"); 366 367 return true; 368 } 369 370 // -------------------------------------------------------------------- 371 template <typename Handler> async_write(Handler && handler,sender * snd,fi_addr_t addr,snd_buffer_type & buffer)372 bool parcelport::async_write(Handler && handler, 373 sender *snd, fi_addr_t addr, 374 snd_buffer_type &buffer) 375 { 376 LOG_DEBUG_MSG("parcelport::async_write using sender " << hexpointer(snd)); 377 snd->dst_addr_ = addr; 378 snd->buffer_ = std::move(buffer); 379 HPX_ASSERT(!snd->handler_); 380 snd->handler_ = std::forward<Handler>(handler); 381 snd->async_write_impl(); 382 // after a send poll to make progress on the network and 383 // reduce latencies for receives coming in 384 // background_work_OS_thread(); 385 // if (hpx::threads::get_self_ptr()) 386 // hpx::this_thread::suspend(hpx::threads::pending_boost, 387 // "libfabric::parcelport::async_write"); 388 return true; 389 } 390 391 // -------------------------------------------------------------------- 392 // This is called to poll for completions and handle all incoming messages 393 // as well as complete outgoing messages. 394 // 395 // Since the parcelport can be serviced by hpx threads or by OS threads, 396 // we must use extra care when dealing with mutexes and condition_variables 397 // since we do not want to suspend an OS thread, but we do want to suspend 398 // hpx threads when necessary. 399 // 400 // NB: There is no difference any more between background polling work 401 // on OS or HPX as all has been tested thoroughly 402 // -------------------------------------------------------------------- background_work_OS_thread()403 inline bool parcelport::background_work_OS_thread() { 404 LOG_TIMED_INIT(background); 405 bool done = false; 406 do { 407 LOG_TIMED_BLOCK(background, DEVEL, 5.0, { 408 LOG_DEBUG_MSG("number of senders in use " 409 << decnumber(senders_in_use_)); 410 }); 411 // if an event comes in, we may spend time processing/handling it 412 // and another may arrive during this handling, 413 // so keep checking until none are received 414 // libfabric_controller_->refill_client_receives(false); 415 int numc = libfabric_controller_->poll_endpoints(); 416 completions_handled_ += numc; 417 done = (numc==0); 418 } while (!done); 419 return (done!=0); 420 } 421 422 // -------------------------------------------------------------------- 423 // Background work 424 // 425 // This is called whenever the main thread scheduler is idling, 426 // is used to poll for events, messages on the libfabric connection 427 // -------------------------------------------------------------------- background_work(std::size_t num_thread)428 bool parcelport::background_work(std::size_t num_thread) { 429 if (stopped_ || hpx::is_stopped()) { 430 return false; 431 } 432 return background_work_OS_thread(); 433 } 434 }}}} 435 436 HPX_REGISTER_PARCELPORT(hpx::parcelset::policies::libfabric::parcelport, libfabric); 437