1 // Copyright (c) 2016 John Biddiscombe
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See
4 // accompanying file LICENSE_1_0.txt or copy at
5 
6 #ifndef HPX_PARCELSET_POLICIES_VERBS_ENDPOINT_HPP
7 #define HPX_PARCELSET_POLICIES_VERBS_ENDPOINT_HPP
8 
9 #include <hpx/lcos/promise.hpp>
10 #include <hpx/lcos/future.hpp>
11 //
12 #include <hpx/config/parcelport_defines.hpp>
13 //
14 #include <plugins/parcelport/parcelport_logging.hpp>
15 #include <plugins/parcelport/verbs/rdma/rdma_error.hpp>
16 #include <plugins/parcelport/verbs/rdma/verbs_event_channel.hpp>
17 #include <plugins/parcelport/verbs/rdma/verbs_shared_receive_queue.hpp>
18 #include <plugins/parcelport/verbs/rdma/verbs_sender_receiver.hpp>
19 #include <plugins/parcelport/verbs/rdma/verbs_completion_queue.hpp>
20 #include <plugins/parcelport/verbs/rdma/verbs_memory_region.hpp>
21 #include <plugins/parcelport/verbs/rdma/verbs_protection_domain.hpp>
22 //
23 #include <inttypes.h>
24 #include <atomic>
25 #include <iomanip>
26 #include <iostream>
27 #include <memory>
28 #include <stdexcept>
29 #include <string>
30 
31 #define HPX_PARCELPORT_VERBS_MAX_WORK_REQUESTS 1024
32 
33 namespace hpx {
34 namespace parcelset {
35 namespace policies {
36 namespace verbs
37 {
38     // Connection for RDMA operations with a remote partner.
39     class verbs_endpoint : public verbs_sender_receiver
40     {
41     public:
42         // ---------------------------------------------------------------------------
43         // enum definition macro that generates logging string conversion
44         // see rdma_logging.hpp for macro
45         DEFINE_ENUM_WITH_STRING_CONVERSIONS(connection_state,
46             (uninitialized)
47             (resolving_address)
48             (address_resolved)
49             (resolving_route)
50             (route_resolved)
51             (connecting)
52             (accepting)
53             (rejecting)
54             (connected)
55             (disconnecting)
56             (disconnected)
57             (terminated)
58             (aborted)
59         )
60 
61         std::atomic<connection_state> state_;
62 
63         // ---------------------------------------------------------------------------
64         // This constructor is used to create a local server endpoint.
65         // It can accept incoming connections, or make outgoing ones.
66         // Every node will create one server endpoint, and then for each connection
67         // made to another node a new client endpoint will be constructed.
68         // The endpoint constructed here will represent the local node.
verbs_endpoint(struct sockaddr_in local_address)69         verbs_endpoint(
70             struct sockaddr_in local_address)
71             : verbs_sender_receiver(nullptr)
72          {
73             LOG_DEVEL_MSG("verbs_endpoint Listening Server Constructor");
74             //
75             init();
76             create_cm_id();
77             bind(local_address);
78          }
79 
80         // ---------------------------------------------------------------------------
81         // This constructor is used when we have received a connection request and
82         // create a client endpoint to represent the remote end of the link.
83         // NB. we only use one CompletionQ, send/recv share the same one.
84         // This could be changed if need arises
verbs_endpoint(struct sockaddr_in localAddress,struct rdma_cm_id * cmId,verbs_protection_domain_ptr domain,verbs_completion_queue_ptr CompletionQ,rdma_memory_pool_ptr pool,verbs_shared_receive_queue_ptr SRQ,verbs_event_channel_ptr event_channel)85         verbs_endpoint(
86             struct sockaddr_in localAddress,
87             struct rdma_cm_id *cmId,
88             verbs_protection_domain_ptr domain,
89             verbs_completion_queue_ptr CompletionQ,
90             rdma_memory_pool_ptr pool,
91             verbs_shared_receive_queue_ptr SRQ,
92             verbs_event_channel_ptr event_channel) :
93                 verbs_sender_receiver(nullptr)
94         {
95             LOG_DEVEL_MSG("verbs_endpoint receive connection constructor");
96             //
97             event_channel_    = event_channel;
98             init();
99 
100             // Use the input rdma connection management id.
101             cmId_             = cmId;
102             local_address_    = localAddress;
103             remote_address_   = *(sockaddr_in*)(&cmId->route.addr.dst_addr);
104             srq_              = SRQ;
105             completion_queue_ = CompletionQ;
106             memory_pool_      = pool;
107             domain_           = domain;
108             event_channel_    = event_channel;
109 
110             LOG_DEVEL_MSG("endpoint created with CQ "
111                 << hexpointer(completion_queue_.get()));
112 
113             // Create a queue pair. Both send and receive share a completion queue
114             create_queue_pair(domain_, completion_queue_, completion_queue_,
115                 HPX_PARCELPORT_VERBS_MAX_WORK_REQUESTS, false);
116         }
117 
118         // ---------------------------------------------------------------------------
119         // This constructor is used when we make/start a new connection to a remote
120         // node (as opposed to when they connect to us).
121         // The constructed endpoint will represent the remote node.
verbs_endpoint(struct sockaddr_in localAddress,struct sockaddr_in remoteAddress,verbs_protection_domain_ptr domain,verbs_completion_queue_ptr CompletionQ,rdma_memory_pool_ptr pool,verbs_shared_receive_queue_ptr SRQ,verbs_event_channel_ptr event_channel)122         verbs_endpoint(
123             struct sockaddr_in localAddress,
124             struct sockaddr_in remoteAddress,
125             verbs_protection_domain_ptr domain,
126             verbs_completion_queue_ptr CompletionQ,
127             rdma_memory_pool_ptr pool,
128             verbs_shared_receive_queue_ptr SRQ,
129             verbs_event_channel_ptr event_channel) :
130                 verbs_sender_receiver(nullptr)
131         {
132             LOG_DEVEL_MSG("verbs_endpoint create connection constructor "
133                 << sockaddress(&localAddress) << "to "
134                 << sockaddress(&remoteAddress));
135             //
136             event_channel_        = event_channel;
137             init();
138             create_cm_id();
139             //
140             srq_                  = SRQ;
141             completion_queue_     = CompletionQ;
142             memory_pool_          = pool;
143             domain_               = domain;
144             initiated_connection_ = true;
145 
146             // resolve ib addresses
147             resolve_address(&localAddress, &remoteAddress,
148                 std::chrono::milliseconds(10000));
149         }
150 
151         // ---------------------------------------------------------------------------
~verbs_endpoint(void)152         ~verbs_endpoint(void)
153         {
154             LOG_DEVEL_MSG("reset domain");
155             domain_.reset();
156 
157             // Destroy the rdma cm id and queue pair.
158             if (cmId_ != nullptr) {
159                 if (cmId_->qp != nullptr) {
160                     rdma_destroy_qp(cmId_); // No return code
161                     LOG_DEVEL_MSG("destroyed queue pair");
162                 }
163 
164                 if (rdma_destroy_id(cmId_) == 0) {
165                     LOG_DEVEL_MSG("destroyed rdma cm id " << cmId_);
166                     cmId_ = nullptr;
167                 }
168                 else {
169                     int err = errno;
170                     LOG_ERROR_MSG(
171                         "error destroying rdma cm id " << cmId_ << ": "
172                         << rdma_error::error_string(err));
173                 }
174             }
175 
176             LOG_DEVEL_MSG("reset CQ ");
177             completion_queue_.reset();
178 
179             LOG_DEBUG_MSG("releasing memory pool reference");
180             memory_pool_.reset();
181 
182             // event channel is cleaned up by unique ptr destructor
183             LOG_DEBUG_MSG("destroyed connection");
184         }
185 
186         // ---------------------------------------------------------------------------
get_completion_queue(void)187         verbs_completion_queue_ptr& get_completion_queue(void) {
188             return completion_queue_;
189         }
190 
191         // ---------------------------------------------------------------------------
refill_preposts(unsigned int preposts,bool force=true)192         void refill_preposts(unsigned int preposts, bool force=true) {
193             //            LOG_DEBUG_MSG("Entering refill size of waiting receives is "
194             //                << decnumber(get_receive_count()));
195             while (get_receive_count() < preposts) {
196                 // if the pool has spare small blocks (just use 0 size) then
197                 // refill the queues, but don't wait, just abort if none are available
198                 if (force || this->memory_pool_->can_allocate_unsafe(
199                     this->memory_pool_->small_.chunk_size_))
200                 {
201                     LOG_TRACE_MSG("Pre-Posting a receive to client size "
202                         << hexnumber(this->memory_pool_->small_.chunk_size_));
203                     verbs_memory_region *region =
204                         this->get_free_region(
205                             this->memory_pool_->small_.chunk_size_);
206                     this->post_recv_region_as_id_counted_srq(region,
207                         region->get_size(), getsrq());
208                 }
209                 else {
210                     LOG_DEVEL_MSG("aborting refill can_allocate_unsafe false");
211                     break; // don't block, if there are no free memory blocks
212                 }
213             }
214         }
215 
216         // ---------------------------------------------------------------------------
set_memory_pool(rdma_memory_pool_ptr pool)217         inline void set_memory_pool(rdma_memory_pool_ptr pool) {
218             this->memory_pool_ = pool;
219         }
220 
221         // ---------------------------------------------------------------------------
get_free_region(size_t size)222         inline verbs_memory_region *get_free_region(size_t size)
223         {
224             verbs_memory_region* region = this->memory_pool_->allocate_region(size);
225             if (!region) {
226                 LOG_ERROR_MSG("Error creating free memory region");
227             }
228             region->set_message_length(size);
229 
230             return region;
231         }
232 
233         // ---------------------------------------------------------------------------
234         // Called by server side prior to listening for connections
bind(struct sockaddr_in local_address)235         int bind(struct sockaddr_in local_address)
236         {
237            local_address_ = local_address;
238            // Bind address to the listening connection.
239            LOG_DEVEL_MSG("binding " << sockaddress(&local_address_)
240                << "to port " << decnumber(local_address_.sin_port));
241            //
242            int err = rdma_bind_addr(cmId_, (struct sockaddr *)&local_address_);
243            if (err != 0) {
244               err = abs(err);
245               LOG_ERROR_MSG("error binding to address "
246                   << sockaddress(&local_address_) << ": "
247                   << rdma_error::error_string(err));
248               return err;
249            }
250            LOG_DEBUG_MSG("bound rdma cm id to address " << sockaddress(&local_address_));
251            return 0;
252         }
253 
254         // ---------------------------------------------------------------------------
255         // called by server side to enable clients to connect
listen(int backlog)256         int listen(int backlog)
257         {
258            // Start listening for connections.
259            int err = rdma_listen(cmId_, backlog);
260            if (err != 0) {
261               err = abs(err);
262               LOG_ERROR_MSG("error listening for connections: "
263                   << rdma_error::error_string(err));
264               return err;
265            }
266            LOG_DEBUG_MSG("listening for connections with backlog " << backlog);
267            return 0;
268         }
269 
270         // ---------------------------------------------------------------------------
271         // this poll for event function is used by the main server endpoint when
272         // it is waiting for connection/disconnection requests etc
273         // ack_event, deletes the cm_event data structure allocated by the CM,
274         // so we do not ack and alow the event handler routine to do it
275         template<typename Func>
poll_for_event(Func && f)276         int poll_for_event(Func &&f)
277         {
278             return event_channel_->poll_verbs_event_channel(
279                 [this, &f]()
280                 {
281                     struct rdma_cm_event *cm_event;
282                     int err = event_channel_->get_event(verbs_event_channel::no_ack_event,
283                         nullptr, cm_event);
284                     if (err != 0) return 0;
285                     return f(cm_event);
286                 }
287             );
288         }
289 
290         // ---------------------------------------------------------------------------
get_event(verbs_event_channel::event_ack_type ack,rdma_cm_event_type event,struct rdma_cm_event * & cm_event)291         int get_event(verbs_event_channel::event_ack_type ack,
292             rdma_cm_event_type event, struct rdma_cm_event *&cm_event)
293         {
294             return event_channel_->get_event(ack, &event, cm_event);
295         }
296 
297         // ---------------------------------------------------------------------------
298         // resolve_address is called before we wish to make a connection to another node.
299         // an address resolved event will be generated once this completes
resolve_address(struct sockaddr_in * localAddr,struct sockaddr_in * remoteAddr,std::chrono::milliseconds timeout)300         int resolve_address(
301             struct sockaddr_in *localAddr,
302             struct sockaddr_in *remoteAddr,
303             std::chrono::milliseconds timeout)
304         {
305             // Resolve the addresses.
306             LOG_DEVEL_MSG("resolving remote address "
307                 << sockaddress(localAddr) << ": "
308                 << sockaddress(remoteAddr));
309 
310             state_ = connection_state::resolving_address;
311 
312             // set our port to zero and let it find one
313             localAddr->sin_port = 0 ;
314             int rc = rdma_resolve_addr(cmId_,
315                 (struct sockaddr *) localAddr,
316                 (struct sockaddr *) remoteAddr, 1000); // Configurable timeout?
317 
318             if (rc != 0) {
319                 rdma_error e(errno, "rdma_resolve_addr() failed");
320                 LOG_ERROR_MSG("error resolving remote address "
321                     << sockaddress(localAddr) << ": "
322                     << sockaddress(remoteAddr) << ": "
323                     << rdma_error::error_string(e.error_code()));
324                 throw e;
325             }
326 
327             // Save the addresses.
328             memcpy(&remote_address_, remoteAddr, sizeof(struct sockaddr_in));
329             if (localAddr != nullptr) {
330                 memcpy(&local_address_, localAddr, sizeof(struct sockaddr_in));
331             }
332 
333             LOG_DEVEL_MSG("rdma_resolve_addr     "
334                 << hexnumber(event_channel_->get_file_descriptor()) << "from "
335                 << sockaddress(&local_address_)
336                 << "to " << sockaddress(&remote_address_)
337                 << "( " << sockaddress(&local_address_) << ")");
338 
339             return 0;
340         }
341 
342         // ---------------------------------------------------------------------------
343         // called when we receive address resolved event
handle_addr_resolved(struct rdma_cm_event * event)344         int handle_addr_resolved(struct rdma_cm_event *event)
345         {
346             LOG_DEVEL_MSG("Current state is " << ToString(state_));
347             if (state_!=connection_state::resolving_address) {
348                 rdma_error e(0, "invalid state in resolving_address");
349                 std::terminate();
350                 return -1;
351             }
352             verbs_event_channel::ack_event(event);
353 
354             // set new state
355             state_ = connection_state::address_resolved;
356 
357             LOG_DEVEL_MSG("resolved to " << sockaddress(&remote_address_)
358                 << "Current state is " << ToString(state_));
359 
360             // after resolving address, we must resolve route
361             resolve_route();
362 
363             return 0;
364         }
365 
366         // ---------------------------------------------------------------------------
367         // after resolving address and before making connection, we must resolve route
resolve_route(void)368         int resolve_route(void)
369         {
370             if (state_!=connection_state::address_resolved) {
371                 rdma_error e(0, LOG_FORMAT_MSG("invalid state in resolve_route"));
372                 std::terminate();
373                 throw e;
374             }
375 
376             LOG_DEBUG_MSG("Calling rdma_resolve_route   "
377                 << "from " << sockaddress(&local_address_)
378                 << "to " << sockaddress(&remote_address_)
379                 << "( " << sockaddress(&local_address_) << ")");
380 
381             state_ = connection_state::resolving_route;
382 
383             // Resolve a route.
384             int rc = rdma_resolve_route(cmId_, 1000); // Configurable timeout?
385             if (rc != 0) {
386                 rdma_error e(rc, LOG_FORMAT_MSG("error resolving route to "
387                     << sockaddress(&remote_address_)
388                     << "from " << sockaddress(&local_address_)));
389                 return rc;
390             }
391             return 0;
392         }
393 
394         // ---------------------------------------------------------------------------
395         // Handles route_resolved event and starts a connection to the remote endpoint
handle_route_resolved(struct rdma_cm_event * event)396         int handle_route_resolved(struct rdma_cm_event *event)
397         {
398             LOG_DEVEL_MSG("Current state is " << ToString(state_));
399             if (state_!=connection_state::resolving_route) {
400                 rdma_error e(0, "invalid state in handle_route_resolved");
401                 std::terminate();
402                 return -1;
403             }
404             state_ = connection_state::route_resolved;
405             verbs_event_channel::ack_event(event);
406 
407             LOG_DEBUG_MSG("resolved route to " << sockaddress(&remote_address_)
408                 << "Current state is " << ToString(state_));
409 
410             // after resolving route, we must create a queue pair
411             create_queue_pair(domain_, completion_queue_, completion_queue_,
412                 HPX_PARCELPORT_VERBS_MAX_WORK_REQUESTS, false);
413 
414             // open the connection between this endpoint and the remote endpoint
415             return connect();
416         }
417 
418         // ---------------------------------------------------------------------------
419         // if we start a connection but have to abort it, then this function sets the
420         // aborted state
abort()421         int abort()
422         {
423             LOG_DEVEL_MSG("Current state is " << ToString(state_));
424             if (state_==connection_state::resolving_address ||
425                 state_==connection_state::address_resolved ||
426                 state_==connection_state::resolving_route ||
427                 state_==connection_state::route_resolved ||
428                 state_==connection_state::connecting ||
429                 state_==connection_state::terminated)
430             {
431                 state_ = connection_state::aborted;
432             }
433             else {
434                 rdma_error e(0, "invalid state in abort");
435                 std::terminate();
436                 return -1;
437             }
438             LOG_DEBUG_MSG("Aborted " << sockaddress(&remote_address_)
439                 << "Current state is " << ToString(state_));
440             return 0;
441         }
442 
443         // ---------------------------------------------------------------------------
accept()444         int accept()
445         {
446             LOG_DEVEL_MSG("Calling rdma_accept          from "
447                 << sockaddress(&remote_address_)
448                 << "to " << sockaddress(&local_address_)
449                 << "( " << sockaddress(&local_address_) << ")");
450 
451             // Accept the connection request.
452             struct rdma_conn_param param;
453             memset(&param, 0, sizeof(param));
454             param.responder_resources = 1;
455             param.initiator_depth     = 1;
456             param.retry_count         = 0;  // ignored in accept (connect sets it)
457             param.rnr_retry_count     = 7;  // 7 = special code for infinite retries
458             //
459             int rc = rdma_accept(cmId_, &param);
460             if (rc != 0) {
461                 int err = errno;
462                 LOG_ERROR_MSG(
463                     "error accepting connection: " << rdma_error::error_string(err));
464                 return err;
465             }
466             LOG_DEBUG_MSG("accepted connection from client "
467                 << sockaddress(&remote_address_));
468 
469             state_ = connection_state::accepting;
470             LOG_DEVEL_MSG("Current state is " << ToString(state_));
471             return 0;
472         }
473 
474         // ---------------------------------------------------------------------------
reject(rdma_cm_id * id)475         int reject(rdma_cm_id *id)
476         {
477             //
478             // Debugging code to get ip address of soure/dest of event
479             // NB: The src and dest fields refer to the message - not the connect request
480             // so we are actually receiving a request from dest (but src of the msg)
481             //
482             struct sockaddr *ip_src = &cmId_->route.addr.src_addr;
483             struct sockaddr_in *addr_src =
484                 reinterpret_cast<struct sockaddr_in *>(ip_src);
485             //
486             local_address_ = *addr_src;
487 
488             LOG_DEVEL_MSG("Calling rdma_reject          from "
489                 << sockaddress(&remote_address_)
490                 << "to " << sockaddress(&local_address_)
491                 << "( " << sockaddress(&local_address_) << ")");
492 
493             // Reject a connection request.
494             int err = rdma_reject(id, 0, 0);
495             if (err != 0) {
496                 LOG_ERROR_MSG("error rejecting connection on cmid "
497                     << hexpointer(id) << rdma_error::error_string(errno));
498                 return -1;
499             }
500 
501             LOG_DEVEL_MSG("Rejected connection from new client");
502             return 0;
503         }
504 
505         // ---------------------------------------------------------------------------
506         // Initiate a connection to another node's server endpoint
connect()507         int connect()
508         {
509             LOG_DEVEL_MSG("rdma_connect          "
510                 << hexnumber(event_channel_->get_file_descriptor()) << "from "
511                 << sockaddress(&local_address_)
512                 << "to " << sockaddress(&remote_address_)
513                 << "( " << sockaddress(&local_address_) << ")");
514 
515             // Connect to the server.
516             struct rdma_conn_param param;
517             memset(&param, 0, sizeof(param));
518             param.responder_resources = 1;
519             param.initiator_depth = 2;
520             // retry_count * 4.096 x 2 ^ qpattr.timeout microseconds
521             param.retry_count     = 0;  // retries before ack timeout signals error
522             param.rnr_retry_count = 7;  // 7 = special code for infinite retries
523             //
524             int rc = rdma_connect(cmId_, &param);
525             if (rc != 0) {
526                 int err = errno;
527                 LOG_ERROR_MSG("error in rdma_connect to "
528                     << sockaddress(&remote_address_)
529                     << "from " << sockaddress(&local_address_)
530                     << ": " << rdma_error::error_string(err));
531                 return err;
532             }
533             state_ = connection_state::connecting;
534             LOG_DEVEL_MSG("Current state is " << ToString(state_));
535             return 0;
536         }
537 
538         // ---------------------------------------------------------------------------
handle_establish(struct rdma_cm_event * event)539         int handle_establish(struct rdma_cm_event *event)
540         {
541             LOG_DEVEL_MSG("Current state is " << ToString(state_));
542             if (event->event == RDMA_CM_EVENT_ESTABLISHED) {
543                 if (state_!=connection_state::connecting &&
544                     state_!=connection_state::accepting)
545                 {
546                     rdma_error e(0, "invalid state in handle_establish for "
547                         "RDMA_CM_EVENT_ESTABLISHED");
548                     std::terminate();
549                     return -1;
550                 }
551                 state_ = connection_state::connected;
552                 verbs_event_channel::ack_event(event);
553                 LOG_DEVEL_MSG("connected to " << sockaddress(&remote_address_)
554                     << "Current state is " << ToString(state_));
555 
556 /*
557                 struct ibv_qp_attr      attr;
558                 struct ibv_qp_init_attr init_attr;
559                 //
560                 if (ibv_query_qp(cmId_->qp, &attr,
561                     IBV_QP_STATE | IBV_QP_TIMEOUT, &init_attr)) {
562                     LOG_DEVEL_MSG("Failed to query QP state\n");
563                     std::terminate();
564                     return -1;
565                 }
566                 LOG_DEVEL_MSG("Current state is " << attr.qp_state);
567                 LOG_DEVEL_MSG("Current timeout is " << int(attr.timeout));
568 
569                 // set retry counter timeout value
570                 // 4.096 x 2 ^ attr.timeout microseconds
571                 attr.timeout = 13; // 8589935 usec (8.58 sec);
572                 //
573                 LOG_DEVEL_MSG("Modifying qp " << decnumber(cmId_->qp->qp_num));
574                 if (ibv_modify_qp(cmId_->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT))
575                 {
576                     rdma_error e(errno,
577                         LOG_FORMAT_MSG("Failed to set QP timeout : qp "
578                             << decnumber(cmId_->qp->qp_num)));
579                     throw e;
580                 }
581 */
582 
583             }
584             else if (event->event == RDMA_CM_EVENT_REJECTED) {
585                 if (state_!=connection_state::aborted &&
586                     state_!=connection_state::connecting)
587                 {
588                     rdma_error e(0, "invalid state in handle_establish for "
589                         "RDMA_CM_EVENT_REJECTED");
590                     std::terminate();
591                     return -1;
592                 }
593                 LOG_DEVEL_MSG("2: Connection rejected for "
594                     << sockaddress(&remote_address_)
595                     << "( " << sockaddress(&local_address_) << ")");
596                 verbs_event_channel::ack_event(event);
597                 state_ = connection_state::terminated;
598                 LOG_DEVEL_MSG("Current state is " << ToString(state_));
599                 return -2;
600             }
601             return 0;
602         }
603 
604         // ---------------------------------------------------------------------------
disconnect()605         int disconnect()
606         {
607             LOG_DEVEL_MSG("Current state is " << ToString(state_));
608             if (state_!=connection_state::connected &&
609                 state_!=connection_state::aborted   &&
610                 state_!=connection_state::terminated)
611             {
612                 rdma_error e(0, LOG_FORMAT_MSG("invalid state in disconnect "));
613                 std::terminate();
614                 throw e;
615             }
616             state_ = connection_state::disconnecting;
617 
618             LOG_DEVEL_MSG("Sending disconnect to " << sockaddress(&remote_address_)
619                 << "Current state is " << ToString(state_));
620 
621             // Disconnect the connection.
622             int err = rdma_disconnect(cmId_);
623             if (err != 0) {
624                 err = abs(err);
625                 LOG_ERROR_MSG(
626                     "error disconnect: " << rdma_error::error_string(err));
627                 return err;
628             }
629 
630             return 0;
631         }
632 
633         // ---------------------------------------------------------------------------
handle_disconnect(struct rdma_cm_event * event)634         int handle_disconnect(struct rdma_cm_event *event)
635         {
636             LOG_DEVEL_MSG("Current state is " << ToString(state_));
637             if (state_!=connection_state::disconnecting &&
638                 state_!=connection_state::terminated &&
639                 state_!=connection_state::connected)
640             {
641                 rdma_error e(0, "invalid state in handle_disconnect");
642                 std::terminate();
643                 return -1;
644             }
645 
646             state_ = connection_state::disconnected;
647             verbs_event_channel::ack_event(event);
648 
649             flush();
650 
651             LOG_DEBUG_MSG("Disconnected               "
652                 << "from " << sockaddress(&remote_address_)
653                 << "( " << sockaddress(&local_address_) << ") "
654                 << "Current state is " << ToString(state_));
655 
656             return 0;
657         }
658 
659         // ---------------------------------------------------------------------------
handle_time_wait_exit(struct rdma_cm_event * event)660         int handle_time_wait_exit(struct rdma_cm_event *event)
661         {
662             LOG_DEVEL_MSG("Current state is " << ToString(state_));
663             if (state_!=connection_state::disconnecting &&
664                 state_!=connection_state::terminated &&
665                 state_!=connection_state::connected)
666             {
667                 rdma_error e(0, "invalid state in handle_disconnect");
668                 return -1;
669             }
670 
671             state_ = connection_state::disconnected;
672             verbs_event_channel::ack_event(event);
673 
674             flush();
675 
676             LOG_DEBUG_MSG("Time Wait Exit               "
677                 << "from " << sockaddress(&remote_address_)
678                 << "( " << sockaddress(&local_address_) << ") "
679                 << "Current state is " << ToString(state_));
680             return 0;
681         }
682 
683         // ---------------------------------------------------------------------------
create_srq(verbs_protection_domain_ptr domain)684         int create_srq(verbs_protection_domain_ptr domain)
685         {
686             try {
687                 srq_ = std::make_shared<verbs_shared_receive_queue>(domain);
688             }
689             catch (...) {
690                 return 0;
691             }
692             return 1;
693         }
694 
695         // ---------------------------------------------------------------------------
get_qp_num(void) const696         uint32_t get_qp_num(void) const {
697             return cmId_->qp ? cmId_->qp->qp_num : 0;
698         }
699 
700         // ---------------------------------------------------------------------------
get_device_context(void) const701         struct ibv_context *get_device_context(void) const {
702             return cmId_->verbs;
703         }
704 
705         // ---------------------------------------------------------------------------
get_local_ip_address(void) const706         inline uint32_t get_local_ip_address(void) const {
707             return local_address_.sin_addr.s_addr;
708         }
709 
710         // ---------------------------------------------------------------------------
get_local_port(void)711         inline in_port_t get_local_port(void) {
712             if (local_address_.sin_port == 0) {
713                 local_address_.sin_port = rdma_get_src_port(cmId_);
714             }
715             return local_address_.sin_port;
716         }
717 
718         // ---------------------------------------------------------------------------
get_remote_address(void)719         inline struct sockaddr_in *get_remote_address(void) {
720             return &remote_address_;
721         }
722 
723         // ---------------------------------------------------------------------------
get_remote_ip_address(void) const724         inline uint32_t get_remote_ip_address(void) const {
725             return remote_address_.sin_addr.s_addr;
726         }
727 
728         // ---------------------------------------------------------------------------
get_remote_port(void) const729         inline in_port_t get_remote_port(void) const {
730             return remote_address_.sin_port;
731         }
732 
733         // ---------------------------------------------------------------------------
SRQ()734         inline verbs_shared_receive_queue_ptr SRQ() {
735             return srq_;
736         }
737 
738         // ---------------------------------------------------------------------------
getsrq() const739         virtual inline struct ibv_srq *getsrq() const {
740             return srq_ ? srq_->getsrq() : nullptr;
741         }
742 
743         // ---------------------------------------------------------------------------
is_client_endpoint(void) const744         inline bool is_client_endpoint(void) const {
745             return initiated_connection_;
746         }
747 
748         // ---------------------------------------------------------------------------
get_state(void) const749         connection_state get_state(void) const {
750             return state_;
751         }
752 
753         // ---------------------------------------------------------------------------
set_state(connection_state s)754         void set_state(connection_state s) {
755             state_ = s;
756         }
757 
758         // ---------------------------------------------------------------------------
get_event_channel(void) const759         verbs_event_channel_ptr get_event_channel(void) const {
760             return event_channel_;
761         }
762 
763         // ---------------------------------------------------------------------------
764         // Transition the qp to an error state
flush()765         void flush()
766         {
767             // do noting if the qp was never created
768             if (!cmId_->qp) {
769                 return;
770             }
771             //
772             state_ = connection_state::disconnected;
773             //
774             struct ibv_qp_attr attr;
775             memset(&attr, 0, sizeof(attr));
776             attr.qp_state = IBV_QPS_ERR;
777             //
778             LOG_DEVEL_MSG("Flushing qp " << decnumber(cmId_->qp->qp_num));
779             if (ibv_modify_qp(cmId_->qp, &attr, IBV_QP_STATE))
780             {
781                 rdma_error e(errno,
782                     LOG_FORMAT_MSG("Failed to flush qp "
783                         << decnumber(cmId_->qp->qp_num)));
784                 throw e;
785             }
786             LOG_DEVEL_MSG("Current state is (flushed) " << ToString(state_));
787         }
788 
789     protected:
790 
791         // ---------------------------------------------------------------------------
init(void)792         void init(void)
793         {
794             // Initialize private data.
795             memset(&local_address_, 0, sizeof(local_address_));
796             memset(&remote_address_, 0, sizeof(remote_address_));
797             if (!event_channel_) {
798                 event_channel_ = std::make_shared<verbs_event_channel>();
799             }
800             clear_counters();
801             initiated_connection_ = false;
802             state_ = connection_state::uninitialized;
803             LOG_DEVEL_MSG("Current state is " << ToString(state_));
804             return;
805         }
806 
807         // ---------------------------------------------------------------------------
create_cm_id(void)808         void create_cm_id(void)
809         {
810             // Create the event channel.
811             if (!event_channel_->get_verbs_event_channel()) {
812                 event_channel_->create_channel();
813             }
814             // Create the rdma cm id.
815             LOG_DEVEL_MSG("Creating cmid with event channel "
816                 << hexnumber(event_channel_->get_file_descriptor()));
817             int err = rdma_create_id(
818                 event_channel_->get_verbs_event_channel(), &cmId_, this, RDMA_PS_TCP);
819             if (err != 0) {
820                 rdma_error e(err, "rdma_create_id() failed");
821                 LOG_ERROR_MSG(
822                     "error creating rdma cm id: " << rdma_error::error_string(err));
823                 throw e;
824             }
825             LOG_DEVEL_MSG("created rdma cm id " << cmId_);
826         }
827 
828         // ---------------------------------------------------------------------------
create_queue_pair(verbs_protection_domain_ptr domain,verbs_completion_queue_ptr sendCompletionQ,verbs_completion_queue_ptr recvCompletionQ,uint32_t maxWorkRequests,bool signalSendQueue)829         void create_queue_pair(verbs_protection_domain_ptr domain,
830             verbs_completion_queue_ptr sendCompletionQ,
831             verbs_completion_queue_ptr recvCompletionQ,
832             uint32_t maxWorkRequests, bool signalSendQueue)
833         {
834             // Create a queue pair.
835             struct ibv_qp_init_attr qpAttributes;
836             memset(&qpAttributes, 0, sizeof qpAttributes);
837             qpAttributes.cap.max_send_wr  = maxWorkRequests;
838             qpAttributes.cap.max_recv_wr  = maxWorkRequests;
839             qpAttributes.cap.max_send_sge = 3; // 6;
840             qpAttributes.cap.max_recv_sge = 3; // 6;
841             qpAttributes.qp_context       = this;    // Save this pointer
842             qpAttributes.sq_sig_all       = signalSendQueue;
843             qpAttributes.qp_type          = IBV_QPT_RC;
844             qpAttributes.send_cq          = sendCompletionQ->getQueue();
845             qpAttributes.recv_cq          = recvCompletionQ->getQueue();
846             LOG_DEVEL_MSG("Setting SRQ to " << getsrq());
847             qpAttributes.srq              = getsrq();
848             //
849             int rc = rdma_create_qp(cmId_, domain->getDomain(), &qpAttributes);
850             if (rc != 0) {
851                 rdma_error e(errno, "rdma_create_qp() failed");
852                 LOG_ERROR_MSG(
853                     "error creating queue pair: " << hexpointer(this)
854                     "local address " << sockaddress(&local_address_)
855                 << " remote address " << sockaddress(&remote_address_)
856                 << rdma_error::error_string(e.error_code()));
857                 throw e;
858             }
859 
860             LOG_DEVEL_MSG("created queue pair " << decnumber(cmId_->qp->qp_num)
861                 << "max inline data is " << hexnumber(qpAttributes.cap.max_inline_data));
862 
863             return;
864         }
865 
866         rdma_memory_pool_ptr memory_pool_;
867 
868         verbs_shared_receive_queue_ptr srq_;
869 
870         // Event channel for notification of RDMA connection management events.
871         verbs_event_channel_ptr event_channel_;
872 
873         // Address of this (local) side of the connection.
874         struct sockaddr_in local_address_;
875 
876         // Address of other (remote) side of the connection.
877         struct sockaddr_in remote_address_;
878 
879         // if the client connected to the server, then set this flag so that
880         // at shutdown, we use the correct flag for disconnect(mode)
881         bool initiated_connection_;
882 
883         // Memory region for inbound messages.
884         verbs_protection_domain_ptr domain_;
885 
886         // Completion queue.
887         verbs_completion_queue_ptr completion_queue_;
888     };
889 
890     // Smart pointer for verbs_endpoint object.
891     typedef std::shared_ptr<verbs_endpoint> verbs_endpoint_ptr;
892 
893 }}}}
894 
895 #endif
896