1 //  Copyright (c) 2014-2016 John Biddiscombe
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <hpx/config.hpp>
7 
8 #if defined(HPX_HAVE_PARCELPORT_VERBS)
9 
10 #include <hpx/config/parcelport_verbs_defines.hpp>
11 #include <hpx/config/parcelport_defines.hpp>
12 //
13 #include <plugins/parcelport/readers_writers_mutex.hpp>
14 //
15 #include <plugins/parcelport/verbs/rdma/rdma_error.hpp>
16 #include <plugins/parcelport/verbs/rdma/rdma_locks.hpp>
17 #include <plugins/parcelport/verbs/rdma/verbs_event_channel.hpp>
18 #include <plugins/parcelport/verbs/rdma/verbs_device.hpp>
19 #include <plugins/parcelport/verbs/rdma/rdma_controller.hpp>
20 #include <plugins/parcelport/verbs/rdma/verbs_completion_queue.hpp>
21 #include <plugins/parcelport/verbs/rdma/verbs_device.hpp>
22 //
23 #include <boost/lexical_cast.hpp>
24 //
25 #include <poll.h>
26 #include <errno.h>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include <stdio.h>
31 #include <thread>
32 #include <fstream>
33 #include <memory>
34 #include <utility>
35 #include <cstdint>
36 #include <cstring>
37 //
38 #include <netinet/in.h>
39 
40 const int hpx::parcelset::policies::verbs::verbs_completion_queue::MaxQueueSize;
41 
42 using namespace hpx::parcelset::policies::verbs;
43 
44 //----------------------------------------------------------------------------
rdma_controller(const char * device,const char * interface,int port)45 rdma_controller::rdma_controller(const char *device, const char *interface, int port)
46 {
47     device_    = device;
48     interface_ = interface;
49     //
50     local_addr_.sin_family      = AF_INET;
51     local_addr_.sin_port        = port;
52     local_addr_.sin_addr.s_addr = 0xFFFFFFFF;
53     //
54     event_pause_ = 0;
55 }
56 
57 //----------------------------------------------------------------------------
~rdma_controller()58 rdma_controller::~rdma_controller()
59 {
60     //
61     if (memory_pool_ && server_endpoint_)
62     {
63         memory_pool_->small_.decrement_used_count(
64             server_endpoint_->get_receive_count()
65         );
66     }
67     //
68     LOG_DEVEL_MSG("rdma_controller destructor clearing clients");
69     connections_started_.clear();
70     LOG_DEVEL_MSG("rdma_controller destructor closing server");
71     this->server_endpoint_.reset();
72     LOG_DEVEL_MSG("rdma_controller destructor freeing memory pool");
73     this->memory_pool_.reset();
74     LOG_DEVEL_MSG("rdma_controller destructor releasing protection domain");
75     this->protection_domain_.reset();
76     LOG_DEVEL_MSG("rdma_controller destructor deleting completion queue");
77     this->completion_queue_.reset();
78     LOG_DEVEL_MSG("rdma_controller destructor done");
79 }
80 
81 //----------------------------------------------------------------------------
startup()82 int rdma_controller::startup()
83 {
84     LOG_DEVEL_MSG("creating InfiniBand device for " << device_
85         << " using interface " << interface_);
86 
87     // Find the address of the Infiniband link device.
88     verbs_device linkDevice(device_, interface_);
89 
90     LOG_DEVEL_MSG(
91         "created InfiniBand device for " << linkDevice.get_device_name()
92         << " using interface " << linkDevice.get_interface_name());
93 
94     local_addr_.sin_addr.s_addr = linkDevice.get_address();
95     LOG_DEVEL_MSG("Device returns IP address " << sockaddress(&local_addr_));
96 
97     // Create server/listener for RDMA connections.
98     try {
99         //
100         server_endpoint_ = std::make_shared<verbs_endpoint>(local_addr_);
101         //
102         if (server_endpoint_->get_local_port() != local_addr_.sin_port)
103         {
104             local_addr_.sin_port = server_endpoint_->get_local_port();
105             LOG_DEVEL_MSG("verbs_endpoint port changed to "
106                 << decnumber(local_addr_.sin_port));
107         }
108     } catch (rdma_error& e) {
109         LOG_ERROR_MSG("error creating listening RDMA connection: " << e.what());
110         return e.error_code();
111     }
112 
113     LOG_DEVEL_MSG(
114         "created listening RDMA connection " << hexpointer(server_endpoint_.get())
115         << " on port " << decnumber(local_addr_.sin_port)
116         << " IP address " << sockaddress(&local_addr_));
117 
118     // Create a protection domain object.
119     try {
120         protection_domain_ = verbs_protection_domain_ptr(
121             new verbs_protection_domain(server_endpoint_->get_device_context()));
122     } catch (rdma_error& e) {
123         LOG_ERROR_MSG("error allocating protection domain: " << e.what());
124         return e.error_code();
125     }
126     LOG_DEVEL_MSG("created protection domain " << protection_domain_->get_handle());
127 
128     // Create a memory pool for pinned buffers
129     memory_pool_ = std::make_shared<rdma_memory_pool> (protection_domain_);
130 
131     // Construct a completion queue object that will be shared by all endpoints
132     completion_queue_ = std::make_shared<verbs_completion_queue>(
133         server_endpoint_->get_device_context(),
134         verbs_completion_queue::MaxQueueSize, (ibv_comp_channel*) nullptr);
135 
136     // create a shared receive queue
137     LOG_DEVEL_MSG("Creating SRQ shared receive queue ");
138     server_endpoint_->create_srq(protection_domain_);
139     LOG_DEVEL_MSG("SRQ is " << hexpointer(server_endpoint_->getsrq()));
140     // preposts are made via the server endpoint when using SRQ, so make sure
141     // the memory pool is setup correctly
142     server_endpoint_->set_memory_pool(memory_pool_);
143     server_endpoint_->refill_preposts(HPX_PARCELPORT_VERBS_MAX_PREPOSTS, true);
144 
145     // Listen for connections.
146     LOG_DEVEL_MSG("Calling LISTEN function on "
147         << sockaddress(&local_addr_));
148     int err = server_endpoint_->listen(256);
149     if (err != 0) {
150         LOG_ERROR_MSG(
151             "error listening for new RDMA connections: "
152             << rdma_error::error_string(err));
153         return err;
154     }
155     LOG_DEVEL_MSG("LISTEN enabled for new RDMA connections on "
156         << sockaddress(&local_addr_));
157 
158     return 0;
159 }
160 
161 //----------------------------------------------------------------------------
refill_client_receives(bool force)162 void rdma_controller::refill_client_receives(bool force)
163 {
164     // a copy of the shared receive queue is held by the server_endpoint
165     // so pre-post receives to that to ensure all clients are 'ready'
166     LOG_DEVEL_MSG("refill_client_receives");
167     server_endpoint_->refill_preposts(HPX_PARCELPORT_VERBS_MAX_PREPOSTS, force);
168 }
169 
170 //----------------------------------------------------------------------------
poll_endpoints(bool stopped)171 int rdma_controller::poll_endpoints(bool stopped)
172 {
173     // completions of work requests
174     int handled = poll_for_work_completions(stopped);
175 
176     // no need to check for connection events very often, use a backoff so that
177     // when an event is received, we check frequently, when not, we gradually slow
178     // down our checks to avoid wasting too much time
179     using namespace std::chrono;
180     time_point<system_clock> now = system_clock::now();
181     if (duration_cast<microseconds>(now - event_check_time_).count() > event_pause_)
182     {
183         event_check_time_ = now;
184         // only active when logging is enabled
185         LOG_TIMED_INIT(event_poll);
186         LOG_TIMED_BLOCK(event_poll, DEVEL, 5.0,
187             {
188                 LOG_DEVEL_MSG("Polling event channel");
189                 debug_connections();
190             }
191         )
192         int events = server_endpoint_->poll_for_event(
193             [this](struct rdma_cm_event *cm_event) {
194             return handle_event(cm_event, server_endpoint_.get());
195         }
196         );
197         if (events>0) {
198             event_pause_ = 0;
199         }
200         else {
201             event_pause_ = (event_pause_<500) ? event_pause_ + 10 : 500;
202         }
203         handled += events;
204     }
205     return handled;
206 }
207 
208 //----------------------------------------------------------------------------
poll_for_work_completions(bool stopped)209 int rdma_controller::poll_for_work_completions(bool stopped)
210 {
211     LOG_TIMED_INIT(completion_poll);
212     LOG_TIMED_BLOCK(completion_poll, DEVEL, 5.0,
213         {
214             LOG_DEVEL_MSG("Polling completion_poll channel");
215         }
216     )
217 
218     struct ibv_wc completion;
219     int ntot = 0, nc = 0;
220     //
221     verbs_completion_queue *completionQ = get_completion_queue();
222 
223     // Remove work completions from the completion queue until it is empty.
224     do {
225         nc = completionQ->poll_completion(&completion);
226         // positive result means completion ok
227         if (nc > 0 && !stopped) {
228             verbs_endpoint *client = get_client_from_completion(completion);
229             // handle the completion
230             this->completion_function_(completion, client);
231             ++ntot;
232         }
233         // negative result indicates flushed receive
234         else if (nc < 0) {
235             // flushed receive completion, delete it, disconnection has started
236             verbs_memory_region *region = (verbs_memory_region *)completion.wr_id;
237             // let go of this region
238             memory_pool_->deallocate(region);
239             LOG_DEVEL_MSG("Flushed receive on qp " << decnumber(completion.qp_num));
240         }
241         if (nc != 0 && completion.opcode==IBV_WC_RECV) {
242             // bookkeeping : decrement counter that keeps preposted queue full
243             server_endpoint_->pop_receive_count();
244             if (server_endpoint_->get_receive_count() <
245                 HPX_PARCELPORT_VERBS_MAX_PREPOSTS/2)
246             {
247                 LOG_DEVEL_MSG("refilling preposts");
248                 server_endpoint_->refill_preposts(
249                     HPX_PARCELPORT_VERBS_MAX_PREPOSTS, false);
250             }
251         }
252     } while (nc != 0);
253     //
254     return ntot;
255 }
256 
257 //----------------------------------------------------------------------------
debug_connections()258 void rdma_controller::debug_connections()
259 {
260     map_read_lock_type read_lock(connections_started_.read_write_mutex());
261     //
262     LOG_DEVEL_MSG("qp_endpoint_map_ entries");
263     std::for_each(qp_endpoint_map_.begin(), qp_endpoint_map_.end(),
264         [this](const rdma_controller::QPMapPair &_client) {
265             verbs_endpoint_ptr endpoint = _client.second;
266             if (endpoint->is_client_endpoint()) {
267                 LOG_DEVEL_MSG("Status of connection         from "
268                     << sockaddress(&local_addr_) << "to "
269                     << sockaddress(endpoint->get_remote_address())
270                     << "client " << decnumber(endpoint->get_qp_num())
271                     << " state " << verbs_endpoint::ToString(endpoint->get_state()));
272             }
273             else {
274                 LOG_DEVEL_MSG("Status of connection         from "
275                     << sockaddress(endpoint->get_remote_address()) << "to "
276                     << sockaddress(&local_addr_)
277                     << "server " << decnumber(endpoint->get_qp_num())
278                     << " state " << verbs_endpoint::ToString(endpoint->get_state()));
279             }
280         }
281     );
282     LOG_DEVEL_MSG("connections_started_ entries");
283     std::for_each(connections_started_.begin(), connections_started_.end(),
284         [this](const rdma_controller::ClientMapPair &_client) {
285             verbs_endpoint_ptr endpoint = std::get<0>(_client.second);
286             if (endpoint->is_client_endpoint()) {
287                 LOG_DEVEL_MSG("Status of connection         from "
288                     << sockaddress(&local_addr_) << "to "
289                     << sockaddress(endpoint->get_remote_address())
290                     << "client " << decnumber(endpoint->get_qp_num())
291                     << " state " << verbs_endpoint::ToString(endpoint->get_state()));
292             }
293             else {
294                 LOG_DEVEL_MSG("Status of connection         from "
295                     << sockaddress(endpoint->get_remote_address()) << "to "
296                     << sockaddress(&local_addr_)
297                     << "server " << decnumber(endpoint->get_qp_num())
298                     << " state " << verbs_endpoint::ToString(endpoint->get_state()));
299             }
300         }
301     );
302 }
303 //----------------------------------------------------------------------------
handle_event(struct rdma_cm_event * cm_event,verbs_endpoint * a_client)304 int rdma_controller::handle_event(struct rdma_cm_event *cm_event,
305     verbs_endpoint *a_client)
306 {
307     // Get ip address of source/dest of event
308     // NB: The src and dest fields refer to the event and not the 'request'
309     struct sockaddr *ip_src = &cm_event->id->route.addr.src_addr;
310     struct sockaddr *ip_dst = &cm_event->id->route.addr.dst_addr;
311     struct sockaddr_in *addr_src = reinterpret_cast<struct sockaddr_in *>(ip_src);
312     struct sockaddr_in *addr_dst = reinterpret_cast<struct sockaddr_in *>(ip_dst);
313 
314     LOG_DEVEL_MSG("event src is " << sockaddress(addr_src)
315         << "( " << sockaddress(&local_addr_) << ")");
316 
317     verbs_endpoint_ptr event_client;
318     uint32_t qpnum = (cm_event->id->qp) ? cm_event->id->qp->qp_num : 0;
319     if (qpnum>0) {
320         // Find connection associated with this event if it's not a new request
321         LOG_DEVEL_MSG("handle_event : Looking for qp in map " << decnumber(qpnum));
322         auto present = qp_endpoint_map_.is_in_map(qpnum);
323         if (present.second) {
324             event_client = present.first->second;
325         }
326         else {
327             if (cm_event->event == RDMA_CM_EVENT_TIMEWAIT_EXIT) {
328                 // do nothing
329                 verbs_event_channel::ack_event(cm_event);
330                 return 0;
331             }
332             else {
333                 LOG_DEVEL_MSG("handle_event : could not find client for "
334                     << decnumber(qpnum));
335                 std::terminate();
336             }
337         }
338     }
339     else {
340         LOG_DEVEL_MSG("handle_event : qp num is zero");
341         auto present = connections_started_.is_in_map(addr_dst->sin_addr.s_addr);
342         if (present.second) {
343             event_client = std::get<0>(present.first->second);
344         }
345     }
346     if (!event_client) {
347         LOG_DEVEL_MSG("handle_event : event client not found");
348     }
349     //
350     struct sockaddr_in *conn_src = reinterpret_cast<struct sockaddr_in *>(ip_src);
351     struct sockaddr_in *conn_dst = reinterpret_cast<struct sockaddr_in *>(ip_dst);
352     //
353     // are we the server or client end of the connection, flip the src/dst
354     // pointers if we are the server end (clients init connections to servers)
355     if (event_client && !event_client->is_client_endpoint()) {
356         conn_src = reinterpret_cast<struct sockaddr_in *>(ip_dst);
357         conn_dst = reinterpret_cast<struct sockaddr_in *>(ip_src);
358     }
359 
360     // Handle the event : NB ack_event will delete the event, do not use it afterwards.
361     switch (cm_event->event) {
362 
363     // a connect request event will only ever occur on the server_endpoint_
364     // in response to a new connection request from a client
365     case RDMA_CM_EVENT_CONNECT_REQUEST: {
366         LOG_DEVEL_MSG("RDMA_CM_EVENT_CONNECT_REQUEST     "
367             << sockaddress(conn_dst) << "to "
368             << sockaddress(conn_src)
369             << "( " << sockaddress(&local_addr_) << ")");
370 
371         // We must not allow an new outgoing connection and a new incoming
372         // connect to be started simultaneously - to avoid races on the
373         // connection maps
374         unique_lock lock(controller_mutex_);
375         handle_connect_request(cm_event, conn_dst->sin_addr.s_addr);
376         break;
377     }
378 
379     // this event will be generated after an accept or reject
380     // RDMA_CM_EVENT_ESTABLISHED is sent to both ends of a new connection
381     case RDMA_CM_EVENT_REJECTED:
382     case RDMA_CM_EVENT_ESTABLISHED: {
383         // we use addr_dst because it is the remote end of the connection
384         // regardless of whether we are connecting to, or being connected from
385         uint32_t remote_ip = addr_dst->sin_addr.s_addr;
386 
387         LOG_DEVEL_MSG(rdma_event_str(cm_event->event) << "    from "
388             << sockaddress(conn_src) << "to "
389             << sockaddress(conn_dst)
390             << "( " << sockaddress(&local_addr_) << ")");
391 
392         // process the established event
393         int established = event_client->handle_establish(cm_event);
394 
395         // connection established without problem
396         if (established==0)
397         {
398             LOG_DEVEL_MSG("calling connection callback  from "
399                 << sockaddress(conn_src) << "to "
400                 << sockaddress(conn_dst)
401                 << "( " << sockaddress(&local_addr_) << ")");
402 
403             // call connection function before making the future ready
404             // to avoid a race in the parcelport get connection routines
405             this->connection_function_(event_client);
406 
407             LOG_DEVEL_MSG("established connection       from "
408                 << sockaddress(conn_src) << "to "
409                 << sockaddress(conn_dst)
410                 << "and making future ready, qp = " << decnumber(qpnum));
411 
412             // if there is an entry for a locally started connection on this IP
413             // then set the future ready with the verbs endpoint
414             auto present = connections_started_.is_in_map(remote_ip);
415             if (present.second) {
416                 std::get<1>(connections_started_.find(remote_ip)->second).
417                     set_value(event_client);
418                 // once the future is set, the entry can be removed
419                 connections_started_.erase(remote_ip);
420             }
421         }
422 
423         // @TODO remove this aborted event handler once all is working
424         // send the event to that
425         else if (established==-1)
426         {
427             std::terminate();
428         }
429 
430         // the remote end rejected our connection, so we must abort and clean up
431         else if (established==-2)
432         {
433             // we need to delete the started connection and replace it with a new one
434             LOG_DEVEL_MSG("Abort old connect, rejected from "
435                 << sockaddress(addr_src) << "to "
436                 << sockaddress(addr_dst)
437                 << "( " << sockaddress(&local_addr_) << ")"
438                 << "qp = " << decnumber(qpnum));
439 
440             // if this was a connection started by remote, remove it from the map
441             qp_endpoint_map_.erase(qpnum);
442         }
443         // event acked by handle_establish
444         return 0;
445     }
446 
447     // this event is only ever received on the client end of a connection
448     // after starting to make a connection to a remote server
449     case RDMA_CM_EVENT_ADDR_RESOLVED: {
450         LOG_DEVEL_MSG("RDMA_CM_EVENT_ADDR_RESOLVED         "
451             << sockaddress(conn_src) << "to "
452             << sockaddress(conn_dst)
453             << "( " << sockaddress(&local_addr_) << ")");
454 
455         // When a new connection is started (start_server_connection),
456         // this event might be received before the new endpoint has been added to the map.
457         // protect with the controller lock
458         unique_lock lock(controller_mutex_);
459         //
460         verbs_endpoint_ptr temp_client =
461             std::get<0>(connections_started_.find(conn_dst->sin_addr.s_addr)->second);
462         if (temp_client->handle_addr_resolved(cm_event)==-1) {
463             std::terminate();
464         }
465         // event acked by handle_addr_resolved
466         return 0;
467     }
468 
469     // this event is only ever received on the client end of a connection
470     // after starting to make a connection to a remote server
471     case RDMA_CM_EVENT_ROUTE_RESOLVED: {
472         LOG_DEVEL_MSG("RDMA_CM_EVENT_ROUTE_RESOLVED        "
473             << sockaddress(conn_src) << "to "
474             << sockaddress(conn_dst)
475             << "( " << sockaddress(&local_addr_) << ")");
476 
477         // we don't need the lock on controller_mutex_ here because we cannot get here
478         // until addr_resolved has been completed.
479         verbs_endpoint_ptr temp_client =
480             std::get<0>(connections_started_.find(conn_dst->sin_addr.s_addr)->second);
481         if (temp_client->handle_route_resolved(cm_event)==-1) {
482             std::terminate();
483         }
484         // handle_route_resolved makes the queue-pair valid, add it to qp map
485         uint32_t qpnum = temp_client->get_qp_num();
486         LOG_DEVEL_MSG("Adding new_client to qp_endpoint_map " << decnumber(qpnum)
487             << "in start_server_connection");
488         qp_endpoint_map_.insert(std::make_pair(qpnum, temp_client));
489 
490         // event acked by handle_route_resolved
491         return 0;
492     }
493 
494     case RDMA_CM_EVENT_DISCONNECTED: {
495         LOG_DEVEL_MSG("RDMA_CM_EVENT_DISCONNECTED          "
496             << sockaddress(addr_src) << "to "
497             << sockaddress(addr_dst)
498             << "( " << sockaddress(&local_addr_) << ")");
499         //
500         if (event_client->handle_disconnect(cm_event)==-1) {
501             std::terminate();
502         }
503 
504         LOG_DEVEL_MSG("Erasing client from qp_endpoint_map "
505             << decnumber(event_client->get_qp_num()));
506         qp_endpoint_map_.erase(event_client->get_qp_num());
507 
508         // get cq before we delete client
509 //        verbs_completion_queue_ptr completionQ = event_client->get_completion_queue();
510 //        uint32_t remote_ip = event_client->get_remote_ip_address();
511 
512         // event acked by handle_disconnect
513         return 0;
514     }
515 
516     default: {
517         LOG_ERROR_MSG(
518             "RDMA event: " << rdma_event_str(cm_event->event)
519             << " is not supported "
520             << " event came with "
521             << hexpointer(cm_event->param.conn.private_data) << " , "
522             << decnumber((int)(cm_event->param.conn.private_data_len)) << " , "
523             << decnumber(cm_event->id->qp->qp_num));
524 
525         break;
526     }
527     }
528 
529     // Acknowledge the event. This is always necessary because it tells
530     // rdma_cm that it can delete the structure it allocated for the event data
531     return verbs_event_channel::ack_event(cm_event);
532 }
533 
534 //----------------------------------------------------------------------------
535 // This function is only ever called from inside the event handler and is therefore
536 // protected by the controller mutex
handle_connect_request(struct rdma_cm_event * cm_event,std::uint32_t remote_ip)537 int rdma_controller::handle_connect_request(
538     struct rdma_cm_event *cm_event, std::uint32_t remote_ip)
539 {
540     auto present = connections_started_.is_in_map(remote_ip);
541     if (present.second)
542     {
543         LOG_DEVEL_MSG("Race connection, check priority   "
544             << ipaddress(remote_ip) << "to "
545             << sockaddress(&local_addr_)
546             << "( " << sockaddress(&local_addr_) << ")");
547 
548         // if a connection to this ip address is already being made, and we have
549         // a lower ip than the remote end, reject the incoming connection
550         if (remote_ip>local_addr_.sin_addr.s_addr &&
551              std::get<0>(present.first->second)->get_state() !=
552                  verbs_endpoint::connection_state::terminated)
553          {
554             LOG_DEVEL_MSG("Reject connection , priority from "
555                 << ipaddress(remote_ip) << "to "
556                 << sockaddress(&local_addr_)
557                 << "( " << sockaddress(&local_addr_) << ")");
558             //
559             server_endpoint_->reject(cm_event->id);
560             return 0;
561         }
562         else {
563             // we need to delete the connection we started and replace it with a new one
564             LOG_DEVEL_MSG("Priorty to new, Aborting old from "
565                 << sockaddress(&local_addr_) << "to "
566                 << ipaddress(remote_ip)
567                 << "( " << sockaddress(&local_addr_) << ")");
568 
569             verbs_endpoint_ptr aborted_client = std::get<0>(present.first->second);
570             aborted_client->abort();
571         }
572     }
573 
574     // Construct a new verbs_endpoint object for the new client.
575     verbs_endpoint_ptr new_client;
576     new_client = std::make_shared<verbs_endpoint>
577         (local_addr_, cm_event->id, protection_domain_, completion_queue_,
578         memory_pool_, server_endpoint_->SRQ(),
579         server_endpoint_->get_event_channel());
580     LOG_DEVEL_MSG("Created a new endpoint with pointer "
581         << hexpointer(new_client.get())
582         << "qp " << decnumber(new_client->get_qp_num()));
583 
584     uint32_t qpnum = new_client->get_qp_num();
585     LOG_DEVEL_MSG("Adding new_client to qp_endpoint_map " << decnumber(qpnum)
586         << "in handle_connect_request");
587     qp_endpoint_map_.insert(std::make_pair(qpnum, new_client));
588 
589     LOG_DEVEL_MSG("CR.Map<ip <endpoint,promise>>from "
590         << ipaddress(remote_ip) << "to "
591         << sockaddress(&local_addr_)
592         << "( " << sockaddress(&local_addr_) << ")"
593         << decnumber(qpnum));
594 
595     if (present.second)
596     {
597         // previous attempt was aborted, reset the endpoint in the connection map
598         // use find, because iterator from present.first is const
599         std::get<0>(connections_started_.find(remote_ip)->second) = new_client;
600     }
601 
602     // Accept the connection from the new client.
603     // accept() does not wait or ack
604     if (new_client->accept() != 0)
605     {
606         LOG_ERROR_MSG("error accepting client connection: %s "
607             << rdma_error::error_string(errno));
608         // @TODO : Handle failed connection - is there a correct thing to do
609         std::terminate();
610         return -1;
611     }
612 
613     LOG_DEVEL_MSG("accepted connection from "
614         << ipaddress(remote_ip)
615         << "qp = " << decnumber(qpnum));
616 
617     return 0;
618 }
619 
620 //----------------------------------------------------------------------------
621 // This function is only called from connect_to_server and is therefore
622 // holding the controller_mutex_ lock already
start_server_connection(uint32_t remote_ip)623 int rdma_controller::start_server_connection(uint32_t remote_ip)
624 {
625     sockaddr_in remote_addr;
626     sockaddr_in local_addr;
627     //
628     std::memset(&remote_addr, 0, sizeof(remote_addr));
629     remote_addr.sin_family      = AF_INET;
630     remote_addr.sin_port        = local_addr_.sin_port;
631     remote_addr.sin_addr.s_addr = remote_ip;
632     local_addr.sin_port         = 0;
633     local_addr                  = local_addr_;
634 
635     LOG_DEVEL_MSG("start_server_connection      from "
636         << sockaddress(&local_addr_)
637         << "to " << ipaddress(remote_ip)
638         << "( " << sockaddress(&local_addr_) << ")");
639 
640     // create a new client object for the remote endpoint
641     verbs_endpoint_ptr new_client = std::make_shared<verbs_endpoint>(
642         local_addr, remote_addr, protection_domain_, completion_queue_,
643         memory_pool_, server_endpoint_->SRQ(),
644         server_endpoint_->get_event_channel());
645 
646     LOG_DEVEL_MSG("SS.Map<ip <endpoint,promise>>from "
647         << sockaddress(&local_addr_) << "to "
648         << sockaddress(&remote_addr)
649         << "( " << sockaddress(&local_addr_) << ")");
650 
651     // create a future for this connection
652     hpx::promise<verbs_endpoint_ptr> new_endpoint_promise;
653     hpx::future<verbs_endpoint_ptr>  new_endpoint_future =
654         new_endpoint_promise.get_future();
655 
656     connections_started_.insert(
657         std::make_pair(
658             remote_ip,
659             std::make_tuple(
660                 new_client,
661                 std::move(new_endpoint_promise),
662                 std::move(new_endpoint_future))));
663 
664     return 0;
665 }
666 
667 //----------------------------------------------------------------------------
668 // return a future to a client - it will become ready when the
669 // connection is setup and ready for use
670 hpx::shared_future<verbs_endpoint_ptr>
connect_to_server(uint32_t remote_ip)671 rdma_controller::connect_to_server(uint32_t remote_ip)
672 {
673     // Prevent an incoming event handler connection request,
674     // and an outgoing server connect request from colliding
675     scoped_lock lock(controller_mutex_);
676 
677     bool delete_connection_on_exit = false;
678 
679     // has a connection been started from here already?
680     bool connection = connections_started_.is_in_map(remote_ip).second;
681     LOG_DEVEL_MSG("connect to server : connections_started_.is_in_map " << connection)
682 
683     // has someone tried to connect to us already?
684     if (!connection) {
685         for (const auto &client_pair : qp_endpoint_map_) {
686             verbs_endpoint *client = client_pair.second.get();
687             if (client->get_remote_ip_address() == remote_ip)
688             {
689                 LOG_DEVEL_MSG("connect_to_server : Found a remote connection ip "
690                     << ipaddress(remote_ip));
691                 // we must create a future for this connection as there is no entry
692                 // in the connections_started_ map (a connect request from remote ip)
693                 hpx::promise<verbs_endpoint_ptr> new_endpoint_promise;
694                 hpx::future<verbs_endpoint_ptr>  new_endpoint_future =
695                     new_endpoint_promise.get_future();
696                 //
697                 // if the connection was made by a connection request from outside
698                 // it might have already become established/ready but won't have set
699                 // the future ready, so do it here
700                 if (client->get_state()==verbs_endpoint::connection_state::connected) {
701                     LOG_DEVEL_MSG("state already connected - setting promise"
702                         << ipaddress(remote_ip));
703                     new_endpoint_promise.set_value(client_pair.second);
704                     // once the future is set, the entry can be removed
705                     delete_connection_on_exit = true;
706                 }
707 
708                 auto position = connections_started_.insert(
709                     std::make_pair(
710                         remote_ip,
711                         std::make_tuple(
712                             client_pair.second,
713                             std::move(new_endpoint_promise),
714                             std::move(new_endpoint_future))));
715 
716                 connection = true;
717                 break;
718             }
719         }
720         LOG_DEVEL_MSG("connect to server : qp_endpoint_map_.is_in_map " << connection)
721     }
722 
723     // if no connection either to or from here to the remote_ip has been started ...
724     if (!connection) {
725         start_server_connection(remote_ip);
726     }
727 
728     // the future will become ready when the remote end accepts/rejects our connection
729     // or we accept a connection from a remote
730     auto it = connections_started_.find(remote_ip);
731     hpx::shared_future<verbs_endpoint_ptr> result = std::get<2>(it->second);
732     if (delete_connection_on_exit) {
733         connections_started_.erase(it);
734     }
735     return result;
736 }
737 
738 //----------------------------------------------------------------------------
disconnect_all()739 void rdma_controller::disconnect_all()
740 {
741     // removing connections will affect the map, so lock it and loop over
742     // each element triggering a disconnect on each
743     map_read_lock_type read_lock(qp_endpoint_map_.read_write_mutex());
744     //
745     std::for_each(qp_endpoint_map_.begin(), qp_endpoint_map_.end(),
746         [this](const rdma_controller::QPMapPair &_client) {
747 //            if (!_client.second->is_client_endpoint()) {
748                 LOG_DEVEL_MSG("Removing a connection        from "
749                     << sockaddress(&local_addr_) << "to "
750                     << sockaddress(_client.second->get_remote_address())
751                     << "( " << sockaddress(&local_addr_) << ")");
752                 _client.second->disconnect();
753 //            }
754         }
755     );
756 }
757 
758 //----------------------------------------------------------------------------
active()759 bool rdma_controller::active()
760 {
761     map_read_lock_type read_lock(qp_endpoint_map_.read_write_mutex());
762     //
763     for (const auto &_client : qp_endpoint_map_) {
764         verbs_endpoint *client = _client.second.get();
765         if (client->get_state()!=verbs_endpoint::connection_state::terminated) {
766             LOG_TIMED_INIT(terminated);
767             LOG_TIMED_BLOCK(terminated, DEVEL, 5.0,
768                 {
769                     LOG_DEVEL_MSG("still active because client in state "
770                         << verbs_endpoint::ToString(client->get_state()));
771                 }
772             )
773             return true;
774         }
775     }
776     return false;
777 }
778 
779 #endif
780