1 // Copyright (c) 2014 Thomas Heller 2 // Copyright (c) 2007-2017 Hartmut Kaiser 3 // Copyright (c) 2007 Richard D Guidry Jr 4 // Copyright (c) 2011 Bryce Lelbach 5 // Copyright (c) 2011 Katelyn Kufahl 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 10 #ifndef HPX_PARCELSET_PARCELPORT_IMPL_HPP 11 #define HPX_PARCELSET_PARCELPORT_IMPL_HPP 12 13 #include <hpx/config.hpp> 14 #include <hpx/error_code.hpp> 15 #include <hpx/runtime/config_entry.hpp> 16 #include <hpx/runtime/parcelset/detail/call_for_each.hpp> 17 #include <hpx/runtime/parcelset/detail/parcel_await.hpp> 18 #include <hpx/runtime/parcelset/encode_parcels.hpp> 19 #include <hpx/runtime/parcelset/parcelport.hpp> 20 #include <hpx/runtime/threads/thread.hpp> 21 #include <hpx/throw_exception.hpp> 22 #include <hpx/util/assert.hpp> 23 #include <hpx/util/atomic_count.hpp> 24 #include <hpx/util/bind_front.hpp> 25 #include <hpx/util/connection_cache.hpp> 26 #include <hpx/util/deferred_call.hpp> 27 #include <hpx/util/detail/yield_k.hpp> 28 #include <hpx/util/io_service_pool.hpp> 29 #include <hpx/util/runtime_configuration.hpp> 30 #include <hpx/util/safe_lexical_cast.hpp> 31 32 #include <boost/detail/endian.hpp> 33 34 #include <atomic> 35 #include <chrono> 36 #include <cstddef> 37 #include <cstdint> 38 #include <limits> 39 #include <memory> 40 #include <mutex> 41 #include <string> 42 #include <type_traits> 43 #include <utility> 44 #include <vector> 45 46 /////////////////////////////////////////////////////////////////////////////// 47 namespace hpx { namespace parcelset 48 { 49 /////////////////////////////////////////////////////////////////////////// 50 template <typename ConnectionHandler> 51 struct connection_handler_traits; 52 53 template <typename ConnectionHandler> 54 class HPX_EXPORT parcelport_impl 55 : public parcelport 56 { 57 typedef 58 typename connection_handler_traits<ConnectionHandler>::connection_type 59 connection; 60 public: connection_handler_type()61 static const char * connection_handler_type() 62 { 63 return connection_handler_traits<ConnectionHandler>::type(); 64 } 65 thread_pool_size(util::runtime_configuration const & ini)66 static std::size_t thread_pool_size(util::runtime_configuration const& ini) 67 { 68 std::string key("hpx.parcel."); 69 key += connection_handler_type(); 70 71 return hpx::util::get_entry_as<std::size_t>( 72 ini, key + ".io_pool_size", "2"); 73 } 74 pool_name()75 static const char *pool_name() 76 { 77 return connection_handler_traits<ConnectionHandler>::pool_name(); 78 } 79 pool_name_postfix()80 static const char *pool_name_postfix() 81 { 82 return connection_handler_traits<ConnectionHandler>::pool_name_postfix(); 83 } 84 max_connections(util::runtime_configuration const & ini)85 static std::size_t max_connections(util::runtime_configuration const& ini) 86 { 87 std::string key("hpx.parcel."); 88 key += connection_handler_type(); 89 90 return hpx::util::get_entry_as<std::size_t>( 91 ini, key + ".max_connections", HPX_PARCEL_MAX_CONNECTIONS); 92 } 93 94 static max_connections_per_loc(util::runtime_configuration const & ini)95 std::size_t max_connections_per_loc(util::runtime_configuration const& ini) 96 { 97 std::string key("hpx.parcel."); 98 key += connection_handler_type(); 99 100 return hpx::util::get_entry_as<std::size_t>( 101 ini, key + ".max_connections_per_locality", 102 HPX_PARCEL_MAX_CONNECTIONS_PER_LOCALITY); 103 } 104 105 public: 106 /// Construct the parcelport on the given locality. parcelport_impl(util::runtime_configuration const & ini,locality const & here,util::function_nonser<void (std::size_t,char const *)> const & on_start_thread,util::function_nonser<void (std::size_t,char const *)> const & on_stop_thread)107 parcelport_impl(util::runtime_configuration const& ini, 108 locality const& here, 109 util::function_nonser<void(std::size_t, char const*)> const& 110 on_start_thread, 111 util::function_nonser<void(std::size_t, char const*)> const& 112 on_stop_thread) 113 : parcelport(ini, here, connection_handler_type()) 114 , io_service_pool_(thread_pool_size(ini), on_start_thread, 115 on_stop_thread, pool_name(), pool_name_postfix()) 116 , connection_cache_( 117 max_connections(ini), max_connections_per_loc(ini)) 118 , archive_flags_(0) 119 , operations_in_flight_(0) 120 , num_thread_(0) 121 , max_background_thread_(hpx::util::safe_lexical_cast<std::size_t>( 122 hpx::get_config_entry("hpx.max_background_threads", 123 (std::numeric_limits<std::size_t>::max)()))) 124 { 125 #ifdef BOOST_BIG_ENDIAN 126 std::string endian_out = get_config_entry("hpx.parcel.endian_out", "big"); 127 #else 128 std::string endian_out = get_config_entry("hpx.parcel.endian_out", "little"); 129 #endif 130 if (endian_out == "little") 131 archive_flags_ |= serialization::endian_little; 132 else if (endian_out == "big") 133 archive_flags_ |= serialization::endian_big; 134 else { 135 HPX_ASSERT(endian_out =="little" || endian_out == "big"); 136 } 137 138 if (!this->allow_array_optimizations()) { 139 archive_flags_ |= serialization::disable_array_optimization; 140 archive_flags_ |= serialization::disable_data_chunking; 141 } 142 else { 143 if (!this->allow_zero_copy_optimizations()) 144 archive_flags_ |= serialization::disable_data_chunking; 145 } 146 } 147 ~parcelport_impl()148 ~parcelport_impl() override 149 { 150 connection_cache_.clear(); 151 } 152 can_bootstrap() const153 bool can_bootstrap() const override 154 { 155 return 156 connection_handler_traits< 157 ConnectionHandler 158 >::send_early_parcel::value; 159 } 160 run(bool blocking=true)161 bool run(bool blocking = true) override 162 { 163 io_service_pool_.run(false); // start pool 164 165 bool success = connection_handler().do_run(); 166 167 if (blocking) 168 io_service_pool_.join(); 169 170 return success; 171 } 172 flush_parcels()173 void flush_parcels() override 174 { 175 // We suspend our thread, which will make progress on the network 176 if(threads::get_self_ptr()) 177 { 178 hpx::this_thread::suspend(hpx::threads::pending_boost, 179 "parcelport_impl::flush_parcels"); 180 } 181 182 // make sure no more work is pending, wait for service pool to get 183 // empty 184 185 while(operations_in_flight_ != 0 || get_pending_parcels_count(false) != 0) 186 { 187 if(threads::get_self_ptr()) 188 { 189 hpx::this_thread::suspend(hpx::threads::pending_boost, 190 "parcelport_impl::flush_parcels"); 191 } 192 } 193 } 194 stop(bool blocking=true)195 void stop(bool blocking = true) override 196 { 197 flush_parcels(); 198 199 io_service_pool_.stop(); 200 if (blocking) { 201 connection_cache_.shutdown(); 202 connection_handler().do_stop(); 203 io_service_pool_.join(); 204 connection_cache_.clear(); 205 io_service_pool_.clear(); 206 } 207 208 } 209 210 public: put_parcel(locality const & dest,parcel p,write_handler_type f)211 void put_parcel( 212 locality const& dest, parcel p, write_handler_type f) override 213 { 214 HPX_ASSERT(dest.type() == type()); 215 216 // We create a shared pointer of the parcels_await object since it 217 // needs to be kept alive as long as there are futures not ready 218 // or GIDs to be split. This is necessary to preserve the identity 219 // of the this pointer. 220 detail::parcel_await_apply(std::move(p), std::move(f), 221 archive_flags_, [this, dest](parcel&& p, write_handler_type&& f) 222 { 223 if (connection_handler_traits<ConnectionHandler>:: 224 send_immediate_parcels::value && 225 can_send_immediate_impl<ConnectionHandler>()) 226 { 227 send_immediate_impl<ConnectionHandler>( 228 *this, dest, &f, &p, 1); 229 } 230 else 231 { 232 // enqueue the outgoing parcel ... 233 enqueue_parcel(dest, std::move(p), std::move(f)); 234 235 get_connection_and_send_parcels(dest); 236 } 237 }); 238 } 239 put_parcels(locality const & dest,std::vector<parcel> parcels,std::vector<write_handler_type> handlers)240 void put_parcels(locality const& dest, std::vector<parcel> parcels, 241 std::vector<write_handler_type> handlers) override 242 { 243 if (parcels.size() != handlers.size()) 244 { 245 HPX_THROW_EXCEPTION(bad_parameter, "parcelport::put_parcels", 246 "mismatched number of parcels and handlers"); 247 return; 248 } 249 250 #if defined(HPX_DEBUG) 251 // make sure all parcels go to the same locality 252 HPX_ASSERT(dest.type() == type()); 253 for (std::size_t i = 1; i != parcels.size(); ++i) 254 { 255 HPX_ASSERT(parcels[0].destination_locality() == 256 parcels[i].destination_locality()); 257 } 258 #endif 259 // We create a shared pointer of the parcels_await object since it 260 // needs to be kept alive as long as there are futures not ready 261 // or GIDs to be split. This is necessary to preserve the identity 262 // of the this pointer. 263 detail::parcels_await_apply(std::move(parcels), 264 std::move(handlers), archive_flags_, 265 [this, dest](std::vector<parcel>&& parcels, 266 std::vector<write_handler_type>&& handlers) 267 { 268 if (connection_handler_traits<ConnectionHandler>:: 269 send_immediate_parcels::value && 270 can_send_immediate_impl<ConnectionHandler>()) 271 { 272 send_immediate_impl<ConnectionHandler>( 273 *this, dest, handlers.data(), parcels.data(), 274 parcels.size()); 275 } 276 else 277 { 278 enqueue_parcels( 279 dest, std::move(parcels), std::move(handlers)); 280 281 get_connection_and_send_parcels(dest); 282 } 283 }); 284 } 285 send_early_parcel(locality const & dest,parcel p)286 void send_early_parcel(locality const & dest, parcel p) override 287 { 288 send_early_parcel_impl<ConnectionHandler>(dest, std::move(p)); 289 } 290 get_thread_pool(char const * name)291 util::io_service_pool* get_thread_pool(char const* name) override 292 { 293 if (0 == std::strcmp(name, io_service_pool_.get_name())) 294 return &io_service_pool_; 295 return nullptr; 296 } 297 do_background_work(std::size_t num_thread)298 bool do_background_work(std::size_t num_thread) override 299 { 300 trigger_pending_work(); 301 return do_background_work_impl<ConnectionHandler>(num_thread); 302 } 303 304 /// support enable_shared_from_this shared_from_this()305 std::shared_ptr<parcelport_impl> shared_from_this() 306 { 307 return std::static_pointer_cast<parcelport_impl>( 308 parcelset::parcelport::shared_from_this()); 309 } 310 shared_from_this() const311 std::shared_ptr<parcelport_impl const> shared_from_this() const 312 { 313 return std::static_pointer_cast<parcelport_impl const>( 314 parcelset::parcelport::shared_from_this()); 315 } 316 317 /// Cache specific functionality remove_from_connection_cache_delayed(locality const & loc)318 void remove_from_connection_cache_delayed(locality const& loc) 319 { 320 if (operations_in_flight_ != 0) 321 { 322 error_code ec(lightweight); 323 hpx::applier::register_thread_nullary( 324 util::deferred_call( 325 &parcelport_impl::remove_from_connection_cache, 326 this, loc), 327 "remove_from_connection_cache_delayed", 328 threads::pending, true, threads::thread_priority_normal, 329 threads::thread_schedule_hint( 330 static_cast<std::int16_t>(get_next_num_thread())), 331 threads::thread_stacksize_default, 332 ec); 333 if (!ec) return; 334 } 335 336 connection_cache_.clear(loc); 337 } 338 remove_from_connection_cache(locality const & loc)339 void remove_from_connection_cache(locality const& loc) override 340 { 341 error_code ec(lightweight); 342 threads::thread_id_type id = 343 hpx::applier::register_thread_nullary( 344 util::deferred_call( 345 &parcelport_impl::remove_from_connection_cache_delayed, 346 this, loc), 347 "remove_from_connection_cache", 348 threads::suspended, true, threads::thread_priority_normal, 349 threads::thread_schedule_hint( 350 static_cast<std::int16_t>(get_next_num_thread())), 351 threads::thread_stacksize_default, 352 ec); 353 if (ec) return; 354 355 threads::set_thread_state(id, 356 std::chrono::milliseconds(100), threads::pending, 357 threads::wait_signaled, threads::thread_priority_boost, ec); 358 } 359 360 /// Return the name of this locality get_locality_name() const361 std::string get_locality_name() const override 362 { 363 return connection_handler().get_locality_name(); 364 } 365 366 //////////////////////////////////////////////////////////////////////// 367 // Return the given connection cache statistic get_connection_cache_statistics(connection_cache_statistics_type t,bool reset)368 std::int64_t get_connection_cache_statistics( 369 connection_cache_statistics_type t, bool reset) override 370 { 371 switch (t) { 372 case connection_cache_insertions: 373 return connection_cache_.get_cache_insertions(reset); 374 375 case connection_cache_evictions: 376 return connection_cache_.get_cache_evictions(reset); 377 378 case connection_cache_hits: 379 return connection_cache_.get_cache_hits(reset); 380 381 case connection_cache_misses: 382 return connection_cache_.get_cache_misses(reset); 383 384 case connection_cache_reclaims: 385 return connection_cache_.get_cache_reclaims(reset); 386 387 default: 388 break; 389 } 390 391 HPX_THROW_EXCEPTION(bad_parameter, 392 "parcelport_impl::get_connection_cache_statistics", 393 "invalid connection cache statistics type"); 394 return 0; 395 } 396 397 private: connection_handler()398 ConnectionHandler & connection_handler() 399 { 400 return static_cast<ConnectionHandler &>(*this); 401 } 402 connection_handler() const403 ConnectionHandler const & connection_handler() const 404 { 405 return static_cast<ConnectionHandler const &>(*this); 406 } 407 408 template <typename ConnectionHandler_> 409 typename std::enable_if< 410 connection_handler_traits< 411 ConnectionHandler_ 412 >::send_early_parcel::value 413 >::type send_early_parcel_impl(locality const & dest,parcel p)414 send_early_parcel_impl(locality const & dest, parcel p) 415 { 416 put_parcel( 417 dest 418 , std::move(p) 419 , util::bind_front( 420 &parcelport::early_pending_parcel_handler 421 , this 422 ) 423 ); 424 } 425 426 template <typename ConnectionHandler_> 427 typename std::enable_if< 428 !connection_handler_traits< 429 ConnectionHandler_ 430 >::send_early_parcel::value 431 >::type send_early_parcel_impl(locality const & dest,parcel p)432 send_early_parcel_impl(locality const & dest, parcel p) 433 { 434 HPX_THROW_EXCEPTION(network_error, "send_early_parcel", 435 "This parcelport does not support sending early parcels"); 436 } 437 438 template <typename ConnectionHandler_> 439 typename std::enable_if< 440 connection_handler_traits< 441 ConnectionHandler_ 442 >::do_background_work::value, 443 bool 444 >::type do_background_work_impl(std::size_t num_thread)445 do_background_work_impl(std::size_t num_thread) 446 { 447 return connection_handler().background_work(num_thread); 448 } 449 450 template <typename ConnectionHandler_> 451 typename std::enable_if< 452 !connection_handler_traits< 453 ConnectionHandler_ 454 >::do_background_work::value, 455 bool 456 >::type do_background_work_impl(std::size_t)457 do_background_work_impl(std::size_t) 458 { 459 return false; 460 } 461 462 template <typename ConnectionHandler_> 463 typename std::enable_if< 464 connection_handler_traits< 465 ConnectionHandler_ 466 >::send_immediate_parcels::value, 467 bool 468 >::type can_send_immediate_impl()469 can_send_immediate_impl() 470 { 471 return connection_handler().can_send_immediate(); 472 } 473 474 template <typename ConnectionHandler_> 475 typename std::enable_if< 476 !connection_handler_traits< 477 ConnectionHandler_ 478 >::send_immediate_parcels::value, 479 bool 480 >::type can_send_immediate_impl()481 can_send_immediate_impl() 482 { 483 return false; 484 } 485 486 protected: 487 template <typename ConnectionHandler_> 488 typename std::enable_if< 489 connection_handler_traits< 490 ConnectionHandler_ 491 >::send_immediate_parcels::value, 492 void 493 >::type send_immediate_impl(parcelport_impl & this_,locality const & dest_,write_handler_type * fs,parcel * ps,std::size_t num_parcels)494 send_immediate_impl( 495 parcelport_impl &this_, locality const&dest_, 496 write_handler_type *fs, parcel *ps, std::size_t num_parcels) 497 { 498 std::uint64_t addr; 499 error_code ec; 500 // First try to get a connection ... 501 connection *sender = this_.connection_handler().get_connection(dest_, addr); 502 503 // If we couldn't get one ... enqueue the parcel and move on 504 std::size_t encoded_parcels = 0; 505 std::vector<parcel> parcels; 506 std::vector<write_handler_type> handlers; 507 if (sender != nullptr) 508 { 509 if (fs == nullptr) 510 { 511 HPX_ASSERT(ps == nullptr); 512 HPX_ASSERT(num_parcels == 0u); 513 514 if(!dequeue_parcels(dest_, parcels, handlers)) 515 { 516 // Give this connection back to the connection handler as 517 // we couldn't dequeue parcels. 518 this_.connection_handler().reclaim_connection(sender); 519 520 return; 521 } 522 523 ps = parcels.data(); 524 fs = handlers.data(); 525 num_parcels = parcels.size(); 526 HPX_ASSERT(parcels.size() == handlers.size()); 527 } 528 529 auto encoded_buffer = sender->get_new_buffer(); 530 // encode the parcels 531 encoded_parcels = encode_parcels(this_, ps, num_parcels, 532 encoded_buffer, 533 this_.archive_flags_, 534 this_.get_max_outbound_message_size()); 535 536 typedef detail::call_for_each handler_type; 537 538 if (sender->parcelport_->async_write( 539 handler_type( 540 handler_type::handlers_type( 541 std::make_move_iterator(fs), 542 std::make_move_iterator(fs + encoded_parcels)), 543 handler_type::parcels_type( 544 std::make_move_iterator(ps), 545 std::make_move_iterator(ps + encoded_parcels)) 546 ), 547 sender, addr, 548 encoded_buffer)) 549 { 550 // we don't propagate errors for now 551 } 552 } 553 if (num_parcels != encoded_parcels && fs != nullptr) 554 { 555 std::vector<parcel> overflow_parcels( 556 std::make_move_iterator(ps + encoded_parcels), 557 std::make_move_iterator(ps + num_parcels)); 558 std::vector<write_handler_type> overflow_handlers( 559 std::make_move_iterator(fs + encoded_parcels), 560 std::make_move_iterator(fs + num_parcels)); 561 enqueue_parcels(dest_, std::move(overflow_parcels), 562 std::move(overflow_handlers)); 563 } 564 } 565 566 template <typename ConnectionHandler_> 567 typename std::enable_if< 568 !connection_handler_traits< 569 ConnectionHandler_ 570 >::send_immediate_parcels::value, 571 void 572 >::type send_immediate_impl(parcelport_impl & this_,locality const & dest_,write_handler_type * fs,parcel * ps,std::size_t num_parcels)573 send_immediate_impl( 574 parcelport_impl &this_, locality const&dest_, 575 write_handler_type *fs, parcel *ps, std::size_t num_parcels) 576 { 577 HPX_ASSERT(false); 578 } 579 580 private: 581 /////////////////////////////////////////////////////////////////////// get_connection(locality const & l,bool force,error_code & ec)582 std::shared_ptr<connection> get_connection( 583 locality const& l, bool force, error_code& ec) 584 { 585 // Request new connection from connection cache. 586 std::shared_ptr<connection> sender_connection; 587 588 if (connection_handler_traits<ConnectionHandler>:: 589 send_immediate_parcels::value) 590 { 591 std::terminate(); 592 } 593 else { 594 // Get a connection or reserve space for a new connection. 595 if (!connection_cache_.get_or_reserve(l, sender_connection)) 596 { 597 // If no slot is available it's not a problem as the parcel 598 // will be sent out whenever the next connection is returned 599 // to the cache. 600 if (&ec != &throws) 601 ec = make_success_code(); 602 return sender_connection; 603 } 604 } 605 606 // Check if we need to create the new connection. 607 if (!sender_connection) 608 return connection_handler().create_connection(l, ec); 609 610 if (&ec != &throws) 611 ec = make_success_code(); 612 613 return sender_connection; 614 } 615 616 /////////////////////////////////////////////////////////////////////// enqueue_parcel(locality const & locality_id,parcel && p,write_handler_type && f)617 void enqueue_parcel(locality const& locality_id, 618 parcel&& p, write_handler_type&& f) 619 { 620 typedef pending_parcels_map::mapped_type mapped_type; 621 622 std::unique_lock<lcos::local::spinlock> l(mtx_); 623 // We ignore the lock here. It might happen that while enqueuing, 624 // we need to acquire a lock. This should not cause any problems 625 // (famous last words) 626 util::ignore_while_checking< 627 std::unique_lock<lcos::local::spinlock> 628 > il(&l); 629 630 mapped_type& e = pending_parcels_[locality_id]; 631 util::get<0>(e).push_back(std::move(p)); 632 util::get<1>(e).push_back(std::move(f)); 633 634 parcel_destinations_.insert(locality_id); 635 ++num_parcel_destinations_; 636 } 637 enqueue_parcels(locality const & locality_id,std::vector<parcel> && parcels,std::vector<write_handler_type> && handlers)638 void enqueue_parcels(locality const& locality_id, 639 std::vector<parcel>&& parcels, 640 std::vector<write_handler_type>&& handlers) 641 { 642 typedef pending_parcels_map::mapped_type mapped_type; 643 644 std::unique_lock<lcos::local::spinlock> l(mtx_); 645 // We ignore the lock here. It might happen that while enqueuing, 646 // we need to acquire a lock. This should not cause any problems 647 // (famous last words) 648 util::ignore_while_checking< 649 std::unique_lock<lcos::local::spinlock> 650 > il(&l); 651 652 HPX_ASSERT(parcels.size() == handlers.size()); 653 654 mapped_type& e = pending_parcels_[locality_id]; 655 if (util::get<0>(e).empty()) 656 { 657 HPX_ASSERT(util::get<1>(e).empty()); 658 std::swap(util::get<0>(e), parcels); 659 std::swap(util::get<1>(e), handlers); 660 } 661 else 662 { 663 HPX_ASSERT(util::get<0>(e).size() == util::get<1>(e).size()); 664 std::size_t new_size = util::get<0>(e).size() + parcels.size(); 665 util::get<0>(e).reserve(new_size); 666 667 std::move(parcels.begin(), parcels.end(), 668 std::back_inserter(util::get<0>(e))); 669 util::get<1>(e).reserve(new_size); 670 std::move(handlers.begin(), handlers.end(), 671 std::back_inserter(util::get<1>(e))); 672 } 673 674 parcel_destinations_.insert(locality_id); 675 ++num_parcel_destinations_; 676 } 677 dequeue_parcels(locality const & locality_id,std::vector<parcel> & parcels,std::vector<write_handler_type> & handlers)678 bool dequeue_parcels(locality const& locality_id, 679 std::vector<parcel>& parcels, 680 std::vector<write_handler_type>& handlers) 681 { 682 typedef pending_parcels_map::iterator iterator; 683 684 { 685 std::unique_lock<lcos::local::spinlock> l(mtx_, std::try_to_lock); 686 687 if (!l) return false; 688 689 iterator it = pending_parcels_.find(locality_id); 690 691 // do nothing if parcels have already been picked up by 692 // another thread 693 if (it != pending_parcels_.end() && !util::get<0>(it->second).empty()) 694 { 695 HPX_ASSERT(it->first == locality_id); 696 HPX_ASSERT(handlers.size() == 0); 697 HPX_ASSERT(handlers.size() == parcels.size()); 698 std::swap(parcels, util::get<0>(it->second)); 699 HPX_ASSERT(util::get<0>(it->second).size() == 0); 700 std::swap(handlers, util::get<1>(it->second)); 701 HPX_ASSERT(handlers.size() == parcels.size()); 702 703 HPX_ASSERT(!handlers.empty()); 704 } 705 else 706 { 707 HPX_ASSERT(util::get<1>(it->second).empty()); 708 return false; 709 } 710 711 parcel_destinations_.erase(locality_id); 712 713 HPX_ASSERT(0 != num_parcel_destinations_.load()); 714 --num_parcel_destinations_; 715 716 return true; 717 } 718 } 719 720 protected: dequeue_parcel(locality & dest,parcel & p,write_handler_type & handler)721 bool dequeue_parcel(locality& dest, parcel& p, write_handler_type& handler) 722 { 723 { 724 std::unique_lock<lcos::local::spinlock> l(mtx_, std::try_to_lock); 725 726 if (!l) return false; 727 728 for (auto &pending: pending_parcels_) 729 { 730 auto &parcels = util::get<0>(pending.second); 731 if (!parcels.empty()) 732 { 733 auto& handlers = util::get<1>(pending.second); 734 dest = pending.first; 735 p = std::move(parcels.back()); 736 parcels.pop_back(); 737 handler = std::move(handlers.back()); 738 handlers.pop_back(); 739 740 if (parcels.empty()) 741 { 742 pending_parcels_.erase(dest); 743 } 744 return true; 745 } 746 } 747 } 748 return false; 749 } 750 trigger_pending_work()751 bool trigger_pending_work() 752 { 753 if (0 == num_parcel_destinations_.load(std::memory_order_relaxed)) 754 return true; 755 756 std::vector<locality> destinations; 757 758 { 759 std::unique_lock<lcos::local::spinlock> l(mtx_, std::try_to_lock); 760 if(l.owns_lock()) 761 { 762 if (parcel_destinations_.empty()) 763 return true; 764 765 destinations.reserve(parcel_destinations_.size()); 766 for (locality const& loc : parcel_destinations_) 767 { 768 destinations.push_back(loc); 769 } 770 } 771 } 772 773 // Create new HPX threads which send the parcels that are still 774 // pending. 775 for (locality const& loc : destinations) 776 { 777 get_connection_and_send_parcels(loc); 778 } 779 780 return true; 781 } 782 783 private: 784 /////////////////////////////////////////////////////////////////////// get_connection_and_send_parcels(locality const & locality_id,bool background=false)785 void get_connection_and_send_parcels( 786 locality const& locality_id, bool background = false) 787 { 788 789 if (connection_handler_traits<ConnectionHandler>:: 790 send_immediate_parcels::value) 791 { 792 this->send_immediate_impl<ConnectionHandler>( 793 *this, locality_id, nullptr, nullptr, 0); 794 return; 795 } 796 797 // If one of the sending threads are in suspended state, we 798 // need to force a new connection to avoid deadlocks. 799 bool force_connection = true; 800 801 error_code ec; 802 std::shared_ptr<connection> sender_connection = 803 get_connection(locality_id, force_connection, ec); 804 805 if (!sender_connection) 806 { 807 // We can safely return if no connection is available 808 // at this point. As soon as a connection becomes 809 // available it checks for pending parcels and sends 810 // those out. 811 return; 812 } 813 814 // repeat until no more parcels are to be sent 815 std::vector<parcel> parcels; 816 std::vector<write_handler_type> handlers; 817 818 if(!dequeue_parcels(locality_id, parcels, handlers)) 819 { 820 // Give this connection back to the cache as we couldn't dequeue 821 // parcels. 822 connection_cache_.reclaim(locality_id, sender_connection); 823 824 return; 825 } 826 827 // send parcels if they didn't get sent by another connection 828 send_pending_parcels( 829 locality_id, 830 sender_connection, std::move(parcels), 831 std::move(handlers)); 832 833 // We yield here for a short amount of time to give another 834 // HPX thread the chance to put a subsequent parcel which 835 // leads to a more effective parcel buffering 836 // if (hpx::threads::get_self_ptr()) 837 // hpx::this_thread::yield(); 838 } 839 840 send_pending_parcels_trampoline(boost::system::error_code const & ec,locality const & locality_id,std::shared_ptr<connection> sender_connection)841 void send_pending_parcels_trampoline( 842 boost::system::error_code const& ec, 843 locality const& locality_id, 844 std::shared_ptr<connection> sender_connection) 845 { 846 HPX_ASSERT(operations_in_flight_ != 0); 847 --operations_in_flight_; 848 849 #if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION) 850 sender_connection->set_state(connection::state_scheduled_thread); 851 #endif 852 if (!ec) 853 { 854 // Give this connection back to the cache as it's not 855 // needed anymore. 856 connection_cache_.reclaim(locality_id, sender_connection); 857 } 858 else 859 { 860 // remove this connection from cache 861 connection_cache_.clear(locality_id, sender_connection); 862 } 863 { 864 std::lock_guard<lcos::local::spinlock> l(mtx_); 865 866 // HPX_ASSERT(locality_id == sender_connection->destination()); 867 pending_parcels_map::iterator it = pending_parcels_.find(locality_id); 868 if (it == pending_parcels_.end() || util::get<0>(it->second).empty()) 869 return; 870 } 871 872 // Create a new HPX thread which sends parcels that are still 873 // pending. 874 get_connection_and_send_parcels(locality_id); 875 } 876 send_pending_parcels(parcelset::locality const & parcel_locality_id,std::shared_ptr<connection> sender_connection,std::vector<parcel> && parcels,std::vector<write_handler_type> && handlers)877 void send_pending_parcels( 878 parcelset::locality const & parcel_locality_id, 879 std::shared_ptr<connection> sender_connection, 880 std::vector<parcel>&& parcels, 881 std::vector<write_handler_type>&& handlers) 882 { 883 #if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION) 884 sender_connection->set_state(connection::state_send_pending); 885 #endif 886 887 #if defined(HPX_DEBUG) 888 // verify the connection points to the right destination 889 // HPX_ASSERT(parcel_locality_id == sender_connection->destination()); 890 sender_connection->verify_(parcel_locality_id); 891 #endif 892 // encode the parcels 893 std::size_t num_parcels = encode_parcels(*this, &parcels[0], 894 parcels.size(), sender_connection->buffer_, 895 archive_flags_, 896 this->get_max_outbound_message_size()); 897 898 using hpx::parcelset::detail::call_for_each; 899 if (num_parcels == parcels.size()) 900 { 901 ++operations_in_flight_; 902 // send all of the parcels 903 sender_connection->async_write( 904 call_for_each(std::move(handlers), std::move(parcels)), 905 util::bind_front(&parcelport_impl::send_pending_parcels_trampoline, 906 this)); 907 } 908 else 909 { 910 ++operations_in_flight_; 911 HPX_ASSERT(num_parcels < parcels.size()); 912 913 std::vector<write_handler_type> handled_handlers; 914 handled_handlers.reserve(num_parcels); 915 916 std::move(handlers.begin(), handlers.begin()+num_parcels, 917 std::back_inserter(handled_handlers)); 918 919 std::vector<parcel> handled_parcels; 920 handled_parcels.reserve(num_parcels); 921 922 std::move(parcels.begin(), parcels.begin()+num_parcels, 923 std::back_inserter(handled_parcels)); 924 925 // send only part of the parcels 926 sender_connection->async_write( 927 call_for_each( 928 std::move(handled_handlers), std::move(handled_parcels)), 929 util::bind_front(&parcelport_impl::send_pending_parcels_trampoline, 930 this)); 931 932 // give back unhandled parcels 933 parcels.erase(parcels.begin(), parcels.begin()+num_parcels); 934 handlers.erase(handlers.begin(), handlers.begin()+num_parcels); 935 936 enqueue_parcels(parcel_locality_id, std::move(parcels), 937 std::move(handlers)); 938 } 939 940 if(threads::get_self_ptr()) 941 { 942 // We suspend our thread, which will make progress on the network 943 hpx::this_thread::suspend(hpx::threads::pending_boost, 944 "parcelport_impl::send_pending_parcels"); 945 } 946 } 947 948 public: get_next_num_thread()949 std::size_t get_next_num_thread() 950 { 951 return ++num_thread_ % max_background_thread_; 952 } 953 954 protected: 955 /// The pool of io_service objects used to perform asynchronous operations. 956 util::io_service_pool io_service_pool_; 957 958 /// The connection cache for sending connections 959 util::connection_cache<connection, locality> connection_cache_; 960 961 typedef hpx::lcos::local::spinlock mutex_type; 962 963 int archive_flags_; 964 hpx::util::atomic_count operations_in_flight_; 965 966 std::atomic<std::size_t> num_thread_; 967 std::size_t const max_background_thread_; 968 }; 969 }} 970 971 #endif 972