1 // Copyright (c) 2014-2015 Thomas Heller 2 // Copyright (c) 2007-2014 Hartmut Kaiser 3 // Copyright (c) 2007 Richard D Guidry Jr 4 // Copyright (c) 2011 Bryce Lelbach 5 // Copyright (c) 2011 Katelyn Kufahl 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See accompanying 8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 10 #if !defined(HPX_PARCELSET_PARCELPORT_MAR_26_2008_1214PM) 11 #define HPX_PARCELSET_PARCELPORT_MAR_26_2008_1214PM 12 13 #include <hpx/config.hpp> 14 #include <hpx/lcos/local/spinlock.hpp> 15 #include <hpx/performance_counters/parcels/data_point.hpp> 16 #include <hpx/performance_counters/parcels/gatherer.hpp> 17 #include <hpx/runtime/applier_fwd.hpp> 18 #include <hpx/runtime/parcelset/detail/per_action_data_counter.hpp> 19 #include <hpx/runtime/parcelset/locality.hpp> 20 #include <hpx/runtime/parcelset/parcel.hpp> 21 #include <hpx/util/function.hpp> 22 #include <hpx/util/tuple.hpp> 23 #include <hpx/util_fwd.hpp> 24 25 #include <atomic> 26 #include <cstddef> 27 #include <cstdint> 28 #include <cstdint> 29 #include <list> 30 #include <map> 31 #include <memory> 32 #include <mutex> 33 #include <set> 34 #include <string> 35 #include <vector> 36 37 #include <hpx/config/warnings_prefix.hpp> 38 39 /////////////////////////////////////////////////////////////////////////////// 40 namespace hpx { namespace agas 41 { 42 // forward declaration only 43 struct HPX_EXPORT big_boot_barrier; 44 }} 45 46 /////////////////////////////////////////////////////////////////////////////// 47 namespace hpx { namespace parcelset 48 { 49 /// The parcelport is the lowest possible representation of the parcelset 50 /// inside a locality. It provides the minimal functionality to send and 51 /// to receive parcels. 52 class HPX_EXPORT parcelport 53 : public std::enable_shared_from_this<parcelport> 54 { 55 public: 56 HPX_NON_COPYABLE(parcelport); 57 58 private: 59 // avoid warnings about using \a this in member initializer list This()60 parcelport& This() { return *this; } 61 62 friend struct agas::big_boot_barrier; 63 64 public: 65 typedef util::function_nonser< 66 void(boost::system::error_code const&, parcel const&) 67 > write_handler_type; 68 69 typedef util::function_nonser< 70 void(parcelport& pp, std::shared_ptr<std::vector<char> >, 71 threads::thread_priority) 72 > read_handler_type; 73 74 /// Construct the parcelport on the given locality. 75 parcelport(util::runtime_configuration const& ini, locality const & here, 76 std::string const& type); 77 78 /// Virtual destructor ~parcelport()79 virtual ~parcelport() {} 80 81 virtual bool can_bootstrap() const = 0; 82 priority() const83 int priority() const { return priority_; } 84 85 /// Retrieve the type of the locality represented by this parcelport type() const86 std::string const& type() const { return type_; } 87 88 /// Start the parcelport I/O thread pool. 89 /// 90 /// \param blocking [in] If blocking is set to \a true the routine will 91 /// not return before stop() has been called, otherwise 92 /// the routine returns immediately. 93 virtual bool run(bool blocking = true) = 0; 94 95 virtual void flush_parcels() = 0; 96 97 /// Stop the parcelport I/O thread pool. 98 /// 99 /// \param blocking [in] If blocking is set to \a false the routine will 100 /// return immediately, otherwise it will wait for all 101 /// worker threads to exit. 102 virtual void stop(bool blocking = true) = 0; 103 104 /// Check if this parcelport can connect to this locality 105 /// 106 /// The default is to return true if it can be used at bootstrap or alternative 107 /// parcelports are enabled. can_connect(locality const &,bool use_alternative_parcelport)108 virtual bool can_connect(locality const &, bool use_alternative_parcelport) 109 { 110 return use_alternative_parcelport || can_bootstrap(); 111 } 112 113 /// Queues a parcel for transmission to another locality 114 /// 115 /// \note The function put_parcel() is asynchronous, the provided 116 /// function or function object gets invoked on completion of the send 117 /// operation or on any error. 118 /// 119 /// \param p [in] A reference to the parcel to send. 120 /// \param f [in] A function object to be invoked on successful 121 /// completion or on errors. The signature of this 122 /// function object is expected to be: 123 /// 124 /// \code 125 /// void handler(boost::system::error_code const& err, 126 /// std::size_t bytes_written); 127 /// \endcode 128 virtual void put_parcel(locality const & dest, parcel p, 129 write_handler_type f) = 0; 130 131 /// Queues a list of parcels for transmission to another locality 132 /// 133 /// \note The function put_parcels() is asynchronous, the provided 134 /// functions or function objects get invoked on completion of the send 135 /// operation or on any error. 136 /// 137 /// \param parcels [in] A reference to the list of parcels to send. 138 /// \param handlers [in] A list of function objects to be invoked on 139 /// successful completion or on errors. The signature of 140 /// these function objects is expected to be: 141 /// 142 /// \code 143 /// void handler(boost::system::error_code const& err, 144 /// std::size_t bytes_written); 145 /// \endcode 146 virtual void put_parcels(locality const& dests, 147 std::vector<parcel> parcels, 148 std::vector<write_handler_type> handlers) = 0; 149 150 /// Send an early parcel through the TCP parcelport 151 /// 152 /// \param p [in, out] A reference to the parcel to send. The 153 /// parcel \a p will be modified in place, as it will 154 /// get set the resolved destination address and parcel 155 /// id (if not already set). 156 virtual void send_early_parcel(locality const & dest, parcel p) = 0; 157 158 /// Cache specific functionality 159 virtual void remove_from_connection_cache(locality const& loc) = 0; 160 161 /// Return the thread pool if the name matches 162 virtual util::io_service_pool* get_thread_pool(char const* name) = 0; 163 164 /// Return the given connection cache statistic 165 enum connection_cache_statistics_type 166 { 167 connection_cache_insertions = 0, 168 connection_cache_evictions = 1, 169 connection_cache_hits = 2, 170 connection_cache_misses = 3, 171 connection_cache_reclaims = 4 172 }; 173 174 // invoke pending background work 175 virtual bool do_background_work(std::size_t num_thread) = 0; 176 177 // retrieve performance counter value for given statistics type 178 virtual std::int64_t get_connection_cache_statistics( 179 connection_cache_statistics_type, bool reset) = 0; 180 181 /// Return the name of this locality 182 virtual std::string get_locality_name() const = 0; 183 184 /// \brief Allow access to the locality this parcelport is associated 185 /// with. 186 /// 187 /// This accessor returns a reference to the locality this parcelport 188 /// is associated with. here() const189 locality const& here() const 190 { 191 return here_; 192 } 193 194 virtual locality create_locality() const = 0; 195 196 virtual locality agas_locality(util::runtime_configuration const& ini) 197 const = 0; 198 199 /// Performance counter data 200 201 /// number of parcels sent 202 std::int64_t get_parcel_send_count(bool reset); 203 204 /// number of messages sent 205 std::int64_t get_message_send_count(bool reset); 206 207 /// number of parcels received 208 std::int64_t get_parcel_receive_count(bool reset); 209 210 /// number of messages received 211 std::int64_t get_message_receive_count(bool reset); 212 213 /// the total time it took for all sends, from async_write to the 214 /// completion handler (nanoseconds) 215 std::int64_t get_sending_time(bool reset); 216 217 /// the total time it took for all receives, from async_read to the 218 /// completion handler (nanoseconds) 219 std::int64_t get_receiving_time(bool reset); 220 221 /// the total time it took for all sender-side serialization operations 222 /// (nanoseconds) 223 std::int64_t get_sending_serialization_time(bool reset); 224 225 /// the total time it took for all receiver-side serialization 226 /// operations (nanoseconds) 227 std::int64_t get_receiving_serialization_time(bool reset); 228 229 /// total data sent (bytes) 230 std::int64_t get_data_sent(bool reset); 231 232 /// total data (uncompressed) sent (bytes) 233 std::int64_t get_raw_data_sent(bool reset); 234 235 /// total data received (bytes) 236 std::int64_t get_data_received(bool reset); 237 238 /// total data (uncompressed) received (bytes) 239 std::int64_t get_raw_data_received(bool reset); 240 241 std::int64_t get_buffer_allocate_time_sent(bool reset); 242 std::int64_t get_buffer_allocate_time_received(bool reset); 243 244 std::int64_t get_pending_parcels_count(bool /*reset*/); 245 246 #if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS) 247 // same as above, just separated data for each action 248 // number of parcels sent 249 std::int64_t get_action_parcel_send_count( 250 std::string const&, bool reset); 251 252 // number of parcels received 253 std::int64_t get_action_parcel_receive_count( 254 std::string const&, bool reset); 255 256 // the total time it took for all sender-side serialization operations 257 // (nanoseconds) 258 std::int64_t get_action_sending_serialization_time( 259 std::string const&, bool reset); 260 261 // the total time it took for all receiver-side serialization 262 // operations (nanoseconds) 263 std::int64_t get_action_receiving_serialization_time( 264 std::string const&, bool reset); 265 266 // total data sent (bytes) 267 std::int64_t get_action_data_sent( 268 std::string const&, bool reset); 269 270 // total data received (bytes) 271 std::int64_t get_action_data_received( 272 std::string const&, bool reset); 273 #endif 274 275 /////////////////////////////////////////////////////////////////////// set_applier(applier::applier * applier)276 void set_applier(applier::applier * applier) 277 { 278 applier_ = applier; 279 } 280 281 /// Update performance counter data 282 void add_received_data( 283 performance_counters::parcels::data_point const& data); 284 285 void add_sent_data( 286 performance_counters::parcels::data_point const& data); 287 288 #if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS) 289 void add_received_data(char const* action, 290 performance_counters::parcels::data_point const& data); 291 292 void add_sent_data(char const* action, 293 performance_counters::parcels::data_point const& data); 294 #endif 295 296 /// Return the configured maximal allowed message data size get_max_inbound_message_size() const297 std::int64_t get_max_inbound_message_size() const 298 { 299 return max_inbound_message_size_; 300 } 301 get_max_outbound_message_size() const302 std::int64_t get_max_outbound_message_size() const 303 { 304 return max_outbound_message_size_; 305 } 306 307 /// Return whether it is allowed to apply array optimizations allow_array_optimizations() const308 bool allow_array_optimizations() const 309 { 310 return allow_array_optimizations_; 311 } 312 313 /// Return whether it is allowed to apply zero copy optimizations allow_zero_copy_optimizations() const314 bool allow_zero_copy_optimizations() const 315 { 316 return allow_zero_copy_optimizations_; 317 } 318 async_serialization() const319 bool async_serialization() const 320 { 321 return async_serialization_; 322 } 323 324 // callback while bootstrap the parcel layer 325 void early_pending_parcel_handler(boost::system::error_code const& ec, 326 parcel const & p); 327 328 protected: 329 /// mutex for all of the member data 330 mutable lcos::local::spinlock mtx_; 331 332 hpx::applier::applier *applier_; 333 334 /// The cache for pending parcels 335 typedef util::tuple< 336 std::vector<parcel> 337 , std::vector<write_handler_type> 338 > map_second_type; 339 typedef std::map<locality, map_second_type> pending_parcels_map; 340 pending_parcels_map pending_parcels_; 341 342 typedef std::set<locality> pending_parcels_destinations; 343 pending_parcels_destinations parcel_destinations_; 344 std::atomic<std::uint32_t> num_parcel_destinations_; 345 346 /// The local locality 347 locality here_; 348 349 /// The maximally allowed message size 350 std::int64_t const max_inbound_message_size_; 351 std::int64_t const max_outbound_message_size_; 352 353 /// Overall parcel statistics 354 performance_counters::parcels::gatherer parcels_sent_; 355 performance_counters::parcels::gatherer parcels_received_; 356 357 #if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS) 358 // Per-action based parcel statistics 359 detail::per_action_data_counter action_parcels_sent_; 360 detail::per_action_data_counter action_parcels_received_; 361 #endif 362 363 /// serialization is allowed to use array optimization 364 bool allow_array_optimizations_; 365 bool allow_zero_copy_optimizations_; 366 367 /// async serialization of parcels 368 bool async_serialization_; 369 370 /// priority of the parcelport 371 int priority_; 372 std::string type_; 373 }; 374 }} 375 376 #include <hpx/config/warnings_suffix.hpp> 377 378 #endif 379