1 //  Copyright (c) 2016 John Biddiscombe
2 //  Copyright (c) 2017 Thomas Heller
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #ifndef HPX_PARCELSET_POLICIES_LIBFABRIC_CONTROLLER_HPP
8 #define HPX_PARCELSET_POLICIES_LIBFABRIC_CONTROLLER_HPP
9 
10 // config
11 #include <hpx/config/defines.hpp>
12 //
13 #include <hpx/lcos/local/shared_mutex.hpp>
14 #include <hpx/lcos/promise.hpp>
15 #include <hpx/lcos/future.hpp>
16 //
17 #include <hpx/runtime/parcelset/parcelport_impl.hpp>
18 #include <hpx/runtime/agas/addressing_service.hpp>
19 //
20 #include <plugins/parcelport/parcelport_logging.hpp>
21 #include <plugins/parcelport/libfabric/rdma_locks.hpp>
22 #include <plugins/parcelport/libfabric/receiver.hpp>
23 #include <plugins/parcelport/libfabric/sender.hpp>
24 #include <plugins/parcelport/libfabric/rma_receiver.hpp>
25 #include <plugins/parcelport/libfabric/locality.hpp>
26 //
27 #include <plugins/parcelport/unordered_map.hpp>
28 //
29 #include <memory>
30 #include <deque>
31 #include <chrono>
32 #include <iostream>
33 #include <functional>
34 #include <map>
35 #include <atomic>
36 #include <string>
37 #include <utility>
38 #include <array>
39 #include <vector>
40 #include <unordered_map>
41 #include <sstream>
42 #include <cstdint>
43 #include <cstddef>
44 #include <cstring>
45 //
46 #include <rdma/fabric.h>
47 #include <rdma/fi_cm.h>
48 #include <rdma/fi_domain.h>
49 #include <rdma/fi_endpoint.h>
50 #include <rdma/fi_eq.h>
51 #include <rdma/fi_errno.h>
52 #include <rdma/fi_rma.h>
53 #include "fabric_error.hpp"
54 
55 #if defined(HPX_PARCELPORT_LIBFABRIC_GNI) || \
56     defined(HPX_PARCELPORT_LIBFABRIC_SOCKETS) || \
57     defined(HPX_PARCELPORT_LIBFABRIC_PSM2)
58 # define HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM
59 #else
60 # define HPX_PARCELPORT_LIBFABRIC_ENDPOINT_MSG
61 #endif
62 
63 #ifdef HPX_PARCELPORT_LIBFABRIC_GNI
64 # include "rdma/fi_ext_gni.h"
65 #endif
66 
67 #ifdef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI
68 //
69 # include <pmi2.h>
70 //
71 # include <boost/archive/iterators/base64_from_binary.hpp>
72 # include <boost/archive/iterators/binary_from_base64.hpp>
73 # include <boost/archive/iterators/transform_width.hpp>
74 
75     using namespace boost::archive::iterators;
76 
77     typedef
78         base64_from_binary<
79             transform_width<std::string::const_iterator, 6, 8>
80             > base64_t;
81 
82     typedef
83         transform_width<
84             binary_from_base64<std::string::const_iterator>, 8, 6
85     > binary_t;
86 #endif
87 
88 namespace hpx {
89 namespace parcelset {
90 namespace policies {
91 namespace libfabric
92 {
93 
94     class libfabric_controller
95     {
96     public:
97         typedef hpx::lcos::local::spinlock mutex_type;
98         typedef hpx::parcelset::policies::libfabric::unique_lock<mutex_type> unique_lock;
99         typedef hpx::parcelset::policies::libfabric::scoped_lock<mutex_type> scoped_lock;
100 
101         // NOTE: Connection maps are not used for endpoint type RDM
102         // when a new connection is requested, it will be completed asynchronously
103         // we need a promise/future for each endpoint so that we can set the new
104         // endpoint when the connection completes and is ready
105         // Note - only used during connection, then deleted
106         typedef std::tuple<
107             hpx::promise<fid_ep *>,
108             hpx::shared_future<fid_ep *>
109         > promise_tuple_type;
110 
111         // lock types for maps
112         typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type>
113             ::map_read_lock_type map_read_lock_type;
114         typedef hpx::concurrent::unordered_map<uint32_t, promise_tuple_type>
115             ::map_write_lock_type map_write_lock_type;
116 
117         // Map of connections started, needed until connection is completed
118         hpx::concurrent::unordered_map<uint32_t, promise_tuple_type> endpoint_tmp_;
119         std::unordered_map<uint64_t, fi_addr_t> endpoint_av_;
120 
121         locality here_;
122         locality agas_;
123 
124         struct fi_info    *fabric_info_;
125         struct fid_fabric *fabric_;
126         struct fid_domain *fabric_domain_;
127         // Server/Listener for RDMA connections.
128         struct fid_pep    *ep_passive_;
129         struct fid_ep     *ep_active_;
130         struct fid_ep     *ep_shared_rx_cxt_;
131 
132         // we will use just one event queue for all connections
133         struct fid_eq     *event_queue_;
134         struct fid_cq     *txcq_, *rxcq_;
135         struct fid_av     *av_;
136 
137         bool immediate_;
138 
139         // --------------------------------------------------------------------
140         // constructor gets info from device and sets up all necessary
141         // maps, queues and server endpoint etc
libfabric_controller(std::string provider,std::string domain,std::string endpoint,int port=7910)142         libfabric_controller(
143             std::string provider,
144             std::string domain,
145             std::string endpoint, int port=7910)
146           : fabric_info_(nullptr)
147           , fabric_(nullptr)
148           , fabric_domain_(nullptr)
149           , ep_passive_(nullptr)
150           , ep_active_(nullptr)
151           , ep_shared_rx_cxt_(nullptr)
152           , event_queue_(nullptr)
153             //
154           , txcq_(nullptr)
155           , rxcq_(nullptr)
156           , av_(nullptr)
157             //
158           , immediate_(false)
159         {
160             FUNC_START_DEBUG_MSG;
161             open_fabric(provider, domain, endpoint);
162 
163             // Create a memory pool for pinned buffers
164             memory_pool_.reset(
165                 new rma_memory_pool<libfabric_region_provider>(fabric_domain_));
166 
167             // setup a passive listener, or an active RDM endpoint
168             here_ = create_local_endpoint();
169 #ifndef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM
170             create_event_queue();
171 #endif
172 
173             LOG_DEBUG_MSG("Calling boot PMI");
174             boot_PMI();
175             FUNC_END_DEBUG_MSG;
176         }
177 
boot_PMI()178         void boot_PMI()
179         {
180 #ifdef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI
181             int spawned;
182             int size;
183             int rank;
184             int appnum;
185 
186             LOG_DEBUG_MSG("Calling PMI init");
187             PMI2_Init(&spawned, &size, &rank, &appnum);
188             LOG_DEBUG_MSG("Called PMI init on rank" << decnumber(rank));
189 
190             // create address vector and queues we need if bootstrapping
191             create_completion_queues(fabric_info_, size);
192 
193             // we must pass out libfabric data to other nodes
194             // encode it as a string to put into the PMI KV store
195             std::string encoded_locality(
196                 base64_t((const char*)(here_.fabric_data())),
197                 base64_t((const char*)(here_.fabric_data()) + locality::array_size));
198             int encoded_length = encoded_locality.size();
199             LOG_DEBUG_MSG("Encoded locality as " << encoded_locality
200                 << " with length " << decnumber(encoded_length));
201 
202             // Key name for PMI
203             std::string pmi_key = "hpx_libfabric_" + std::to_string(rank);
204             // insert out data in the KV store
205             LOG_DEBUG_MSG("Calling PMI2_KVS_Put on rank " << decnumber(rank));
206             PMI2_KVS_Put(pmi_key.data(), encoded_locality.data());
207 
208             // Wait for all to do the same
209             LOG_DEBUG_MSG("Calling PMI2_KVS_Fence on rank " << decnumber(rank));
210             PMI2_KVS_Fence();
211 
212             // read libfabric data for all nodes and insert into our Address vector
213             for (int i = 0; i < size; ++i)
214             {
215                 // read one locality key
216                 std::string pmi_key = "hpx_libfabric_" + std::to_string(i);
217                 char encoded_data[locality::array_size*2];
218                 int length = 0;
219                 PMI2_KVS_Get(0, i, pmi_key.data(), encoded_data,
220                     encoded_length + 1, &length);
221                 if (length != encoded_length)
222                 {
223                     LOG_ERROR_MSG("PMI value length mismatch, expected "
224                         << decnumber(encoded_length) << "got " << decnumber(length));
225                 }
226 
227                 // decode the string back to raw locality data
228                 LOG_DEBUG_MSG("Calling decode for " << decnumber(i)
229                     << " locality data on rank " << decnumber(rank));
230                 locality new_locality;
231                 std::copy(binary_t(encoded_data),
232                           binary_t(encoded_data + encoded_length),
233                           (new_locality.fabric_data_writable()));
234 
235                 // insert locality into address vector
236                 LOG_DEBUG_MSG("Calling insert_address for " << decnumber(i)
237                     << "on rank " << decnumber(rank));
238                 insert_address(new_locality);
239                 LOG_DEBUG_MSG("rank " << decnumber(i)
240                     << "added to address vector");
241                 if (i == 0)
242                 {
243                     agas_ = new_locality;
244                 }
245             }
246 
247             PMI2_Finalize();
248 #endif
249         }
250 
251         // --------------------------------------------------------------------
252         // clean up all resources
~libfabric_controller()253         ~libfabric_controller()
254         {
255             unsigned int messages_handled = 0;
256             unsigned int acks_received    = 0;
257             unsigned int msg_plain        = 0;
258             unsigned int msg_rma          = 0;
259             unsigned int sent_ack         = 0;
260             unsigned int rma_reads        = 0;
261             unsigned int recv_deletes     = 0;
262             //
263             for (auto &r : receivers_) {
264                 r.cleanup();
265                 // from receiver
266                 messages_handled += r.messages_handled_;
267                 acks_received    += r.acks_received_;
268                 // from rma_receivers
269                 msg_plain        += r.msg_plain_;
270                 msg_rma          += r.msg_rma_;
271                 sent_ack         += r.sent_ack_;
272                 rma_reads        += r.rma_reads_;
273                 recv_deletes     += r.recv_deletes_;
274             }
275 
276             LOG_DEBUG_MSG(
277                    "Received messages " << decnumber(messages_handled)
278                 << "Received acks "     << decnumber(acks_received)
279                 << "Sent acks "     << decnumber(sent_ack)
280                 << "Total reads "       << decnumber(rma_reads)
281                 << "Total deletes "     << decnumber(recv_deletes)
282                 << "deletes error " << decnumber(messages_handled - recv_deletes));
283 
284             // Cleaning up receivers to avoid memory leak errors.
285             receivers_.clear();
286 
287             LOG_DEBUG_MSG("closing fabric_->fid");
288             if (fabric_)
289                 fi_close(&fabric_->fid);
290 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM
291             LOG_DEBUG_MSG("closing ep_active_->fid");
292             if (ep_active_)
293                 fi_close(&ep_active_->fid);
294 #else
295             LOG_DEBUG_MSG("closing ep_passive_->fid");
296             if (ep_passive_)
297                 fi_close(&ep_passive_->fid);
298 #endif
299             LOG_DEBUG_MSG("closing event_queue_->fid");
300             if (event_queue_)
301                 fi_close(&event_queue_->fid);
302             LOG_DEBUG_MSG("closing fabric_domain_->fid");
303             if (fabric_domain_)
304                 fi_close(&fabric_domain_->fid);
305             LOG_DEBUG_MSG("closing ep_shared_rx_cxt_->fid");
306             if (ep_shared_rx_cxt_)
307                 fi_close(&ep_shared_rx_cxt_->fid);
308             // clean up
309             LOG_DEBUG_MSG("freeing fabric_info");
310             fi_freeinfo(fabric_info_);
311         }
312 
313         // --------------------------------------------------------------------
314         // initialize the basic fabric/domain/name
open_fabric(std::string provider,std::string domain,std::string endpoint_type)315         void open_fabric(
316             std::string provider, std::string domain, std::string endpoint_type)
317         {
318             FUNC_START_DEBUG_MSG;
319             struct fi_info *fabric_hints_ = fi_allocinfo();
320             if (!fabric_hints_) {
321                 throw fabric_error(-1, "Failed to allocate fabric hints");
322             }
323             // we require message and RMA support, so ask for them
324             // we also want receives to carry source address info
325             fabric_hints_->caps                   = FI_MSG | FI_RMA | FI_SOURCE |
326                 FI_WRITE | FI_READ | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_RMA_EVENT;
327             fabric_hints_->mode                   = FI_CONTEXT | FI_LOCAL_MR;
328             fabric_hints_->fabric_attr->prov_name = strdup(provider.c_str());
329             LOG_DEBUG_MSG("fabric provider " << fabric_hints_->fabric_attr->prov_name);
330             if (domain.size()>0) {
331                 fabric_hints_->domain_attr->name  = strdup(domain.c_str());
332                 LOG_DEBUG_MSG("fabric domain "   << fabric_hints_->domain_attr->name);
333             }
334 
335             // use infiniband type basic registration for now
336             fabric_hints_->domain_attr->mr_mode = FI_MR_BASIC;
337 
338             // Disable the use of progress threads
339             fabric_hints_->domain_attr->control_progress = FI_PROGRESS_MANUAL;
340             fabric_hints_->domain_attr->data_progress = FI_PROGRESS_MANUAL;
341 
342             // Enable thread safe mode Does not work with psm2 provider
343             fabric_hints_->domain_attr->threading = FI_THREAD_SAFE;
344 
345             // Enable resource management
346             fabric_hints_->domain_attr->resource_mgmt = FI_RM_ENABLED;
347 
348 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM
349             LOG_DEBUG_MSG("Selecting endpoint type RDM");
350             fabric_hints_->ep_attr->type = FI_EP_RDM;
351 #else
352             // we will use a shared receive context for active endpoints
353             fabric_hints_->ep_attr->rx_ctx_cnt = FI_SHARED_CONTEXT;
354 
355             if (endpoint_type=="msg") {
356                 fabric_hints_->ep_attr->type = FI_EP_MSG;
357             } else if (endpoint_type=="rdm") {
358                 fabric_hints_->ep_attr->type = FI_EP_RDM;
359             } else if (endpoint_type=="dgram") {
360                 fabric_hints_->ep_attr->type = FI_EP_DGRAM;
361             }
362             else {
363                 LOG_DEBUG_MSG("endpoint type not set, using RDM");
364                 fabric_hints_->ep_attr->type = FI_EP_RDM;
365             }
366 #endif
367 
368             // by default, we will always want completions on both tx/rx events
369             fabric_hints_->tx_attr->op_flags = FI_COMPLETION;
370             fabric_hints_->rx_attr->op_flags = FI_COMPLETION;
371 
372             uint64_t flags = 0;
373             LOG_DEBUG_MSG("Getting initial info about fabric");
374             int ret = fi_getinfo(FI_VERSION(1,4), nullptr, nullptr,
375                 flags, fabric_hints_, &fabric_info_);
376             if (ret) {
377                 throw fabric_error(ret, "Failed to get fabric info");
378             }
379             LOG_DEBUG_MSG("Fabric info " << fi_tostr(fabric_info_, FI_TYPE_INFO));
380 
381             immediate_ = (fabric_info_->rx_attr->mode & FI_RX_CQ_DATA)!=0;
382             LOG_DEBUG_MSG("Fabric supports immediate data " << immediate_);
383             LOG_EXCLUSIVE(bool context = (fabric_hints_->mode & FI_CONTEXT)!=0);
384             LOG_DEBUG_MSG("Fabric requires FI_CONTEXT " << context);
385 
386             LOG_DEBUG_MSG("Creating fabric object");
387             ret = fi_fabric(fabric_info_->fabric_attr, &fabric_, nullptr);
388             if (ret) {
389                 throw fabric_error(ret, "Failed to get fi_fabric");
390             }
391 
392             // Allocate a domain.
393             LOG_DEBUG_MSG("Allocating domain ");
394             ret = fi_domain(fabric_, fabric_info_, &fabric_domain_, nullptr);
395             if (ret) throw fabric_error(ret, "fi_domain");
396 
397             // Cray specific. Disable memory registration cache
398             _set_disable_registration();
399 
400             fi_freeinfo(fabric_hints_);
401             FUNC_END_DEBUG_MSG;
402         }
403 
404         // -------------------------------------------------------------------
405         // create endpoint and get ready for possible communications
startup(parcelport * pp)406         void startup(parcelport *pp)
407         {
408             FUNC_START_DEBUG_MSG;
409             //
410 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM
411             bind_endpoint_to_queues(ep_active_);
412 #else
413             bind_endpoint_to_queues(ep_passive_);
414             fabric_info_->handle = &(ep_passive->fid);
415 
416             LOG_DEBUG_MSG("Creating active endpoint");
417             new_endpoint_active(fabric_info_, &ep_active_);
418             LOG_DEBUG_MSG("active endpoint " << hexpointer(ep_active_);
419 
420             bind_endpoint_to_queues(ep_active_);
421 #endif
422 
423             // filling our vector of receivers...
424             std::size_t num_receivers = HPX_PARCELPORT_LIBFABRIC_MAX_PREPOSTS;
425             receivers_.reserve(num_receivers);
426             for(std::size_t i = 0; i != num_receivers; ++i)
427             {
428                 receivers_.emplace_back(pp, ep_active_, *memory_pool_);
429             }
430         }
431 
432         // --------------------------------------------------------------------
433         // Special GNI extensions to disable memory registration cache
434 
435         // this helper function only works for string ops
436         void _set_check_domain_op_value(int op, const char *value)
437         {
438 #ifdef HPX_PARCELPORT_LIBFABRIC_GNI
439             int ret;
440             struct fi_gni_ops_domain *gni_domain_ops;
441             char *get_val;
442 
443             ret = fi_open_ops(&fabric_domain_->fid, FI_GNI_DOMAIN_OPS_1,
444                       0, (void **) &gni_domain_ops, nullptr);
445             if (ret) throw fabric_error(ret, "fi_open_ops");
446             LOG_DEBUG_MSG("domain ops returned " << hexpointer(gni_domain_ops));
447 
448             ret = gni_domain_ops->set_val(&fabric_domain_->fid,
449                     (dom_ops_val_t)(op), &value);
450             if (ret) throw fabric_error(ret, "set val (ops)");
451 
452             ret = gni_domain_ops->get_val(&fabric_domain_->fid,
453                     (dom_ops_val_t)(op), &get_val);
454             LOG_DEBUG_MSG("Cache mode set to " << get_val);
455             if (std::string(value) != std::string(get_val))
456                 throw fabric_error(ret, "get val");
457 #endif
458         }
459 
460         void _set_disable_registration()
461         {
462 #ifdef HPX_PARCELPORT_LIBFABRIC_GNI
463             _set_check_domain_op_value(GNI_MR_CACHE, "none");
464 #endif
465         }
466 
467         // -------------------------------------------------------------------
468         void create_event_queue()
469         {
470             LOG_DEBUG_MSG("Creating event queue");
471             fi_eq_attr eq_attr = {};
472             eq_attr.wait_obj = FI_WAIT_NONE;
473             int ret = fi_eq_open(fabric_, &eq_attr, &event_queue_, nullptr);
474             if (ret) throw fabric_error(ret, "fi_eq_open");
475 
476             if (fabric_info_->ep_attr->type == FI_EP_MSG) {
477                 LOG_DEBUG_MSG("Binding event queue to passive endpoint");
478                 ret = fi_pep_bind(ep_passive_, &event_queue_->fid, 0);
479                 if (ret) throw fabric_error(ret, "fi_pep_bind");
480 
481                 LOG_DEBUG_MSG("Passive endpoint : listen");
482                 ret = fi_listen(ep_passive_);
483                 if (ret) throw fabric_error(ret, "fi_listen");
484 
485                 LOG_DEBUG_MSG("Allocating shared receive context");
486                 ret = fi_srx_context(fabric_domain_, fabric_info_->rx_attr,
487                     &ep_shared_rx_cxt_, nullptr);
488                 if (ret) throw fabric_error(ret, "fi_srx_context");
489             }
490             FUNC_END_DEBUG_MSG;
491         }
492 
493         // --------------------------------------------------------------------
494         locality create_local_endpoint()
495         {
496             struct fid *id;
497             int ret;
498 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_RDM
499             LOG_DEBUG_MSG("Creating active endpoint");
500             new_endpoint_active(fabric_info_, &ep_active_);
501             LOG_DEBUG_MSG("active endpoint " << hexpointer(ep_active_));
502             id = &ep_active_->fid;
503 #else
504             LOG_DEBUG_MSG("Creating passive endpoint");
505             ret = fi_passive_ep(fabric_, fabric_info_, &ep_passive_, nullptr);
506             if (ret) {
507                 throw fabric_error(ret, "Failed to create fi_passive_ep");
508             }
509             LOG_DEBUG_MSG("passive endpoint " << hexpointer(ep_passive_));
510             id = &ep_passive_->fid;
511 #endif
512 
513 #ifdef HPX_HAVE_PARCELPORT_TCP
514             // with tcp we do not use PMI boot, so enable the endpoint now
515             LOG_DEBUG_MSG("Enabling endpoint (TCP) " << hexpointer(ep_active_));
516             ret = fi_enable(ep_active_);
517             if (ret) throw fabric_error(ret, "fi_enable");
518 #endif
519 
520             locality::locality_data local_addr;
521             std::size_t addrlen = locality::array_size;
522             LOG_DEBUG_MSG("Fetching local address using size " << decnumber(addrlen));
523             ret = fi_getname(id, local_addr.data(), &addrlen);
524             if (ret || (addrlen>locality::array_size)) {
525                 fabric_error(ret, "fi_getname - size error or other problem");
526             }
527 
528             LOG_EXCLUSIVE(
529             {
530                 std::stringstream temp1;
531                 for (std::size_t i=0; i<locality::array_length; ++i) {
532                     temp1 << ipaddress(local_addr[i]);
533                 }
534                 LOG_DEBUG_MSG("address info is " << temp1.str().c_str());
535                 std::stringstream temp2;
536                 for (std::size_t i=0; i<locality::array_length; ++i) {
537                     temp2 << hexuint32(local_addr[i]);
538                 }
539                 LOG_DEBUG_MSG("address info is " << temp2.str().c_str());
540             });
541             FUNC_END_DEBUG_MSG;
542             return locality(local_addr);
543         }
544 
545         // --------------------------------------------------------------------
546         void new_endpoint_active(struct fi_info *info, struct fid_ep **new_endpoint)
547         {
548             FUNC_START_DEBUG_MSG;
549             // create an 'active' endpoint that can be used for sending/receiving
550             LOG_DEBUG_MSG("Creating active endpoint");
551             LOG_DEBUG_MSG("Got info mode " << (info->mode & FI_NOTIFY_FLAGS_ONLY));
552             int ret = fi_endpoint(fabric_domain_, info, new_endpoint, nullptr);
553             if (ret) throw fabric_error(ret, "fi_endpoint");
554 
555             if (info->ep_attr->type == FI_EP_MSG) {
556                 if (event_queue_) {
557                     LOG_DEBUG_MSG("Binding endpoint to EQ");
558                     ret = fi_ep_bind(*new_endpoint, &event_queue_->fid, 0);
559                     if (ret) throw fabric_error(ret, "bind event_queue_");
560                 }
561             }
562         }
563 
564         // --------------------------------------------------------------------
565         void bind_endpoint_to_queues(struct fid_ep *endpoint)
566         {
567             int ret;
568             if (av_) {
569                 LOG_DEBUG_MSG("Binding endpoint to AV");
570                 ret = fi_ep_bind(endpoint, &av_->fid, 0);
571                 if (ret) throw fabric_error(ret, "bind event_queue_");
572             }
573 
574             if (txcq_) {
575                 LOG_DEBUG_MSG("Binding endpoint to TX CQ");
576                 ret = fi_ep_bind(endpoint, &txcq_->fid, FI_TRANSMIT);
577                 if (ret) throw fabric_error(ret, "bind txcq");
578             }
579 
580             if (rxcq_) {
581                 LOG_DEBUG_MSG("Binding endpoint to RX CQ");
582                 ret = fi_ep_bind(endpoint, &rxcq_->fid, FI_RECV);
583                 if (ret) throw fabric_error(ret, "rxcq");
584             }
585 
586             if (ep_shared_rx_cxt_) {
587                 LOG_DEBUG_MSG("Binding endpoint to shared receive context");
588                 ret = fi_ep_bind(endpoint, &ep_shared_rx_cxt_->fid, 0);
589                 if (ret) throw fabric_error(ret, "ep_shared_rx_cxt_");
590             }
591 
592             LOG_DEBUG_MSG("Enabling endpoint " << hexpointer(endpoint));
593             ret = fi_enable(endpoint);
594             if (ret) throw fabric_error(ret, "fi_enable");
595 
596             FUNC_END_DEBUG_MSG;
597         }
598 
599         // --------------------------------------------------------------------
600         void initialize_localities(hpx::agas::addressing_service &as)
601         {
602             FUNC_START_DEBUG_MSG;
603 #ifndef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI
604             std::uint32_t N = hpx::get_config().get_num_localities();
605             LOG_DEBUG_MSG("Parcelport initialize_localities with " << N << " localities");
606 
607             // make sure address vector is created
608             create_completion_queues(fabric_info_, N);
609 
610             for (std::uint32_t i=0; i<N; ++i) {
611                 hpx::naming::gid_type l = hpx::naming::get_gid_from_locality_id(i);
612                 LOG_DEBUG_MSG("Resolving locality" << l);
613                 // each locality may be reachable by mutiplte parcelports
614                 const parcelset::endpoints_type &res = as.resolve_locality(l);
615                 // get the fabric related data
616                 auto it = res.find("libfabric");
617                 LOG_DEBUG_MSG("locality resolution " << it->first << " => " <<it->second);
618                 const hpx::parcelset::locality &fabric_locality = it->second;
619                 const locality &loc = fabric_locality.get<locality>();
620                 // put the provide specific data into the address vector
621                 // so that we can look it  up later
622                 /*fi_addr_t dummy =*/ insert_address(loc);
623             }
624 #endif
625             LOG_DEBUG_MSG("Done getting localities ");
626             FUNC_END_DEBUG_MSG;
627         }
628 
629         // --------------------------------------------------------------------
630         fi_addr_t get_fabric_address(const locality &dest_fabric)
631         {
632             uint64_t key = (uint64_t)dest_fabric.ip_address() << 32 | dest_fabric.port();
633             return endpoint_av_.find(key)->second;
634         }
635 
636         // --------------------------------------------------------------------
637         const locality & here() const { return here_; }
638 
639         // --------------------------------------------------------------------
640         const bool & immedate_data_supported() const { return immediate_; }
641 
642         // --------------------------------------------------------------------
643         // returns true when all connections have been disconnected and none are active
644         bool isTerminated() {
645             return false;
646             //return (qp_endpoint_map_.size() == 0);
647         }
648 
649         // types we need for connection and disconnection callback functions
650         // into the main parcelport code.
651         typedef std::function<void(fid_ep *endpoint, uint32_t ipaddr)>
652             ConnectionFunction;
653         typedef std::function<void(fid_ep *endpoint, uint32_t ipaddr)>
654             DisconnectionFunction;
655 
656         // --------------------------------------------------------------------
657         // Set a callback which will be called immediately after
658         // RDMA_CM_EVENT_ESTABLISHED has been received.
659         // This should be used to initialize all structures for handling a new connection
660         void setConnectionFunction(ConnectionFunction f) {
661             connection_function_ = f;
662         }
663 
664         // --------------------------------------------------------------------
665         // currently not used.
666         void setDisconnectionFunction(DisconnectionFunction f) {
667             disconnection_function_ = f;
668         }
669 
670         // --------------------------------------------------------------------
671         // This is the main polling function that checks for work completions
672         // and connection manager events, if stopped is true, then completions
673         // are thrown away, otherwise the completion callback is triggered
674         int poll_endpoints(bool stopped=false)
675         {
676             int work = poll_for_work_completions();
677 
678 #ifdef HPX_PARCELPORT_LIBFABRIC_ENDPOINT_MSG
679             work += poll_event_queue(stopped);
680 #endif
681             return work;
682         }
683 
684         // --------------------------------------------------------------------
685         int poll_for_work_completions()
686         {
687             // @TODO, disable polling until queues are initialized to avoid this check
688             // if queues are not setup, don't poll
689             if (HPX_UNLIKELY(!rxcq_)) return 0;
690             //
691             return poll_send_queue() + poll_recv_queue();
692         }
693 
694         // --------------------------------------------------------------------
695         int poll_send_queue()
696         {
697             LOG_TIMED_INIT(poll);
698             LOG_TIMED_BLOCK(poll, DEVEL, 5.0, { LOG_DEBUG_MSG("poll_send_queue"); });
699 
700             fi_cq_msg_entry entry;
701             int ret = 0;
702             {
703                 std::unique_lock<mutex_type> l(polling_mutex_, std::try_to_lock);
704                 if (l)
705                     ret = fi_cq_read(txcq_, &entry, 1);
706             }
707             if (ret>0) {
708                 LOG_DEBUG_MSG("Completion txcq wr_id "
709                     << fi_tostr(&entry.flags, FI_TYPE_OP_FLAGS)
710                     << " (" << decnumber(entry.flags) << ") "
711                     << "context " << hexpointer(entry.op_context)
712                     << "length " << hexuint32(entry.len));
713                 if (entry.flags & FI_RMA) {
714                     LOG_DEBUG_MSG("Received a txcq RMA completion "
715                         << "Context " << hexpointer(entry.op_context));
716                     rma_receiver* rcv = reinterpret_cast<rma_receiver*>(entry.op_context);
717                     rcv->handle_rma_read_completion();
718                 }
719                 else if (entry.flags == (FI_MSG | FI_SEND)) {
720                     LOG_DEBUG_MSG("Received a txcq RMA send completion");
721                     sender* handler = reinterpret_cast<sender*>(entry.op_context);
722                     handler->handle_send_completion();
723                 }
724                 else {
725                     LOG_DEBUG_MSG("$$$$$ Received an unknown txcq completion ***** "
726                         << decnumber(entry.flags));
727                     std::terminate();
728                 }
729                 return 1;
730             }
731             else if (ret==0 || ret==-FI_EAGAIN) {
732                 // do nothing, we will try again on the next check
733                 LOG_TIMED_MSG(poll, DEVEL, 10, "txcq FI_EAGAIN");
734             }
735             else if (ret == -FI_EAVAIL) {
736                 struct fi_cq_err_entry e = {};
737                 int err_sz = fi_cq_readerr(txcq_, &e ,0);
738                 // from the manpage 'man 3 fi_cq_readerr'
739                 //
740                 // On error, a negative value corresponding to
741                 // 'fabric errno' is returned
742                 //
743                 if(e.err == err_sz) {
744                     LOG_ERROR_MSG("txcq_ Error with len " << hexlength(e.len)
745                         << "context " << hexpointer(e.op_context));
746                 }
747                 // flags might not be set correctly
748                 if (e.flags == (FI_MSG | FI_SEND)) {
749                     LOG_ERROR_MSG("txcq Error for FI_SEND with len " << hexlength(e.len)
750                         << "context " << hexpointer(e.op_context));
751                 }
752                 if (e.flags & FI_RMA) {
753                     LOG_ERROR_MSG("txcq Error for FI_RMA with len " << hexlength(e.len)
754                         << "context " << hexpointer(e.op_context));
755                 }
756                 rma_base *base = reinterpret_cast<rma_base*>(e.op_context);
757                 base->handle_error(e);
758             }
759             else {
760                 LOG_ERROR_MSG("unknown error in completion txcq read");
761             }
762             return 0;
763         }
764 
765         // --------------------------------------------------------------------
766         int poll_recv_queue()
767         {
768             LOG_TIMED_INIT(poll);
769             LOG_TIMED_BLOCK(poll, DEVEL, 5.0, { LOG_DEBUG_MSG("poll_recv_queue"); });
770 
771             int result = 0;
772             fi_addr_t src_addr;
773             fi_cq_msg_entry entry;
774 
775             // receives will use fi_cq_readfrom as we want the source address
776             int ret = 0;
777             {
778                 std::unique_lock<mutex_type> l(polling_mutex_, std::try_to_lock);
779                 if (l)
780                     ret = fi_cq_readfrom(rxcq_, &entry, 1, &src_addr);
781             }
782             if (ret>0) {
783                 LOG_DEBUG_MSG("Completion rxcq wr_id "
784                     << fi_tostr(&entry.flags, FI_TYPE_OP_FLAGS)
785                     << " (" << decnumber(entry.flags) << ") "
786                     << "source " << hexpointer(src_addr)
787                     << "context " << hexpointer(entry.op_context)
788                     << "length " << hexuint32(entry.len));
789                 if (src_addr == FI_ADDR_NOTAVAIL)
790                 {
791                     LOG_DEBUG_MSG("Source address not available...\n");
792                     std::terminate();
793                 }
794 //                     if ((entry.flags & FI_RMA) == FI_RMA) {
795 //                         LOG_DEBUG_MSG("Received an rxcq RMA completion");
796 //                     }
797                 else if (entry.flags == (FI_MSG | FI_RECV)) {
798                     LOG_DEBUG_MSG("Received an rxcq recv completion "
799                         << hexpointer(entry.op_context));
800                     reinterpret_cast<receiver *>(entry.op_context)->
801                         handle_recv(src_addr, entry.len);
802                 }
803                 else {
804                     LOG_DEBUG_MSG("Received an unknown rxcq completion "
805                         << decnumber(entry.flags));
806                     std::terminate();
807                 }
808                 result = 1;
809             }
810             else if (ret==0 || ret==-FI_EAGAIN) {
811                 // do nothing, we will try again on the next check
812                 LOG_TIMED_MSG(poll, DEVEL, 10, "rxcq FI_EAGAIN");
813             }
814             else if (ret == -FI_EAVAIL) {
815                 struct fi_cq_err_entry e = {};
816                 int err_sz = fi_cq_readerr(rxcq_, &e ,0);
817                 // from the manpage 'man 3 fi_cq_readerr'
818                 //
819                 // On error, a negative value corresponding to
820                 // 'fabric errno' is returned
821                 //
822                 if(e.err == err_sz) {
823                     LOG_ERROR_MSG("txcq_ Error with len " << hexlength(e.len)
824                         << "context " << hexpointer(e.op_context));
825                 }
826                 LOG_ERROR_MSG("rxcq Error with flags " << hexlength(e.flags)
827                     << "len " << hexlength(e.len));
828             }
829             else {
830                 LOG_ERROR_MSG("unknown error in completion rxcq read");
831             }
832             return result;
833         }
834 
835         // --------------------------------------------------------------------
836         int poll_event_queue(bool stopped=false)
837         {
838             LOG_TIMED_INIT(poll);
839             LOG_TIMED_BLOCK(poll, DEVEL, 5.0,
840                 {
841                     LOG_DEBUG_MSG("Polling event completion channel");
842                 }
843             )
844             struct fi_eq_cm_entry *cm_entry;
845 //             struct fi_eq_entry    *entry;
846             struct fid_ep         *new_ep;
847 //             uint32_t *addr;
848             uint32_t event;
849             std::array<char, 256> buffer;
850             ssize_t rd = fi_eq_read(event_queue_, &event,
851                 buffer.data(), sizeof(buffer), 0);
852             if (rd > 0) {
853                 LOG_DEBUG_MSG("fi_eq_cm_entry " << decnumber(sizeof(fi_eq_cm_entry))
854                     << " fi_eq_entry " << decnumber(sizeof(fi_eq_entry)));
855                 LOG_DEBUG_MSG("got event " << event << " with bytes = " << decnumber(rd));
856                 switch (event) {
857                 case FI_CONNREQ:
858                 {
859                     cm_entry = reinterpret_cast<struct fi_eq_cm_entry*>(buffer.data());
860                     locality::locality_data addressinfo;
861                     std::memcpy(addressinfo.data(), cm_entry->info->dest_addr,
862                         locality::array_size);
863                     locality loc(addressinfo);
864                     LOG_DEBUG_MSG("FI_CONNREQ                 from "
865                         << ipaddress(loc.ip_address()) << "-> "
866                         << ipaddress(here_.ip_address())
867                         << "( " << ipaddress(here_.ip_address()) << " )");
868                     {
869                         auto result = insert_new_future(loc.ip_address());
870                         // if the insert fails, it means we have a connection
871                         // already in progress, reject if we are a lower ip address
872                         if (!result.first && loc.ip_address()>here_.ip_address()) {
873                             LOG_DEBUG_MSG("FI_CONNREQ priority fi_reject   "
874                                 << ipaddress(loc.ip_address()) << "-> "
875                                 << ipaddress(here_.ip_address())
876                                 << "( " << ipaddress(here_.ip_address()) << " )");
877 //                            int ret = fi_reject(ep_passive_, cm_entry->info->handle,
878 //                                nullptr, 0);
879 //                            if (ret) throw fabric_error(ret, "new_ep fi_reject failed");
880                             fi_freeinfo(cm_entry->info);
881                             return 0;
882                         }
883                         // create a new endpoint for this request and accept it
884                         new_endpoint_active(cm_entry->info, &new_ep);
885                         LOG_DEBUG_MSG("Calling fi_accept               "
886                             << ipaddress(loc.ip_address()) << "-> "
887                             << ipaddress(here_.ip_address())
888                             << "( " << ipaddress(here_.ip_address()) << " )");
889                         int ret = fi_accept(new_ep, &here_.ip_address(),
890                             sizeof(uint32_t));
891                         if (ret) throw fabric_error(ret, "new_ep fi_accept failed");
892                     }
893                     fi_freeinfo(cm_entry->info);
894                     break;
895                 }
896                 case FI_CONNECTED:
897                 {
898                     cm_entry = reinterpret_cast<struct fi_eq_cm_entry*>(buffer.data());
899                     new_ep = container_of(cm_entry->fid, struct fid_ep, fid);
900                     locality::locality_data address;
901                     std::size_t len = sizeof(locality::locality_data);
902                     fi_getpeer(new_ep, address.data(), &len);
903                     //
904                     auto present1 = endpoint_tmp_.is_in_map(address[1]);
905                     if (!present1.second) {
906                         throw fabric_error(0, "FI_CONNECTED, endpoint map error");
907                     }
908                     LOG_DEBUG_MSG("FI_CONNECTED " << hexpointer(new_ep)
909                         << ipaddress(locality::ip_address(address)) << "<> "
910                         << ipaddress(here_.ip_address())
911                         << "( " << ipaddress(here_.ip_address()) << " )");
912 
913                     // call parcelport connection function before setting future
914                     connection_function_(new_ep, address[1]);
915 
916                     // if there is an entry for a locally started connection on this IP
917                     // then set the future ready with the verbs endpoint
918                     LOG_DEBUG_MSG("FI_CONNECTED setting future     "
919                             << ipaddress(locality::ip_address(address)) << "<> "
920                             << ipaddress(here_.ip_address())
921                         << "( " << ipaddress(here_.ip_address()) << " )");
922 
923                     std::get<0>(endpoint_tmp_.find(address[1])->second).
924                         set_value(new_ep);
925 
926                     // once the future is set, the entry can be removed?
927 //                    endpoint_tmp_.erase(present1.first);
928                 }
929                 break;
930                 case FI_NOTIFY:
931                     LOG_DEBUG_MSG("Got FI_NOTIFY");
932                     break;
933                 case FI_SHUTDOWN:
934                     LOG_DEBUG_MSG("Got FI_SHUTDOWN");
935                     break;
936                 case FI_MR_COMPLETE:
937                     LOG_DEBUG_MSG("Got FI_MR_COMPLETE");
938                     break;
939                 case FI_AV_COMPLETE:
940                     LOG_DEBUG_MSG("Got FI_AV_COMPLETE");
941                     break;
942                 case FI_JOIN_COMPLETE:
943                     LOG_DEBUG_MSG("Got FI_JOIN_COMPLETE");
944                     break;
945                 }
946                 //                   HPX_ASSERT(rd == sizeof(struct fi_eq_cm_entry));
947                 //                   HPX_ASSERT(cm_entry->fid == event_queue_->fid);
948             }
949             else {
950                 LOG_TIMED_MSG(poll, DEVEL, 5, "We did not get an event completion")
951             }
952             return 0;
953         }
954 
955         // --------------------------------------------------------------------
956         inline struct fid_domain * get_domain() {
957             return fabric_domain_;
958         }
959 
960         // --------------------------------------------------------------------
961         inline rma_memory_pool<libfabric_region_provider>& get_memory_pool() {
962             return *memory_pool_;
963         }
964 
965         // --------------------------------------------------------------------
966         void create_completion_queues(struct fi_info *info, int N)
967         {
968             FUNC_START_DEBUG_MSG;
969 
970             // only one thread must be allowed to create queues,
971             // and it is only required once
972             scoped_lock lock(initialization_mutex_);
973             if (txcq_!=nullptr || rxcq_!=nullptr || av_!=nullptr) {
974                 return;
975             }
976 
977             int ret;
978 
979             fi_cq_attr cq_attr = {};
980             // @TODO - why do we check this
981 //             if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) {
982                 LOG_DEBUG_MSG("Setting CQ attribute to FI_CQ_FORMAT_MSG");
983                 cq_attr.format = FI_CQ_FORMAT_MSG;
984 //             }
985 
986             // open completion queue on fabric domain and set context ptr to tx queue
987             cq_attr.wait_obj = FI_WAIT_NONE;
988             cq_attr.size = info->tx_attr->size;
989             info->tx_attr->op_flags |= FI_COMPLETION;
990             cq_attr.flags = 0;//|= FI_COMPLETION;
991             LOG_DEBUG_MSG("Creating CQ with tx size " << decnumber(info->tx_attr->size));
992             ret = fi_cq_open(fabric_domain_, &cq_attr, &txcq_, &txcq_);
993             if (ret) throw fabric_error(ret, "fi_cq_open");
994 
995             // open completion queue on fabric domain and set context ptr to rx queue
996             cq_attr.size = info->rx_attr->size;
997             LOG_DEBUG_MSG("Creating CQ with rx size " << decnumber(info->rx_attr->size));
998             ret = fi_cq_open(fabric_domain_, &cq_attr, &rxcq_, &rxcq_);
999             if (ret) throw fabric_error(ret, "fi_cq_open");
1000 
1001 
1002             fi_av_attr av_attr = {};
1003             if (info->ep_attr->type == FI_EP_RDM || info->ep_attr->type == FI_EP_DGRAM) {
1004                 if (info->domain_attr->av_type != FI_AV_UNSPEC)
1005                     av_attr.type = info->domain_attr->av_type;
1006                 else {
1007                     LOG_DEBUG_MSG("Setting map type to FI_AV_MAP");
1008                     av_attr.type  = FI_AV_MAP;
1009                     av_attr.count = N;
1010                 }
1011 
1012                 LOG_DEBUG_MSG("Creating address vector ");
1013                 ret = fi_av_open(fabric_domain_, &av_attr, &av_, nullptr);
1014                 if (ret) throw fabric_error(ret, "fi_av_open");
1015             }
1016             FUNC_END_DEBUG_MSG;
1017         }
1018 
1019         // --------------------------------------------------------------------
1020         std::pair<bool, hpx::shared_future<struct fid_ep*>> insert_new_future(
1021             uint32_t remote_ip)
1022         {
1023 
1024             LOG_DEBUG_MSG("insert_new_future : Obsolete in RDM mode");
1025             std::terminate();
1026 
1027             LOG_DEBUG_MSG("Inserting future in map         "
1028                 << ipaddress(here_.ip_address()) << "-> "
1029                 << ipaddress(remote_ip)
1030                 << "( " << ipaddress(here_.ip_address()) << " )");
1031 
1032             //
1033             hpx::promise<struct fid_ep*> new_endpoint_promise;
1034             hpx::future<struct fid_ep*>  new_endpoint_future =
1035                 new_endpoint_promise.get_future();
1036             //
1037             auto fp_pair = std::make_pair(
1038                     remote_ip,
1039                     std::make_tuple(
1040                         std::move(new_endpoint_promise),
1041                         std::move(new_endpoint_future)));
1042              //
1043             auto it = endpoint_tmp_.insert(std::move(fp_pair));
1044             // if the insert failed, we must safely delete the future/promise
1045             if (!it.second) {
1046                 LOG_DEBUG_MSG("Must safely delete promise");
1047             }
1048 
1049             // get the future that was inserted or already present
1050             // the future will become ready when remote end accepts/rejects connection
1051             // or we accept a connection from a remote
1052             hpx::shared_future<struct fid_ep*> result = std::get<1>(it.first->second);
1053 
1054             // if the insert fails due to a duplicate value, return the duplicate
1055             if (!it.second) {
1056                 return std::make_pair(false, result);
1057             }
1058             return std::make_pair(true, result);
1059         }
1060 
1061         // --------------------------------------------------------------------
1062         fi_addr_t insert_address(const locality &remote)
1063         {
1064             FUNC_START_DEBUG_MSG;
1065             LOG_DEBUG_MSG("inserting address in vector "
1066                 << ipaddress(remote.ip_address()));
1067             fi_addr_t result = 0xffffffff;
1068             int ret = fi_av_insert(av_, remote.fabric_data(), 1, &result, 0, nullptr);
1069             if (ret < 0) {
1070                 fabric_error(ret, "fi_av_insert");
1071             }
1072             else if (ret != 1) {
1073                 fabric_error(ret, "fi_av_insert did not return 1");
1074             }
1075             uint64_t key = (uint64_t)remote.ip_address() << 32 | remote.port();
1076             endpoint_av_.insert(std::make_pair(key, result));
1077             LOG_DEBUG_MSG("Address inserted in vector "
1078                 << ipaddress(remote.ip_address()) << ":"
1079                 << remote.port() << hexuint64(result));
1080             FUNC_END_DEBUG_MSG;
1081             return result;
1082         }
1083 
1084         // --------------------------------------------------------------------
1085         hpx::shared_future<struct fid_ep*> connect_to_server(const locality &remote)
1086         {
1087 
1088             LOG_DEBUG_MSG("connect_to_server : Obsolete in RDM mode");
1089             std::terminate();
1090 
1091             const uint32_t &remote_ip = remote.ip_address();
1092 
1093             // Has a connection been started from here already?
1094             // Note: The future must be created before we call fi_connect
1095             // otherwise a connection may complete before the future is setup
1096             auto connection = insert_new_future(remote_ip);
1097 
1098             // if a connection is already underway, just return the future
1099             if (!connection.first) {
1100                 LOG_DEBUG_MSG("connect to server : returning existing future");
1101                 // the future will become ready when the remote end accepts/rejects
1102                 // our connection - or we accept a connection from a remote
1103                 return connection.second;
1104             }
1105 
1106             // for thread safety, make a copy of the fi_info before setting
1107             // the address in it. fi_freeinfo will free the dest_addr field.
1108             struct fi_info *new_info = fi_dupinfo(fabric_info_);
1109             new_info->dest_addrlen = locality::array_size;
1110             new_info->dest_addr = malloc(locality::array_size);
1111             std::memcpy(new_info->dest_addr, remote.fabric_data(), locality::array_size);
1112 
1113             uint64_t flags = 0;
1114             struct fi_info *fabric_info_active_;
1115             int ret = fi_getinfo(FI_VERSION(1,4), nullptr, nullptr,
1116                 flags, new_info, &fabric_info_active_);
1117             if (ret) throw fabric_error(ret, "fi_getinfo");
1118 
1119             LOG_DEBUG_MSG("New connection for IP address "
1120                 << ipaddress(remote.ip_address())
1121                 << "Fabric info " << fi_tostr(fabric_info_active_, FI_TYPE_INFO));
1122             create_completion_queues(fabric_info_active_, 0);
1123 
1124             fid_ep *new_endpoint;
1125             new_endpoint_active(fabric_info_active_, &new_endpoint);
1126 
1127             // now it is safe to call connect
1128             LOG_DEBUG_MSG("Calling fi_connect         from "
1129                 << ipaddress(here_.ip_address()) << "-> "
1130                 << ipaddress(remote.ip_address())
1131                 << "( " << ipaddress(here_.ip_address()) << " )");
1132 
1133             ret = fi_connect(new_endpoint, remote.fabric_data(), nullptr, 0);
1134             if (ret) throw fabric_error(ret, "fi_connect");
1135 
1136             LOG_DEBUG_MSG("Deleting new endpoint info structure");
1137             fi_freeinfo(fabric_info_active_);
1138             fi_freeinfo(new_info);
1139 
1140             return connection.second;
1141         }
1142 
1143         void disconnect_all() {}
1144 
1145         bool active() { return false; }
1146 
1147     private:
1148         // store info about local device
1149         std::string  device_;
1150         std::string  interface_;
1151         sockaddr_in  local_addr_;
1152 
1153         // callback functions used for connection event handling
1154         ConnectionFunction    connection_function_;
1155         DisconnectionFunction disconnection_function_;
1156 
1157         // Pinned memory pool used for allocating buffers
1158         std::unique_ptr<rma_memory_pool<libfabric_region_provider>>  memory_pool_;
1159 
1160         // Shared completion queue for all endoints
1161         // Count outstanding receives posted to SRQ + Completion queue
1162         std::vector<receiver> receivers_;
1163 
1164         // only allow one thread to handle connect/disconnect events etc
1165         mutex_type            initialization_mutex_;
1166         mutex_type            endpoint_map_mutex_;
1167         mutex_type            polling_mutex_;
1168 
1169         // used to skip polling event channel too frequently
1170         typedef std::chrono::time_point<std::chrono::system_clock> time_type;
1171         time_type event_check_time_;
1172         uint32_t  event_pause_;
1173 
1174     };
1175 
1176     // Smart pointer for libfabric_controller obje
1177     typedef std::shared_ptr<libfabric_controller> libfabric_controller_ptr;
1178 
1179 }}}}
1180 
1181 #endif
1182