1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
2 // Copyright (C) 2007   Douglas Gregor
3 // Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
4 
5 // Use, modification and distribution is subject to the Boost Software
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8 
9 //  Authors: Douglas Gregor
10 //           Matthias Troyer
11 //           Andrew Lumsdaine
12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
14 
15 #ifndef BOOST_GRAPH_USE_MPI
16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
17 #endif
18 
19 //#define NO_SPLIT_BATCHES
20 #define SEND_OOB_BSEND
21 
22 #include <boost/optional.hpp>
23 #include <boost/shared_ptr.hpp>
24 #include <boost/weak_ptr.hpp>
25 #include <utility>
26 #include <memory>
27 #include <boost/function/function1.hpp>
28 #include <boost/function/function2.hpp>
29 #include <boost/function/function0.hpp>
30 #include <boost/mpi.hpp>
31 #include <boost/property_map/parallel/process_group.hpp>
32 #include <boost/utility/enable_if.hpp>
33 
34 namespace boost { namespace graph { namespace distributed {
35 
36 // Process group tags
37 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
38 
39 class mpi_process_group
40 {
41   struct impl;
42 
43  public:
44   /// Number of tags available to each data structure.
45   static const int max_tags = 256;
46 
47   /**
48    * The type of a "receive" handler, that will be provided with
49    * (source, tag) pairs when a message is received. Users can provide a
50    * receive handler for a distributed data structure, for example, to
51    * automatically pick up and respond to messages as needed.
52    */
53   typedef function<void(int source, int tag)> receiver_type;
54 
55   /**
56    * The type of a handler for the on-synchronize event, which will be
57    * executed at the beginning of synchronize().
58    */
59   typedef function0<void>      on_synchronize_event_type;
60 
61   /// Used as a tag to help create an "empty" process group.
62   struct create_empty {};
63 
64   /// The type used to buffer message data
65   typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
66 
67   /// The type used to identify a process
68   typedef int process_id_type;
69 
70   /// The type used to count the number of processes
71   typedef int process_size_type;
72 
73   /// The type of communicator used to transmit data via MPI
74   typedef boost::mpi::communicator communicator_type;
75 
76   /// Classification of the capabilities of this process group
77   struct communication_category
78     : virtual boost::parallel::bsp_process_group_tag,
79       virtual mpi_process_group_tag { };
80 
81   // TBD: We can eliminate the "source" field and possibly the
82   // "offset" field.
83   struct message_header {
84     /// The process that sent the message
85     process_id_type source;
86 
87     /// The message tag
88     int tag;
89 
90     /// The offset of the message into the buffer
91     std::size_t offset;
92 
93     /// The length of the message in the buffer, in bytes
94     std::size_t bytes;
95 
96     template <class Archive>
serializeboost::graph::distributed::mpi_process_group::message_header97     void serialize(Archive& ar, int)
98     {
99       ar & source & tag & offset & bytes;
100     }
101   };
102 
103   /**
104    * Stores the outgoing messages for a particular processor.
105    *
106    * @todo Evaluate whether we should use a deque instance, which
107    * would reduce could reduce the cost of "sending" messages but
108    * increases the time spent in the synchronization step.
109    */
110   struct outgoing_messages {
outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages111         outgoing_messages() {}
~outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages112         ~outgoing_messages() {}
113 
114     std::vector<message_header> headers;
115     buffer_type                 buffer;
116 
117     template <class Archive>
serializeboost::graph::distributed::mpi_process_group::outgoing_messages118     void serialize(Archive& ar, int)
119     {
120       ar & headers & buffer;
121     }
122 
swapboost::graph::distributed::mpi_process_group::outgoing_messages123     void swap(outgoing_messages& x)
124     {
125       headers.swap(x.headers);
126       buffer.swap(x.buffer);
127     }
128   };
129 
130 private:
131   /**
132    * Virtual base from which every trigger will be launched. See @c
133    * trigger_launcher for more information.
134    */
135   class trigger_base : boost::noncopyable
136   {
137   public:
trigger_base(int tag)138     explicit trigger_base(int tag) : tag_(tag) { }
139 
140     /// Retrieve the tag associated with this trigger
tag() const141     int tag() const { return tag_; }
142 
~trigger_base()143     virtual ~trigger_base() { }
144 
145     /**
146      * Invoked to receive a message that matches a particular trigger.
147      *
148      * @param source      the source of the message
149      * @param tag         the (local) tag of the message
150      * @param context     the context under which the trigger is being
151      *                    invoked
152      */
153     virtual void
154     receive(mpi_process_group const& pg, int source, int tag,
155             trigger_receive_context context, int block=-1) const = 0;
156 
157   protected:
158     // The message tag associated with this trigger
159     int tag_;
160   };
161 
162   /**
163    * Launches a specific handler in response to a trigger. This
164    * function object wraps up the handler function object and a buffer
165    * for incoming data.
166    */
167   template<typename Type, typename Handler>
168   class trigger_launcher : public trigger_base
169   {
170   public:
trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)171     explicit trigger_launcher(mpi_process_group& self, int tag,
172                               const Handler& handler)
173       : trigger_base(tag), self(self), handler(handler)
174       {}
175 
176     void
177     receive(mpi_process_group const& pg, int source, int tag,
178             trigger_receive_context context, int block=-1) const;
179 
180   private:
181     mpi_process_group& self;
182     mutable Handler handler;
183   };
184 
185   /**
186    * Launches a specific handler with a message reply in response to a
187    * trigger. This function object wraps up the handler function
188    * object and a buffer for incoming data.
189    */
190   template<typename Type, typename Handler>
191   class reply_trigger_launcher : public trigger_base
192   {
193   public:
reply_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)194     explicit reply_trigger_launcher(mpi_process_group& self, int tag,
195                                     const Handler& handler)
196       : trigger_base(tag), self(self), handler(handler)
197       {}
198 
199     void
200     receive(mpi_process_group const& pg, int source, int tag,
201             trigger_receive_context context, int block=-1) const;
202 
203   private:
204     mpi_process_group& self;
205     mutable Handler handler;
206   };
207 
208   template<typename Type, typename Handler>
209   class global_trigger_launcher : public trigger_base
210   {
211   public:
global_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)212     explicit global_trigger_launcher(mpi_process_group& self, int tag,
213                               const Handler& handler)
214       : trigger_base(tag), handler(handler)
215       {
216       }
217 
218     void
219     receive(mpi_process_group const& pg, int source, int tag,
220             trigger_receive_context context, int block=-1) const;
221 
222   private:
223     mutable Handler handler;
224     // TBD: do not forget to cancel any outstanding Irecv when deleted,
225     // if we decide to use Irecv
226   };
227 
228   template<typename Type, typename Handler>
229   class global_irecv_trigger_launcher : public trigger_base
230   {
231   public:
global_irecv_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler,int sz)232     explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
233                               const Handler& handler, int sz)
234       : trigger_base(tag), handler(handler), buffer_size(sz)
235       {
236         prepare_receive(self,tag);
237       }
238 
239     void
240     receive(mpi_process_group const& pg, int source, int tag,
241             trigger_receive_context context, int block=-1) const;
242 
243   private:
244     void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
245     Handler handler;
246     int buffer_size;
247     // TBD: do not forget to cancel any outstanding Irecv when deleted,
248     // if we decide to use Irecv
249   };
250 
251 public:
252   /**
253    * Construct a new BSP process group from an MPI communicator. The
254    * MPI communicator will be duplicated to create a new communicator
255    * for this process group to use.
256    */
257   mpi_process_group(communicator_type parent_comm = communicator_type());
258 
259   /**
260    * Construct a new BSP process group from an MPI communicator. The
261    * MPI communicator will be duplicated to create a new communicator
262    * for this process group to use. This constructor allows to tune the
263    * size of message batches.
264    *
265    *   @param num_headers The maximum number of headers in a message batch
266    *
267    *   @param buffer_size The maximum size of the message buffer in a batch.
268    *
269    */
270   mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
271                      communicator_type parent_comm = communicator_type());
272 
273   /**
274    * Construct a copy of the BSP process group for a new distributed
275    * data structure. This data structure will synchronize with all
276    * other members of the process group's equivalence class (including
277    * @p other), but will have its own set of tags.
278    *
279    *   @param other The process group that this new process group will
280    *   be based on, using a different set of tags within the same
281    *   communication and synchronization space.
282    *
283    *   @param handler A message handler that will be passed (source,
284    *   tag) pairs for each message received by this data
285    *   structure. The handler is expected to receive the messages
286    *   immediately. The handler can be changed after-the-fact by
287    *   calling @c replace_handler.
288    *
289    *   @param out_of_band_receive An anachronism. TODO: remove this.
290    */
291   mpi_process_group(const mpi_process_group& other,
292                     const receiver_type& handler,
293                     bool out_of_band_receive = false);
294 
295   /**
296    * Construct a copy of the BSP process group for a new distributed
297    * data structure. This data structure will synchronize with all
298    * other members of the process group's equivalence class (including
299    * @p other), but will have its own set of tags.
300    */
301   mpi_process_group(const mpi_process_group& other,
302                     attach_distributed_object,
303                     bool out_of_band_receive = false);
304 
305   /**
306    * Create an "empty" process group, with no information. This is an
307    * internal routine that users should never need.
308    */
mpi_process_group(create_empty)309   explicit mpi_process_group(create_empty) {}
310 
311   /**
312    * Destroys this copy of the process group.
313    */
314   ~mpi_process_group();
315 
316   /**
317    * Replace the current message handler with a new message handler.
318    *
319    * @param handle The new message handler.
320    * @param out_of_band_receive An anachronism: remove this
321    */
322   void replace_handler(const receiver_type& handler,
323                        bool out_of_band_receive = false);
324 
325   /**
326    * Turns this process group into the process group for a new
327    * distributed data structure or object, allocating its own tag
328    * block.
329    */
330   void make_distributed_object();
331 
332   /**
333    * Replace the handler to be invoked at the beginning of synchronize.
334    */
335   void
336   replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
337 
338   /**
339    * Return the block number of the current data structure. A value of
340    * 0 indicates that this particular instance of the process group is
341    * not associated with any distributed data structure.
342    */
my_block_number() const343   int my_block_number() const { return block_num? *block_num : 0; }
344 
345   /**
346    * Encode a block number/tag pair into a single encoded tag for
347    * transmission.
348    */
encode_tag(int block_num,int tag) const349   int encode_tag(int block_num, int tag) const
350   { return block_num * max_tags + tag; }
351 
352   /**
353    * Decode an encoded tag into a block number/tag pair.
354    */
decode_tag(int encoded_tag) const355   std::pair<int, int> decode_tag(int encoded_tag) const
356   { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
357 
358   // @todo Actually write up the friend declarations so these could be
359   // private.
360 
361   // private:
362 
363   /** Allocate a block of tags for this instance. The block should not
364    * have been allocated already, e.g., my_block_number() ==
365    * 0. Returns the newly-allocated block number.
366    */
367   int allocate_block(bool out_of_band_receive = false);
368 
369   /** Potentially emit a receive event out of band. Returns true if an event
370    *  was actually sent, false otherwise.
371    */
372   bool maybe_emit_receive(int process, int encoded_tag) const;
373 
374   /** Emit a receive event. Returns true if an event was actually
375    * sent, false otherwise.
376    */
377   bool emit_receive(int process, int encoded_tag) const;
378 
379   /** Emit an on-synchronize event to all block handlers. */
380   void emit_on_synchronize() const;
381 
382   /** Retrieve a reference to the stored receiver in this block.  */
383   template<typename Receiver>
384   Receiver* get_receiver();
385 
386   template<typename T>
387   void
388   send_impl(int dest, int tag, const T& value,
389             mpl::true_ /*is_mpi_datatype*/) const;
390 
391   template<typename T>
392   void
393   send_impl(int dest, int tag, const T& value,
394             mpl::false_ /*is_mpi_datatype*/) const;
395 
396   template<typename T>
397   typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
398   array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
399 
400   template<typename T>
401   bool
402   receive_impl(int source, int tag, T& value,
403                mpl::true_ /*is_mpi_datatype*/) const;
404 
405   template<typename T>
406   bool
407   receive_impl(int source, int tag, T& value,
408                mpl::false_ /*is_mpi_datatype*/) const;
409 
410   // Receive an array of values
411   template<typename T>
412   typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
413   array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
414 
415   optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
416 
417   void synchronize() const;
418 
operator bool()419   operator bool() { return bool(impl_); }
420 
421   mpi_process_group base() const;
422 
423   /**
424    * Create a new trigger for a specific message tag. Triggers handle
425    * out-of-band messaging, and the handler itself will be called
426    * whenever a message is available. The handler itself accepts four
427    * arguments: the source of the message, the message tag (which will
428    * be the same as @p tag), the message data (of type @c Type), and a
429    * boolean flag that states whether the message was received
430    * out-of-band. The last will be @c true for out-of-band receives,
431    * or @c false for receives at the end of a synchronization step.
432    */
433   template<typename Type, typename Handler>
434   void trigger(int tag, const Handler& handler);
435 
436   /**
437    * Create a new trigger for a specific message tag, along with a way
438    * to send a reply with data back to the sender. Triggers handle
439    * out-of-band messaging, and the handler itself will be called
440    * whenever a message is available. The handler itself accepts four
441    * arguments: the source of the message, the message tag (which will
442    * be the same as @p tag), the message data (of type @c Type), and a
443    * boolean flag that states whether the message was received
444    * out-of-band. The last will be @c true for out-of-band receives,
445    * or @c false for receives at the end of a synchronization
446    * step. The handler also returns a value, which will be routed back
447    * to the sender.
448    */
449   template<typename Type, typename Handler>
450   void trigger_with_reply(int tag, const Handler& handler);
451 
452   template<typename Type, typename Handler>
453   void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
454 
455 
456 
457   /**
458    * Poll for any out-of-band messages. This routine will check if any
459    * out-of-band messages are available. Those that are available will
460    * be handled immediately, if possible.
461    *
462    * @returns if an out-of-band message has been received, but we are
463    * unable to actually receive the message, a (source, tag) pair will
464    * be returned. Otherwise, returns an empty optional.
465    *
466    * @param wait When true, we should block until a message comes in.
467    *
468    * @param synchronizing whether we are currently synchronizing the
469    *                      process group
470    */
471   optional<std::pair<int, int> >
472   poll(bool wait = false, int block = -1, bool synchronizing = false) const;
473 
474   /**
475    * Determines the context of the trigger currently executing. If
476    * multiple triggers are executing (recursively), then the context
477    * for the most deeply nested trigger will be returned. If no
478    * triggers are executing, returns @c trc_none. This might be used,
479    * for example, to determine whether a reply to a message should
480    * itself be sent out-of-band or whether it can go via the normal,
481    * slower communication route.
482    */
483   trigger_receive_context trigger_context() const;
484 
485   /// INTERNAL ONLY
486   void receive_batch(process_id_type source, outgoing_messages& batch) const;
487 
488   /// INTERNAL ONLY
489   ///
490   /// Determine the actual communicator and tag will be used for a
491   /// transmission with the given tag.
492   std::pair<boost::mpi::communicator, int>
493   actual_communicator_and_tag(int tag, int block) const;
494 
495   /// set the size of the message buffer used for buffered oob sends
496 
497   static void set_message_buffer_size(std::size_t s);
498 
499   /// get the size of the message buffer used for buffered oob sends
500 
501   static std::size_t message_buffer_size();
502   static int old_buffer_size;
503   static void* old_buffer;
504 private:
505 
506   void install_trigger(int tag, int block,
507       shared_ptr<trigger_base> const& launcher);
508 
509   void poll_requests(int block=-1) const;
510 
511 
512   // send a batch if the buffer is full now or would get full
513   void maybe_send_batch(process_id_type dest) const;
514 
515   // actually send a batch
516   void send_batch(process_id_type dest, outgoing_messages& batch) const;
517   void send_batch(process_id_type dest) const;
518 
519   void pack_headers() const;
520 
521   /**
522    * Process a batch of incoming messages immediately.
523    *
524    * @param source         the source of these messages
525    */
526   void process_batch(process_id_type source) const;
527   void receive_batch(boost::mpi::status& status) const;
528 
529   //void free_finished_sends() const;
530 
531   /// Status messages used internally by the process group
532   enum status_messages {
533     /// the first of the reserved message tags
534     msg_reserved_first = 126,
535     /// Sent from a processor when sending batched messages
536     msg_batch = 126,
537     /// Sent from a processor when sending large batched messages, larger than
538     /// the maximum buffer size for messages to be received by MPI_Irecv
539     msg_large_batch = 127,
540     /// Sent from a source processor to everyone else when that
541     /// processor has entered the synchronize() function.
542     msg_synchronizing = 128,
543     /// the last of the reserved message tags
544     msg_reserved_last = 128
545   };
546 
547   /**
548    * Description of a block of tags associated to a particular
549    * distributed data structure. This structure will live as long as
550    * the distributed data structure is around, and will be used to
551    * help send messages to the data structure.
552    */
553   struct block_type
554   {
block_typeboost::graph::distributed::mpi_process_group::block_type555     block_type() { }
556 
557     /// Handler for receive events
558     receiver_type     on_receive;
559 
560     /// Handler executed at the start of  synchronization
561     on_synchronize_event_type  on_synchronize;
562 
563     /// Individual message triggers. Note: at present, this vector is
564     /// indexed by the (local) tag of the trigger.  Any tags that
565     /// don't have triggers will have NULL pointers in that spot.
566     std::vector<shared_ptr<trigger_base> > triggers;
567   };
568 
569   /**
570    * Data structure containing all of the blocks for the distributed
571    * data structures attached to a process group.
572    */
573   typedef std::vector<block_type*> blocks_type;
574 
575   /// Iterator into @c blocks_type.
576   typedef blocks_type::iterator block_iterator;
577 
578   /**
579    * Deleter used to deallocate a block when its distributed data
580    * structure is destroyed. This type will be used as the deleter for
581    * @c block_num.
582    */
583   struct deallocate_block;
584 
585   static std::vector<char> message_buffer;
586 
587 public:
588   /**
589    * Data associated with the process group and all of its attached
590    * distributed data structures.
591    */
592   shared_ptr<impl> impl_;
593 
594   /**
595    * When non-null, indicates that this copy of the process group is
596    * associated with a particular distributed data structure. The
597    * integer value contains the block number (a value > 0) associated
598    * with that data structure. The deleter for this @c shared_ptr is a
599    * @c deallocate_block object that will deallocate the associated
600    * block in @c impl_->blocks.
601    */
602   shared_ptr<int>  block_num;
603 
604   /**
605    * Rank of this process, to avoid having to call rank() repeatedly.
606    */
607   int rank;
608 
609   /**
610    * Number of processes in this process group, to avoid having to
611    * call communicator::size() repeatedly.
612    */
613   int size;
614 };
615 
616 
617 
618 inline mpi_process_group::process_id_type
process_id(const mpi_process_group & pg)619 process_id(const mpi_process_group& pg)
620 { return pg.rank; }
621 
622 inline mpi_process_group::process_size_type
num_processes(const mpi_process_group & pg)623 num_processes(const mpi_process_group& pg)
624 { return pg.size; }
625 
626 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
627 
628 template<typename T>
629 void
630 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
631      int tag, const T& value);
632 
633 template<typename InputIterator>
634 void
635 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
636      int tag, InputIterator first, InputIterator last);
637 
638 template<typename T>
639 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,T * first,T * last)640 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
641      int tag, T* first, T* last)
642 { send(pg, dest, tag, first, last - first); }
643 
644 template<typename T>
645 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T * first,const T * last)646 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
647      int tag, const T* first, const T* last)
648 { send(pg, dest, tag, first, last - first); }
649 
650 template<typename T>
651 mpi_process_group::process_id_type
652 receive(const mpi_process_group& pg, int tag, T& value);
653 
654 template<typename T>
655 mpi_process_group::process_id_type
656 receive(const mpi_process_group& pg,
657         mpi_process_group::process_id_type source, int tag, T& value);
658 
659 optional<std::pair<mpi_process_group::process_id_type, int> >
660 probe(const mpi_process_group& pg);
661 
662 void synchronize(const mpi_process_group& pg);
663 
664 template<typename T, typename BinaryOperation>
665 T*
666 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
667            BinaryOperation bin_op);
668 
669 template<typename T, typename BinaryOperation>
670 T*
671 scan(const mpi_process_group& pg, T* first, T* last, T* out,
672            BinaryOperation bin_op);
673 
674 template<typename InputIterator, typename T>
675 void
676 all_gather(const mpi_process_group& pg,
677            InputIterator first, InputIterator last, std::vector<T>& out);
678 
679 template<typename InputIterator>
680 mpi_process_group
681 process_subgroup(const mpi_process_group& pg,
682                  InputIterator first, InputIterator last);
683 
684 template<typename T>
685 void
686 broadcast(const mpi_process_group& pg, T& val,
687           mpi_process_group::process_id_type root);
688 
689 
690 /*******************************************************************
691  * Out-of-band communication                                       *
692  *******************************************************************/
693 
694 template<typename T>
695 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)696 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
697          int tag, const T& value, int block=-1)
698 {
699   using boost::mpi::get_mpi_datatype;
700 
701   // Determine the actual message tag we will use for the send, and which
702   // communicator we will use.
703   std::pair<boost::mpi::communicator, int> actual
704     = pg.actual_communicator_and_tag(tag, block);
705 
706 #ifdef SEND_OOB_BSEND
707   if (mpi_process_group::message_buffer_size()) {
708     MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
709               actual.second, actual.first);
710     return;
711   }
712 #endif
713   MPI_Request request;
714   MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
715             actual.second, actual.first, &request);
716 
717   int done=0;
718   do {
719     pg.poll();
720     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
721   } while (!done);
722 }
723 
724 template<typename T>
725 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)726 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
727          int tag, const T& value, int block=-1)
728 {
729   using boost::mpi::packed_oarchive;
730 
731   // Determine the actual message tag we will use for the send, and which
732   // communicator we will use.
733   std::pair<boost::mpi::communicator, int> actual
734     = pg.actual_communicator_and_tag(tag, block);
735 
736   // Serialize the data into a buffer
737   packed_oarchive out(actual.first);
738   out << value;
739   std::size_t size = out.size();
740 
741   // Send the actual message data
742 #ifdef SEND_OOB_BSEND
743   if (mpi_process_group::message_buffer_size()) {
744     MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
745             dest, actual.second, actual.first);
746    return;
747   }
748 #endif
749   MPI_Request request;
750   MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
751             dest, actual.second, actual.first, &request);
752 
753   int done=0;
754   do {
755     pg.poll();
756     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
757   } while (!done);
758 }
759 
760 template<typename T>
761 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
762 receive_oob(const mpi_process_group& pg,
763             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
764 
765 template<typename T>
766 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
767 receive_oob(const mpi_process_group& pg,
768             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
769 
770 template<typename SendT, typename ReplyT>
771 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
772 send_oob_with_reply(const mpi_process_group& pg,
773                     mpi_process_group::process_id_type dest,
774                     int tag, const SendT& send_value, ReplyT& reply_value,
775                     int block = -1);
776 
777 template<typename SendT, typename ReplyT>
778 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
779 send_oob_with_reply(const mpi_process_group& pg,
780                     mpi_process_group::process_id_type dest,
781                     int tag, const SendT& send_value, ReplyT& reply_value,
782                     int block = -1);
783 
784 } } } // end namespace boost::graph::distributed
785 
786 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
787 namespace boost { namespace mpi {
788     template<>
789     struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
790 } } // end namespace boost::mpi
791 
792 namespace std {
793 /// optimized swap for outgoing messages
794 inline void
swap(boost::graph::distributed::mpi_process_group::outgoing_messages & x,boost::graph::distributed::mpi_process_group::outgoing_messages & y)795 swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
796      boost::graph::distributed::mpi_process_group::outgoing_messages& y)
797 {
798   x.swap(y);
799 }
800 
801 
802 }
803 
804 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
805 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
806 
807 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
808 
809 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP
810