1 //  Copyright (c) 2014 Thomas Heller
2 //  Copyright (c) 2007-2017 Hartmut Kaiser
3 //  Copyright (c) 2007 Richard D Guidry Jr
4 //  Copyright (c) 2011 Bryce Lelbach
5 //  Copyright (c) 2011 Katelyn Kufahl
6 //
7 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
8 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 
10 #ifndef HPX_PARCELSET_PARCELPORT_IMPL_HPP
11 #define HPX_PARCELSET_PARCELPORT_IMPL_HPP
12 
13 #include <hpx/config.hpp>
14 #include <hpx/error_code.hpp>
15 #include <hpx/runtime/config_entry.hpp>
16 #include <hpx/runtime/parcelset/detail/call_for_each.hpp>
17 #include <hpx/runtime/parcelset/detail/parcel_await.hpp>
18 #include <hpx/runtime/parcelset/encode_parcels.hpp>
19 #include <hpx/runtime/parcelset/parcelport.hpp>
20 #include <hpx/runtime/threads/thread.hpp>
21 #include <hpx/throw_exception.hpp>
22 #include <hpx/util/assert.hpp>
23 #include <hpx/util/atomic_count.hpp>
24 #include <hpx/util/bind_front.hpp>
25 #include <hpx/util/connection_cache.hpp>
26 #include <hpx/util/deferred_call.hpp>
27 #include <hpx/util/detail/yield_k.hpp>
28 #include <hpx/util/io_service_pool.hpp>
29 #include <hpx/util/runtime_configuration.hpp>
30 #include <hpx/util/safe_lexical_cast.hpp>
31 
32 #include <boost/detail/endian.hpp>
33 
34 #include <atomic>
35 #include <chrono>
36 #include <cstddef>
37 #include <cstdint>
38 #include <limits>
39 #include <memory>
40 #include <mutex>
41 #include <string>
42 #include <type_traits>
43 #include <utility>
44 #include <vector>
45 
46 ///////////////////////////////////////////////////////////////////////////////
47 namespace hpx { namespace parcelset
48 {
49     ///////////////////////////////////////////////////////////////////////////
50     template <typename ConnectionHandler>
51     struct connection_handler_traits;
52 
53     template <typename ConnectionHandler>
54     class HPX_EXPORT parcelport_impl
55       : public parcelport
56     {
57         typedef
58             typename connection_handler_traits<ConnectionHandler>::connection_type
59             connection;
60     public:
connection_handler_type()61         static const char * connection_handler_type()
62         {
63             return connection_handler_traits<ConnectionHandler>::type();
64         }
65 
thread_pool_size(util::runtime_configuration const & ini)66         static std::size_t thread_pool_size(util::runtime_configuration const& ini)
67         {
68             std::string key("hpx.parcel.");
69             key += connection_handler_type();
70 
71             return hpx::util::get_entry_as<std::size_t>(
72                 ini, key + ".io_pool_size", "2");
73         }
74 
pool_name()75         static const char *pool_name()
76         {
77             return connection_handler_traits<ConnectionHandler>::pool_name();
78         }
79 
pool_name_postfix()80         static const char *pool_name_postfix()
81         {
82             return connection_handler_traits<ConnectionHandler>::pool_name_postfix();
83         }
84 
max_connections(util::runtime_configuration const & ini)85         static std::size_t max_connections(util::runtime_configuration const& ini)
86         {
87             std::string key("hpx.parcel.");
88             key += connection_handler_type();
89 
90             return hpx::util::get_entry_as<std::size_t>(
91                 ini, key + ".max_connections", HPX_PARCEL_MAX_CONNECTIONS);
92         }
93 
94         static
max_connections_per_loc(util::runtime_configuration const & ini)95             std::size_t max_connections_per_loc(util::runtime_configuration const& ini)
96         {
97             std::string key("hpx.parcel.");
98             key += connection_handler_type();
99 
100             return hpx::util::get_entry_as<std::size_t>(
101                 ini, key + ".max_connections_per_locality",
102                 HPX_PARCEL_MAX_CONNECTIONS_PER_LOCALITY);
103         }
104 
105     public:
106         /// Construct the parcelport on the given locality.
parcelport_impl(util::runtime_configuration const & ini,locality const & here,util::function_nonser<void (std::size_t,char const *)> const & on_start_thread,util::function_nonser<void (std::size_t,char const *)> const & on_stop_thread)107         parcelport_impl(util::runtime_configuration const& ini,
108             locality const& here,
109             util::function_nonser<void(std::size_t, char const*)> const&
110                 on_start_thread,
111             util::function_nonser<void(std::size_t, char const*)> const&
112                 on_stop_thread)
113           : parcelport(ini, here, connection_handler_type())
114           , io_service_pool_(thread_pool_size(ini), on_start_thread,
115                 on_stop_thread, pool_name(), pool_name_postfix())
116           , connection_cache_(
117                 max_connections(ini), max_connections_per_loc(ini))
118           , archive_flags_(0)
119           , operations_in_flight_(0)
120           , num_thread_(0)
121           , max_background_thread_(hpx::util::safe_lexical_cast<std::size_t>(
122                 hpx::get_config_entry("hpx.max_background_threads",
123                     (std::numeric_limits<std::size_t>::max)())))
124         {
125 #ifdef BOOST_BIG_ENDIAN
126             std::string endian_out = get_config_entry("hpx.parcel.endian_out", "big");
127 #else
128             std::string endian_out = get_config_entry("hpx.parcel.endian_out", "little");
129 #endif
130             if (endian_out == "little")
131                 archive_flags_ |= serialization::endian_little;
132             else if (endian_out == "big")
133                 archive_flags_ |= serialization::endian_big;
134             else {
135                 HPX_ASSERT(endian_out =="little" || endian_out == "big");
136             }
137 
138             if (!this->allow_array_optimizations()) {
139                 archive_flags_ |= serialization::disable_array_optimization;
140                 archive_flags_ |= serialization::disable_data_chunking;
141             }
142             else {
143                 if (!this->allow_zero_copy_optimizations())
144                     archive_flags_ |= serialization::disable_data_chunking;
145             }
146         }
147 
~parcelport_impl()148         ~parcelport_impl() override
149         {
150             connection_cache_.clear();
151         }
152 
can_bootstrap() const153         bool can_bootstrap() const override
154         {
155             return
156                 connection_handler_traits<
157                     ConnectionHandler
158                 >::send_early_parcel::value;
159         }
160 
run(bool blocking=true)161         bool run(bool blocking = true) override
162         {
163             io_service_pool_.run(false);    // start pool
164 
165             bool success = connection_handler().do_run();
166 
167             if (blocking)
168                 io_service_pool_.join();
169 
170             return success;
171         }
172 
flush_parcels()173         void flush_parcels() override
174         {
175             // We suspend our thread, which will make progress on the network
176             if(threads::get_self_ptr())
177             {
178                 hpx::this_thread::suspend(hpx::threads::pending_boost,
179                     "parcelport_impl::flush_parcels");
180             }
181 
182             // make sure no more work is pending, wait for service pool to get
183             // empty
184 
185             while(operations_in_flight_ != 0 || get_pending_parcels_count(false) != 0)
186             {
187                 if(threads::get_self_ptr())
188                 {
189                     hpx::this_thread::suspend(hpx::threads::pending_boost,
190                         "parcelport_impl::flush_parcels");
191                 }
192             }
193         }
194 
stop(bool blocking=true)195         void stop(bool blocking = true) override
196         {
197             flush_parcels();
198 
199             io_service_pool_.stop();
200             if (blocking) {
201                 connection_cache_.shutdown();
202                 connection_handler().do_stop();
203                 io_service_pool_.join();
204                 connection_cache_.clear();
205                 io_service_pool_.clear();
206             }
207 
208         }
209 
210     public:
put_parcel(locality const & dest,parcel p,write_handler_type f)211         void put_parcel(
212             locality const& dest, parcel p, write_handler_type f) override
213         {
214             HPX_ASSERT(dest.type() == type());
215 
216             // We create a shared pointer of the parcels_await object since it
217             // needs to be kept alive as long as there are futures not ready
218             // or GIDs to be split. This is necessary to preserve the identity
219             // of the this pointer.
220             detail::parcel_await_apply(std::move(p), std::move(f),
221                 archive_flags_, [this, dest](parcel&& p, write_handler_type&& f)
222                 {
223                     if (connection_handler_traits<ConnectionHandler>::
224                         send_immediate_parcels::value &&
225                         can_send_immediate_impl<ConnectionHandler>())
226                     {
227                         send_immediate_impl<ConnectionHandler>(
228                             *this, dest, &f, &p, 1);
229                     }
230                     else
231                     {
232                         // enqueue the outgoing parcel ...
233                         enqueue_parcel(dest, std::move(p), std::move(f));
234 
235                         get_connection_and_send_parcels(dest);
236                     }
237                 });
238         }
239 
put_parcels(locality const & dest,std::vector<parcel> parcels,std::vector<write_handler_type> handlers)240         void put_parcels(locality const& dest, std::vector<parcel> parcels,
241             std::vector<write_handler_type> handlers) override
242         {
243             if (parcels.size() != handlers.size())
244             {
245                 HPX_THROW_EXCEPTION(bad_parameter, "parcelport::put_parcels",
246                     "mismatched number of parcels and handlers");
247                 return;
248             }
249 
250 #if defined(HPX_DEBUG)
251             // make sure all parcels go to the same locality
252             HPX_ASSERT(dest.type() == type());
253             for (std::size_t i = 1; i != parcels.size(); ++i)
254             {
255                 HPX_ASSERT(parcels[0].destination_locality() ==
256                     parcels[i].destination_locality());
257             }
258 #endif
259             // We create a shared pointer of the parcels_await object since it
260             // needs to be kept alive as long as there are futures not ready
261             // or GIDs to be split. This is necessary to preserve the identity
262             // of the this pointer.
263             detail::parcels_await_apply(std::move(parcels),
264                 std::move(handlers), archive_flags_,
265                 [this, dest](std::vector<parcel>&& parcels,
266                     std::vector<write_handler_type>&& handlers)
267                 {
268                     if (connection_handler_traits<ConnectionHandler>::
269                         send_immediate_parcels::value &&
270                         can_send_immediate_impl<ConnectionHandler>())
271                     {
272                         send_immediate_impl<ConnectionHandler>(
273                             *this, dest, handlers.data(), parcels.data(),
274                             parcels.size());
275                     }
276                     else
277                     {
278                         enqueue_parcels(
279                             dest, std::move(parcels), std::move(handlers));
280 
281                         get_connection_and_send_parcels(dest);
282                     }
283                 });
284         }
285 
send_early_parcel(locality const & dest,parcel p)286         void send_early_parcel(locality const & dest, parcel p) override
287         {
288             send_early_parcel_impl<ConnectionHandler>(dest, std::move(p));
289         }
290 
get_thread_pool(char const * name)291         util::io_service_pool* get_thread_pool(char const* name) override
292         {
293             if (0 == std::strcmp(name, io_service_pool_.get_name()))
294                 return &io_service_pool_;
295             return nullptr;
296         }
297 
do_background_work(std::size_t num_thread)298         bool do_background_work(std::size_t num_thread) override
299         {
300             trigger_pending_work();
301             return do_background_work_impl<ConnectionHandler>(num_thread);
302         }
303 
304         /// support enable_shared_from_this
shared_from_this()305         std::shared_ptr<parcelport_impl> shared_from_this()
306         {
307             return std::static_pointer_cast<parcelport_impl>(
308                 parcelset::parcelport::shared_from_this());
309         }
310 
shared_from_this() const311         std::shared_ptr<parcelport_impl const> shared_from_this() const
312         {
313             return std::static_pointer_cast<parcelport_impl const>(
314                 parcelset::parcelport::shared_from_this());
315         }
316 
317         /// Cache specific functionality
remove_from_connection_cache_delayed(locality const & loc)318         void remove_from_connection_cache_delayed(locality const& loc)
319         {
320             if (operations_in_flight_ != 0)
321             {
322                 error_code ec(lightweight);
323                 hpx::applier::register_thread_nullary(
324                     util::deferred_call(
325                         &parcelport_impl::remove_from_connection_cache,
326                         this, loc),
327                     "remove_from_connection_cache_delayed",
328                     threads::pending, true, threads::thread_priority_normal,
329                     threads::thread_schedule_hint(
330                         static_cast<std::int16_t>(get_next_num_thread())),
331                     threads::thread_stacksize_default,
332                     ec);
333                 if (!ec) return;
334             }
335 
336             connection_cache_.clear(loc);
337         }
338 
remove_from_connection_cache(locality const & loc)339         void remove_from_connection_cache(locality const& loc) override
340         {
341             error_code ec(lightweight);
342             threads::thread_id_type id =
343                 hpx::applier::register_thread_nullary(
344                     util::deferred_call(
345                         &parcelport_impl::remove_from_connection_cache_delayed,
346                         this, loc),
347                     "remove_from_connection_cache",
348                     threads::suspended, true, threads::thread_priority_normal,
349                     threads::thread_schedule_hint(
350                         static_cast<std::int16_t>(get_next_num_thread())),
351                     threads::thread_stacksize_default,
352                     ec);
353             if (ec) return;
354 
355             threads::set_thread_state(id,
356                 std::chrono::milliseconds(100), threads::pending,
357                 threads::wait_signaled, threads::thread_priority_boost, ec);
358         }
359 
360         /// Return the name of this locality
get_locality_name() const361         std::string get_locality_name() const override
362         {
363             return connection_handler().get_locality_name();
364         }
365 
366         ////////////////////////////////////////////////////////////////////////
367         // Return the given connection cache statistic
get_connection_cache_statistics(connection_cache_statistics_type t,bool reset)368         std::int64_t get_connection_cache_statistics(
369             connection_cache_statistics_type t, bool reset) override
370         {
371             switch (t) {
372                 case connection_cache_insertions:
373                     return connection_cache_.get_cache_insertions(reset);
374 
375                 case connection_cache_evictions:
376                     return connection_cache_.get_cache_evictions(reset);
377 
378                 case connection_cache_hits:
379                     return connection_cache_.get_cache_hits(reset);
380 
381                 case connection_cache_misses:
382                     return connection_cache_.get_cache_misses(reset);
383 
384                 case connection_cache_reclaims:
385                     return connection_cache_.get_cache_reclaims(reset);
386 
387                 default:
388                     break;
389             }
390 
391             HPX_THROW_EXCEPTION(bad_parameter,
392                 "parcelport_impl::get_connection_cache_statistics",
393                 "invalid connection cache statistics type");
394             return 0;
395         }
396 
397     private:
connection_handler()398         ConnectionHandler & connection_handler()
399         {
400             return static_cast<ConnectionHandler &>(*this);
401         }
402 
connection_handler() const403         ConnectionHandler const & connection_handler() const
404         {
405             return static_cast<ConnectionHandler const &>(*this);
406         }
407 
408         template <typename ConnectionHandler_>
409         typename std::enable_if<
410             connection_handler_traits<
411                 ConnectionHandler_
412             >::send_early_parcel::value
413         >::type
send_early_parcel_impl(locality const & dest,parcel p)414         send_early_parcel_impl(locality const & dest, parcel p)
415         {
416             put_parcel(
417                 dest
418               , std::move(p)
419               , util::bind_front(
420                     &parcelport::early_pending_parcel_handler
421                   , this
422                 )
423             );
424         }
425 
426         template <typename ConnectionHandler_>
427         typename std::enable_if<
428             !connection_handler_traits<
429                 ConnectionHandler_
430             >::send_early_parcel::value
431         >::type
send_early_parcel_impl(locality const & dest,parcel p)432         send_early_parcel_impl(locality const & dest, parcel p)
433         {
434             HPX_THROW_EXCEPTION(network_error, "send_early_parcel",
435                 "This parcelport does not support sending early parcels");
436         }
437 
438         template <typename ConnectionHandler_>
439         typename std::enable_if<
440             connection_handler_traits<
441                 ConnectionHandler_
442             >::do_background_work::value,
443             bool
444         >::type
do_background_work_impl(std::size_t num_thread)445         do_background_work_impl(std::size_t num_thread)
446         {
447             return connection_handler().background_work(num_thread);
448         }
449 
450         template <typename ConnectionHandler_>
451         typename std::enable_if<
452            !connection_handler_traits<
453                 ConnectionHandler_
454             >::do_background_work::value,
455             bool
456         >::type
do_background_work_impl(std::size_t)457         do_background_work_impl(std::size_t)
458         {
459             return false;
460         }
461 
462         template <typename ConnectionHandler_>
463         typename std::enable_if<
464             connection_handler_traits<
465                 ConnectionHandler_
466             >::send_immediate_parcels::value,
467             bool
468         >::type
can_send_immediate_impl()469         can_send_immediate_impl()
470         {
471             return connection_handler().can_send_immediate();
472         }
473 
474         template <typename ConnectionHandler_>
475         typename std::enable_if<
476            !connection_handler_traits<
477                 ConnectionHandler_
478             >::send_immediate_parcels::value,
479             bool
480         >::type
can_send_immediate_impl()481         can_send_immediate_impl()
482         {
483             return false;
484         }
485 
486     protected:
487         template <typename ConnectionHandler_>
488         typename std::enable_if<
489             connection_handler_traits<
490                 ConnectionHandler_
491             >::send_immediate_parcels::value,
492             void
493         >::type
send_immediate_impl(parcelport_impl & this_,locality const & dest_,write_handler_type * fs,parcel * ps,std::size_t num_parcels)494         send_immediate_impl(
495             parcelport_impl &this_, locality const&dest_,
496             write_handler_type *fs, parcel *ps, std::size_t num_parcels)
497         {
498             std::uint64_t addr;
499             error_code ec;
500             // First try to get a connection ...
501             connection *sender = this_.connection_handler().get_connection(dest_, addr);
502 
503             // If we couldn't get one ... enqueue the parcel and move on
504             std::size_t encoded_parcels = 0;
505             std::vector<parcel> parcels;
506             std::vector<write_handler_type> handlers;
507             if (sender != nullptr)
508             {
509                 if (fs == nullptr)
510                 {
511                     HPX_ASSERT(ps == nullptr);
512                     HPX_ASSERT(num_parcels == 0u);
513 
514                     if(!dequeue_parcels(dest_, parcels, handlers))
515                     {
516                         // Give this connection back to the connection handler as
517                         // we couldn't dequeue parcels.
518                         this_.connection_handler().reclaim_connection(sender);
519 
520                         return;
521                     }
522 
523                     ps = parcels.data();
524                     fs = handlers.data();
525                     num_parcels = parcels.size();
526                     HPX_ASSERT(parcels.size() == handlers.size());
527                 }
528 
529                 auto encoded_buffer = sender->get_new_buffer();
530                 // encode the parcels
531                 encoded_parcels = encode_parcels(this_, ps, num_parcels,
532                     encoded_buffer,
533                     this_.archive_flags_,
534                     this_.get_max_outbound_message_size());
535 
536                 typedef detail::call_for_each handler_type;
537 
538                 if (sender->parcelport_->async_write(
539                     handler_type(
540                         handler_type::handlers_type(
541                             std::make_move_iterator(fs),
542                             std::make_move_iterator(fs + encoded_parcels)),
543                         handler_type::parcels_type(
544                             std::make_move_iterator(ps),
545                             std::make_move_iterator(ps + encoded_parcels))
546                     ),
547                     sender, addr,
548                     encoded_buffer))
549                 {
550                     // we don't propagate errors for now
551                 }
552             }
553             if (num_parcels != encoded_parcels && fs != nullptr)
554             {
555                 std::vector<parcel> overflow_parcels(
556                     std::make_move_iterator(ps + encoded_parcels),
557                     std::make_move_iterator(ps + num_parcels));
558                 std::vector<write_handler_type> overflow_handlers(
559                     std::make_move_iterator(fs + encoded_parcels),
560                     std::make_move_iterator(fs + num_parcels));
561                 enqueue_parcels(dest_, std::move(overflow_parcels),
562                     std::move(overflow_handlers));
563             }
564         }
565 
566         template <typename ConnectionHandler_>
567         typename std::enable_if<
568             !connection_handler_traits<
569                 ConnectionHandler_
570             >::send_immediate_parcels::value,
571             void
572         >::type
send_immediate_impl(parcelport_impl & this_,locality const & dest_,write_handler_type * fs,parcel * ps,std::size_t num_parcels)573         send_immediate_impl(
574             parcelport_impl &this_, locality const&dest_,
575             write_handler_type *fs, parcel *ps, std::size_t num_parcels)
576         {
577             HPX_ASSERT(false);
578         }
579 
580     private:
581         ///////////////////////////////////////////////////////////////////////
get_connection(locality const & l,bool force,error_code & ec)582         std::shared_ptr<connection> get_connection(
583             locality const& l, bool force, error_code& ec)
584         {
585             // Request new connection from connection cache.
586             std::shared_ptr<connection> sender_connection;
587 
588             if (connection_handler_traits<ConnectionHandler>::
589                     send_immediate_parcels::value)
590             {
591                 std::terminate();
592             }
593             else {
594                 // Get a connection or reserve space for a new connection.
595                 if (!connection_cache_.get_or_reserve(l, sender_connection))
596                 {
597                     // If no slot is available it's not a problem as the parcel
598                     // will be sent out whenever the next connection is returned
599                     // to the cache.
600                     if (&ec != &throws)
601                         ec = make_success_code();
602                     return sender_connection;
603                 }
604             }
605 
606             // Check if we need to create the new connection.
607             if (!sender_connection)
608                 return connection_handler().create_connection(l, ec);
609 
610             if (&ec != &throws)
611                 ec = make_success_code();
612 
613             return sender_connection;
614         }
615 
616         ///////////////////////////////////////////////////////////////////////
enqueue_parcel(locality const & locality_id,parcel && p,write_handler_type && f)617         void enqueue_parcel(locality const& locality_id,
618             parcel&& p, write_handler_type&& f)
619         {
620             typedef pending_parcels_map::mapped_type mapped_type;
621 
622             std::unique_lock<lcos::local::spinlock> l(mtx_);
623             // We ignore the lock here. It might happen that while enqueuing,
624             // we need to acquire a lock. This should not cause any problems
625             // (famous last words)
626             util::ignore_while_checking<
627                 std::unique_lock<lcos::local::spinlock>
628             > il(&l);
629 
630             mapped_type& e = pending_parcels_[locality_id];
631             util::get<0>(e).push_back(std::move(p));
632             util::get<1>(e).push_back(std::move(f));
633 
634             parcel_destinations_.insert(locality_id);
635             ++num_parcel_destinations_;
636         }
637 
enqueue_parcels(locality const & locality_id,std::vector<parcel> && parcels,std::vector<write_handler_type> && handlers)638         void enqueue_parcels(locality const& locality_id,
639             std::vector<parcel>&& parcels,
640             std::vector<write_handler_type>&& handlers)
641         {
642             typedef pending_parcels_map::mapped_type mapped_type;
643 
644             std::unique_lock<lcos::local::spinlock> l(mtx_);
645             // We ignore the lock here. It might happen that while enqueuing,
646             // we need to acquire a lock. This should not cause any problems
647             // (famous last words)
648             util::ignore_while_checking<
649                 std::unique_lock<lcos::local::spinlock>
650             > il(&l);
651 
652             HPX_ASSERT(parcels.size() == handlers.size());
653 
654             mapped_type& e = pending_parcels_[locality_id];
655             if (util::get<0>(e).empty())
656             {
657                 HPX_ASSERT(util::get<1>(e).empty());
658                 std::swap(util::get<0>(e), parcels);
659                 std::swap(util::get<1>(e), handlers);
660             }
661             else
662             {
663                 HPX_ASSERT(util::get<0>(e).size() == util::get<1>(e).size());
664                 std::size_t new_size = util::get<0>(e).size() + parcels.size();
665                 util::get<0>(e).reserve(new_size);
666 
667                 std::move(parcels.begin(), parcels.end(),
668                     std::back_inserter(util::get<0>(e)));
669                 util::get<1>(e).reserve(new_size);
670                 std::move(handlers.begin(), handlers.end(),
671                     std::back_inserter(util::get<1>(e)));
672             }
673 
674             parcel_destinations_.insert(locality_id);
675             ++num_parcel_destinations_;
676         }
677 
dequeue_parcels(locality const & locality_id,std::vector<parcel> & parcels,std::vector<write_handler_type> & handlers)678         bool dequeue_parcels(locality const& locality_id,
679             std::vector<parcel>& parcels,
680             std::vector<write_handler_type>& handlers)
681         {
682             typedef pending_parcels_map::iterator iterator;
683 
684             {
685                 std::unique_lock<lcos::local::spinlock> l(mtx_, std::try_to_lock);
686 
687                 if (!l) return false;
688 
689                 iterator it = pending_parcels_.find(locality_id);
690 
691                 // do nothing if parcels have already been picked up by
692                 // another thread
693                 if (it != pending_parcels_.end() && !util::get<0>(it->second).empty())
694                 {
695                     HPX_ASSERT(it->first == locality_id);
696                     HPX_ASSERT(handlers.size() == 0);
697                     HPX_ASSERT(handlers.size() == parcels.size());
698                     std::swap(parcels, util::get<0>(it->second));
699                     HPX_ASSERT(util::get<0>(it->second).size() == 0);
700                     std::swap(handlers, util::get<1>(it->second));
701                     HPX_ASSERT(handlers.size() == parcels.size());
702 
703                     HPX_ASSERT(!handlers.empty());
704                 }
705                 else
706                 {
707                     HPX_ASSERT(util::get<1>(it->second).empty());
708                     return false;
709                 }
710 
711                 parcel_destinations_.erase(locality_id);
712 
713                 HPX_ASSERT(0 != num_parcel_destinations_.load());
714                 --num_parcel_destinations_;
715 
716                 return true;
717             }
718         }
719 
720     protected:
dequeue_parcel(locality & dest,parcel & p,write_handler_type & handler)721         bool dequeue_parcel(locality& dest, parcel& p, write_handler_type& handler)
722         {
723             {
724                 std::unique_lock<lcos::local::spinlock> l(mtx_, std::try_to_lock);
725 
726                 if (!l) return false;
727 
728                 for (auto &pending: pending_parcels_)
729                 {
730                     auto &parcels = util::get<0>(pending.second);
731                     if (!parcels.empty())
732                     {
733                         auto& handlers = util::get<1>(pending.second);
734                         dest = pending.first;
735                         p = std::move(parcels.back());
736                         parcels.pop_back();
737                         handler = std::move(handlers.back());
738                         handlers.pop_back();
739 
740                         if (parcels.empty())
741                         {
742                             pending_parcels_.erase(dest);
743                         }
744                         return true;
745                     }
746                 }
747             }
748             return false;
749         }
750 
trigger_pending_work()751         bool trigger_pending_work()
752         {
753             if (0 == num_parcel_destinations_.load(std::memory_order_relaxed))
754                 return true;
755 
756             std::vector<locality> destinations;
757 
758             {
759                 std::unique_lock<lcos::local::spinlock> l(mtx_, std::try_to_lock);
760                 if(l.owns_lock())
761                 {
762                     if (parcel_destinations_.empty())
763                         return true;
764 
765                     destinations.reserve(parcel_destinations_.size());
766                     for (locality const& loc : parcel_destinations_)
767                     {
768                         destinations.push_back(loc);
769                     }
770                 }
771             }
772 
773             // Create new HPX threads which send the parcels that are still
774             // pending.
775             for (locality const& loc : destinations)
776             {
777                 get_connection_and_send_parcels(loc);
778             }
779 
780             return true;
781         }
782 
783     private:
784         ///////////////////////////////////////////////////////////////////////
get_connection_and_send_parcels(locality const & locality_id,bool background=false)785         void get_connection_and_send_parcels(
786             locality const& locality_id, bool background = false)
787         {
788 
789             if (connection_handler_traits<ConnectionHandler>::
790                     send_immediate_parcels::value)
791             {
792                 this->send_immediate_impl<ConnectionHandler>(
793                     *this, locality_id, nullptr, nullptr, 0);
794                 return;
795             }
796 
797             // If one of the sending threads are in suspended state, we
798             // need to force a new connection to avoid deadlocks.
799             bool force_connection = true;
800 
801             error_code ec;
802             std::shared_ptr<connection> sender_connection =
803                 get_connection(locality_id, force_connection, ec);
804 
805             if (!sender_connection)
806             {
807                 // We can safely return if no connection is available
808                 // at this point. As soon as a connection becomes
809                 // available it checks for pending parcels and sends
810                 // those out.
811                 return;
812             }
813 
814             // repeat until no more parcels are to be sent
815             std::vector<parcel> parcels;
816             std::vector<write_handler_type> handlers;
817 
818             if(!dequeue_parcels(locality_id, parcels, handlers))
819             {
820                 // Give this connection back to the cache as we couldn't dequeue
821                 // parcels.
822                 connection_cache_.reclaim(locality_id, sender_connection);
823 
824                 return;
825             }
826 
827             // send parcels if they didn't get sent by another connection
828             send_pending_parcels(
829                 locality_id,
830                 sender_connection, std::move(parcels),
831                 std::move(handlers));
832 
833             // We yield here for a short amount of time to give another
834             // HPX thread the chance to put a subsequent parcel which
835             // leads to a more effective parcel buffering
836 //                 if (hpx::threads::get_self_ptr())
837 //                     hpx::this_thread::yield();
838         }
839 
840 
send_pending_parcels_trampoline(boost::system::error_code const & ec,locality const & locality_id,std::shared_ptr<connection> sender_connection)841         void send_pending_parcels_trampoline(
842             boost::system::error_code const& ec,
843             locality const& locality_id,
844             std::shared_ptr<connection> sender_connection)
845         {
846             HPX_ASSERT(operations_in_flight_ != 0);
847             --operations_in_flight_;
848 
849 #if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
850             sender_connection->set_state(connection::state_scheduled_thread);
851 #endif
852             if (!ec)
853             {
854                 // Give this connection back to the cache as it's not
855                 // needed anymore.
856                 connection_cache_.reclaim(locality_id, sender_connection);
857             }
858             else
859             {
860                 // remove this connection from cache
861                 connection_cache_.clear(locality_id, sender_connection);
862             }
863             {
864                 std::lock_guard<lcos::local::spinlock> l(mtx_);
865 
866 //                HPX_ASSERT(locality_id == sender_connection->destination());
867                 pending_parcels_map::iterator it = pending_parcels_.find(locality_id);
868                 if (it == pending_parcels_.end() || util::get<0>(it->second).empty())
869                     return;
870             }
871 
872             // Create a new HPX thread which sends parcels that are still
873             // pending.
874             get_connection_and_send_parcels(locality_id);
875         }
876 
send_pending_parcels(parcelset::locality const & parcel_locality_id,std::shared_ptr<connection> sender_connection,std::vector<parcel> && parcels,std::vector<write_handler_type> && handlers)877         void send_pending_parcels(
878             parcelset::locality const & parcel_locality_id,
879             std::shared_ptr<connection> sender_connection,
880             std::vector<parcel>&& parcels,
881             std::vector<write_handler_type>&& handlers)
882         {
883 #if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
884             sender_connection->set_state(connection::state_send_pending);
885 #endif
886 
887 #if defined(HPX_DEBUG)
888             // verify the connection points to the right destination
889 //            HPX_ASSERT(parcel_locality_id == sender_connection->destination());
890             sender_connection->verify_(parcel_locality_id);
891 #endif
892             // encode the parcels
893             std::size_t num_parcels = encode_parcels(*this, &parcels[0],
894                     parcels.size(), sender_connection->buffer_,
895                     archive_flags_,
896                     this->get_max_outbound_message_size());
897 
898             using hpx::parcelset::detail::call_for_each;
899             if (num_parcels == parcels.size())
900             {
901                 ++operations_in_flight_;
902                 // send all of the parcels
903                 sender_connection->async_write(
904                     call_for_each(std::move(handlers), std::move(parcels)),
905                     util::bind_front(&parcelport_impl::send_pending_parcels_trampoline,
906                         this));
907             }
908             else
909             {
910                 ++operations_in_flight_;
911                 HPX_ASSERT(num_parcels < parcels.size());
912 
913                 std::vector<write_handler_type> handled_handlers;
914                 handled_handlers.reserve(num_parcels);
915 
916                 std::move(handlers.begin(), handlers.begin()+num_parcels,
917                     std::back_inserter(handled_handlers));
918 
919                 std::vector<parcel> handled_parcels;
920                 handled_parcels.reserve(num_parcels);
921 
922                 std::move(parcels.begin(), parcels.begin()+num_parcels,
923                     std::back_inserter(handled_parcels));
924 
925                 // send only part of the parcels
926                 sender_connection->async_write(
927                     call_for_each(
928                         std::move(handled_handlers), std::move(handled_parcels)),
929                     util::bind_front(&parcelport_impl::send_pending_parcels_trampoline,
930                         this));
931 
932                 // give back unhandled parcels
933                 parcels.erase(parcels.begin(), parcels.begin()+num_parcels);
934                 handlers.erase(handlers.begin(), handlers.begin()+num_parcels);
935 
936                 enqueue_parcels(parcel_locality_id, std::move(parcels),
937                     std::move(handlers));
938             }
939 
940             if(threads::get_self_ptr())
941             {
942                 // We suspend our thread, which will make progress on the network
943                 hpx::this_thread::suspend(hpx::threads::pending_boost,
944                     "parcelport_impl::send_pending_parcels");
945             }
946         }
947 
948     public:
get_next_num_thread()949         std::size_t get_next_num_thread()
950         {
951             return ++num_thread_ % max_background_thread_;
952         }
953 
954     protected:
955         /// The pool of io_service objects used to perform asynchronous operations.
956         util::io_service_pool io_service_pool_;
957 
958         /// The connection cache for sending connections
959         util::connection_cache<connection, locality> connection_cache_;
960 
961         typedef hpx::lcos::local::spinlock mutex_type;
962 
963         int archive_flags_;
964         hpx::util::atomic_count operations_in_flight_;
965 
966         std::atomic<std::size_t> num_thread_;
967         std::size_t const max_background_thread_;
968     };
969 }}
970 
971 #endif
972