1 //  Copyright (c) 2015-2016 John Biddiscombe
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <plugins/parcelport/libfabric/parcelport_libfabric.hpp>
7 
8 // TODO: cleanup includes
9 
10 // config
11 #include <hpx/config.hpp>
12 // util
13 #include <hpx/lcos/local/condition_variable.hpp>
14 #include <hpx/runtime/threads/thread_data.hpp>
15 #include <hpx/util/command_line_handling.hpp>
16 #include <hpx/util/high_resolution_timer.hpp>
17 #include <hpx/util/runtime_configuration.hpp>
18 
19 // The memory pool specialization need to be pulled in before encode_parcels
20 #include <hpx/plugins/parcelport_factory.hpp>
21 #include <hpx/runtime.hpp>
22 #include <hpx/runtime/parcelset/decode_parcels.hpp>
23 #include <hpx/runtime/parcelset/encode_parcels.hpp>
24 #include <hpx/runtime/parcelset/parcel_buffer.hpp>
25 #include <hpx/runtime/parcelset/parcelport.hpp>
26 #include <hpx/runtime/parcelset/parcelport_impl.hpp>
27 //
28 #include <hpx/util/assert.hpp>
29 #include <hpx/util/debug/thread_stacktrace.hpp>
30 //
31 #include <boost/asio/ip/host_name.hpp>
32 //
33 // This header is generated by CMake and contains a number of configurable
34 // setting that affect the parcelport. It needs to be #included before
35 // other files that are part of the parcelport
36 #include <hpx/config/parcelport_defines.hpp>
37 
38 // --------------------------------------------------------------------
39 // Controls whether we are allowed to suspend threads that are sending
40 // when we have maxed out the number of sends we can handle
41 #define HPX_PARCELPORT_LIBFABRIC_SUSPEND_WAKE  (HPX_PARCELPORT_LIBFABRIC_THROTTLE_SENDS/2)
42 
43 // --------------------------------------------------------------------
44 // Enable the use of boost small_vector for certain short lived storage
45 // elements within the parcelport. This can reduce some memory allocations
46 #define HPX_PARCELPORT_LIBFABRIC_USE_SMALL_VECTOR    true
47 
48 // --------------------------------------------------------------------
49 #include <plugins/parcelport/unordered_map.hpp>
50 #include <plugins/parcelport/libfabric/header.hpp>
51 #include <plugins/parcelport/libfabric/locality.hpp>
52 
53 #include <plugins/parcelport/libfabric/libfabric_region_provider.hpp>
54 #include <plugins/parcelport/performance_counter.hpp>
55 #include <plugins/parcelport/rma_memory_pool.hpp>
56 #include <plugins/parcelport/libfabric/connection_handler.hpp>
57 #include <plugins/parcelport/parcelport_logging.hpp>
58 #include <plugins/parcelport/libfabric/rdma_locks.hpp>
59 #include <plugins/parcelport/libfabric/libfabric_controller.hpp>
60 
61 //
62 #if HPX_PARCELPORT_LIBFABRIC_USE_SMALL_VECTOR
63 # include <boost/container/small_vector.hpp>
64 #endif
65 //
66 #include <unordered_map>
67 #include <memory>
68 #include <mutex>
69 #include <sstream>
70 #include <cstddef>
71 #include <cstdint>
72 #include <cstring>
73 #include <iostream>
74 #include <list>
75 #include <string>
76 #include <utility>
77 #include <vector>
78 
79 using namespace hpx::parcelset::policies;
80 
81 namespace hpx {
82 namespace parcelset {
83 namespace policies {
84 namespace libfabric
85 {
86     // --------------------------------------------------------------------
87     // parcelport, the implementation of the parcelport itself
88     // --------------------------------------------------------------------
89 
90     // --------------------------------------------------------------------
91     // Constructor : mostly just initializes the superclass with 'here'
92     // --------------------------------------------------------------------
parcelport(util::runtime_configuration const & ini,util::function_nonser<void (std::size_t,char const *)> const & on_start_thread,util::function_nonser<void (std::size_t,char const *)> const & on_stop_thread)93     parcelport::parcelport(util::runtime_configuration const& ini,
94         util::function_nonser<void(std::size_t, char const*)> const& on_start_thread,
95         util::function_nonser<void(std::size_t, char const*)> const& on_stop_thread)
96         : base_type(ini, locality(), on_start_thread, on_stop_thread)
97         , stopped_(false)
98         , completions_handled_(0)
99         , senders_in_use_(0)
100     {
101         FUNC_START_DEBUG_MSG;
102 
103         // if we are not enabled, then skip allocating resources
104         parcelport_enabled_ = hpx::util::get_entry_as<bool>(ini,
105             "hpx.parcel.libfabric.enable", 0);
106         LOG_DEBUG_MSG("Got enabled " << parcelport_enabled_);
107 
108         bootstrap_enabled_ = ("libfabric" ==
109             hpx::util::get_entry_as<std::string>(ini, "hpx.parcel.bootstrap", ""));
110         LOG_DEBUG_MSG("Got bootstrap " << bootstrap_enabled_);
111 
112         if (!parcelport_enabled_) return;
113 
114         // Get parameters that determine our fabric selection
115         std::string provider = ini.get_entry("hpx.parcel.libfabric.provider",
116             HPX_PARCELPORT_LIBFABRIC_PROVIDER);
117         std::string domain = ini.get_entry("hpx.parcel.libfabric.domain",
118             HPX_PARCELPORT_LIBFABRIC_DOMAIN);
119         std::string endpoint = ini.get_entry("hpx.parcel.libfabric.endpoint",
120             HPX_PARCELPORT_LIBFABRIC_ENDPOINT);
121 
122         LOG_DEBUG_MSG("libfabric parcelport function using attributes "
123             << provider << " " << domain << " " << endpoint);
124 
125         // create our main fabric control structure
126         libfabric_controller_ = std::make_shared<libfabric_controller>(
127             provider, domain, endpoint);
128 
129         // get 'this' locality from the controller
130         LOG_DEBUG_MSG("Getting local locality object");
131         const locality & local = libfabric_controller_->here();
132         here_ = parcelset::locality(local);
133         // and make a note of our ip address for convenience
134         ip_addr_ = local.ip_address();
135 
136         FUNC_END_DEBUG_MSG;
137     }
138 
139     // --------------------------------------------------------------------
140     // during bootup, this is used by the service threads
io_service_work()141     void parcelport::io_service_work()
142     {
143         while (hpx::is_starting())
144         {
145             background_work(0);
146         }
147         LOG_DEBUG_MSG("io service task completed");
148     }
149 
150     // --------------------------------------------------------------------
151     // Start the handling of communication.
do_run()152     bool parcelport::do_run()
153     {
154         if (!parcelport_enabled_) return false;
155 
156 #ifndef HPX_PARCELPORT_LIBFABRIC_HAVE_PMI
157         auto &as = this->applier_->get_agas_client();
158         libfabric_controller_->initialize_localities(as);
159 #endif
160 
161         FUNC_START_DEBUG_MSG;
162         libfabric_controller_->startup(this);
163 
164         LOG_DEBUG_MSG("Fetching memory pool");
165         chunk_pool_ = &libfabric_controller_->get_memory_pool();
166 
167         for (std::size_t i = 0; i < HPX_PARCELPORT_LIBFABRIC_THROTTLE_SENDS; ++i)
168         {
169             sender *snd =
170                new sender(this,
171                     libfabric_controller_->ep_active_,
172                     libfabric_controller_->get_domain(),
173                     chunk_pool_);
174             snd->postprocess_handler_ = [this](sender* s)
175                 {
176                     --senders_in_use_;
177                     senders_.push(s);
178                     trigger_pending_work();
179                 };
180            senders_.push(snd);
181         }
182 
183         if (bootstrap_enabled_)
184         {
185             for (std::size_t i = 0; i != io_service_pool_.size(); ++i)
186             {
187                 io_service_pool_.get_io_service(int(i)).post(
188                     hpx::util::bind(
189                         &parcelport::io_service_work, this));
190             }
191         }
192         return true;
193    }
194 
195     // --------------------------------------------------------------------
196     // return a sender object back to the parcelport_impl
197     // this is used by the send_immediate version of parcelport_impl
198     // --------------------------------------------------------------------
get_connection(parcelset::locality const & dest,fi_addr_t & fi_addr)199     sender* parcelport::get_connection(
200         parcelset::locality const& dest, fi_addr_t &fi_addr)
201     {
202         sender* snd = nullptr;
203         if (senders_.pop(snd))
204         {
205             FUNC_START_DEBUG_MSG;
206             const locality &fabric_locality = dest.get<locality>();
207             LOG_DEBUG_MSG("get_fabric_address           from "
208                 << ipaddress(here_.get<locality>().ip_address()) << "to "
209                 << ipaddress(fabric_locality.ip_address()));
210             ++senders_in_use_;
211             fi_addr = libfabric_controller_->get_fabric_address(fabric_locality);
212             FUNC_END_DEBUG_MSG;
213             return snd;
214         }
215 //         else if(threads::get_self_ptr())
216 // //         else if(this_thread::has_sufficient_stack_space())
217 //         {
218 // //             background_work_OS_thread();
219 //             hpx::this_thread::suspend(hpx::threads::pending_boost,
220 //                 "libfabric::parcelport::async_write");
221 //         }
222 
223         // if no senders are available shutdown
224         FUNC_END_DEBUG_MSG;
225         return nullptr;
226     }
227 
reclaim_connection(sender * s)228     void parcelport::reclaim_connection(sender* s)
229     {
230         --senders_in_use_;
231         senders_.push(s);
232     }
233 
234     // --------------------------------------------------------------------
235     // return a sender object back to the parcelport_impl
236     // this is for compatibility with non send_immediate operation
237     // --------------------------------------------------------------------
create_connection(parcelset::locality const & dest,error_code & ec)238     std::shared_ptr<sender> parcelport::create_connection(
239         parcelset::locality const& dest, error_code& ec)
240     {
241         LOG_DEBUG_MSG("Creating new sender");
242         return std::shared_ptr<sender>();
243     }
244 
245     // --------------------------------------------------------------------
246     // cleanup
~parcelport()247     parcelport::~parcelport() {
248         FUNC_START_DEBUG_MSG;
249         scoped_lock lk(stop_mutex);
250         sender *snd = nullptr;
251 
252         unsigned int sends_posted  = 0;
253         unsigned int sends_deleted = 0;
254         unsigned int acks_received = 0;
255         //
256         while (senders_.pop(snd)) {
257             LOG_DEBUG_MSG("Popped a sender for delete " << hexpointer(snd));
258             sends_posted  += snd->sends_posted_;
259             sends_deleted += snd->sends_deleted_;
260             acks_received += snd->acks_received_;
261             delete snd;
262         }
263         LOG_DEBUG_MSG(
264                "sends_posted "  << decnumber(sends_posted)
265             << "sends_deleted " << decnumber(sends_deleted)
266             << "acks_received " << decnumber(acks_received)
267             << "non_rma-send "  << decnumber(sends_posted-acks_received));
268         //
269         libfabric_controller_ = nullptr;
270         FUNC_END_DEBUG_MSG;
271     }
272 
273     // --------------------------------------------------------------------
274     /// Should not be used any more as parcelport_impl handles this?
can_bootstrap() const275     bool parcelport::can_bootstrap() const {
276         FUNC_START_DEBUG_MSG;
277         bool can_boot = HPX_PARCELPORT_LIBFABRIC_HAVE_BOOTSTRAPPING();
278         LOG_TRACE_MSG("Returning " << can_boot << " from can_bootstrap")
279         FUNC_END_DEBUG_MSG;
280         return can_boot;
281     }
282 
283     // --------------------------------------------------------------------
284     /// return a string form of the locality name
get_locality_name() const285     std::string parcelport::get_locality_name() const
286     {
287         FUNC_START_DEBUG_MSG;
288         // return hostname:iblibfabric ip address
289         std::stringstream temp;
290         temp << boost::asio::ip::host_name() << ":" << ipaddress(ip_addr_);
291         std::string tstr = temp.str();
292         FUNC_END_DEBUG_MSG;
293         return tstr.substr(0, tstr.size()-1);
294     }
295 
296     // --------------------------------------------------------------------
297     // the root node has spacial handlig, this returns its Id
298     parcelset::locality parcelport::
agas_locality(util::runtime_configuration const & ini) const299     agas_locality(util::runtime_configuration const & ini) const
300     {
301         FUNC_START_DEBUG_MSG;
302         // load all components as described in the configuration information
303         if (!bootstrap_enabled_)
304         {
305             LOG_ERROR_MSG("Should only return agas locality when bootstrapping");
306         }
307         FUNC_END_DEBUG_MSG;
308         return libfabric_controller_->agas_;
309     }
310 
311     // --------------------------------------------------------------------
create_locality() const312     parcelset::locality parcelport::create_locality() const {
313         FUNC_START_DEBUG_MSG;
314         FUNC_END_DEBUG_MSG;
315         return parcelset::locality(locality());
316     }
317 
318     // --------------------------------------------------------------------
319     /// for debugging
suspended_task_debug(const std::string & match)320     void parcelport::suspended_task_debug(const std::string &match)
321     {
322         std::string temp = hpx::util::debug::suspended_task_backtraces();
323         if (match.size()==0 ||
324             temp.find(match)!=std::string::npos)
325         {
326             LOG_DEBUG_MSG("Suspended threads " << temp);
327         }
328     }
329 
330     // --------------------------------------------------------------------
331     /// stop the parcelport, prior to shutdown
do_stop()332     void parcelport::do_stop() {
333         LOG_DEBUG_MSG("Entering libfabric stop ");
334         FUNC_START_DEBUG_MSG;
335         if (!stopped_) {
336             // we don't want multiple threads trying to stop the clients
337             scoped_lock lock(stop_mutex);
338 
339             LOG_DEBUG_MSG("Removing all initiated connections");
340             libfabric_controller_->disconnect_all();
341 
342             // wait for all clients initiated elsewhere to be disconnected
343             while (libfabric_controller_->active() /*&& !hpx::is_stopped()*/) {
344                 completions_handled_ += libfabric_controller_->poll_endpoints(true);
345                 LOG_TIMED_INIT(disconnect_poll);
346                 LOG_TIMED_BLOCK(disconnect_poll, DEVEL, 5.0,
347                     {
348                         LOG_DEBUG_MSG("Polling before shutdown");
349                     }
350                 )
351             }
352             LOG_DEBUG_MSG("stopped removing clients and terminating");
353         }
354         stopped_ = true;
355         // Stop receiving and sending of parcels
356     }
357 
358     // --------------------------------------------------------------------
can_send_immediate()359     bool parcelport::can_send_immediate()
360     {
361         // hpx::util::yield_while([this]()
362         //     {
363         //         this->background_work(0);
364         //         return this->senders_.empty();
365         //     }, "libfabric::can_send_immediate");
366 
367         return true;
368     }
369 
370     // --------------------------------------------------------------------
371     template <typename Handler>
async_write(Handler && handler,sender * snd,fi_addr_t addr,snd_buffer_type & buffer)372     bool parcelport::async_write(Handler && handler,
373         sender *snd, fi_addr_t addr,
374         snd_buffer_type &buffer)
375     {
376         LOG_DEBUG_MSG("parcelport::async_write using sender " << hexpointer(snd));
377         snd->dst_addr_ = addr;
378         snd->buffer_ = std::move(buffer);
379         HPX_ASSERT(!snd->handler_);
380         snd->handler_ = std::forward<Handler>(handler);
381         snd->async_write_impl();
382         // after a send poll to make progress on the network and
383         // reduce latencies for receives coming in
384 //         background_work_OS_thread();
385 //         if (hpx::threads::get_self_ptr())
386 //             hpx::this_thread::suspend(hpx::threads::pending_boost,
387 //                 "libfabric::parcelport::async_write");
388         return true;
389     }
390 
391     // --------------------------------------------------------------------
392     // This is called to poll for completions and handle all incoming messages
393     // as well as complete outgoing messages.
394     //
395     // Since the parcelport can be serviced by hpx threads or by OS threads,
396     // we must use extra care when dealing with mutexes and condition_variables
397     // since we do not want to suspend an OS thread, but we do want to suspend
398     // hpx threads when necessary.
399     //
400     // NB: There is no difference any more between background polling work
401     // on OS or HPX as all has been tested thoroughly
402     // --------------------------------------------------------------------
background_work_OS_thread()403     inline bool parcelport::background_work_OS_thread() {
404         LOG_TIMED_INIT(background);
405         bool done = false;
406         do {
407             LOG_TIMED_BLOCK(background, DEVEL, 5.0, {
408                 LOG_DEBUG_MSG("number of senders in use "
409                     << decnumber(senders_in_use_));
410             });
411             // if an event comes in, we may spend time processing/handling it
412             // and another may arrive during this handling,
413             // so keep checking until none are received
414             // libfabric_controller_->refill_client_receives(false);
415             int numc = libfabric_controller_->poll_endpoints();
416             completions_handled_ += numc;
417             done = (numc==0);
418         } while (!done);
419         return (done!=0);
420     }
421 
422     // --------------------------------------------------------------------
423     // Background work
424     //
425     // This is called whenever the main thread scheduler is idling,
426     // is used to poll for events, messages on the libfabric connection
427     // --------------------------------------------------------------------
background_work(std::size_t num_thread)428     bool parcelport::background_work(std::size_t num_thread) {
429         if (stopped_ || hpx::is_stopped()) {
430             return false;
431         }
432         return background_work_OS_thread();
433     }
434 }}}}
435 
436 HPX_REGISTER_PARCELPORT(hpx::parcelset::policies::libfabric::parcelport, libfabric);
437