1 // Copyright (c) 2016 John Biddiscombe 2 // Copyright (c) 2017 Thomas Heller 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 #ifndef HPX_PARCELSET_POLICIES_LIBFABRIC_CONTROLLER_HPP 8 #define HPX_PARCELSET_POLICIES_LIBFABRIC_CONTROLLER_HPP 9 10 // config 11 #include <hpx/config/defines.hpp> 12 // 13 #include <hpx/lcos/local/shared_mutex.hpp> 14 #include <hpx/lcos/promise.hpp> 15 #include <hpx/lcos/future.hpp> 16 // 17 #include <hpx/runtime/parcelset/parcelport_impl.hpp> 18 #include <hpx/runtime/agas/addressing_service.hpp> 19 // 20 #include <plugins/parcelport/parcelport_logging.hpp> 21 #include <plugins/parcelport/libfabric/rdma_locks.hpp> 22 #include <plugins/parcelport/libfabric/receiver.hpp> 23 #include <plugins/parcelport/libfabric/sender.hpp> 24 #include <plugins/parcelport/libfabric/rma_receiver.hpp> 25 #include <plugins/parcelport/libfabric/locality.hpp> 26 // 27 #include <plugins/parcelport/unordered_map.hpp> 28 // 29 #include <memory> 30 #include <deque> 31 #include <chrono> 32 #include <iostream> 33 #include <functional> 34 #include <map> 35 #include <atomic> 36 #include <string> 37 #include <utility> 38 #include <array> 39 #include <vector> 40 #include <unordered_map> 41 #include <sstream> 42 #include <cstdint> 43 #include <cstddef> 44 #include <cstring> 45 // 46 #include <rdma/fabric.h> 47 #include <rdma/fi_cm.h> 48 #include <rdma/fi_domain.h> 49 #include <rdma/fi_endpoint.h> 50 #include <rdma/fi_eq.h> 51 #include <rdma/fi_errno.h> 52 #include <rdma/fi_rma.h> 53 #include "fabric_error.hpp" 54 55 #if defined(HPX_PARCELPORT_LIBFABRIC_GNI) || \ 56 defined(HPX_PARCELPORT_LIBFABRIC_SOCKETS) || \ 57 defined(HPX_PARCELPORT_LIBFABRIC_PSM2) 58 # define HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM 59 #else 60 # define HPX_PARCELPORT_LIBFABRIC_ENDPOINT_MSG 61 #endif 62 63 #ifdef HPX_PARCELPORT_LIBFABRIC_GNI 64 # include "rdma/fi_ext_gni.h" 65 #endif 66 67 #ifdef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI 68 // 69 # include <pmi2.h> 70 // 71 # include <boost/archive/iterators/base64_from_binary.hpp> 72 # include <boost/archive/iterators/binary_from_base64.hpp> 73 # include <boost/archive/iterators/transform_width.hpp> 74 75 using namespace boost::archive::iterators; 76 77 typedef 78 base64_from_binary< 79 transform_width<std::string::const_iterator, 6, 8> 80 > base64_t; 81 82 typedef 83 transform_width< 84 binary_from_base64<std::string::const_iterator>, 8, 6 85 > binary_t; 86 #endif 87 88 namespace hpx { 89 namespace parcelset { 90 namespace policies { 91 namespace libfabric 92 { 93 94 class libfabric_controller 95 { 96 public: 97 typedef hpx::lcos::local::spinlock mutex_type; 98 typedef hpx::parcelset::policies::libfabric::unique_lock<mutex_type> unique_lock; 99 typedef hpx::parcelset::policies::libfabric::scoped_lock<mutex_type> scoped_lock; 100 101 // NOTE: Connection maps are not used for endpoint type RDM 102 // when a new connection is requested, it will be completed asynchronously 103 // we need a promise/future for each endpoint so that we can set the new 104 // endpoint when the connection completes and is ready 105 // Note - only used during connection, then deleted 106 typedef std::tuple< 107 hpx::promise<fid_ep *>, 108 hpx::shared_future<fid_ep *> 109 > promise_tuple_type; 110 111 // lock types for maps 112 typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> 113 ::map_read_lock_type map_read_lock_type; 114 typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> 115 ::map_write_lock_type map_write_lock_type; 116 117 // Map of connections started, needed until connection is completed 118 hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> endpoint_tmp_; 119 std::unordered_map<uint64_t, fi_addr_t> endpoint_av_; 120 121 locality here_; 122 locality agas_; 123 124 struct fi_info *fabric_info_; 125 struct fid_fabric *fabric_; 126 struct fid_domain *fabric_domain_; 127 // Server/Listener for RDMA connections. 128 struct fid_pep *ep_passive_; 129 struct fid_ep *ep_active_; 130 struct fid_ep *ep_shared_rx_cxt_; 131 132 // we will use just one event queue for all connections 133 struct fid_eq *event_queue_; 134 struct fid_cq *txcq_, *rxcq_; 135 struct fid_av *av_; 136 137 bool immediate_; 138 139 // -------------------------------------------------------------------- 140 // constructor gets info from device and sets up all necessary 141 // maps, queues and server endpoint etc libfabric_controller(std::string provider,std::string domain,std::string endpoint,int port=7910)142 libfabric_controller( 143 std::string provider, 144 std::string domain, 145 std::string endpoint, int port=7910) 146 : fabric_info_(nullptr) 147 , fabric_(nullptr) 148 , fabric_domain_(nullptr) 149 , ep_passive_(nullptr) 150 , ep_active_(nullptr) 151 , ep_shared_rx_cxt_(nullptr) 152 , event_queue_(nullptr) 153 // 154 , txcq_(nullptr) 155 , rxcq_(nullptr) 156 , av_(nullptr) 157 // 158 , immediate_(false) 159 { 160 FUNC_START_DEBUG_MSG; 161 open_fabric(provider, domain, endpoint); 162 163 // Create a memory pool for pinned buffers 164 memory_pool_.reset( 165 new rma_memory_pool<libfabric_region_provider>(fabric_domain_)); 166 167 // setup a passive listener, or an active RDM endpoint 168 here_ = create_local_endpoint(); 169 #ifndef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM 170 create_event_queue(); 171 #endif 172 173 LOG_DEBUG_MSG("Calling boot PMI"); 174 boot_PMI(); 175 FUNC_END_DEBUG_MSG; 176 } 177 boot_PMI()178 void boot_PMI() 179 { 180 #ifdef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI 181 int spawned; 182 int size; 183 int rank; 184 int appnum; 185 186 LOG_DEBUG_MSG("Calling PMI init"); 187 PMI2_Init(&spawned, &size, &rank, &appnum); 188 LOG_DEBUG_MSG("Called PMI init on rank" << decnumber(rank)); 189 190 // create address vector and queues we need if bootstrapping 191 create_completion_queues(fabric_info_, size); 192 193 // we must pass out libfabric data to other nodes 194 // encode it as a string to put into the PMI KV store 195 std::string encoded_locality( 196 base64_t((const char*)(here_.fabric_data())), 197 base64_t((const char*)(here_.fabric_data()) + locality::array_size)); 198 int encoded_length = encoded_locality.size(); 199 LOG_DEBUG_MSG("Encoded locality as " << encoded_locality 200 << " with length " << decnumber(encoded_length)); 201 202 // Key name for PMI 203 std::string pmi_key = "hpx_libfabric_" + std::to_string(rank); 204 // insert out data in the KV store 205 LOG_DEBUG_MSG("Calling PMI2_KVS_Put on rank " << decnumber(rank)); 206 PMI2_KVS_Put(pmi_key.data(), encoded_locality.data()); 207 208 // Wait for all to do the same 209 LOG_DEBUG_MSG("Calling PMI2_KVS_Fence on rank " << decnumber(rank)); 210 PMI2_KVS_Fence(); 211 212 // read libfabric data for all nodes and insert into our Address vector 213 for (int i = 0; i < size; ++i) 214 { 215 // read one locality key 216 std::string pmi_key = "hpx_libfabric_" + std::to_string(i); 217 char encoded_data[locality::array_size*2]; 218 int length = 0; 219 PMI2_KVS_Get(0, i, pmi_key.data(), encoded_data, 220 encoded_length + 1, &length); 221 if (length != encoded_length) 222 { 223 LOG_ERROR_MSG("PMI value length mismatch, expected " 224 << decnumber(encoded_length) << "got " << decnumber(length)); 225 } 226 227 // decode the string back to raw locality data 228 LOG_DEBUG_MSG("Calling decode for " << decnumber(i) 229 << " locality data on rank " << decnumber(rank)); 230 locality new_locality; 231 std::copy(binary_t(encoded_data), 232 binary_t(encoded_data + encoded_length), 233 (new_locality.fabric_data_writable())); 234 235 // insert locality into address vector 236 LOG_DEBUG_MSG("Calling insert_address for " << decnumber(i) 237 << "on rank " << decnumber(rank)); 238 insert_address(new_locality); 239 LOG_DEBUG_MSG("rank " << decnumber(i) 240 << "added to address vector"); 241 if (i == 0) 242 { 243 agas_ = new_locality; 244 } 245 } 246 247 PMI2_Finalize(); 248 #endif 249 } 250 251 // -------------------------------------------------------------------- 252 // clean up all resources ~libfabric_controller()253 ~libfabric_controller() 254 { 255 unsigned int messages_handled = 0; 256 unsigned int acks_received = 0; 257 unsigned int msg_plain = 0; 258 unsigned int msg_rma = 0; 259 unsigned int sent_ack = 0; 260 unsigned int rma_reads = 0; 261 unsigned int recv_deletes = 0; 262 // 263 for (auto &r : receivers_) { 264 r.cleanup(); 265 // from receiver 266 messages_handled += r.messages_handled_; 267 acks_received += r.acks_received_; 268 // from rma_receivers 269 msg_plain += r.msg_plain_; 270 msg_rma += r.msg_rma_; 271 sent_ack += r.sent_ack_; 272 rma_reads += r.rma_reads_; 273 recv_deletes += r.recv_deletes_; 274 } 275 276 LOG_DEBUG_MSG( 277 "Received messages " << decnumber(messages_handled) 278 << "Received acks " << decnumber(acks_received) 279 << "Sent acks " << decnumber(sent_ack) 280 << "Total reads " << decnumber(rma_reads) 281 << "Total deletes " << decnumber(recv_deletes) 282 << "deletes error " << decnumber(messages_handled - recv_deletes)); 283 284 // Cleaning up receivers to avoid memory leak errors. 285 receivers_.clear(); 286 287 LOG_DEBUG_MSG("closing fabric_->fid"); 288 if (fabric_) 289 fi_close(&fabric_->fid); 290 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM 291 LOG_DEBUG_MSG("closing ep_active_->fid"); 292 if (ep_active_) 293 fi_close(&ep_active_->fid); 294 #else 295 LOG_DEBUG_MSG("closing ep_passive_->fid"); 296 if (ep_passive_) 297 fi_close(&ep_passive_->fid); 298 #endif 299 LOG_DEBUG_MSG("closing event_queue_->fid"); 300 if (event_queue_) 301 fi_close(&event_queue_->fid); 302 LOG_DEBUG_MSG("closing fabric_domain_->fid"); 303 if (fabric_domain_) 304 fi_close(&fabric_domain_->fid); 305 LOG_DEBUG_MSG("closing ep_shared_rx_cxt_->fid"); 306 if (ep_shared_rx_cxt_) 307 fi_close(&ep_shared_rx_cxt_->fid); 308 // clean up 309 LOG_DEBUG_MSG("freeing fabric_info"); 310 fi_freeinfo(fabric_info_); 311 } 312 313 // -------------------------------------------------------------------- 314 // initialize the basic fabric/domain/name open_fabric(std::string provider,std::string domain,std::string endpoint_type)315 void open_fabric( 316 std::string provider, std::string domain, std::string endpoint_type) 317 { 318 FUNC_START_DEBUG_MSG; 319 struct fi_info *fabric_hints_ = fi_allocinfo(); 320 if (!fabric_hints_) { 321 throw fabric_error(-1, "Failed to allocate fabric hints"); 322 } 323 // we require message and RMA support, so ask for them 324 // we also want receives to carry source address info 325 fabric_hints_->caps = FI_MSG | FI_RMA | FI_SOURCE | 326 FI_WRITE | FI_READ | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_RMA_EVENT; 327 fabric_hints_->mode = FI_CONTEXT | FI_LOCAL_MR; 328 fabric_hints_->fabric_attr->prov_name = strdup(provider.c_str()); 329 LOG_DEBUG_MSG("fabric provider " << fabric_hints_->fabric_attr->prov_name); 330 if (domain.size()>0) { 331 fabric_hints_->domain_attr->name = strdup(domain.c_str()); 332 LOG_DEBUG_MSG("fabric domain " << fabric_hints_->domain_attr->name); 333 } 334 335 // use infiniband type basic registration for now 336 fabric_hints_->domain_attr->mr_mode = FI_MR_BASIC; 337 338 // Disable the use of progress threads 339 fabric_hints_->domain_attr->control_progress = FI_PROGRESS_MANUAL; 340 fabric_hints_->domain_attr->data_progress = FI_PROGRESS_MANUAL; 341 342 // Enable thread safe mode Does not work with psm2 provider 343 fabric_hints_->domain_attr->threading = FI_THREAD_SAFE; 344 345 // Enable resource management 346 fabric_hints_->domain_attr->resource_mgmt = FI_RM_ENABLED; 347 348 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM 349 LOG_DEBUG_MSG("Selecting endpoint type RDM"); 350 fabric_hints_->ep_attr->type = FI_EP_RDM; 351 #else 352 // we will use a shared receive context for active endpoints 353 fabric_hints_->ep_attr->rx_ctx_cnt = FI_SHARED_CONTEXT; 354 355 if (endpoint_type=="msg") { 356 fabric_hints_->ep_attr->type = FI_EP_MSG; 357 } else if (endpoint_type=="rdm") { 358 fabric_hints_->ep_attr->type = FI_EP_RDM; 359 } else if (endpoint_type=="dgram") { 360 fabric_hints_->ep_attr->type = FI_EP_DGRAM; 361 } 362 else { 363 LOG_DEBUG_MSG("endpoint type not set, using RDM"); 364 fabric_hints_->ep_attr->type = FI_EP_RDM; 365 } 366 #endif 367 368 // by default, we will always want completions on both tx/rx events 369 fabric_hints_->tx_attr->op_flags = FI_COMPLETION; 370 fabric_hints_->rx_attr->op_flags = FI_COMPLETION; 371 372 uint64_t flags = 0; 373 LOG_DEBUG_MSG("Getting initial info about fabric"); 374 int ret = fi_getinfo(FI_VERSION(1,4), nullptr, nullptr, 375 flags, fabric_hints_, &fabric_info_); 376 if (ret) { 377 throw fabric_error(ret, "Failed to get fabric info"); 378 } 379 LOG_DEBUG_MSG("Fabric info " << fi_tostr(fabric_info_, FI_TYPE_INFO)); 380 381 immediate_ = (fabric_info_->rx_attr->mode & FI_RX_CQ_DATA)!=0; 382 LOG_DEBUG_MSG("Fabric supports immediate data " << immediate_); 383 LOG_EXCLUSIVE(bool context = (fabric_hints_->mode & FI_CONTEXT)!=0); 384 LOG_DEBUG_MSG("Fabric requires FI_CONTEXT " << context); 385 386 LOG_DEBUG_MSG("Creating fabric object"); 387 ret = fi_fabric(fabric_info_->fabric_attr, &fabric_, nullptr); 388 if (ret) { 389 throw fabric_error(ret, "Failed to get fi_fabric"); 390 } 391 392 // Allocate a domain. 393 LOG_DEBUG_MSG("Allocating domain "); 394 ret = fi_domain(fabric_, fabric_info_, &fabric_domain_, nullptr); 395 if (ret) throw fabric_error(ret, "fi_domain"); 396 397 // Cray specific. Disable memory registration cache 398 _set_disable_registration(); 399 400 fi_freeinfo(fabric_hints_); 401 FUNC_END_DEBUG_MSG; 402 } 403 404 // ------------------------------------------------------------------- 405 // create endpoint and get ready for possible communications startup(parcelport * pp)406 void startup(parcelport *pp) 407 { 408 FUNC_START_DEBUG_MSG; 409 // 410 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM 411 bind_endpoint_to_queues(ep_active_); 412 #else 413 bind_endpoint_to_queues(ep_passive_); 414 fabric_info_->handle = &(ep_passive->fid); 415 416 LOG_DEBUG_MSG("Creating active endpoint"); 417 new_endpoint_active(fabric_info_, &ep_active_); 418 LOG_DEBUG_MSG("active endpoint " << hexpointer(ep_active_); 419 420 bind_endpoint_to_queues(ep_active_); 421 #endif 422 423 // filling our vector of receivers... 424 std::size_t num_receivers = HPX_PARCELPORT_LIBFABRIC_MAX_PREPOSTS; 425 receivers_.reserve(num_receivers); 426 for(std::size_t i = 0; i != num_receivers; ++i) 427 { 428 receivers_.emplace_back(pp, ep_active_, *memory_pool_); 429 } 430 } 431 432 // -------------------------------------------------------------------- 433 // Special GNI extensions to disable memory registration cache 434 435 // this helper function only works for string ops 436 void _set_check_domain_op_value(int op, const char *value) 437 { 438 #ifdef HPX_PARCELPORT_LIBFABRIC_GNI 439 int ret; 440 struct fi_gni_ops_domain *gni_domain_ops; 441 char *get_val; 442 443 ret = fi_open_ops(&fabric_domain_->fid, FI_GNI_DOMAIN_OPS_1, 444 0, (void **) &gni_domain_ops, nullptr); 445 if (ret) throw fabric_error(ret, "fi_open_ops"); 446 LOG_DEBUG_MSG("domain ops returned " << hexpointer(gni_domain_ops)); 447 448 ret = gni_domain_ops->set_val(&fabric_domain_->fid, 449 (dom_ops_val_t)(op), &value); 450 if (ret) throw fabric_error(ret, "set val (ops)"); 451 452 ret = gni_domain_ops->get_val(&fabric_domain_->fid, 453 (dom_ops_val_t)(op), &get_val); 454 LOG_DEBUG_MSG("Cache mode set to " << get_val); 455 if (std::string(value) != std::string(get_val)) 456 throw fabric_error(ret, "get val"); 457 #endif 458 } 459 460 void _set_disable_registration() 461 { 462 #ifdef HPX_PARCELPORT_LIBFABRIC_GNI 463 _set_check_domain_op_value(GNI_MR_CACHE, "none"); 464 #endif 465 } 466 467 // ------------------------------------------------------------------- 468 void create_event_queue() 469 { 470 LOG_DEBUG_MSG("Creating event queue"); 471 fi_eq_attr eq_attr = {}; 472 eq_attr.wait_obj = FI_WAIT_NONE; 473 int ret = fi_eq_open(fabric_, &eq_attr, &event_queue_, nullptr); 474 if (ret) throw fabric_error(ret, "fi_eq_open"); 475 476 if (fabric_info_->ep_attr->type == FI_EP_MSG) { 477 LOG_DEBUG_MSG("Binding event queue to passive endpoint"); 478 ret = fi_pep_bind(ep_passive_, &event_queue_->fid, 0); 479 if (ret) throw fabric_error(ret, "fi_pep_bind"); 480 481 LOG_DEBUG_MSG("Passive endpoint : listen"); 482 ret = fi_listen(ep_passive_); 483 if (ret) throw fabric_error(ret, "fi_listen"); 484 485 LOG_DEBUG_MSG("Allocating shared receive context"); 486 ret = fi_srx_context(fabric_domain_, fabric_info_->rx_attr, 487 &ep_shared_rx_cxt_, nullptr); 488 if (ret) throw fabric_error(ret, "fi_srx_context"); 489 } 490 FUNC_END_DEBUG_MSG; 491 } 492 493 // -------------------------------------------------------------------- 494 locality create_local_endpoint() 495 { 496 struct fid *id; 497 int ret; 498 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM 499 LOG_DEBUG_MSG("Creating active endpoint"); 500 new_endpoint_active(fabric_info_, &ep_active_); 501 LOG_DEBUG_MSG("active endpoint " << hexpointer(ep_active_)); 502 id = &ep_active_->fid; 503 #else 504 LOG_DEBUG_MSG("Creating passive endpoint"); 505 ret = fi_passive_ep(fabric_, fabric_info_, &ep_passive_, nullptr); 506 if (ret) { 507 throw fabric_error(ret, "Failed to create fi_passive_ep"); 508 } 509 LOG_DEBUG_MSG("passive endpoint " << hexpointer(ep_passive_)); 510 id = &ep_passive_->fid; 511 #endif 512 513 #ifdef HPX_HAVE_PARCELPORT_TCP 514 // with tcp we do not use PMI boot, so enable the endpoint now 515 LOG_DEBUG_MSG("Enabling endpoint (TCP) " << hexpointer(ep_active_)); 516 ret = fi_enable(ep_active_); 517 if (ret) throw fabric_error(ret, "fi_enable"); 518 #endif 519 520 locality::locality_data local_addr; 521 std::size_t addrlen = locality::array_size; 522 LOG_DEBUG_MSG("Fetching local address using size " << decnumber(addrlen)); 523 ret = fi_getname(id, local_addr.data(), &addrlen); 524 if (ret || (addrlen>locality::array_size)) { 525 fabric_error(ret, "fi_getname - size error or other problem"); 526 } 527 528 LOG_EXCLUSIVE( 529 { 530 std::stringstream temp1; 531 for (std::size_t i=0; i<locality::array_length; ++i) { 532 temp1 << ipaddress(local_addr[i]); 533 } 534 LOG_DEBUG_MSG("address info is " << temp1.str().c_str()); 535 std::stringstream temp2; 536 for (std::size_t i=0; i<locality::array_length; ++i) { 537 temp2 << hexuint32(local_addr[i]); 538 } 539 LOG_DEBUG_MSG("address info is " << temp2.str().c_str()); 540 }); 541 FUNC_END_DEBUG_MSG; 542 return locality(local_addr); 543 } 544 545 // -------------------------------------------------------------------- 546 void new_endpoint_active(struct fi_info *info, struct fid_ep **new_endpoint) 547 { 548 FUNC_START_DEBUG_MSG; 549 // create an 'active' endpoint that can be used for sending/receiving 550 LOG_DEBUG_MSG("Creating active endpoint"); 551 LOG_DEBUG_MSG("Got info mode " << (info->mode & FI_NOTIFY_FLAGS_ONLY)); 552 int ret = fi_endpoint(fabric_domain_, info, new_endpoint, nullptr); 553 if (ret) throw fabric_error(ret, "fi_endpoint"); 554 555 if (info->ep_attr->type == FI_EP_MSG) { 556 if (event_queue_) { 557 LOG_DEBUG_MSG("Binding endpoint to EQ"); 558 ret = fi_ep_bind(*new_endpoint, &event_queue_->fid, 0); 559 if (ret) throw fabric_error(ret, "bind event_queue_"); 560 } 561 } 562 } 563 564 // -------------------------------------------------------------------- 565 void bind_endpoint_to_queues(struct fid_ep *endpoint) 566 { 567 int ret; 568 if (av_) { 569 LOG_DEBUG_MSG("Binding endpoint to AV"); 570 ret = fi_ep_bind(endpoint, &av_->fid, 0); 571 if (ret) throw fabric_error(ret, "bind event_queue_"); 572 } 573 574 if (txcq_) { 575 LOG_DEBUG_MSG("Binding endpoint to TX CQ"); 576 ret = fi_ep_bind(endpoint, &txcq_->fid, FI_TRANSMIT); 577 if (ret) throw fabric_error(ret, "bind txcq"); 578 } 579 580 if (rxcq_) { 581 LOG_DEBUG_MSG("Binding endpoint to RX CQ"); 582 ret = fi_ep_bind(endpoint, &rxcq_->fid, FI_RECV); 583 if (ret) throw fabric_error(ret, "rxcq"); 584 } 585 586 if (ep_shared_rx_cxt_) { 587 LOG_DEBUG_MSG("Binding endpoint to shared receive context"); 588 ret = fi_ep_bind(endpoint, &ep_shared_rx_cxt_->fid, 0); 589 if (ret) throw fabric_error(ret, "ep_shared_rx_cxt_"); 590 } 591 592 LOG_DEBUG_MSG("Enabling endpoint " << hexpointer(endpoint)); 593 ret = fi_enable(endpoint); 594 if (ret) throw fabric_error(ret, "fi_enable"); 595 596 FUNC_END_DEBUG_MSG; 597 } 598 599 // -------------------------------------------------------------------- 600 void initialize_localities(hpx::agas::addressing_service &as) 601 { 602 FUNC_START_DEBUG_MSG; 603 #ifndef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI 604 std::uint32_t N = hpx::get_config().get_num_localities(); 605 LOG_DEBUG_MSG("Parcelport initialize_localities with " << N << " localities"); 606 607 // make sure address vector is created 608 create_completion_queues(fabric_info_, N); 609 610 for (std::uint32_t i=0; i<N; ++i) { 611 hpx::naming::gid_type l = hpx::naming::get_gid_from_locality_id(i); 612 LOG_DEBUG_MSG("Resolving locality" << l); 613 // each locality may be reachable by mutiplte parcelports 614 const parcelset::endpoints_type &res = as.resolve_locality(l); 615 // get the fabric related data 616 auto it = res.find("libfabric"); 617 LOG_DEBUG_MSG("locality resolution " << it->first << " => " <<it->second); 618 const hpx::parcelset::locality &fabric_locality = it->second; 619 const locality &loc = fabric_locality.get<locality>(); 620 // put the provide specific data into the address vector 621 // so that we can look it up later 622 /*fi_addr_t dummy =*/ insert_address(loc); 623 } 624 #endif 625 LOG_DEBUG_MSG("Done getting localities "); 626 FUNC_END_DEBUG_MSG; 627 } 628 629 // -------------------------------------------------------------------- 630 fi_addr_t get_fabric_address(const locality &dest_fabric) 631 { 632 uint64_t key = (uint64_t)dest_fabric.ip_address() << 32 | dest_fabric.port(); 633 return endpoint_av_.find(key)->second; 634 } 635 636 // -------------------------------------------------------------------- 637 const locality & here() const { return here_; } 638 639 // -------------------------------------------------------------------- 640 const bool & immedate_data_supported() const { return immediate_; } 641 642 // -------------------------------------------------------------------- 643 // returns true when all connections have been disconnected and none are active 644 bool isTerminated() { 645 return false; 646 //return (qp_endpoint_map_.size() == 0); 647 } 648 649 // types we need for connection and disconnection callback functions 650 // into the main parcelport code. 651 typedef std::function<void(fid_ep *endpoint, uint32_t ipaddr)> 652 ConnectionFunction; 653 typedef std::function<void(fid_ep *endpoint, uint32_t ipaddr)> 654 DisconnectionFunction; 655 656 // -------------------------------------------------------------------- 657 // Set a callback which will be called immediately after 658 // RDMA_CM_EVENT_ESTABLISHED has been received. 659 // This should be used to initialize all structures for handling a new connection 660 void setConnectionFunction(ConnectionFunction f) { 661 connection_function_ = f; 662 } 663 664 // -------------------------------------------------------------------- 665 // currently not used. 666 void setDisconnectionFunction(DisconnectionFunction f) { 667 disconnection_function_ = f; 668 } 669 670 // -------------------------------------------------------------------- 671 // This is the main polling function that checks for work completions 672 // and connection manager events, if stopped is true, then completions 673 // are thrown away, otherwise the completion callback is triggered 674 int poll_endpoints(bool stopped=false) 675 { 676 int work = poll_for_work_completions(); 677 678 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_MSG 679 work += poll_event_queue(stopped); 680 #endif 681 return work; 682 } 683 684 // -------------------------------------------------------------------- 685 int poll_for_work_completions() 686 { 687 // @TODO, disable polling until queues are initialized to avoid this check 688 // if queues are not setup, don't poll 689 if (HPX_UNLIKELY(!rxcq_)) return 0; 690 // 691 return poll_send_queue() + poll_recv_queue(); 692 } 693 694 // -------------------------------------------------------------------- 695 int poll_send_queue() 696 { 697 LOG_TIMED_INIT(poll); 698 LOG_TIMED_BLOCK(poll, DEVEL, 5.0, { LOG_DEBUG_MSG("poll_send_queue"); }); 699 700 fi_cq_msg_entry entry; 701 int ret = 0; 702 { 703 std::unique_lock<mutex_type> l(polling_mutex_, std::try_to_lock); 704 if (l) 705 ret = fi_cq_read(txcq_, &entry, 1); 706 } 707 if (ret>0) { 708 LOG_DEBUG_MSG("Completion txcq wr_id " 709 << fi_tostr(&entry.flags, FI_TYPE_OP_FLAGS) 710 << " (" << decnumber(entry.flags) << ") " 711 << "context " << hexpointer(entry.op_context) 712 << "length " << hexuint32(entry.len)); 713 if (entry.flags & FI_RMA) { 714 LOG_DEBUG_MSG("Received a txcq RMA completion " 715 << "Context " << hexpointer(entry.op_context)); 716 rma_receiver* rcv = reinterpret_cast<rma_receiver*>(entry.op_context); 717 rcv->handle_rma_read_completion(); 718 } 719 else if (entry.flags == (FI_MSG | FI_SEND)) { 720 LOG_DEBUG_MSG("Received a txcq RMA send completion"); 721 sender* handler = reinterpret_cast<sender*>(entry.op_context); 722 handler->handle_send_completion(); 723 } 724 else { 725 LOG_DEBUG_MSG("$$$$$ Received an unknown txcq completion ***** " 726 << decnumber(entry.flags)); 727 std::terminate(); 728 } 729 return 1; 730 } 731 else if (ret==0 || ret==-FI_EAGAIN) { 732 // do nothing, we will try again on the next check 733 LOG_TIMED_MSG(poll, DEVEL, 10, "txcq FI_EAGAIN"); 734 } 735 else if (ret == -FI_EAVAIL) { 736 struct fi_cq_err_entry e = {}; 737 int err_sz = fi_cq_readerr(txcq_, &e ,0); 738 // from the manpage 'man 3 fi_cq_readerr' 739 // 740 // On error, a negative value corresponding to 741 // 'fabric errno' is returned 742 // 743 if(e.err == err_sz) { 744 LOG_ERROR_MSG("txcq_ Error with len " << hexlength(e.len) 745 << "context " << hexpointer(e.op_context)); 746 } 747 // flags might not be set correctly 748 if (e.flags == (FI_MSG | FI_SEND)) { 749 LOG_ERROR_MSG("txcq Error for FI_SEND with len " << hexlength(e.len) 750 << "context " << hexpointer(e.op_context)); 751 } 752 if (e.flags & FI_RMA) { 753 LOG_ERROR_MSG("txcq Error for FI_RMA with len " << hexlength(e.len) 754 << "context " << hexpointer(e.op_context)); 755 } 756 rma_base *base = reinterpret_cast<rma_base*>(e.op_context); 757 base->handle_error(e); 758 } 759 else { 760 LOG_ERROR_MSG("unknown error in completion txcq read"); 761 } 762 return 0; 763 } 764 765 // -------------------------------------------------------------------- 766 int poll_recv_queue() 767 { 768 LOG_TIMED_INIT(poll); 769 LOG_TIMED_BLOCK(poll, DEVEL, 5.0, { LOG_DEBUG_MSG("poll_recv_queue"); }); 770 771 int result = 0; 772 fi_addr_t src_addr; 773 fi_cq_msg_entry entry; 774 775 // receives will use fi_cq_readfrom as we want the source address 776 int ret = 0; 777 { 778 std::unique_lock<mutex_type> l(polling_mutex_, std::try_to_lock); 779 if (l) 780 ret = fi_cq_readfrom(rxcq_, &entry, 1, &src_addr); 781 } 782 if (ret>0) { 783 LOG_DEBUG_MSG("Completion rxcq wr_id " 784 << fi_tostr(&entry.flags, FI_TYPE_OP_FLAGS) 785 << " (" << decnumber(entry.flags) << ") " 786 << "source " << hexpointer(src_addr) 787 << "context " << hexpointer(entry.op_context) 788 << "length " << hexuint32(entry.len)); 789 if (src_addr == FI_ADDR_NOTAVAIL) 790 { 791 LOG_DEBUG_MSG("Source address not available...\n"); 792 std::terminate(); 793 } 794 // if ((entry.flags & FI_RMA) == FI_RMA) { 795 // LOG_DEBUG_MSG("Received an rxcq RMA completion"); 796 // } 797 else if (entry.flags == (FI_MSG | FI_RECV)) { 798 LOG_DEBUG_MSG("Received an rxcq recv completion " 799 << hexpointer(entry.op_context)); 800 reinterpret_cast<receiver *>(entry.op_context)-> 801 handle_recv(src_addr, entry.len); 802 } 803 else { 804 LOG_DEBUG_MSG("Received an unknown rxcq completion " 805 << decnumber(entry.flags)); 806 std::terminate(); 807 } 808 result = 1; 809 } 810 else if (ret==0 || ret==-FI_EAGAIN) { 811 // do nothing, we will try again on the next check 812 LOG_TIMED_MSG(poll, DEVEL, 10, "rxcq FI_EAGAIN"); 813 } 814 else if (ret == -FI_EAVAIL) { 815 struct fi_cq_err_entry e = {}; 816 int err_sz = fi_cq_readerr(rxcq_, &e ,0); 817 // from the manpage 'man 3 fi_cq_readerr' 818 // 819 // On error, a negative value corresponding to 820 // 'fabric errno' is returned 821 // 822 if(e.err == err_sz) { 823 LOG_ERROR_MSG("txcq_ Error with len " << hexlength(e.len) 824 << "context " << hexpointer(e.op_context)); 825 } 826 LOG_ERROR_MSG("rxcq Error with flags " << hexlength(e.flags) 827 << "len " << hexlength(e.len)); 828 } 829 else { 830 LOG_ERROR_MSG("unknown error in completion rxcq read"); 831 } 832 return result; 833 } 834 835 // -------------------------------------------------------------------- 836 int poll_event_queue(bool stopped=false) 837 { 838 LOG_TIMED_INIT(poll); 839 LOG_TIMED_BLOCK(poll, DEVEL, 5.0, 840 { 841 LOG_DEBUG_MSG("Polling event completion channel"); 842 } 843 ) 844 struct fi_eq_cm_entry *cm_entry; 845 // struct fi_eq_entry *entry; 846 struct fid_ep *new_ep; 847 // uint32_t *addr; 848 uint32_t event; 849 std::array<char, 256> buffer; 850 ssize_t rd = fi_eq_read(event_queue_, &event, 851 buffer.data(), sizeof(buffer), 0); 852 if (rd > 0) { 853 LOG_DEBUG_MSG("fi_eq_cm_entry " << decnumber(sizeof(fi_eq_cm_entry)) 854 << " fi_eq_entry " << decnumber(sizeof(fi_eq_entry))); 855 LOG_DEBUG_MSG("got event " << event << " with bytes = " << decnumber(rd)); 856 switch (event) { 857 case FI_CONNREQ: 858 { 859 cm_entry = reinterpret_cast<struct fi_eq_cm_entry*>(buffer.data()); 860 locality::locality_data addressinfo; 861 std::memcpy(addressinfo.data(), cm_entry->info->dest_addr, 862 locality::array_size); 863 locality loc(addressinfo); 864 LOG_DEBUG_MSG("FI_CONNREQ from " 865 << ipaddress(loc.ip_address()) << "-> " 866 << ipaddress(here_.ip_address()) 867 << "( " << ipaddress(here_.ip_address()) << " )"); 868 { 869 auto result = insert_new_future(loc.ip_address()); 870 // if the insert fails, it means we have a connection 871 // already in progress, reject if we are a lower ip address 872 if (!result.first && loc.ip_address()>here_.ip_address()) { 873 LOG_DEBUG_MSG("FI_CONNREQ priority fi_reject " 874 << ipaddress(loc.ip_address()) << "-> " 875 << ipaddress(here_.ip_address()) 876 << "( " << ipaddress(here_.ip_address()) << " )"); 877 // int ret = fi_reject(ep_passive_, cm_entry->info->handle, 878 // nullptr, 0); 879 // if (ret) throw fabric_error(ret, "new_ep fi_reject failed"); 880 fi_freeinfo(cm_entry->info); 881 return 0; 882 } 883 // create a new endpoint for this request and accept it 884 new_endpoint_active(cm_entry->info, &new_ep); 885 LOG_DEBUG_MSG("Calling fi_accept " 886 << ipaddress(loc.ip_address()) << "-> " 887 << ipaddress(here_.ip_address()) 888 << "( " << ipaddress(here_.ip_address()) << " )"); 889 int ret = fi_accept(new_ep, &here_.ip_address(), 890 sizeof(uint32_t)); 891 if (ret) throw fabric_error(ret, "new_ep fi_accept failed"); 892 } 893 fi_freeinfo(cm_entry->info); 894 break; 895 } 896 case FI_CONNECTED: 897 { 898 cm_entry = reinterpret_cast<struct fi_eq_cm_entry*>(buffer.data()); 899 new_ep = container_of(cm_entry->fid, struct fid_ep, fid); 900 locality::locality_data address; 901 std::size_t len = sizeof(locality::locality_data); 902 fi_getpeer(new_ep, address.data(), &len); 903 // 904 auto present1 = endpoint_tmp_.is_in_map(address[1]); 905 if (!present1.second) { 906 throw fabric_error(0, "FI_CONNECTED, endpoint map error"); 907 } 908 LOG_DEBUG_MSG("FI_CONNECTED " << hexpointer(new_ep) 909 << ipaddress(locality::ip_address(address)) << "<> " 910 << ipaddress(here_.ip_address()) 911 << "( " << ipaddress(here_.ip_address()) << " )"); 912 913 // call parcelport connection function before setting future 914 connection_function_(new_ep, address[1]); 915 916 // if there is an entry for a locally started connection on this IP 917 // then set the future ready with the verbs endpoint 918 LOG_DEBUG_MSG("FI_CONNECTED setting future " 919 << ipaddress(locality::ip_address(address)) << "<> " 920 << ipaddress(here_.ip_address()) 921 << "( " << ipaddress(here_.ip_address()) << " )"); 922 923 std::get<0>(endpoint_tmp_.find(address[1])->second). 924 set_value(new_ep); 925 926 // once the future is set, the entry can be removed? 927 // endpoint_tmp_.erase(present1.first); 928 } 929 break; 930 case FI_NOTIFY: 931 LOG_DEBUG_MSG("Got FI_NOTIFY"); 932 break; 933 case FI_SHUTDOWN: 934 LOG_DEBUG_MSG("Got FI_SHUTDOWN"); 935 break; 936 case FI_MR_COMPLETE: 937 LOG_DEBUG_MSG("Got FI_MR_COMPLETE"); 938 break; 939 case FI_AV_COMPLETE: 940 LOG_DEBUG_MSG("Got FI_AV_COMPLETE"); 941 break; 942 case FI_JOIN_COMPLETE: 943 LOG_DEBUG_MSG("Got FI_JOIN_COMPLETE"); 944 break; 945 } 946 // HPX_ASSERT(rd == sizeof(struct fi_eq_cm_entry)); 947 // HPX_ASSERT(cm_entry->fid == event_queue_->fid); 948 } 949 else { 950 LOG_TIMED_MSG(poll, DEVEL, 5, "We did not get an event completion") 951 } 952 return 0; 953 } 954 955 // -------------------------------------------------------------------- 956 inline struct fid_domain * get_domain() { 957 return fabric_domain_; 958 } 959 960 // -------------------------------------------------------------------- 961 inline rma_memory_pool<libfabric_region_provider>& get_memory_pool() { 962 return *memory_pool_; 963 } 964 965 // -------------------------------------------------------------------- 966 void create_completion_queues(struct fi_info *info, int N) 967 { 968 FUNC_START_DEBUG_MSG; 969 970 // only one thread must be allowed to create queues, 971 // and it is only required once 972 scoped_lock lock(initialization_mutex_); 973 if (txcq_!=nullptr || rxcq_!=nullptr || av_!=nullptr) { 974 return; 975 } 976 977 int ret; 978 979 fi_cq_attr cq_attr = {}; 980 // @TODO - why do we check this 981 // if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) { 982 LOG_DEBUG_MSG("Setting CQ attribute to FI_CQ_FORMAT_MSG"); 983 cq_attr.format = FI_CQ_FORMAT_MSG; 984 // } 985 986 // open completion queue on fabric domain and set context ptr to tx queue 987 cq_attr.wait_obj = FI_WAIT_NONE; 988 cq_attr.size = info->tx_attr->size; 989 info->tx_attr->op_flags |= FI_COMPLETION; 990 cq_attr.flags = 0;//|= FI_COMPLETION; 991 LOG_DEBUG_MSG("Creating CQ with tx size " << decnumber(info->tx_attr->size)); 992 ret = fi_cq_open(fabric_domain_, &cq_attr, &txcq_, &txcq_); 993 if (ret) throw fabric_error(ret, "fi_cq_open"); 994 995 // open completion queue on fabric domain and set context ptr to rx queue 996 cq_attr.size = info->rx_attr->size; 997 LOG_DEBUG_MSG("Creating CQ with rx size " << decnumber(info->rx_attr->size)); 998 ret = fi_cq_open(fabric_domain_, &cq_attr, &rxcq_, &rxcq_); 999 if (ret) throw fabric_error(ret, "fi_cq_open"); 1000 1001 1002 fi_av_attr av_attr = {}; 1003 if (info->ep_attr->type == FI_EP_RDM || info->ep_attr->type == FI_EP_DGRAM) { 1004 if (info->domain_attr->av_type != FI_AV_UNSPEC) 1005 av_attr.type = info->domain_attr->av_type; 1006 else { 1007 LOG_DEBUG_MSG("Setting map type to FI_AV_MAP"); 1008 av_attr.type = FI_AV_MAP; 1009 av_attr.count = N; 1010 } 1011 1012 LOG_DEBUG_MSG("Creating address vector "); 1013 ret = fi_av_open(fabric_domain_, &av_attr, &av_, nullptr); 1014 if (ret) throw fabric_error(ret, "fi_av_open"); 1015 } 1016 FUNC_END_DEBUG_MSG; 1017 } 1018 1019 // -------------------------------------------------------------------- 1020 std::pair<bool, hpx::shared_future<struct fid_ep*>> insert_new_future( 1021 uint32_t remote_ip) 1022 { 1023 1024 LOG_DEBUG_MSG("insert_new_future : Obsolete in RDM mode"); 1025 std::terminate(); 1026 1027 LOG_DEBUG_MSG("Inserting future in map " 1028 << ipaddress(here_.ip_address()) << "-> " 1029 << ipaddress(remote_ip) 1030 << "( " << ipaddress(here_.ip_address()) << " )"); 1031 1032 // 1033 hpx::promise<struct fid_ep*> new_endpoint_promise; 1034 hpx::future<struct fid_ep*> new_endpoint_future = 1035 new_endpoint_promise.get_future(); 1036 // 1037 auto fp_pair = std::make_pair( 1038 remote_ip, 1039 std::make_tuple( 1040 std::move(new_endpoint_promise), 1041 std::move(new_endpoint_future))); 1042 // 1043 auto it = endpoint_tmp_.insert(std::move(fp_pair)); 1044 // if the insert failed, we must safely delete the future/promise 1045 if (!it.second) { 1046 LOG_DEBUG_MSG("Must safely delete promise"); 1047 } 1048 1049 // get the future that was inserted or already present 1050 // the future will become ready when remote end accepts/rejects connection 1051 // or we accept a connection from a remote 1052 hpx::shared_future<struct fid_ep*> result = std::get<1>(it.first->second); 1053 1054 // if the insert fails due to a duplicate value, return the duplicate 1055 if (!it.second) { 1056 return std::make_pair(false, result); 1057 } 1058 return std::make_pair(true, result); 1059 } 1060 1061 // -------------------------------------------------------------------- 1062 fi_addr_t insert_address(const locality &remote) 1063 { 1064 FUNC_START_DEBUG_MSG; 1065 LOG_DEBUG_MSG("inserting address in vector " 1066 << ipaddress(remote.ip_address())); 1067 fi_addr_t result = 0xffffffff; 1068 int ret = fi_av_insert(av_, remote.fabric_data(), 1, &result, 0, nullptr); 1069 if (ret < 0) { 1070 fabric_error(ret, "fi_av_insert"); 1071 } 1072 else if (ret != 1) { 1073 fabric_error(ret, "fi_av_insert did not return 1"); 1074 } 1075 uint64_t key = (uint64_t)remote.ip_address() << 32 | remote.port(); 1076 endpoint_av_.insert(std::make_pair(key, result)); 1077 LOG_DEBUG_MSG("Address inserted in vector " 1078 << ipaddress(remote.ip_address()) << ":" 1079 << remote.port() << hexuint64(result)); 1080 FUNC_END_DEBUG_MSG; 1081 return result; 1082 } 1083 1084 // -------------------------------------------------------------------- 1085 hpx::shared_future<struct fid_ep*> connect_to_server(const locality &remote) 1086 { 1087 1088 LOG_DEBUG_MSG("connect_to_server : Obsolete in RDM mode"); 1089 std::terminate(); 1090 1091 const uint32_t &remote_ip = remote.ip_address(); 1092 1093 // Has a connection been started from here already? 1094 // Note: The future must be created before we call fi_connect 1095 // otherwise a connection may complete before the future is setup 1096 auto connection = insert_new_future(remote_ip); 1097 1098 // if a connection is already underway, just return the future 1099 if (!connection.first) { 1100 LOG_DEBUG_MSG("connect to server : returning existing future"); 1101 // the future will become ready when the remote end accepts/rejects 1102 // our connection - or we accept a connection from a remote 1103 return connection.second; 1104 } 1105 1106 // for thread safety, make a copy of the fi_info before setting 1107 // the address in it. fi_freeinfo will free the dest_addr field. 1108 struct fi_info *new_info = fi_dupinfo(fabric_info_); 1109 new_info->dest_addrlen = locality::array_size; 1110 new_info->dest_addr = malloc(locality::array_size); 1111 std::memcpy(new_info->dest_addr, remote.fabric_data(), locality::array_size); 1112 1113 uint64_t flags = 0; 1114 struct fi_info *fabric_info_active_; 1115 int ret = fi_getinfo(FI_VERSION(1,4), nullptr, nullptr, 1116 flags, new_info, &fabric_info_active_); 1117 if (ret) throw fabric_error(ret, "fi_getinfo"); 1118 1119 LOG_DEBUG_MSG("New connection for IP address " 1120 << ipaddress(remote.ip_address()) 1121 << "Fabric info " << fi_tostr(fabric_info_active_, FI_TYPE_INFO)); 1122 create_completion_queues(fabric_info_active_, 0); 1123 1124 fid_ep *new_endpoint; 1125 new_endpoint_active(fabric_info_active_, &new_endpoint); 1126 1127 // now it is safe to call connect 1128 LOG_DEBUG_MSG("Calling fi_connect from " 1129 << ipaddress(here_.ip_address()) << "-> " 1130 << ipaddress(remote.ip_address()) 1131 << "( " << ipaddress(here_.ip_address()) << " )"); 1132 1133 ret = fi_connect(new_endpoint, remote.fabric_data(), nullptr, 0); 1134 if (ret) throw fabric_error(ret, "fi_connect"); 1135 1136 LOG_DEBUG_MSG("Deleting new endpoint info structure"); 1137 fi_freeinfo(fabric_info_active_); 1138 fi_freeinfo(new_info); 1139 1140 return connection.second; 1141 } 1142 1143 void disconnect_all() {} 1144 1145 bool active() { return false; } 1146 1147 private: 1148 // store info about local device 1149 std::string device_; 1150 std::string interface_; 1151 sockaddr_in local_addr_; 1152 1153 // callback functions used for connection event handling 1154 ConnectionFunction connection_function_; 1155 DisconnectionFunction disconnection_function_; 1156 1157 // Pinned memory pool used for allocating buffers 1158 std::unique_ptr<rma_memory_pool<libfabric_region_provider>> memory_pool_; 1159 1160 // Shared completion queue for all endoints 1161 // Count outstanding receives posted to SRQ + Completion queue 1162 std::vector<receiver> receivers_; 1163 1164 // only allow one thread to handle connect/disconnect events etc 1165 mutex_type initialization_mutex_; 1166 mutex_type endpoint_map_mutex_; 1167 mutex_type polling_mutex_; 1168 1169 // used to skip polling event channel too frequently 1170 typedef std::chrono::time_point<std::chrono::system_clock> time_type; 1171 time_type event_check_time_; 1172 uint32_t event_pause_; 1173 1174 }; 1175 1176 // Smart pointer for libfabric_controller obje 1177 typedef std::shared_ptr<libfabric_controller> libfabric_controller_ptr; 1178 1179 }}}} 1180 1181 #endif 1182