1 //  Copyright (c) 2014-2015 Thomas Heller
2 //  Copyright (c) 2007-2014 Hartmut Kaiser
3 //  Copyright (c) 2007 Richard D Guidry Jr
4 //  Copyright (c) 2011 Bryce Lelbach
5 //  Copyright (c) 2011 Katelyn Kufahl
6 //
7 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
8 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 
10 #if !defined(HPX_PARCELSET_PARCELPORT_MAR_26_2008_1214PM)
11 #define HPX_PARCELSET_PARCELPORT_MAR_26_2008_1214PM
12 
13 #include <hpx/config.hpp>
14 #include <hpx/lcos/local/spinlock.hpp>
15 #include <hpx/performance_counters/parcels/data_point.hpp>
16 #include <hpx/performance_counters/parcels/gatherer.hpp>
17 #include <hpx/runtime/applier_fwd.hpp>
18 #include <hpx/runtime/parcelset/detail/per_action_data_counter.hpp>
19 #include <hpx/runtime/parcelset/locality.hpp>
20 #include <hpx/runtime/parcelset/parcel.hpp>
21 #include <hpx/util/function.hpp>
22 #include <hpx/util/tuple.hpp>
23 #include <hpx/util_fwd.hpp>
24 
25 #include <atomic>
26 #include <cstddef>
27 #include <cstdint>
28 #include <cstdint>
29 #include <list>
30 #include <map>
31 #include <memory>
32 #include <mutex>
33 #include <set>
34 #include <string>
35 #include <vector>
36 
37 #include <hpx/config/warnings_prefix.hpp>
38 
39 ///////////////////////////////////////////////////////////////////////////////
40 namespace hpx { namespace agas
41 {
42     // forward declaration only
43     struct HPX_EXPORT big_boot_barrier;
44 }}
45 
46 ///////////////////////////////////////////////////////////////////////////////
47 namespace hpx { namespace parcelset
48 {
49     /// The parcelport is the lowest possible representation of the parcelset
50     /// inside a locality. It provides the minimal functionality to send and
51     /// to receive parcels.
52     class HPX_EXPORT parcelport
53       : public std::enable_shared_from_this<parcelport>
54     {
55     public:
56         HPX_NON_COPYABLE(parcelport);
57 
58     private:
59         // avoid warnings about using \a this in member initializer list
This()60         parcelport& This() { return *this; }
61 
62         friend struct agas::big_boot_barrier;
63 
64     public:
65         typedef util::function_nonser<
66             void(boost::system::error_code const&, parcel const&)
67         > write_handler_type;
68 
69         typedef util::function_nonser<
70             void(parcelport& pp, std::shared_ptr<std::vector<char> >,
71                  threads::thread_priority)
72         > read_handler_type;
73 
74         /// Construct the parcelport on the given locality.
75         parcelport(util::runtime_configuration const& ini, locality const & here,
76             std::string const& type);
77 
78         /// Virtual destructor
~parcelport()79         virtual ~parcelport() {}
80 
81         virtual bool can_bootstrap() const = 0;
82 
priority() const83         int priority() const { return priority_; }
84 
85         /// Retrieve the type of the locality represented by this parcelport
type() const86         std::string const& type() const { return type_; }
87 
88         /// Start the parcelport I/O thread pool.
89         ///
90         /// \param blocking [in] If blocking is set to \a true the routine will
91         ///                 not return before stop() has been called, otherwise
92         ///                 the routine returns immediately.
93         virtual bool run(bool blocking = true) = 0;
94 
95         virtual void flush_parcels() = 0;
96 
97         /// Stop the parcelport I/O thread pool.
98         ///
99         /// \param blocking [in] If blocking is set to \a false the routine will
100         ///                 return immediately, otherwise it will wait for all
101         ///                 worker threads to exit.
102         virtual void stop(bool blocking = true) = 0;
103 
104         /// Check if this parcelport can connect to this locality
105         ///
106         /// The default is to return true if it can be used at bootstrap or alternative
107         /// parcelports are enabled.
can_connect(locality const &,bool use_alternative_parcelport)108         virtual bool can_connect(locality const &, bool use_alternative_parcelport)
109         {
110             return use_alternative_parcelport || can_bootstrap();
111         }
112 
113         /// Queues a parcel for transmission to another locality
114         ///
115         /// \note The function put_parcel() is asynchronous, the provided
116         /// function or function object gets invoked on completion of the send
117         /// operation or on any error.
118         ///
119         /// \param p        [in] A reference to the parcel to send.
120         /// \param f        [in] A function object to be invoked on successful
121         ///                 completion or on errors. The signature of this
122         ///                 function object is expected to be:
123         ///
124         /// \code
125         ///      void handler(boost::system::error_code const& err,
126         ///                   std::size_t bytes_written);
127         /// \endcode
128         virtual void put_parcel(locality const & dest, parcel p,
129             write_handler_type f) = 0;
130 
131         /// Queues a list of parcels for transmission to another locality
132         ///
133         /// \note The function put_parcels() is asynchronous, the provided
134         /// functions or function objects get invoked on completion of the send
135         /// operation or on any error.
136         ///
137         /// \param parcels  [in] A reference to the list of parcels to send.
138         /// \param handlers [in] A list of function objects to be invoked on
139         ///                 successful completion or on errors. The signature of
140         ///                 these function objects is expected to be:
141         ///
142         /// \code
143         ///      void handler(boost::system::error_code const& err,
144         ///                   std::size_t bytes_written);
145         /// \endcode
146         virtual void put_parcels(locality const& dests,
147             std::vector<parcel> parcels,
148             std::vector<write_handler_type> handlers) = 0;
149 
150         /// Send an early parcel through the TCP parcelport
151         ///
152         /// \param p        [in, out] A reference to the parcel to send. The
153         ///                 parcel \a p will be modified in place, as it will
154         ///                 get set the resolved destination address and parcel
155         ///                 id (if not already set).
156         virtual void send_early_parcel(locality const & dest, parcel p) = 0;
157 
158         /// Cache specific functionality
159         virtual void remove_from_connection_cache(locality const& loc) = 0;
160 
161         /// Return the thread pool if the name matches
162         virtual util::io_service_pool* get_thread_pool(char const* name) = 0;
163 
164         /// Return the given connection cache statistic
165         enum connection_cache_statistics_type
166         {
167             connection_cache_insertions = 0,
168             connection_cache_evictions = 1,
169             connection_cache_hits = 2,
170             connection_cache_misses = 3,
171             connection_cache_reclaims = 4
172         };
173 
174         // invoke pending background work
175         virtual bool do_background_work(std::size_t num_thread) = 0;
176 
177         // retrieve performance counter value for given statistics type
178         virtual std::int64_t get_connection_cache_statistics(
179             connection_cache_statistics_type, bool reset) = 0;
180 
181         /// Return the name of this locality
182         virtual std::string get_locality_name() const = 0;
183 
184         /// \brief Allow access to the locality this parcelport is associated
185         /// with.
186         ///
187         /// This accessor returns a reference to the locality this parcelport
188         /// is associated with.
here() const189         locality const& here() const
190         {
191             return here_;
192         }
193 
194         virtual locality create_locality() const = 0;
195 
196         virtual locality agas_locality(util::runtime_configuration const& ini)
197             const = 0;
198 
199         /// Performance counter data
200 
201         /// number of parcels sent
202         std::int64_t get_parcel_send_count(bool reset);
203 
204         /// number of messages sent
205         std::int64_t get_message_send_count(bool reset);
206 
207         /// number of parcels received
208         std::int64_t get_parcel_receive_count(bool reset);
209 
210         /// number of messages received
211         std::int64_t get_message_receive_count(bool reset);
212 
213         /// the total time it took for all sends, from async_write to the
214         /// completion handler (nanoseconds)
215         std::int64_t get_sending_time(bool reset);
216 
217         /// the total time it took for all receives, from async_read to the
218         /// completion handler (nanoseconds)
219         std::int64_t get_receiving_time(bool reset);
220 
221         /// the total time it took for all sender-side serialization operations
222         /// (nanoseconds)
223         std::int64_t get_sending_serialization_time(bool reset);
224 
225         /// the total time it took for all receiver-side serialization
226         /// operations (nanoseconds)
227         std::int64_t get_receiving_serialization_time(bool reset);
228 
229         /// total data sent (bytes)
230         std::int64_t get_data_sent(bool reset);
231 
232         /// total data (uncompressed) sent (bytes)
233         std::int64_t get_raw_data_sent(bool reset);
234 
235         /// total data received (bytes)
236         std::int64_t get_data_received(bool reset);
237 
238         /// total data (uncompressed) received (bytes)
239         std::int64_t get_raw_data_received(bool reset);
240 
241         std::int64_t get_buffer_allocate_time_sent(bool reset);
242         std::int64_t get_buffer_allocate_time_received(bool reset);
243 
244         std::int64_t get_pending_parcels_count(bool /*reset*/);
245 
246 #if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
247         // same as above, just separated data for each action
248         // number of parcels sent
249         std::int64_t get_action_parcel_send_count(
250             std::string const&, bool reset);
251 
252         // number of parcels received
253         std::int64_t get_action_parcel_receive_count(
254             std::string const&, bool reset);
255 
256         // the total time it took for all sender-side serialization operations
257         // (nanoseconds)
258         std::int64_t get_action_sending_serialization_time(
259             std::string const&, bool reset);
260 
261         // the total time it took for all receiver-side serialization
262         // operations (nanoseconds)
263         std::int64_t get_action_receiving_serialization_time(
264             std::string const&, bool reset);
265 
266         // total data sent (bytes)
267         std::int64_t get_action_data_sent(
268             std::string const&, bool reset);
269 
270         // total data received (bytes)
271         std::int64_t get_action_data_received(
272             std::string const&, bool reset);
273 #endif
274 
275         ///////////////////////////////////////////////////////////////////////
set_applier(applier::applier * applier)276         void set_applier(applier::applier * applier)
277         {
278             applier_ = applier;
279         }
280 
281         /// Update performance counter data
282         void add_received_data(
283             performance_counters::parcels::data_point const& data);
284 
285         void add_sent_data(
286             performance_counters::parcels::data_point const& data);
287 
288 #if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
289         void add_received_data(char const* action,
290             performance_counters::parcels::data_point const& data);
291 
292         void add_sent_data(char const* action,
293             performance_counters::parcels::data_point const& data);
294 #endif
295 
296         /// Return the configured maximal allowed message data size
get_max_inbound_message_size() const297         std::int64_t get_max_inbound_message_size() const
298         {
299             return max_inbound_message_size_;
300         }
301 
get_max_outbound_message_size() const302         std::int64_t get_max_outbound_message_size() const
303         {
304             return max_outbound_message_size_;
305         }
306 
307         /// Return whether it is allowed to apply array optimizations
allow_array_optimizations() const308         bool allow_array_optimizations() const
309         {
310             return allow_array_optimizations_;
311         }
312 
313         /// Return whether it is allowed to apply zero copy optimizations
allow_zero_copy_optimizations() const314         bool allow_zero_copy_optimizations() const
315         {
316             return allow_zero_copy_optimizations_;
317         }
318 
async_serialization() const319         bool async_serialization() const
320         {
321             return async_serialization_;
322         }
323 
324         // callback while bootstrap the parcel layer
325         void early_pending_parcel_handler(boost::system::error_code const& ec,
326             parcel const & p);
327 
328     protected:
329         /// mutex for all of the member data
330         mutable lcos::local::spinlock mtx_;
331 
332         hpx::applier::applier *applier_;
333 
334         /// The cache for pending parcels
335         typedef util::tuple<
336             std::vector<parcel>
337           , std::vector<write_handler_type>
338         > map_second_type;
339         typedef std::map<locality, map_second_type> pending_parcels_map;
340         pending_parcels_map pending_parcels_;
341 
342         typedef std::set<locality> pending_parcels_destinations;
343         pending_parcels_destinations parcel_destinations_;
344         std::atomic<std::uint32_t> num_parcel_destinations_;
345 
346         /// The local locality
347         locality here_;
348 
349         /// The maximally allowed message size
350         std::int64_t const max_inbound_message_size_;
351         std::int64_t const max_outbound_message_size_;
352 
353         /// Overall parcel statistics
354         performance_counters::parcels::gatherer parcels_sent_;
355         performance_counters::parcels::gatherer parcels_received_;
356 
357 #if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
358         // Per-action based parcel statistics
359         detail::per_action_data_counter action_parcels_sent_;
360         detail::per_action_data_counter action_parcels_received_;
361 #endif
362 
363         /// serialization is allowed to use array optimization
364         bool allow_array_optimizations_;
365         bool allow_zero_copy_optimizations_;
366 
367         /// async serialization of parcels
368         bool async_serialization_;
369 
370         /// priority of the parcelport
371         int priority_;
372         std::string type_;
373     };
374 }}
375 
376 #include <hpx/config/warnings_suffix.hpp>
377 
378 #endif
379