1 // Copyright (c) 2014-2016 John Biddiscombe
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 #include <hpx/config.hpp>
7
8 #if defined(HPX_HAVE_PARCELPORT_VERBS)
9
10 #include <hpx/config/parcelport_verbs_defines.hpp>
11 #include <hpx/config/parcelport_defines.hpp>
12 //
13 #include <plugins/parcelport/readers_writers_mutex.hpp>
14 //
15 #include <plugins/parcelport/verbs/rdma/rdma_error.hpp>
16 #include <plugins/parcelport/verbs/rdma/rdma_locks.hpp>
17 #include <plugins/parcelport/verbs/rdma/verbs_event_channel.hpp>
18 #include <plugins/parcelport/verbs/rdma/verbs_device.hpp>
19 #include <plugins/parcelport/verbs/rdma/rdma_controller.hpp>
20 #include <plugins/parcelport/verbs/rdma/verbs_completion_queue.hpp>
21 #include <plugins/parcelport/verbs/rdma/verbs_device.hpp>
22 //
23 #include <boost/lexical_cast.hpp>
24 //
25 #include <poll.h>
26 #include <errno.h>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include <stdio.h>
31 #include <thread>
32 #include <fstream>
33 #include <memory>
34 #include <utility>
35 #include <cstdint>
36 #include <cstring>
37 //
38 #include <netinet/in.h>
39
40 const int hpx::parcelset::policies::verbs::verbs_completion_queue::MaxQueueSize;
41
42 using namespace hpx::parcelset::policies::verbs;
43
44 //----------------------------------------------------------------------------
rdma_controller(const char * device,const char * interface,int port)45 rdma_controller::rdma_controller(const char *device, const char *interface, int port)
46 {
47 device_ = device;
48 interface_ = interface;
49 //
50 local_addr_.sin_family = AF_INET;
51 local_addr_.sin_port = port;
52 local_addr_.sin_addr.s_addr = 0xFFFFFFFF;
53 //
54 event_pause_ = 0;
55 }
56
57 //----------------------------------------------------------------------------
~rdma_controller()58 rdma_controller::~rdma_controller()
59 {
60 //
61 if (memory_pool_ && server_endpoint_)
62 {
63 memory_pool_->small_.decrement_used_count(
64 server_endpoint_->get_receive_count()
65 );
66 }
67 //
68 LOG_DEVEL_MSG("rdma_controller destructor clearing clients");
69 connections_started_.clear();
70 LOG_DEVEL_MSG("rdma_controller destructor closing server");
71 this->server_endpoint_.reset();
72 LOG_DEVEL_MSG("rdma_controller destructor freeing memory pool");
73 this->memory_pool_.reset();
74 LOG_DEVEL_MSG("rdma_controller destructor releasing protection domain");
75 this->protection_domain_.reset();
76 LOG_DEVEL_MSG("rdma_controller destructor deleting completion queue");
77 this->completion_queue_.reset();
78 LOG_DEVEL_MSG("rdma_controller destructor done");
79 }
80
81 //----------------------------------------------------------------------------
startup()82 int rdma_controller::startup()
83 {
84 LOG_DEVEL_MSG("creating InfiniBand device for " << device_
85 << " using interface " << interface_);
86
87 // Find the address of the Infiniband link device.
88 verbs_device linkDevice(device_, interface_);
89
90 LOG_DEVEL_MSG(
91 "created InfiniBand device for " << linkDevice.get_device_name()
92 << " using interface " << linkDevice.get_interface_name());
93
94 local_addr_.sin_addr.s_addr = linkDevice.get_address();
95 LOG_DEVEL_MSG("Device returns IP address " << sockaddress(&local_addr_));
96
97 // Create server/listener for RDMA connections.
98 try {
99 //
100 server_endpoint_ = std::make_shared<verbs_endpoint>(local_addr_);
101 //
102 if (server_endpoint_->get_local_port() != local_addr_.sin_port)
103 {
104 local_addr_.sin_port = server_endpoint_->get_local_port();
105 LOG_DEVEL_MSG("verbs_endpoint port changed to "
106 << decnumber(local_addr_.sin_port));
107 }
108 } catch (rdma_error& e) {
109 LOG_ERROR_MSG("error creating listening RDMA connection: " << e.what());
110 return e.error_code();
111 }
112
113 LOG_DEVEL_MSG(
114 "created listening RDMA connection " << hexpointer(server_endpoint_.get())
115 << " on port " << decnumber(local_addr_.sin_port)
116 << " IP address " << sockaddress(&local_addr_));
117
118 // Create a protection domain object.
119 try {
120 protection_domain_ = verbs_protection_domain_ptr(
121 new verbs_protection_domain(server_endpoint_->get_device_context()));
122 } catch (rdma_error& e) {
123 LOG_ERROR_MSG("error allocating protection domain: " << e.what());
124 return e.error_code();
125 }
126 LOG_DEVEL_MSG("created protection domain " << protection_domain_->get_handle());
127
128 // Create a memory pool for pinned buffers
129 memory_pool_ = std::make_shared<rdma_memory_pool> (protection_domain_);
130
131 // Construct a completion queue object that will be shared by all endpoints
132 completion_queue_ = std::make_shared<verbs_completion_queue>(
133 server_endpoint_->get_device_context(),
134 verbs_completion_queue::MaxQueueSize, (ibv_comp_channel*) nullptr);
135
136 // create a shared receive queue
137 LOG_DEVEL_MSG("Creating SRQ shared receive queue ");
138 server_endpoint_->create_srq(protection_domain_);
139 LOG_DEVEL_MSG("SRQ is " << hexpointer(server_endpoint_->getsrq()));
140 // preposts are made via the server endpoint when using SRQ, so make sure
141 // the memory pool is setup correctly
142 server_endpoint_->set_memory_pool(memory_pool_);
143 server_endpoint_->refill_preposts(HPX_PARCELPORT_VERBS_MAX_PREPOSTS, true);
144
145 // Listen for connections.
146 LOG_DEVEL_MSG("Calling LISTEN function on "
147 << sockaddress(&local_addr_));
148 int err = server_endpoint_->listen(256);
149 if (err != 0) {
150 LOG_ERROR_MSG(
151 "error listening for new RDMA connections: "
152 << rdma_error::error_string(err));
153 return err;
154 }
155 LOG_DEVEL_MSG("LISTEN enabled for new RDMA connections on "
156 << sockaddress(&local_addr_));
157
158 return 0;
159 }
160
161 //----------------------------------------------------------------------------
refill_client_receives(bool force)162 void rdma_controller::refill_client_receives(bool force)
163 {
164 // a copy of the shared receive queue is held by the server_endpoint
165 // so pre-post receives to that to ensure all clients are 'ready'
166 LOG_DEVEL_MSG("refill_client_receives");
167 server_endpoint_->refill_preposts(HPX_PARCELPORT_VERBS_MAX_PREPOSTS, force);
168 }
169
170 //----------------------------------------------------------------------------
poll_endpoints(bool stopped)171 int rdma_controller::poll_endpoints(bool stopped)
172 {
173 // completions of work requests
174 int handled = poll_for_work_completions(stopped);
175
176 // no need to check for connection events very often, use a backoff so that
177 // when an event is received, we check frequently, when not, we gradually slow
178 // down our checks to avoid wasting too much time
179 using namespace std::chrono;
180 time_point<system_clock> now = system_clock::now();
181 if (duration_cast<microseconds>(now - event_check_time_).count() > event_pause_)
182 {
183 event_check_time_ = now;
184 // only active when logging is enabled
185 LOG_TIMED_INIT(event_poll);
186 LOG_TIMED_BLOCK(event_poll, DEVEL, 5.0,
187 {
188 LOG_DEVEL_MSG("Polling event channel");
189 debug_connections();
190 }
191 )
192 int events = server_endpoint_->poll_for_event(
193 [this](struct rdma_cm_event *cm_event) {
194 return handle_event(cm_event, server_endpoint_.get());
195 }
196 );
197 if (events>0) {
198 event_pause_ = 0;
199 }
200 else {
201 event_pause_ = (event_pause_<500) ? event_pause_ + 10 : 500;
202 }
203 handled += events;
204 }
205 return handled;
206 }
207
208 //----------------------------------------------------------------------------
poll_for_work_completions(bool stopped)209 int rdma_controller::poll_for_work_completions(bool stopped)
210 {
211 LOG_TIMED_INIT(completion_poll);
212 LOG_TIMED_BLOCK(completion_poll, DEVEL, 5.0,
213 {
214 LOG_DEVEL_MSG("Polling completion_poll channel");
215 }
216 )
217
218 struct ibv_wc completion;
219 int ntot = 0, nc = 0;
220 //
221 verbs_completion_queue *completionQ = get_completion_queue();
222
223 // Remove work completions from the completion queue until it is empty.
224 do {
225 nc = completionQ->poll_completion(&completion);
226 // positive result means completion ok
227 if (nc > 0 && !stopped) {
228 verbs_endpoint *client = get_client_from_completion(completion);
229 // handle the completion
230 this->completion_function_(completion, client);
231 ++ntot;
232 }
233 // negative result indicates flushed receive
234 else if (nc < 0) {
235 // flushed receive completion, delete it, disconnection has started
236 verbs_memory_region *region = (verbs_memory_region *)completion.wr_id;
237 // let go of this region
238 memory_pool_->deallocate(region);
239 LOG_DEVEL_MSG("Flushed receive on qp " << decnumber(completion.qp_num));
240 }
241 if (nc != 0 && completion.opcode==IBV_WC_RECV) {
242 // bookkeeping : decrement counter that keeps preposted queue full
243 server_endpoint_->pop_receive_count();
244 if (server_endpoint_->get_receive_count() <
245 HPX_PARCELPORT_VERBS_MAX_PREPOSTS/2)
246 {
247 LOG_DEVEL_MSG("refilling preposts");
248 server_endpoint_->refill_preposts(
249 HPX_PARCELPORT_VERBS_MAX_PREPOSTS, false);
250 }
251 }
252 } while (nc != 0);
253 //
254 return ntot;
255 }
256
257 //----------------------------------------------------------------------------
debug_connections()258 void rdma_controller::debug_connections()
259 {
260 map_read_lock_type read_lock(connections_started_.read_write_mutex());
261 //
262 LOG_DEVEL_MSG("qp_endpoint_map_ entries");
263 std::for_each(qp_endpoint_map_.begin(), qp_endpoint_map_.end(),
264 [this](const rdma_controller::QPMapPair &_client) {
265 verbs_endpoint_ptr endpoint = _client.second;
266 if (endpoint->is_client_endpoint()) {
267 LOG_DEVEL_MSG("Status of connection from "
268 << sockaddress(&local_addr_) << "to "
269 << sockaddress(endpoint->get_remote_address())
270 << "client " << decnumber(endpoint->get_qp_num())
271 << " state " << verbs_endpoint::ToString(endpoint->get_state()));
272 }
273 else {
274 LOG_DEVEL_MSG("Status of connection from "
275 << sockaddress(endpoint->get_remote_address()) << "to "
276 << sockaddress(&local_addr_)
277 << "server " << decnumber(endpoint->get_qp_num())
278 << " state " << verbs_endpoint::ToString(endpoint->get_state()));
279 }
280 }
281 );
282 LOG_DEVEL_MSG("connections_started_ entries");
283 std::for_each(connections_started_.begin(), connections_started_.end(),
284 [this](const rdma_controller::ClientMapPair &_client) {
285 verbs_endpoint_ptr endpoint = std::get<0>(_client.second);
286 if (endpoint->is_client_endpoint()) {
287 LOG_DEVEL_MSG("Status of connection from "
288 << sockaddress(&local_addr_) << "to "
289 << sockaddress(endpoint->get_remote_address())
290 << "client " << decnumber(endpoint->get_qp_num())
291 << " state " << verbs_endpoint::ToString(endpoint->get_state()));
292 }
293 else {
294 LOG_DEVEL_MSG("Status of connection from "
295 << sockaddress(endpoint->get_remote_address()) << "to "
296 << sockaddress(&local_addr_)
297 << "server " << decnumber(endpoint->get_qp_num())
298 << " state " << verbs_endpoint::ToString(endpoint->get_state()));
299 }
300 }
301 );
302 }
303 //----------------------------------------------------------------------------
handle_event(struct rdma_cm_event * cm_event,verbs_endpoint * a_client)304 int rdma_controller::handle_event(struct rdma_cm_event *cm_event,
305 verbs_endpoint *a_client)
306 {
307 // Get ip address of source/dest of event
308 // NB: The src and dest fields refer to the event and not the 'request'
309 struct sockaddr *ip_src = &cm_event->id->route.addr.src_addr;
310 struct sockaddr *ip_dst = &cm_event->id->route.addr.dst_addr;
311 struct sockaddr_in *addr_src = reinterpret_cast<struct sockaddr_in *>(ip_src);
312 struct sockaddr_in *addr_dst = reinterpret_cast<struct sockaddr_in *>(ip_dst);
313
314 LOG_DEVEL_MSG("event src is " << sockaddress(addr_src)
315 << "( " << sockaddress(&local_addr_) << ")");
316
317 verbs_endpoint_ptr event_client;
318 uint32_t qpnum = (cm_event->id->qp) ? cm_event->id->qp->qp_num : 0;
319 if (qpnum>0) {
320 // Find connection associated with this event if it's not a new request
321 LOG_DEVEL_MSG("handle_event : Looking for qp in map " << decnumber(qpnum));
322 auto present = qp_endpoint_map_.is_in_map(qpnum);
323 if (present.second) {
324 event_client = present.first->second;
325 }
326 else {
327 if (cm_event->event == RDMA_CM_EVENT_TIMEWAIT_EXIT) {
328 // do nothing
329 verbs_event_channel::ack_event(cm_event);
330 return 0;
331 }
332 else {
333 LOG_DEVEL_MSG("handle_event : could not find client for "
334 << decnumber(qpnum));
335 std::terminate();
336 }
337 }
338 }
339 else {
340 LOG_DEVEL_MSG("handle_event : qp num is zero");
341 auto present = connections_started_.is_in_map(addr_dst->sin_addr.s_addr);
342 if (present.second) {
343 event_client = std::get<0>(present.first->second);
344 }
345 }
346 if (!event_client) {
347 LOG_DEVEL_MSG("handle_event : event client not found");
348 }
349 //
350 struct sockaddr_in *conn_src = reinterpret_cast<struct sockaddr_in *>(ip_src);
351 struct sockaddr_in *conn_dst = reinterpret_cast<struct sockaddr_in *>(ip_dst);
352 //
353 // are we the server or client end of the connection, flip the src/dst
354 // pointers if we are the server end (clients init connections to servers)
355 if (event_client && !event_client->is_client_endpoint()) {
356 conn_src = reinterpret_cast<struct sockaddr_in *>(ip_dst);
357 conn_dst = reinterpret_cast<struct sockaddr_in *>(ip_src);
358 }
359
360 // Handle the event : NB ack_event will delete the event, do not use it afterwards.
361 switch (cm_event->event) {
362
363 // a connect request event will only ever occur on the server_endpoint_
364 // in response to a new connection request from a client
365 case RDMA_CM_EVENT_CONNECT_REQUEST: {
366 LOG_DEVEL_MSG("RDMA_CM_EVENT_CONNECT_REQUEST "
367 << sockaddress(conn_dst) << "to "
368 << sockaddress(conn_src)
369 << "( " << sockaddress(&local_addr_) << ")");
370
371 // We must not allow an new outgoing connection and a new incoming
372 // connect to be started simultaneously - to avoid races on the
373 // connection maps
374 unique_lock lock(controller_mutex_);
375 handle_connect_request(cm_event, conn_dst->sin_addr.s_addr);
376 break;
377 }
378
379 // this event will be generated after an accept or reject
380 // RDMA_CM_EVENT_ESTABLISHED is sent to both ends of a new connection
381 case RDMA_CM_EVENT_REJECTED:
382 case RDMA_CM_EVENT_ESTABLISHED: {
383 // we use addr_dst because it is the remote end of the connection
384 // regardless of whether we are connecting to, or being connected from
385 uint32_t remote_ip = addr_dst->sin_addr.s_addr;
386
387 LOG_DEVEL_MSG(rdma_event_str(cm_event->event) << " from "
388 << sockaddress(conn_src) << "to "
389 << sockaddress(conn_dst)
390 << "( " << sockaddress(&local_addr_) << ")");
391
392 // process the established event
393 int established = event_client->handle_establish(cm_event);
394
395 // connection established without problem
396 if (established==0)
397 {
398 LOG_DEVEL_MSG("calling connection callback from "
399 << sockaddress(conn_src) << "to "
400 << sockaddress(conn_dst)
401 << "( " << sockaddress(&local_addr_) << ")");
402
403 // call connection function before making the future ready
404 // to avoid a race in the parcelport get connection routines
405 this->connection_function_(event_client);
406
407 LOG_DEVEL_MSG("established connection from "
408 << sockaddress(conn_src) << "to "
409 << sockaddress(conn_dst)
410 << "and making future ready, qp = " << decnumber(qpnum));
411
412 // if there is an entry for a locally started connection on this IP
413 // then set the future ready with the verbs endpoint
414 auto present = connections_started_.is_in_map(remote_ip);
415 if (present.second) {
416 std::get<1>(connections_started_.find(remote_ip)->second).
417 set_value(event_client);
418 // once the future is set, the entry can be removed
419 connections_started_.erase(remote_ip);
420 }
421 }
422
423 // @TODO remove this aborted event handler once all is working
424 // send the event to that
425 else if (established==-1)
426 {
427 std::terminate();
428 }
429
430 // the remote end rejected our connection, so we must abort and clean up
431 else if (established==-2)
432 {
433 // we need to delete the started connection and replace it with a new one
434 LOG_DEVEL_MSG("Abort old connect, rejected from "
435 << sockaddress(addr_src) << "to "
436 << sockaddress(addr_dst)
437 << "( " << sockaddress(&local_addr_) << ")"
438 << "qp = " << decnumber(qpnum));
439
440 // if this was a connection started by remote, remove it from the map
441 qp_endpoint_map_.erase(qpnum);
442 }
443 // event acked by handle_establish
444 return 0;
445 }
446
447 // this event is only ever received on the client end of a connection
448 // after starting to make a connection to a remote server
449 case RDMA_CM_EVENT_ADDR_RESOLVED: {
450 LOG_DEVEL_MSG("RDMA_CM_EVENT_ADDR_RESOLVED "
451 << sockaddress(conn_src) << "to "
452 << sockaddress(conn_dst)
453 << "( " << sockaddress(&local_addr_) << ")");
454
455 // When a new connection is started (start_server_connection),
456 // this event might be received before the new endpoint has been added to the map.
457 // protect with the controller lock
458 unique_lock lock(controller_mutex_);
459 //
460 verbs_endpoint_ptr temp_client =
461 std::get<0>(connections_started_.find(conn_dst->sin_addr.s_addr)->second);
462 if (temp_client->handle_addr_resolved(cm_event)==-1) {
463 std::terminate();
464 }
465 // event acked by handle_addr_resolved
466 return 0;
467 }
468
469 // this event is only ever received on the client end of a connection
470 // after starting to make a connection to a remote server
471 case RDMA_CM_EVENT_ROUTE_RESOLVED: {
472 LOG_DEVEL_MSG("RDMA_CM_EVENT_ROUTE_RESOLVED "
473 << sockaddress(conn_src) << "to "
474 << sockaddress(conn_dst)
475 << "( " << sockaddress(&local_addr_) << ")");
476
477 // we don't need the lock on controller_mutex_ here because we cannot get here
478 // until addr_resolved has been completed.
479 verbs_endpoint_ptr temp_client =
480 std::get<0>(connections_started_.find(conn_dst->sin_addr.s_addr)->second);
481 if (temp_client->handle_route_resolved(cm_event)==-1) {
482 std::terminate();
483 }
484 // handle_route_resolved makes the queue-pair valid, add it to qp map
485 uint32_t qpnum = temp_client->get_qp_num();
486 LOG_DEVEL_MSG("Adding new_client to qp_endpoint_map " << decnumber(qpnum)
487 << "in start_server_connection");
488 qp_endpoint_map_.insert(std::make_pair(qpnum, temp_client));
489
490 // event acked by handle_route_resolved
491 return 0;
492 }
493
494 case RDMA_CM_EVENT_DISCONNECTED: {
495 LOG_DEVEL_MSG("RDMA_CM_EVENT_DISCONNECTED "
496 << sockaddress(addr_src) << "to "
497 << sockaddress(addr_dst)
498 << "( " << sockaddress(&local_addr_) << ")");
499 //
500 if (event_client->handle_disconnect(cm_event)==-1) {
501 std::terminate();
502 }
503
504 LOG_DEVEL_MSG("Erasing client from qp_endpoint_map "
505 << decnumber(event_client->get_qp_num()));
506 qp_endpoint_map_.erase(event_client->get_qp_num());
507
508 // get cq before we delete client
509 // verbs_completion_queue_ptr completionQ = event_client->get_completion_queue();
510 // uint32_t remote_ip = event_client->get_remote_ip_address();
511
512 // event acked by handle_disconnect
513 return 0;
514 }
515
516 default: {
517 LOG_ERROR_MSG(
518 "RDMA event: " << rdma_event_str(cm_event->event)
519 << " is not supported "
520 << " event came with "
521 << hexpointer(cm_event->param.conn.private_data) << " , "
522 << decnumber((int)(cm_event->param.conn.private_data_len)) << " , "
523 << decnumber(cm_event->id->qp->qp_num));
524
525 break;
526 }
527 }
528
529 // Acknowledge the event. This is always necessary because it tells
530 // rdma_cm that it can delete the structure it allocated for the event data
531 return verbs_event_channel::ack_event(cm_event);
532 }
533
534 //----------------------------------------------------------------------------
535 // This function is only ever called from inside the event handler and is therefore
536 // protected by the controller mutex
handle_connect_request(struct rdma_cm_event * cm_event,std::uint32_t remote_ip)537 int rdma_controller::handle_connect_request(
538 struct rdma_cm_event *cm_event, std::uint32_t remote_ip)
539 {
540 auto present = connections_started_.is_in_map(remote_ip);
541 if (present.second)
542 {
543 LOG_DEVEL_MSG("Race connection, check priority "
544 << ipaddress(remote_ip) << "to "
545 << sockaddress(&local_addr_)
546 << "( " << sockaddress(&local_addr_) << ")");
547
548 // if a connection to this ip address is already being made, and we have
549 // a lower ip than the remote end, reject the incoming connection
550 if (remote_ip>local_addr_.sin_addr.s_addr &&
551 std::get<0>(present.first->second)->get_state() !=
552 verbs_endpoint::connection_state::terminated)
553 {
554 LOG_DEVEL_MSG("Reject connection , priority from "
555 << ipaddress(remote_ip) << "to "
556 << sockaddress(&local_addr_)
557 << "( " << sockaddress(&local_addr_) << ")");
558 //
559 server_endpoint_->reject(cm_event->id);
560 return 0;
561 }
562 else {
563 // we need to delete the connection we started and replace it with a new one
564 LOG_DEVEL_MSG("Priorty to new, Aborting old from "
565 << sockaddress(&local_addr_) << "to "
566 << ipaddress(remote_ip)
567 << "( " << sockaddress(&local_addr_) << ")");
568
569 verbs_endpoint_ptr aborted_client = std::get<0>(present.first->second);
570 aborted_client->abort();
571 }
572 }
573
574 // Construct a new verbs_endpoint object for the new client.
575 verbs_endpoint_ptr new_client;
576 new_client = std::make_shared<verbs_endpoint>
577 (local_addr_, cm_event->id, protection_domain_, completion_queue_,
578 memory_pool_, server_endpoint_->SRQ(),
579 server_endpoint_->get_event_channel());
580 LOG_DEVEL_MSG("Created a new endpoint with pointer "
581 << hexpointer(new_client.get())
582 << "qp " << decnumber(new_client->get_qp_num()));
583
584 uint32_t qpnum = new_client->get_qp_num();
585 LOG_DEVEL_MSG("Adding new_client to qp_endpoint_map " << decnumber(qpnum)
586 << "in handle_connect_request");
587 qp_endpoint_map_.insert(std::make_pair(qpnum, new_client));
588
589 LOG_DEVEL_MSG("CR.Map<ip <endpoint,promise>>from "
590 << ipaddress(remote_ip) << "to "
591 << sockaddress(&local_addr_)
592 << "( " << sockaddress(&local_addr_) << ")"
593 << decnumber(qpnum));
594
595 if (present.second)
596 {
597 // previous attempt was aborted, reset the endpoint in the connection map
598 // use find, because iterator from present.first is const
599 std::get<0>(connections_started_.find(remote_ip)->second) = new_client;
600 }
601
602 // Accept the connection from the new client.
603 // accept() does not wait or ack
604 if (new_client->accept() != 0)
605 {
606 LOG_ERROR_MSG("error accepting client connection: %s "
607 << rdma_error::error_string(errno));
608 // @TODO : Handle failed connection - is there a correct thing to do
609 std::terminate();
610 return -1;
611 }
612
613 LOG_DEVEL_MSG("accepted connection from "
614 << ipaddress(remote_ip)
615 << "qp = " << decnumber(qpnum));
616
617 return 0;
618 }
619
620 //----------------------------------------------------------------------------
621 // This function is only called from connect_to_server and is therefore
622 // holding the controller_mutex_ lock already
start_server_connection(uint32_t remote_ip)623 int rdma_controller::start_server_connection(uint32_t remote_ip)
624 {
625 sockaddr_in remote_addr;
626 sockaddr_in local_addr;
627 //
628 std::memset(&remote_addr, 0, sizeof(remote_addr));
629 remote_addr.sin_family = AF_INET;
630 remote_addr.sin_port = local_addr_.sin_port;
631 remote_addr.sin_addr.s_addr = remote_ip;
632 local_addr.sin_port = 0;
633 local_addr = local_addr_;
634
635 LOG_DEVEL_MSG("start_server_connection from "
636 << sockaddress(&local_addr_)
637 << "to " << ipaddress(remote_ip)
638 << "( " << sockaddress(&local_addr_) << ")");
639
640 // create a new client object for the remote endpoint
641 verbs_endpoint_ptr new_client = std::make_shared<verbs_endpoint>(
642 local_addr, remote_addr, protection_domain_, completion_queue_,
643 memory_pool_, server_endpoint_->SRQ(),
644 server_endpoint_->get_event_channel());
645
646 LOG_DEVEL_MSG("SS.Map<ip <endpoint,promise>>from "
647 << sockaddress(&local_addr_) << "to "
648 << sockaddress(&remote_addr)
649 << "( " << sockaddress(&local_addr_) << ")");
650
651 // create a future for this connection
652 hpx::promise<verbs_endpoint_ptr> new_endpoint_promise;
653 hpx::future<verbs_endpoint_ptr> new_endpoint_future =
654 new_endpoint_promise.get_future();
655
656 connections_started_.insert(
657 std::make_pair(
658 remote_ip,
659 std::make_tuple(
660 new_client,
661 std::move(new_endpoint_promise),
662 std::move(new_endpoint_future))));
663
664 return 0;
665 }
666
667 //----------------------------------------------------------------------------
668 // return a future to a client - it will become ready when the
669 // connection is setup and ready for use
670 hpx::shared_future<verbs_endpoint_ptr>
connect_to_server(uint32_t remote_ip)671 rdma_controller::connect_to_server(uint32_t remote_ip)
672 {
673 // Prevent an incoming event handler connection request,
674 // and an outgoing server connect request from colliding
675 scoped_lock lock(controller_mutex_);
676
677 bool delete_connection_on_exit = false;
678
679 // has a connection been started from here already?
680 bool connection = connections_started_.is_in_map(remote_ip).second;
681 LOG_DEVEL_MSG("connect to server : connections_started_.is_in_map " << connection)
682
683 // has someone tried to connect to us already?
684 if (!connection) {
685 for (const auto &client_pair : qp_endpoint_map_) {
686 verbs_endpoint *client = client_pair.second.get();
687 if (client->get_remote_ip_address() == remote_ip)
688 {
689 LOG_DEVEL_MSG("connect_to_server : Found a remote connection ip "
690 << ipaddress(remote_ip));
691 // we must create a future for this connection as there is no entry
692 // in the connections_started_ map (a connect request from remote ip)
693 hpx::promise<verbs_endpoint_ptr> new_endpoint_promise;
694 hpx::future<verbs_endpoint_ptr> new_endpoint_future =
695 new_endpoint_promise.get_future();
696 //
697 // if the connection was made by a connection request from outside
698 // it might have already become established/ready but won't have set
699 // the future ready, so do it here
700 if (client->get_state()==verbs_endpoint::connection_state::connected) {
701 LOG_DEVEL_MSG("state already connected - setting promise"
702 << ipaddress(remote_ip));
703 new_endpoint_promise.set_value(client_pair.second);
704 // once the future is set, the entry can be removed
705 delete_connection_on_exit = true;
706 }
707
708 auto position = connections_started_.insert(
709 std::make_pair(
710 remote_ip,
711 std::make_tuple(
712 client_pair.second,
713 std::move(new_endpoint_promise),
714 std::move(new_endpoint_future))));
715
716 connection = true;
717 break;
718 }
719 }
720 LOG_DEVEL_MSG("connect to server : qp_endpoint_map_.is_in_map " << connection)
721 }
722
723 // if no connection either to or from here to the remote_ip has been started ...
724 if (!connection) {
725 start_server_connection(remote_ip);
726 }
727
728 // the future will become ready when the remote end accepts/rejects our connection
729 // or we accept a connection from a remote
730 auto it = connections_started_.find(remote_ip);
731 hpx::shared_future<verbs_endpoint_ptr> result = std::get<2>(it->second);
732 if (delete_connection_on_exit) {
733 connections_started_.erase(it);
734 }
735 return result;
736 }
737
738 //----------------------------------------------------------------------------
disconnect_all()739 void rdma_controller::disconnect_all()
740 {
741 // removing connections will affect the map, so lock it and loop over
742 // each element triggering a disconnect on each
743 map_read_lock_type read_lock(qp_endpoint_map_.read_write_mutex());
744 //
745 std::for_each(qp_endpoint_map_.begin(), qp_endpoint_map_.end(),
746 [this](const rdma_controller::QPMapPair &_client) {
747 // if (!_client.second->is_client_endpoint()) {
748 LOG_DEVEL_MSG("Removing a connection from "
749 << sockaddress(&local_addr_) << "to "
750 << sockaddress(_client.second->get_remote_address())
751 << "( " << sockaddress(&local_addr_) << ")");
752 _client.second->disconnect();
753 // }
754 }
755 );
756 }
757
758 //----------------------------------------------------------------------------
active()759 bool rdma_controller::active()
760 {
761 map_read_lock_type read_lock(qp_endpoint_map_.read_write_mutex());
762 //
763 for (const auto &_client : qp_endpoint_map_) {
764 verbs_endpoint *client = _client.second.get();
765 if (client->get_state()!=verbs_endpoint::connection_state::terminated) {
766 LOG_TIMED_INIT(terminated);
767 LOG_TIMED_BLOCK(terminated, DEVEL, 5.0,
768 {
769 LOG_DEVEL_MSG("still active because client in state "
770 << verbs_endpoint::ToString(client->get_state()));
771 }
772 )
773 return true;
774 }
775 }
776 return false;
777 }
778
779 #endif
780