1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
2 // Copyright (C) 2007   Douglas Gregor
3 // Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
4 
5 // Use, modification and distribution is subject to the Boost Software
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8 
9 //  Authors: Douglas Gregor
10 //           Matthias Troyer
11 //           Andrew Lumsdaine
12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
14 
15 #ifndef BOOST_GRAPH_USE_MPI
16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
17 #endif
18 
19 //#define NO_SPLIT_BATCHES
20 #define SEND_OOB_BSEND
21 
22 #include <boost/optional.hpp>
23 #include <boost/shared_ptr.hpp>
24 #include <boost/weak_ptr.hpp>
25 #include <utility>
26 #include <memory>
27 #include <boost/function/function1.hpp>
28 #include <boost/function/function2.hpp>
29 #include <boost/function/function0.hpp>
30 #include <boost/mpi.hpp>
31 #include <boost/property_map/parallel/process_group.hpp>
32 #include <boost/serialization/vector.hpp>
33 #include <boost/utility/enable_if.hpp>
34 
35 namespace boost { namespace graph { namespace distributed {
36 
37 // Process group tags
38 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
39 
40 class mpi_process_group
41 {
42   struct impl;
43 
44  public:
45   /// Number of tags available to each data structure.
46   static const int max_tags = 256;
47 
48   /**
49    * The type of a "receive" handler, that will be provided with
50    * (source, tag) pairs when a message is received. Users can provide a
51    * receive handler for a distributed data structure, for example, to
52    * automatically pick up and respond to messages as needed.
53    */
54   typedef function<void(int source, int tag)> receiver_type;
55 
56   /**
57    * The type of a handler for the on-synchronize event, which will be
58    * executed at the beginning of synchronize().
59    */
60   typedef function0<void>      on_synchronize_event_type;
61 
62   /// Used as a tag to help create an "empty" process group.
63   struct create_empty {};
64 
65   /// The type used to buffer message data
66   typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
67 
68   /// The type used to identify a process
69   typedef int process_id_type;
70 
71   /// The type used to count the number of processes
72   typedef int process_size_type;
73 
74   /// The type of communicator used to transmit data via MPI
75   typedef boost::mpi::communicator communicator_type;
76 
77   /// Classification of the capabilities of this process group
78   struct communication_category
79     : virtual boost::parallel::bsp_process_group_tag,
80       virtual mpi_process_group_tag { };
81 
82   // TBD: We can eliminate the "source" field and possibly the
83   // "offset" field.
84   struct message_header {
85     /// The process that sent the message
86     process_id_type source;
87 
88     /// The message tag
89     int tag;
90 
91     /// The offset of the message into the buffer
92     std::size_t offset;
93 
94     /// The length of the message in the buffer, in bytes
95     std::size_t bytes;
96 
97     template <class Archive>
serializeboost::graph::distributed::mpi_process_group::message_header98     void serialize(Archive& ar, int)
99     {
100       ar & source & tag & offset & bytes;
101     }
102   };
103 
104   /**
105    * Stores the outgoing messages for a particular processor.
106    *
107    * @todo Evaluate whether we should use a deque instance, which
108    * would reduce could reduce the cost of "sending" messages but
109    * increases the time spent in the synchronization step.
110    */
111   struct outgoing_messages {
outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages112         outgoing_messages() {}
~outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages113         ~outgoing_messages() {}
114 
115     std::vector<message_header> headers;
116     buffer_type                 buffer;
117 
118     template <class Archive>
serializeboost::graph::distributed::mpi_process_group::outgoing_messages119     void serialize(Archive& ar, int)
120     {
121       ar & headers & buffer;
122     }
123 
swapboost::graph::distributed::mpi_process_group::outgoing_messages124     void swap(outgoing_messages& x)
125     {
126       headers.swap(x.headers);
127       buffer.swap(x.buffer);
128     }
129   };
130 
131 private:
132   /**
133    * Virtual base from which every trigger will be launched. See @c
134    * trigger_launcher for more information.
135    */
136   class trigger_base : boost::noncopyable
137   {
138   public:
trigger_base(int tag)139     explicit trigger_base(int tag) : tag_(tag) { }
140 
141     /// Retrieve the tag associated with this trigger
tag() const142     int tag() const { return tag_; }
143 
~trigger_base()144     virtual ~trigger_base() { }
145 
146     /**
147      * Invoked to receive a message that matches a particular trigger.
148      *
149      * @param source      the source of the message
150      * @param tag         the (local) tag of the message
151      * @param context     the context under which the trigger is being
152      *                    invoked
153      */
154     virtual void
155     receive(mpi_process_group const& pg, int source, int tag,
156             trigger_receive_context context, int block=-1) const = 0;
157 
158   protected:
159     // The message tag associated with this trigger
160     int tag_;
161   };
162 
163   /**
164    * Launches a specific handler in response to a trigger. This
165    * function object wraps up the handler function object and a buffer
166    * for incoming data.
167    */
168   template<typename Type, typename Handler>
169   class trigger_launcher : public trigger_base
170   {
171   public:
trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)172     explicit trigger_launcher(mpi_process_group& self, int tag,
173                               const Handler& handler)
174       : trigger_base(tag), self(self), handler(handler)
175       {}
176 
177     void
178     receive(mpi_process_group const& pg, int source, int tag,
179             trigger_receive_context context, int block=-1) const;
180 
181   private:
182     mpi_process_group& self;
183     mutable Handler handler;
184   };
185 
186   /**
187    * Launches a specific handler with a message reply in response to a
188    * trigger. This function object wraps up the handler function
189    * object and a buffer for incoming data.
190    */
191   template<typename Type, typename Handler>
192   class reply_trigger_launcher : public trigger_base
193   {
194   public:
reply_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)195     explicit reply_trigger_launcher(mpi_process_group& self, int tag,
196                                     const Handler& handler)
197       : trigger_base(tag), self(self), handler(handler)
198       {}
199 
200     void
201     receive(mpi_process_group const& pg, int source, int tag,
202             trigger_receive_context context, int block=-1) const;
203 
204   private:
205     mpi_process_group& self;
206     mutable Handler handler;
207   };
208 
209   template<typename Type, typename Handler>
210   class global_trigger_launcher : public trigger_base
211   {
212   public:
global_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)213     explicit global_trigger_launcher(mpi_process_group& self, int tag,
214                               const Handler& handler)
215       : trigger_base(tag), handler(handler)
216       {
217       }
218 
219     void
220     receive(mpi_process_group const& pg, int source, int tag,
221             trigger_receive_context context, int block=-1) const;
222 
223   private:
224     mutable Handler handler;
225     // TBD: do not forget to cancel any outstanding Irecv when deleted,
226     // if we decide to use Irecv
227   };
228 
229   template<typename Type, typename Handler>
230   class global_irecv_trigger_launcher : public trigger_base
231   {
232   public:
global_irecv_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler,int sz)233     explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
234                               const Handler& handler, int sz)
235       : trigger_base(tag), handler(handler), buffer_size(sz)
236       {
237         prepare_receive(self,tag);
238       }
239 
240     void
241     receive(mpi_process_group const& pg, int source, int tag,
242             trigger_receive_context context, int block=-1) const;
243 
244   private:
245     void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
246     Handler handler;
247     int buffer_size;
248     // TBD: do not forget to cancel any outstanding Irecv when deleted,
249     // if we decide to use Irecv
250   };
251 
252 public:
253   /**
254    * Construct a new BSP process group from an MPI communicator. The
255    * MPI communicator will be duplicated to create a new communicator
256    * for this process group to use.
257    */
258   mpi_process_group(communicator_type parent_comm = communicator_type());
259 
260   /**
261    * Construct a new BSP process group from an MPI communicator. The
262    * MPI communicator will be duplicated to create a new communicator
263    * for this process group to use. This constructor allows to tune the
264    * size of message batches.
265    *
266    *   @param num_headers The maximum number of headers in a message batch
267    *
268    *   @param buffer_size The maximum size of the message buffer in a batch.
269    *
270    */
271   mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
272                      communicator_type parent_comm = communicator_type());
273 
274   /**
275    * Construct a copy of the BSP process group for a new distributed
276    * data structure. This data structure will synchronize with all
277    * other members of the process group's equivalence class (including
278    * @p other), but will have its own set of tags.
279    *
280    *   @param other The process group that this new process group will
281    *   be based on, using a different set of tags within the same
282    *   communication and synchronization space.
283    *
284    *   @param handler A message handler that will be passed (source,
285    *   tag) pairs for each message received by this data
286    *   structure. The handler is expected to receive the messages
287    *   immediately. The handler can be changed after-the-fact by
288    *   calling @c replace_handler.
289    *
290    *   @param out_of_band_receive An anachronism. TODO: remove this.
291    */
292   mpi_process_group(const mpi_process_group& other,
293                     const receiver_type& handler,
294                     bool out_of_band_receive = false);
295 
296   /**
297    * Construct a copy of the BSP process group for a new distributed
298    * data structure. This data structure will synchronize with all
299    * other members of the process group's equivalence class (including
300    * @p other), but will have its own set of tags.
301    */
302   mpi_process_group(const mpi_process_group& other,
303                     attach_distributed_object,
304                     bool out_of_band_receive = false);
305 
306   /**
307    * Create an "empty" process group, with no information. This is an
308    * internal routine that users should never need.
309    */
mpi_process_group(create_empty)310   explicit mpi_process_group(create_empty) {}
311 
312   /**
313    * Destroys this copy of the process group.
314    */
315   ~mpi_process_group();
316 
317   /**
318    * Replace the current message handler with a new message handler.
319    *
320    * @param handle The new message handler.
321    * @param out_of_band_receive An anachronism: remove this
322    */
323   void replace_handler(const receiver_type& handler,
324                        bool out_of_band_receive = false);
325 
326   /**
327    * Turns this process group into the process group for a new
328    * distributed data structure or object, allocating its own tag
329    * block.
330    */
331   void make_distributed_object();
332 
333   /**
334    * Replace the handler to be invoked at the beginning of synchronize.
335    */
336   void
337   replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
338 
339   /**
340    * Return the block number of the current data structure. A value of
341    * 0 indicates that this particular instance of the process group is
342    * not associated with any distributed data structure.
343    */
my_block_number() const344   int my_block_number() const { return block_num? *block_num : 0; }
345 
346   /**
347    * Encode a block number/tag pair into a single encoded tag for
348    * transmission.
349    */
encode_tag(int block_num,int tag) const350   int encode_tag(int block_num, int tag) const
351   { return block_num * max_tags + tag; }
352 
353   /**
354    * Decode an encoded tag into a block number/tag pair.
355    */
decode_tag(int encoded_tag) const356   std::pair<int, int> decode_tag(int encoded_tag) const
357   { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
358 
359   // @todo Actually write up the friend declarations so these could be
360   // private.
361 
362   // private:
363 
364   /** Allocate a block of tags for this instance. The block should not
365    * have been allocated already, e.g., my_block_number() ==
366    * 0. Returns the newly-allocated block number.
367    */
368   int allocate_block(bool out_of_band_receive = false);
369 
370   /** Potentially emit a receive event out of band. Returns true if an event
371    *  was actually sent, false otherwise.
372    */
373   bool maybe_emit_receive(int process, int encoded_tag) const;
374 
375   /** Emit a receive event. Returns true if an event was actually
376    * sent, false otherwise.
377    */
378   bool emit_receive(int process, int encoded_tag) const;
379 
380   /** Emit an on-synchronize event to all block handlers. */
381   void emit_on_synchronize() const;
382 
383   /** Retrieve a reference to the stored receiver in this block.  */
384   template<typename Receiver>
385   Receiver* get_receiver();
386 
387   template<typename T>
388   void
389   send_impl(int dest, int tag, const T& value,
390             mpl::true_ /*is_mpi_datatype*/) const;
391 
392   template<typename T>
393   void
394   send_impl(int dest, int tag, const T& value,
395             mpl::false_ /*is_mpi_datatype*/) const;
396 
397   template<typename T>
398   typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
399   array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
400 
401   template<typename T>
402   bool
403   receive_impl(int source, int tag, T& value,
404                mpl::true_ /*is_mpi_datatype*/) const;
405 
406   template<typename T>
407   bool
408   receive_impl(int source, int tag, T& value,
409                mpl::false_ /*is_mpi_datatype*/) const;
410 
411   // Receive an array of values
412   template<typename T>
413   typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
414   array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
415 
416   optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
417 
418   void synchronize() const;
419 
operator bool()420   operator bool() { return bool(impl_); }
421 
422   mpi_process_group base() const;
423 
424   /**
425    * Create a new trigger for a specific message tag. Triggers handle
426    * out-of-band messaging, and the handler itself will be called
427    * whenever a message is available. The handler itself accepts four
428    * arguments: the source of the message, the message tag (which will
429    * be the same as @p tag), the message data (of type @c Type), and a
430    * boolean flag that states whether the message was received
431    * out-of-band. The last will be @c true for out-of-band receives,
432    * or @c false for receives at the end of a synchronization step.
433    */
434   template<typename Type, typename Handler>
435   void trigger(int tag, const Handler& handler);
436 
437   /**
438    * Create a new trigger for a specific message tag, along with a way
439    * to send a reply with data back to the sender. Triggers handle
440    * out-of-band messaging, and the handler itself will be called
441    * whenever a message is available. The handler itself accepts four
442    * arguments: the source of the message, the message tag (which will
443    * be the same as @p tag), the message data (of type @c Type), and a
444    * boolean flag that states whether the message was received
445    * out-of-band. The last will be @c true for out-of-band receives,
446    * or @c false for receives at the end of a synchronization
447    * step. The handler also returns a value, which will be routed back
448    * to the sender.
449    */
450   template<typename Type, typename Handler>
451   void trigger_with_reply(int tag, const Handler& handler);
452 
453   template<typename Type, typename Handler>
454   void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
455 
456 
457 
458   /**
459    * Poll for any out-of-band messages. This routine will check if any
460    * out-of-band messages are available. Those that are available will
461    * be handled immediately, if possible.
462    *
463    * @returns if an out-of-band message has been received, but we are
464    * unable to actually receive the message, a (source, tag) pair will
465    * be returned. Otherwise, returns an empty optional.
466    *
467    * @param wait When true, we should block until a message comes in.
468    *
469    * @param synchronizing whether we are currently synchronizing the
470    *                      process group
471    */
472   optional<std::pair<int, int> >
473   poll(bool wait = false, int block = -1, bool synchronizing = false) const;
474 
475   /**
476    * Determines the context of the trigger currently executing. If
477    * multiple triggers are executing (recursively), then the context
478    * for the most deeply nested trigger will be returned. If no
479    * triggers are executing, returns @c trc_none. This might be used,
480    * for example, to determine whether a reply to a message should
481    * itself be sent out-of-band or whether it can go via the normal,
482    * slower communication route.
483    */
484   trigger_receive_context trigger_context() const;
485 
486   /// INTERNAL ONLY
487   void receive_batch(process_id_type source, outgoing_messages& batch) const;
488 
489   /// INTERNAL ONLY
490   ///
491   /// Determine the actual communicator and tag will be used for a
492   /// transmission with the given tag.
493   std::pair<boost::mpi::communicator, int>
494   actual_communicator_and_tag(int tag, int block) const;
495 
496   /// set the size of the message buffer used for buffered oob sends
497 
498   static void set_message_buffer_size(std::size_t s);
499 
500   /// get the size of the message buffer used for buffered oob sends
501 
502   static std::size_t message_buffer_size();
503   static int old_buffer_size;
504   static void* old_buffer;
505 private:
506 
507   void install_trigger(int tag, int block,
508       shared_ptr<trigger_base> const& launcher);
509 
510   void poll_requests(int block=-1) const;
511 
512 
513   // send a batch if the buffer is full now or would get full
514   void maybe_send_batch(process_id_type dest) const;
515 
516   // actually send a batch
517   void send_batch(process_id_type dest, outgoing_messages& batch) const;
518   void send_batch(process_id_type dest) const;
519 
520   void pack_headers() const;
521 
522   /**
523    * Process a batch of incoming messages immediately.
524    *
525    * @param source         the source of these messages
526    */
527   void process_batch(process_id_type source) const;
528   void receive_batch(boost::mpi::status& status) const;
529 
530   //void free_finished_sends() const;
531 
532   /// Status messages used internally by the process group
533   enum status_messages {
534     /// the first of the reserved message tags
535     msg_reserved_first = 126,
536     /// Sent from a processor when sending batched messages
537     msg_batch = 126,
538     /// Sent from a processor when sending large batched messages, larger than
539     /// the maximum buffer size for messages to be received by MPI_Irecv
540     msg_large_batch = 127,
541     /// Sent from a source processor to everyone else when that
542     /// processor has entered the synchronize() function.
543     msg_synchronizing = 128,
544     /// the last of the reserved message tags
545     msg_reserved_last = 128
546   };
547 
548   /**
549    * Description of a block of tags associated to a particular
550    * distributed data structure. This structure will live as long as
551    * the distributed data structure is around, and will be used to
552    * help send messages to the data structure.
553    */
554   struct block_type
555   {
block_typeboost::graph::distributed::mpi_process_group::block_type556     block_type() { }
557 
558     /// Handler for receive events
559     receiver_type     on_receive;
560 
561     /// Handler executed at the start of  synchronization
562     on_synchronize_event_type  on_synchronize;
563 
564     /// Individual message triggers. Note: at present, this vector is
565     /// indexed by the (local) tag of the trigger.  Any tags that
566     /// don't have triggers will have NULL pointers in that spot.
567     std::vector<shared_ptr<trigger_base> > triggers;
568   };
569 
570   /**
571    * Data structure containing all of the blocks for the distributed
572    * data structures attached to a process group.
573    */
574   typedef std::vector<block_type*> blocks_type;
575 
576   /// Iterator into @c blocks_type.
577   typedef blocks_type::iterator block_iterator;
578 
579   /**
580    * Deleter used to deallocate a block when its distributed data
581    * structure is destroyed. This type will be used as the deleter for
582    * @c block_num.
583    */
584   struct deallocate_block;
585 
586   static std::vector<char> message_buffer;
587 
588 public:
589   /**
590    * Data associated with the process group and all of its attached
591    * distributed data structures.
592    */
593   shared_ptr<impl> impl_;
594 
595   /**
596    * When non-null, indicates that this copy of the process group is
597    * associated with a particular distributed data structure. The
598    * integer value contains the block number (a value > 0) associated
599    * with that data structure. The deleter for this @c shared_ptr is a
600    * @c deallocate_block object that will deallocate the associated
601    * block in @c impl_->blocks.
602    */
603   shared_ptr<int>  block_num;
604 
605   /**
606    * Rank of this process, to avoid having to call rank() repeatedly.
607    */
608   int rank;
609 
610   /**
611    * Number of processes in this process group, to avoid having to
612    * call communicator::size() repeatedly.
613    */
614   int size;
615 };
616 
617 inline mpi_process_group::process_id_type
process_id(const mpi_process_group & pg)618 process_id(const mpi_process_group& pg)
619 { return pg.rank; }
620 
621 inline mpi_process_group::process_size_type
num_processes(const mpi_process_group & pg)622 num_processes(const mpi_process_group& pg)
623 { return pg.size; }
624 
625 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
626 
627 template<typename T>
628 void
629 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
630      int tag, const T& value);
631 
632 template<typename InputIterator>
633 void
634 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
635      int tag, InputIterator first, InputIterator last);
636 
637 template<typename T>
638 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,T * first,T * last)639 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
640      int tag, T* first, T* last)
641 { send(pg, dest, tag, first, last - first); }
642 
643 template<typename T>
644 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T * first,const T * last)645 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
646      int tag, const T* first, const T* last)
647 { send(pg, dest, tag, first, last - first); }
648 
649 template<typename T>
650 mpi_process_group::process_id_type
651 receive(const mpi_process_group& pg, int tag, T& value);
652 
653 template<typename T>
654 mpi_process_group::process_id_type
655 receive(const mpi_process_group& pg,
656         mpi_process_group::process_id_type source, int tag, T& value);
657 
658 optional<std::pair<mpi_process_group::process_id_type, int> >
659 probe(const mpi_process_group& pg);
660 
661 void synchronize(const mpi_process_group& pg);
662 
663 template<typename T, typename BinaryOperation>
664 T*
665 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
666            BinaryOperation bin_op);
667 
668 template<typename T, typename BinaryOperation>
669 T*
670 scan(const mpi_process_group& pg, T* first, T* last, T* out,
671            BinaryOperation bin_op);
672 
673 template<typename InputIterator, typename T>
674 void
675 all_gather(const mpi_process_group& pg,
676            InputIterator first, InputIterator last, std::vector<T>& out);
677 
678 template<typename InputIterator>
679 mpi_process_group
680 process_subgroup(const mpi_process_group& pg,
681                  InputIterator first, InputIterator last);
682 
683 template<typename T>
684 void
685 broadcast(const mpi_process_group& pg, T& val,
686           mpi_process_group::process_id_type root);
687 
688 
689 /// optimized swap for outgoing messages
690 inline void
swap(mpi_process_group::outgoing_messages & x,mpi_process_group::outgoing_messages & y)691 swap(mpi_process_group::outgoing_messages& x,
692      mpi_process_group::outgoing_messages& y)
693 {
694   x.swap(y);
695 }
696 
697 
698 /*******************************************************************
699  * Out-of-band communication                                       *
700  *******************************************************************/
701 
702 template<typename T>
703 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)704 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
705          int tag, const T& value, int block=-1)
706 {
707   using boost::mpi::get_mpi_datatype;
708 
709   // Determine the actual message tag we will use for the send, and which
710   // communicator we will use.
711   std::pair<boost::mpi::communicator, int> actual
712     = pg.actual_communicator_and_tag(tag, block);
713 
714 #ifdef SEND_OOB_BSEND
715   if (mpi_process_group::message_buffer_size()) {
716     MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
717               actual.second, actual.first);
718     return;
719   }
720 #endif
721   MPI_Request request;
722   MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
723             actual.second, actual.first, &request);
724 
725   int done=0;
726   do {
727     pg.poll();
728     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
729   } while (!done);
730 }
731 
732 template<typename T>
733 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)734 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
735          int tag, const T& value, int block=-1)
736 {
737   using boost::mpi::packed_oarchive;
738 
739   // Determine the actual message tag we will use for the send, and which
740   // communicator we will use.
741   std::pair<boost::mpi::communicator, int> actual
742     = pg.actual_communicator_and_tag(tag, block);
743 
744   // Serialize the data into a buffer
745   packed_oarchive out(actual.first);
746   out << value;
747   std::size_t size = out.size();
748 
749   // Send the actual message data
750 #ifdef SEND_OOB_BSEND
751   if (mpi_process_group::message_buffer_size()) {
752     MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
753             dest, actual.second, actual.first);
754    return;
755   }
756 #endif
757   MPI_Request request;
758   MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
759             dest, actual.second, actual.first, &request);
760 
761   int done=0;
762   do {
763     pg.poll();
764     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
765   } while (!done);
766 }
767 
768 template<typename T>
769 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
770 receive_oob(const mpi_process_group& pg,
771             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
772 
773 template<typename T>
774 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
775 receive_oob(const mpi_process_group& pg,
776             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
777 
778 template<typename SendT, typename ReplyT>
779 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
780 send_oob_with_reply(const mpi_process_group& pg,
781                     mpi_process_group::process_id_type dest,
782                     int tag, const SendT& send_value, ReplyT& reply_value,
783                     int block = -1);
784 
785 template<typename SendT, typename ReplyT>
786 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
787 send_oob_with_reply(const mpi_process_group& pg,
788                     mpi_process_group::process_id_type dest,
789                     int tag, const SendT& send_value, ReplyT& reply_value,
790                     int block = -1);
791 
792 } } } // end namespace boost::graph::distributed
793 
794 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
795 namespace boost { namespace mpi {
796     template<>
797     struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
798 } } // end namespace boost::mpi
799 
800 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
801 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
802 
803 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
804 
805 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP
806