1 // Copyright (c) 2016 John Biddiscombe 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See 4 // accompanying file LICENSE_1_0.txt or copy at 5 6 #ifndef HPX_PARCELSET_POLICIES_VERBS_ENDPOINT_HPP 7 #define HPX_PARCELSET_POLICIES_VERBS_ENDPOINT_HPP 8 9 #include <hpx/lcos/promise.hpp> 10 #include <hpx/lcos/future.hpp> 11 // 12 #include <hpx/config/parcelport_defines.hpp> 13 // 14 #include <plugins/parcelport/parcelport_logging.hpp> 15 #include <plugins/parcelport/verbs/rdma/rdma_error.hpp> 16 #include <plugins/parcelport/verbs/rdma/verbs_event_channel.hpp> 17 #include <plugins/parcelport/verbs/rdma/verbs_shared_receive_queue.hpp> 18 #include <plugins/parcelport/verbs/rdma/verbs_sender_receiver.hpp> 19 #include <plugins/parcelport/verbs/rdma/verbs_completion_queue.hpp> 20 #include <plugins/parcelport/verbs/rdma/verbs_memory_region.hpp> 21 #include <plugins/parcelport/verbs/rdma/verbs_protection_domain.hpp> 22 // 23 #include <inttypes.h> 24 #include <atomic> 25 #include <iomanip> 26 #include <iostream> 27 #include <memory> 28 #include <stdexcept> 29 #include <string> 30 31 #define HPX_PARCELPORT_VERBS_MAX_WORK_REQUESTS 1024 32 33 namespace hpx { 34 namespace parcelset { 35 namespace policies { 36 namespace verbs 37 { 38 // Connection for RDMA operations with a remote partner. 39 class verbs_endpoint : public verbs_sender_receiver 40 { 41 public: 42 // --------------------------------------------------------------------------- 43 // enum definition macro that generates logging string conversion 44 // see rdma_logging.hpp for macro 45 DEFINE_ENUM_WITH_STRING_CONVERSIONS(connection_state, 46 (uninitialized) 47 (resolving_address) 48 (address_resolved) 49 (resolving_route) 50 (route_resolved) 51 (connecting) 52 (accepting) 53 (rejecting) 54 (connected) 55 (disconnecting) 56 (disconnected) 57 (terminated) 58 (aborted) 59 ) 60 61 std::atomic<connection_state> state_; 62 63 // --------------------------------------------------------------------------- 64 // This constructor is used to create a local server endpoint. 65 // It can accept incoming connections, or make outgoing ones. 66 // Every node will create one server endpoint, and then for each connection 67 // made to another node a new client endpoint will be constructed. 68 // The endpoint constructed here will represent the local node. verbs_endpoint(struct sockaddr_in local_address)69 verbs_endpoint( 70 struct sockaddr_in local_address) 71 : verbs_sender_receiver(nullptr) 72 { 73 LOG_DEVEL_MSG("verbs_endpoint Listening Server Constructor"); 74 // 75 init(); 76 create_cm_id(); 77 bind(local_address); 78 } 79 80 // --------------------------------------------------------------------------- 81 // This constructor is used when we have received a connection request and 82 // create a client endpoint to represent the remote end of the link. 83 // NB. we only use one CompletionQ, send/recv share the same one. 84 // This could be changed if need arises verbs_endpoint(struct sockaddr_in localAddress,struct rdma_cm_id * cmId,verbs_protection_domain_ptr domain,verbs_completion_queue_ptr CompletionQ,rdma_memory_pool_ptr pool,verbs_shared_receive_queue_ptr SRQ,verbs_event_channel_ptr event_channel)85 verbs_endpoint( 86 struct sockaddr_in localAddress, 87 struct rdma_cm_id *cmId, 88 verbs_protection_domain_ptr domain, 89 verbs_completion_queue_ptr CompletionQ, 90 rdma_memory_pool_ptr pool, 91 verbs_shared_receive_queue_ptr SRQ, 92 verbs_event_channel_ptr event_channel) : 93 verbs_sender_receiver(nullptr) 94 { 95 LOG_DEVEL_MSG("verbs_endpoint receive connection constructor"); 96 // 97 event_channel_ = event_channel; 98 init(); 99 100 // Use the input rdma connection management id. 101 cmId_ = cmId; 102 local_address_ = localAddress; 103 remote_address_ = *(sockaddr_in*)(&cmId->route.addr.dst_addr); 104 srq_ = SRQ; 105 completion_queue_ = CompletionQ; 106 memory_pool_ = pool; 107 domain_ = domain; 108 event_channel_ = event_channel; 109 110 LOG_DEVEL_MSG("endpoint created with CQ " 111 << hexpointer(completion_queue_.get())); 112 113 // Create a queue pair. Both send and receive share a completion queue 114 create_queue_pair(domain_, completion_queue_, completion_queue_, 115 HPX_PARCELPORT_VERBS_MAX_WORK_REQUESTS, false); 116 } 117 118 // --------------------------------------------------------------------------- 119 // This constructor is used when we make/start a new connection to a remote 120 // node (as opposed to when they connect to us). 121 // The constructed endpoint will represent the remote node. verbs_endpoint(struct sockaddr_in localAddress,struct sockaddr_in remoteAddress,verbs_protection_domain_ptr domain,verbs_completion_queue_ptr CompletionQ,rdma_memory_pool_ptr pool,verbs_shared_receive_queue_ptr SRQ,verbs_event_channel_ptr event_channel)122 verbs_endpoint( 123 struct sockaddr_in localAddress, 124 struct sockaddr_in remoteAddress, 125 verbs_protection_domain_ptr domain, 126 verbs_completion_queue_ptr CompletionQ, 127 rdma_memory_pool_ptr pool, 128 verbs_shared_receive_queue_ptr SRQ, 129 verbs_event_channel_ptr event_channel) : 130 verbs_sender_receiver(nullptr) 131 { 132 LOG_DEVEL_MSG("verbs_endpoint create connection constructor " 133 << sockaddress(&localAddress) << "to " 134 << sockaddress(&remoteAddress)); 135 // 136 event_channel_ = event_channel; 137 init(); 138 create_cm_id(); 139 // 140 srq_ = SRQ; 141 completion_queue_ = CompletionQ; 142 memory_pool_ = pool; 143 domain_ = domain; 144 initiated_connection_ = true; 145 146 // resolve ib addresses 147 resolve_address(&localAddress, &remoteAddress, 148 std::chrono::milliseconds(10000)); 149 } 150 151 // --------------------------------------------------------------------------- ~verbs_endpoint(void)152 ~verbs_endpoint(void) 153 { 154 LOG_DEVEL_MSG("reset domain"); 155 domain_.reset(); 156 157 // Destroy the rdma cm id and queue pair. 158 if (cmId_ != nullptr) { 159 if (cmId_->qp != nullptr) { 160 rdma_destroy_qp(cmId_); // No return code 161 LOG_DEVEL_MSG("destroyed queue pair"); 162 } 163 164 if (rdma_destroy_id(cmId_) == 0) { 165 LOG_DEVEL_MSG("destroyed rdma cm id " << cmId_); 166 cmId_ = nullptr; 167 } 168 else { 169 int err = errno; 170 LOG_ERROR_MSG( 171 "error destroying rdma cm id " << cmId_ << ": " 172 << rdma_error::error_string(err)); 173 } 174 } 175 176 LOG_DEVEL_MSG("reset CQ "); 177 completion_queue_.reset(); 178 179 LOG_DEBUG_MSG("releasing memory pool reference"); 180 memory_pool_.reset(); 181 182 // event channel is cleaned up by unique ptr destructor 183 LOG_DEBUG_MSG("destroyed connection"); 184 } 185 186 // --------------------------------------------------------------------------- get_completion_queue(void)187 verbs_completion_queue_ptr& get_completion_queue(void) { 188 return completion_queue_; 189 } 190 191 // --------------------------------------------------------------------------- refill_preposts(unsigned int preposts,bool force=true)192 void refill_preposts(unsigned int preposts, bool force=true) { 193 // LOG_DEBUG_MSG("Entering refill size of waiting receives is " 194 // << decnumber(get_receive_count())); 195 while (get_receive_count() < preposts) { 196 // if the pool has spare small blocks (just use 0 size) then 197 // refill the queues, but don't wait, just abort if none are available 198 if (force || this->memory_pool_->can_allocate_unsafe( 199 this->memory_pool_->small_.chunk_size_)) 200 { 201 LOG_TRACE_MSG("Pre-Posting a receive to client size " 202 << hexnumber(this->memory_pool_->small_.chunk_size_)); 203 verbs_memory_region *region = 204 this->get_free_region( 205 this->memory_pool_->small_.chunk_size_); 206 this->post_recv_region_as_id_counted_srq(region, 207 region->get_size(), getsrq()); 208 } 209 else { 210 LOG_DEVEL_MSG("aborting refill can_allocate_unsafe false"); 211 break; // don't block, if there are no free memory blocks 212 } 213 } 214 } 215 216 // --------------------------------------------------------------------------- set_memory_pool(rdma_memory_pool_ptr pool)217 inline void set_memory_pool(rdma_memory_pool_ptr pool) { 218 this->memory_pool_ = pool; 219 } 220 221 // --------------------------------------------------------------------------- get_free_region(size_t size)222 inline verbs_memory_region *get_free_region(size_t size) 223 { 224 verbs_memory_region* region = this->memory_pool_->allocate_region(size); 225 if (!region) { 226 LOG_ERROR_MSG("Error creating free memory region"); 227 } 228 region->set_message_length(size); 229 230 return region; 231 } 232 233 // --------------------------------------------------------------------------- 234 // Called by server side prior to listening for connections bind(struct sockaddr_in local_address)235 int bind(struct sockaddr_in local_address) 236 { 237 local_address_ = local_address; 238 // Bind address to the listening connection. 239 LOG_DEVEL_MSG("binding " << sockaddress(&local_address_) 240 << "to port " << decnumber(local_address_.sin_port)); 241 // 242 int err = rdma_bind_addr(cmId_, (struct sockaddr *)&local_address_); 243 if (err != 0) { 244 err = abs(err); 245 LOG_ERROR_MSG("error binding to address " 246 << sockaddress(&local_address_) << ": " 247 << rdma_error::error_string(err)); 248 return err; 249 } 250 LOG_DEBUG_MSG("bound rdma cm id to address " << sockaddress(&local_address_)); 251 return 0; 252 } 253 254 // --------------------------------------------------------------------------- 255 // called by server side to enable clients to connect listen(int backlog)256 int listen(int backlog) 257 { 258 // Start listening for connections. 259 int err = rdma_listen(cmId_, backlog); 260 if (err != 0) { 261 err = abs(err); 262 LOG_ERROR_MSG("error listening for connections: " 263 << rdma_error::error_string(err)); 264 return err; 265 } 266 LOG_DEBUG_MSG("listening for connections with backlog " << backlog); 267 return 0; 268 } 269 270 // --------------------------------------------------------------------------- 271 // this poll for event function is used by the main server endpoint when 272 // it is waiting for connection/disconnection requests etc 273 // ack_event, deletes the cm_event data structure allocated by the CM, 274 // so we do not ack and alow the event handler routine to do it 275 template<typename Func> poll_for_event(Func && f)276 int poll_for_event(Func &&f) 277 { 278 return event_channel_->poll_verbs_event_channel( 279 [this, &f]() 280 { 281 struct rdma_cm_event *cm_event; 282 int err = event_channel_->get_event(verbs_event_channel::no_ack_event, 283 nullptr, cm_event); 284 if (err != 0) return 0; 285 return f(cm_event); 286 } 287 ); 288 } 289 290 // --------------------------------------------------------------------------- get_event(verbs_event_channel::event_ack_type ack,rdma_cm_event_type event,struct rdma_cm_event * & cm_event)291 int get_event(verbs_event_channel::event_ack_type ack, 292 rdma_cm_event_type event, struct rdma_cm_event *&cm_event) 293 { 294 return event_channel_->get_event(ack, &event, cm_event); 295 } 296 297 // --------------------------------------------------------------------------- 298 // resolve_address is called before we wish to make a connection to another node. 299 // an address resolved event will be generated once this completes resolve_address(struct sockaddr_in * localAddr,struct sockaddr_in * remoteAddr,std::chrono::milliseconds timeout)300 int resolve_address( 301 struct sockaddr_in *localAddr, 302 struct sockaddr_in *remoteAddr, 303 std::chrono::milliseconds timeout) 304 { 305 // Resolve the addresses. 306 LOG_DEVEL_MSG("resolving remote address " 307 << sockaddress(localAddr) << ": " 308 << sockaddress(remoteAddr)); 309 310 state_ = connection_state::resolving_address; 311 312 // set our port to zero and let it find one 313 localAddr->sin_port = 0 ; 314 int rc = rdma_resolve_addr(cmId_, 315 (struct sockaddr *) localAddr, 316 (struct sockaddr *) remoteAddr, 1000); // Configurable timeout? 317 318 if (rc != 0) { 319 rdma_error e(errno, "rdma_resolve_addr() failed"); 320 LOG_ERROR_MSG("error resolving remote address " 321 << sockaddress(localAddr) << ": " 322 << sockaddress(remoteAddr) << ": " 323 << rdma_error::error_string(e.error_code())); 324 throw e; 325 } 326 327 // Save the addresses. 328 memcpy(&remote_address_, remoteAddr, sizeof(struct sockaddr_in)); 329 if (localAddr != nullptr) { 330 memcpy(&local_address_, localAddr, sizeof(struct sockaddr_in)); 331 } 332 333 LOG_DEVEL_MSG("rdma_resolve_addr " 334 << hexnumber(event_channel_->get_file_descriptor()) << "from " 335 << sockaddress(&local_address_) 336 << "to " << sockaddress(&remote_address_) 337 << "( " << sockaddress(&local_address_) << ")"); 338 339 return 0; 340 } 341 342 // --------------------------------------------------------------------------- 343 // called when we receive address resolved event handle_addr_resolved(struct rdma_cm_event * event)344 int handle_addr_resolved(struct rdma_cm_event *event) 345 { 346 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 347 if (state_!=connection_state::resolving_address) { 348 rdma_error e(0, "invalid state in resolving_address"); 349 std::terminate(); 350 return -1; 351 } 352 verbs_event_channel::ack_event(event); 353 354 // set new state 355 state_ = connection_state::address_resolved; 356 357 LOG_DEVEL_MSG("resolved to " << sockaddress(&remote_address_) 358 << "Current state is " << ToString(state_)); 359 360 // after resolving address, we must resolve route 361 resolve_route(); 362 363 return 0; 364 } 365 366 // --------------------------------------------------------------------------- 367 // after resolving address and before making connection, we must resolve route resolve_route(void)368 int resolve_route(void) 369 { 370 if (state_!=connection_state::address_resolved) { 371 rdma_error e(0, LOG_FORMAT_MSG("invalid state in resolve_route")); 372 std::terminate(); 373 throw e; 374 } 375 376 LOG_DEBUG_MSG("Calling rdma_resolve_route " 377 << "from " << sockaddress(&local_address_) 378 << "to " << sockaddress(&remote_address_) 379 << "( " << sockaddress(&local_address_) << ")"); 380 381 state_ = connection_state::resolving_route; 382 383 // Resolve a route. 384 int rc = rdma_resolve_route(cmId_, 1000); // Configurable timeout? 385 if (rc != 0) { 386 rdma_error e(rc, LOG_FORMAT_MSG("error resolving route to " 387 << sockaddress(&remote_address_) 388 << "from " << sockaddress(&local_address_))); 389 return rc; 390 } 391 return 0; 392 } 393 394 // --------------------------------------------------------------------------- 395 // Handles route_resolved event and starts a connection to the remote endpoint handle_route_resolved(struct rdma_cm_event * event)396 int handle_route_resolved(struct rdma_cm_event *event) 397 { 398 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 399 if (state_!=connection_state::resolving_route) { 400 rdma_error e(0, "invalid state in handle_route_resolved"); 401 std::terminate(); 402 return -1; 403 } 404 state_ = connection_state::route_resolved; 405 verbs_event_channel::ack_event(event); 406 407 LOG_DEBUG_MSG("resolved route to " << sockaddress(&remote_address_) 408 << "Current state is " << ToString(state_)); 409 410 // after resolving route, we must create a queue pair 411 create_queue_pair(domain_, completion_queue_, completion_queue_, 412 HPX_PARCELPORT_VERBS_MAX_WORK_REQUESTS, false); 413 414 // open the connection between this endpoint and the remote endpoint 415 return connect(); 416 } 417 418 // --------------------------------------------------------------------------- 419 // if we start a connection but have to abort it, then this function sets the 420 // aborted state abort()421 int abort() 422 { 423 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 424 if (state_==connection_state::resolving_address || 425 state_==connection_state::address_resolved || 426 state_==connection_state::resolving_route || 427 state_==connection_state::route_resolved || 428 state_==connection_state::connecting || 429 state_==connection_state::terminated) 430 { 431 state_ = connection_state::aborted; 432 } 433 else { 434 rdma_error e(0, "invalid state in abort"); 435 std::terminate(); 436 return -1; 437 } 438 LOG_DEBUG_MSG("Aborted " << sockaddress(&remote_address_) 439 << "Current state is " << ToString(state_)); 440 return 0; 441 } 442 443 // --------------------------------------------------------------------------- accept()444 int accept() 445 { 446 LOG_DEVEL_MSG("Calling rdma_accept from " 447 << sockaddress(&remote_address_) 448 << "to " << sockaddress(&local_address_) 449 << "( " << sockaddress(&local_address_) << ")"); 450 451 // Accept the connection request. 452 struct rdma_conn_param param; 453 memset(¶m, 0, sizeof(param)); 454 param.responder_resources = 1; 455 param.initiator_depth = 1; 456 param.retry_count = 0; // ignored in accept (connect sets it) 457 param.rnr_retry_count = 7; // 7 = special code for infinite retries 458 // 459 int rc = rdma_accept(cmId_, ¶m); 460 if (rc != 0) { 461 int err = errno; 462 LOG_ERROR_MSG( 463 "error accepting connection: " << rdma_error::error_string(err)); 464 return err; 465 } 466 LOG_DEBUG_MSG("accepted connection from client " 467 << sockaddress(&remote_address_)); 468 469 state_ = connection_state::accepting; 470 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 471 return 0; 472 } 473 474 // --------------------------------------------------------------------------- reject(rdma_cm_id * id)475 int reject(rdma_cm_id *id) 476 { 477 // 478 // Debugging code to get ip address of soure/dest of event 479 // NB: The src and dest fields refer to the message - not the connect request 480 // so we are actually receiving a request from dest (but src of the msg) 481 // 482 struct sockaddr *ip_src = &cmId_->route.addr.src_addr; 483 struct sockaddr_in *addr_src = 484 reinterpret_cast<struct sockaddr_in *>(ip_src); 485 // 486 local_address_ = *addr_src; 487 488 LOG_DEVEL_MSG("Calling rdma_reject from " 489 << sockaddress(&remote_address_) 490 << "to " << sockaddress(&local_address_) 491 << "( " << sockaddress(&local_address_) << ")"); 492 493 // Reject a connection request. 494 int err = rdma_reject(id, 0, 0); 495 if (err != 0) { 496 LOG_ERROR_MSG("error rejecting connection on cmid " 497 << hexpointer(id) << rdma_error::error_string(errno)); 498 return -1; 499 } 500 501 LOG_DEVEL_MSG("Rejected connection from new client"); 502 return 0; 503 } 504 505 // --------------------------------------------------------------------------- 506 // Initiate a connection to another node's server endpoint connect()507 int connect() 508 { 509 LOG_DEVEL_MSG("rdma_connect " 510 << hexnumber(event_channel_->get_file_descriptor()) << "from " 511 << sockaddress(&local_address_) 512 << "to " << sockaddress(&remote_address_) 513 << "( " << sockaddress(&local_address_) << ")"); 514 515 // Connect to the server. 516 struct rdma_conn_param param; 517 memset(¶m, 0, sizeof(param)); 518 param.responder_resources = 1; 519 param.initiator_depth = 2; 520 // retry_count * 4.096 x 2 ^ qpattr.timeout microseconds 521 param.retry_count = 0; // retries before ack timeout signals error 522 param.rnr_retry_count = 7; // 7 = special code for infinite retries 523 // 524 int rc = rdma_connect(cmId_, ¶m); 525 if (rc != 0) { 526 int err = errno; 527 LOG_ERROR_MSG("error in rdma_connect to " 528 << sockaddress(&remote_address_) 529 << "from " << sockaddress(&local_address_) 530 << ": " << rdma_error::error_string(err)); 531 return err; 532 } 533 state_ = connection_state::connecting; 534 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 535 return 0; 536 } 537 538 // --------------------------------------------------------------------------- handle_establish(struct rdma_cm_event * event)539 int handle_establish(struct rdma_cm_event *event) 540 { 541 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 542 if (event->event == RDMA_CM_EVENT_ESTABLISHED) { 543 if (state_!=connection_state::connecting && 544 state_!=connection_state::accepting) 545 { 546 rdma_error e(0, "invalid state in handle_establish for " 547 "RDMA_CM_EVENT_ESTABLISHED"); 548 std::terminate(); 549 return -1; 550 } 551 state_ = connection_state::connected; 552 verbs_event_channel::ack_event(event); 553 LOG_DEVEL_MSG("connected to " << sockaddress(&remote_address_) 554 << "Current state is " << ToString(state_)); 555 556 /* 557 struct ibv_qp_attr attr; 558 struct ibv_qp_init_attr init_attr; 559 // 560 if (ibv_query_qp(cmId_->qp, &attr, 561 IBV_QP_STATE | IBV_QP_TIMEOUT, &init_attr)) { 562 LOG_DEVEL_MSG("Failed to query QP state\n"); 563 std::terminate(); 564 return -1; 565 } 566 LOG_DEVEL_MSG("Current state is " << attr.qp_state); 567 LOG_DEVEL_MSG("Current timeout is " << int(attr.timeout)); 568 569 // set retry counter timeout value 570 // 4.096 x 2 ^ attr.timeout microseconds 571 attr.timeout = 13; // 8589935 usec (8.58 sec); 572 // 573 LOG_DEVEL_MSG("Modifying qp " << decnumber(cmId_->qp->qp_num)); 574 if (ibv_modify_qp(cmId_->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT)) 575 { 576 rdma_error e(errno, 577 LOG_FORMAT_MSG("Failed to set QP timeout : qp " 578 << decnumber(cmId_->qp->qp_num))); 579 throw e; 580 } 581 */ 582 583 } 584 else if (event->event == RDMA_CM_EVENT_REJECTED) { 585 if (state_!=connection_state::aborted && 586 state_!=connection_state::connecting) 587 { 588 rdma_error e(0, "invalid state in handle_establish for " 589 "RDMA_CM_EVENT_REJECTED"); 590 std::terminate(); 591 return -1; 592 } 593 LOG_DEVEL_MSG("2: Connection rejected for " 594 << sockaddress(&remote_address_) 595 << "( " << sockaddress(&local_address_) << ")"); 596 verbs_event_channel::ack_event(event); 597 state_ = connection_state::terminated; 598 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 599 return -2; 600 } 601 return 0; 602 } 603 604 // --------------------------------------------------------------------------- disconnect()605 int disconnect() 606 { 607 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 608 if (state_!=connection_state::connected && 609 state_!=connection_state::aborted && 610 state_!=connection_state::terminated) 611 { 612 rdma_error e(0, LOG_FORMAT_MSG("invalid state in disconnect ")); 613 std::terminate(); 614 throw e; 615 } 616 state_ = connection_state::disconnecting; 617 618 LOG_DEVEL_MSG("Sending disconnect to " << sockaddress(&remote_address_) 619 << "Current state is " << ToString(state_)); 620 621 // Disconnect the connection. 622 int err = rdma_disconnect(cmId_); 623 if (err != 0) { 624 err = abs(err); 625 LOG_ERROR_MSG( 626 "error disconnect: " << rdma_error::error_string(err)); 627 return err; 628 } 629 630 return 0; 631 } 632 633 // --------------------------------------------------------------------------- handle_disconnect(struct rdma_cm_event * event)634 int handle_disconnect(struct rdma_cm_event *event) 635 { 636 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 637 if (state_!=connection_state::disconnecting && 638 state_!=connection_state::terminated && 639 state_!=connection_state::connected) 640 { 641 rdma_error e(0, "invalid state in handle_disconnect"); 642 std::terminate(); 643 return -1; 644 } 645 646 state_ = connection_state::disconnected; 647 verbs_event_channel::ack_event(event); 648 649 flush(); 650 651 LOG_DEBUG_MSG("Disconnected " 652 << "from " << sockaddress(&remote_address_) 653 << "( " << sockaddress(&local_address_) << ") " 654 << "Current state is " << ToString(state_)); 655 656 return 0; 657 } 658 659 // --------------------------------------------------------------------------- handle_time_wait_exit(struct rdma_cm_event * event)660 int handle_time_wait_exit(struct rdma_cm_event *event) 661 { 662 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 663 if (state_!=connection_state::disconnecting && 664 state_!=connection_state::terminated && 665 state_!=connection_state::connected) 666 { 667 rdma_error e(0, "invalid state in handle_disconnect"); 668 return -1; 669 } 670 671 state_ = connection_state::disconnected; 672 verbs_event_channel::ack_event(event); 673 674 flush(); 675 676 LOG_DEBUG_MSG("Time Wait Exit " 677 << "from " << sockaddress(&remote_address_) 678 << "( " << sockaddress(&local_address_) << ") " 679 << "Current state is " << ToString(state_)); 680 return 0; 681 } 682 683 // --------------------------------------------------------------------------- create_srq(verbs_protection_domain_ptr domain)684 int create_srq(verbs_protection_domain_ptr domain) 685 { 686 try { 687 srq_ = std::make_shared<verbs_shared_receive_queue>(domain); 688 } 689 catch (...) { 690 return 0; 691 } 692 return 1; 693 } 694 695 // --------------------------------------------------------------------------- get_qp_num(void) const696 uint32_t get_qp_num(void) const { 697 return cmId_->qp ? cmId_->qp->qp_num : 0; 698 } 699 700 // --------------------------------------------------------------------------- get_device_context(void) const701 struct ibv_context *get_device_context(void) const { 702 return cmId_->verbs; 703 } 704 705 // --------------------------------------------------------------------------- get_local_ip_address(void) const706 inline uint32_t get_local_ip_address(void) const { 707 return local_address_.sin_addr.s_addr; 708 } 709 710 // --------------------------------------------------------------------------- get_local_port(void)711 inline in_port_t get_local_port(void) { 712 if (local_address_.sin_port == 0) { 713 local_address_.sin_port = rdma_get_src_port(cmId_); 714 } 715 return local_address_.sin_port; 716 } 717 718 // --------------------------------------------------------------------------- get_remote_address(void)719 inline struct sockaddr_in *get_remote_address(void) { 720 return &remote_address_; 721 } 722 723 // --------------------------------------------------------------------------- get_remote_ip_address(void) const724 inline uint32_t get_remote_ip_address(void) const { 725 return remote_address_.sin_addr.s_addr; 726 } 727 728 // --------------------------------------------------------------------------- get_remote_port(void) const729 inline in_port_t get_remote_port(void) const { 730 return remote_address_.sin_port; 731 } 732 733 // --------------------------------------------------------------------------- SRQ()734 inline verbs_shared_receive_queue_ptr SRQ() { 735 return srq_; 736 } 737 738 // --------------------------------------------------------------------------- getsrq() const739 virtual inline struct ibv_srq *getsrq() const { 740 return srq_ ? srq_->getsrq() : nullptr; 741 } 742 743 // --------------------------------------------------------------------------- is_client_endpoint(void) const744 inline bool is_client_endpoint(void) const { 745 return initiated_connection_; 746 } 747 748 // --------------------------------------------------------------------------- get_state(void) const749 connection_state get_state(void) const { 750 return state_; 751 } 752 753 // --------------------------------------------------------------------------- set_state(connection_state s)754 void set_state(connection_state s) { 755 state_ = s; 756 } 757 758 // --------------------------------------------------------------------------- get_event_channel(void) const759 verbs_event_channel_ptr get_event_channel(void) const { 760 return event_channel_; 761 } 762 763 // --------------------------------------------------------------------------- 764 // Transition the qp to an error state flush()765 void flush() 766 { 767 // do noting if the qp was never created 768 if (!cmId_->qp) { 769 return; 770 } 771 // 772 state_ = connection_state::disconnected; 773 // 774 struct ibv_qp_attr attr; 775 memset(&attr, 0, sizeof(attr)); 776 attr.qp_state = IBV_QPS_ERR; 777 // 778 LOG_DEVEL_MSG("Flushing qp " << decnumber(cmId_->qp->qp_num)); 779 if (ibv_modify_qp(cmId_->qp, &attr, IBV_QP_STATE)) 780 { 781 rdma_error e(errno, 782 LOG_FORMAT_MSG("Failed to flush qp " 783 << decnumber(cmId_->qp->qp_num))); 784 throw e; 785 } 786 LOG_DEVEL_MSG("Current state is (flushed) " << ToString(state_)); 787 } 788 789 protected: 790 791 // --------------------------------------------------------------------------- init(void)792 void init(void) 793 { 794 // Initialize private data. 795 memset(&local_address_, 0, sizeof(local_address_)); 796 memset(&remote_address_, 0, sizeof(remote_address_)); 797 if (!event_channel_) { 798 event_channel_ = std::make_shared<verbs_event_channel>(); 799 } 800 clear_counters(); 801 initiated_connection_ = false; 802 state_ = connection_state::uninitialized; 803 LOG_DEVEL_MSG("Current state is " << ToString(state_)); 804 return; 805 } 806 807 // --------------------------------------------------------------------------- create_cm_id(void)808 void create_cm_id(void) 809 { 810 // Create the event channel. 811 if (!event_channel_->get_verbs_event_channel()) { 812 event_channel_->create_channel(); 813 } 814 // Create the rdma cm id. 815 LOG_DEVEL_MSG("Creating cmid with event channel " 816 << hexnumber(event_channel_->get_file_descriptor())); 817 int err = rdma_create_id( 818 event_channel_->get_verbs_event_channel(), &cmId_, this, RDMA_PS_TCP); 819 if (err != 0) { 820 rdma_error e(err, "rdma_create_id() failed"); 821 LOG_ERROR_MSG( 822 "error creating rdma cm id: " << rdma_error::error_string(err)); 823 throw e; 824 } 825 LOG_DEVEL_MSG("created rdma cm id " << cmId_); 826 } 827 828 // --------------------------------------------------------------------------- create_queue_pair(verbs_protection_domain_ptr domain,verbs_completion_queue_ptr sendCompletionQ,verbs_completion_queue_ptr recvCompletionQ,uint32_t maxWorkRequests,bool signalSendQueue)829 void create_queue_pair(verbs_protection_domain_ptr domain, 830 verbs_completion_queue_ptr sendCompletionQ, 831 verbs_completion_queue_ptr recvCompletionQ, 832 uint32_t maxWorkRequests, bool signalSendQueue) 833 { 834 // Create a queue pair. 835 struct ibv_qp_init_attr qpAttributes; 836 memset(&qpAttributes, 0, sizeof qpAttributes); 837 qpAttributes.cap.max_send_wr = maxWorkRequests; 838 qpAttributes.cap.max_recv_wr = maxWorkRequests; 839 qpAttributes.cap.max_send_sge = 3; // 6; 840 qpAttributes.cap.max_recv_sge = 3; // 6; 841 qpAttributes.qp_context = this; // Save this pointer 842 qpAttributes.sq_sig_all = signalSendQueue; 843 qpAttributes.qp_type = IBV_QPT_RC; 844 qpAttributes.send_cq = sendCompletionQ->getQueue(); 845 qpAttributes.recv_cq = recvCompletionQ->getQueue(); 846 LOG_DEVEL_MSG("Setting SRQ to " << getsrq()); 847 qpAttributes.srq = getsrq(); 848 // 849 int rc = rdma_create_qp(cmId_, domain->getDomain(), &qpAttributes); 850 if (rc != 0) { 851 rdma_error e(errno, "rdma_create_qp() failed"); 852 LOG_ERROR_MSG( 853 "error creating queue pair: " << hexpointer(this) 854 "local address " << sockaddress(&local_address_) 855 << " remote address " << sockaddress(&remote_address_) 856 << rdma_error::error_string(e.error_code())); 857 throw e; 858 } 859 860 LOG_DEVEL_MSG("created queue pair " << decnumber(cmId_->qp->qp_num) 861 << "max inline data is " << hexnumber(qpAttributes.cap.max_inline_data)); 862 863 return; 864 } 865 866 rdma_memory_pool_ptr memory_pool_; 867 868 verbs_shared_receive_queue_ptr srq_; 869 870 // Event channel for notification of RDMA connection management events. 871 verbs_event_channel_ptr event_channel_; 872 873 // Address of this (local) side of the connection. 874 struct sockaddr_in local_address_; 875 876 // Address of other (remote) side of the connection. 877 struct sockaddr_in remote_address_; 878 879 // if the client connected to the server, then set this flag so that 880 // at shutdown, we use the correct flag for disconnect(mode) 881 bool initiated_connection_; 882 883 // Memory region for inbound messages. 884 verbs_protection_domain_ptr domain_; 885 886 // Completion queue. 887 verbs_completion_queue_ptr completion_queue_; 888 }; 889 890 // Smart pointer for verbs_endpoint object. 891 typedef std::shared_ptr<verbs_endpoint> verbs_endpoint_ptr; 892 893 }}}} 894 895 #endif 896