1 // Copyright (C) 2004-2008 The Trustees of Indiana University.
2 // Copyright (C) 2007 Douglas Gregor
3 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
4
5 // Use, modification and distribution is subject to the Boost Software
6 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8
9 // Authors: Douglas Gregor
10 // Matthias Troyer
11 // Andrew Lumsdaine
12 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
13 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
14
15 #ifndef BOOST_GRAPH_USE_MPI
16 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
17 #endif
18
19 //#define NO_SPLIT_BATCHES
20 #define SEND_OOB_BSEND
21
22 #include <boost/optional.hpp>
23 #include <boost/shared_ptr.hpp>
24 #include <boost/weak_ptr.hpp>
25 #include <utility>
26 #include <memory>
27 #include <boost/function/function1.hpp>
28 #include <boost/function/function2.hpp>
29 #include <boost/function/function0.hpp>
30 #include <boost/mpi.hpp>
31 #include <boost/property_map/parallel/process_group.hpp>
32 #include <boost/utility/enable_if.hpp>
33
34 namespace boost { namespace graph { namespace distributed {
35
36 // Process group tags
37 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
38
39 class mpi_process_group
40 {
41 struct impl;
42
43 public:
44 /// Number of tags available to each data structure.
45 static const int max_tags = 256;
46
47 /**
48 * The type of a "receive" handler, that will be provided with
49 * (source, tag) pairs when a message is received. Users can provide a
50 * receive handler for a distributed data structure, for example, to
51 * automatically pick up and respond to messages as needed.
52 */
53 typedef function<void(int source, int tag)> receiver_type;
54
55 /**
56 * The type of a handler for the on-synchronize event, which will be
57 * executed at the beginning of synchronize().
58 */
59 typedef function0<void> on_synchronize_event_type;
60
61 /// Used as a tag to help create an "empty" process group.
62 struct create_empty {};
63
64 /// The type used to buffer message data
65 typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
66
67 /// The type used to identify a process
68 typedef int process_id_type;
69
70 /// The type used to count the number of processes
71 typedef int process_size_type;
72
73 /// The type of communicator used to transmit data via MPI
74 typedef boost::mpi::communicator communicator_type;
75
76 /// Classification of the capabilities of this process group
77 struct communication_category
78 : virtual boost::parallel::bsp_process_group_tag,
79 virtual mpi_process_group_tag { };
80
81 // TBD: We can eliminate the "source" field and possibly the
82 // "offset" field.
83 struct message_header {
84 /// The process that sent the message
85 process_id_type source;
86
87 /// The message tag
88 int tag;
89
90 /// The offset of the message into the buffer
91 std::size_t offset;
92
93 /// The length of the message in the buffer, in bytes
94 std::size_t bytes;
95
96 template <class Archive>
serializeboost::graph::distributed::mpi_process_group::message_header97 void serialize(Archive& ar, int)
98 {
99 ar & source & tag & offset & bytes;
100 }
101 };
102
103 /**
104 * Stores the outgoing messages for a particular processor.
105 *
106 * @todo Evaluate whether we should use a deque instance, which
107 * would reduce could reduce the cost of "sending" messages but
108 * increases the time spent in the synchronization step.
109 */
110 struct outgoing_messages {
outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages111 outgoing_messages() {}
~outgoing_messagesboost::graph::distributed::mpi_process_group::outgoing_messages112 ~outgoing_messages() {}
113
114 std::vector<message_header> headers;
115 buffer_type buffer;
116
117 template <class Archive>
serializeboost::graph::distributed::mpi_process_group::outgoing_messages118 void serialize(Archive& ar, int)
119 {
120 ar & headers & buffer;
121 }
122
swapboost::graph::distributed::mpi_process_group::outgoing_messages123 void swap(outgoing_messages& x)
124 {
125 headers.swap(x.headers);
126 buffer.swap(x.buffer);
127 }
128 };
129
130 private:
131 /**
132 * Virtual base from which every trigger will be launched. See @c
133 * trigger_launcher for more information.
134 */
135 class trigger_base : boost::noncopyable
136 {
137 public:
trigger_base(int tag)138 explicit trigger_base(int tag) : tag_(tag) { }
139
140 /// Retrieve the tag associated with this trigger
tag() const141 int tag() const { return tag_; }
142
~trigger_base()143 virtual ~trigger_base() { }
144
145 /**
146 * Invoked to receive a message that matches a particular trigger.
147 *
148 * @param source the source of the message
149 * @param tag the (local) tag of the message
150 * @param context the context under which the trigger is being
151 * invoked
152 */
153 virtual void
154 receive(mpi_process_group const& pg, int source, int tag,
155 trigger_receive_context context, int block=-1) const = 0;
156
157 protected:
158 // The message tag associated with this trigger
159 int tag_;
160 };
161
162 /**
163 * Launches a specific handler in response to a trigger. This
164 * function object wraps up the handler function object and a buffer
165 * for incoming data.
166 */
167 template<typename Type, typename Handler>
168 class trigger_launcher : public trigger_base
169 {
170 public:
trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)171 explicit trigger_launcher(mpi_process_group& self, int tag,
172 const Handler& handler)
173 : trigger_base(tag), self(self), handler(handler)
174 {}
175
176 void
177 receive(mpi_process_group const& pg, int source, int tag,
178 trigger_receive_context context, int block=-1) const;
179
180 private:
181 mpi_process_group& self;
182 mutable Handler handler;
183 };
184
185 /**
186 * Launches a specific handler with a message reply in response to a
187 * trigger. This function object wraps up the handler function
188 * object and a buffer for incoming data.
189 */
190 template<typename Type, typename Handler>
191 class reply_trigger_launcher : public trigger_base
192 {
193 public:
reply_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)194 explicit reply_trigger_launcher(mpi_process_group& self, int tag,
195 const Handler& handler)
196 : trigger_base(tag), self(self), handler(handler)
197 {}
198
199 void
200 receive(mpi_process_group const& pg, int source, int tag,
201 trigger_receive_context context, int block=-1) const;
202
203 private:
204 mpi_process_group& self;
205 mutable Handler handler;
206 };
207
208 template<typename Type, typename Handler>
209 class global_trigger_launcher : public trigger_base
210 {
211 public:
global_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler)212 explicit global_trigger_launcher(mpi_process_group& self, int tag,
213 const Handler& handler)
214 : trigger_base(tag), handler(handler)
215 {
216 }
217
218 void
219 receive(mpi_process_group const& pg, int source, int tag,
220 trigger_receive_context context, int block=-1) const;
221
222 private:
223 mutable Handler handler;
224 // TBD: do not forget to cancel any outstanding Irecv when deleted,
225 // if we decide to use Irecv
226 };
227
228 template<typename Type, typename Handler>
229 class global_irecv_trigger_launcher : public trigger_base
230 {
231 public:
global_irecv_trigger_launcher(mpi_process_group & self,int tag,const Handler & handler,int sz)232 explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
233 const Handler& handler, int sz)
234 : trigger_base(tag), handler(handler), buffer_size(sz)
235 {
236 prepare_receive(self,tag);
237 }
238
239 void
240 receive(mpi_process_group const& pg, int source, int tag,
241 trigger_receive_context context, int block=-1) const;
242
243 private:
244 void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
245 Handler handler;
246 int buffer_size;
247 // TBD: do not forget to cancel any outstanding Irecv when deleted,
248 // if we decide to use Irecv
249 };
250
251 public:
252 /**
253 * Construct a new BSP process group from an MPI communicator. The
254 * MPI communicator will be duplicated to create a new communicator
255 * for this process group to use.
256 */
257 mpi_process_group(communicator_type parent_comm = communicator_type());
258
259 /**
260 * Construct a new BSP process group from an MPI communicator. The
261 * MPI communicator will be duplicated to create a new communicator
262 * for this process group to use. This constructor allows to tune the
263 * size of message batches.
264 *
265 * @param num_headers The maximum number of headers in a message batch
266 *
267 * @param buffer_size The maximum size of the message buffer in a batch.
268 *
269 */
270 mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
271 communicator_type parent_comm = communicator_type());
272
273 /**
274 * Construct a copy of the BSP process group for a new distributed
275 * data structure. This data structure will synchronize with all
276 * other members of the process group's equivalence class (including
277 * @p other), but will have its own set of tags.
278 *
279 * @param other The process group that this new process group will
280 * be based on, using a different set of tags within the same
281 * communication and synchronization space.
282 *
283 * @param handler A message handler that will be passed (source,
284 * tag) pairs for each message received by this data
285 * structure. The handler is expected to receive the messages
286 * immediately. The handler can be changed after-the-fact by
287 * calling @c replace_handler.
288 *
289 * @param out_of_band_receive An anachronism. TODO: remove this.
290 */
291 mpi_process_group(const mpi_process_group& other,
292 const receiver_type& handler,
293 bool out_of_band_receive = false);
294
295 /**
296 * Construct a copy of the BSP process group for a new distributed
297 * data structure. This data structure will synchronize with all
298 * other members of the process group's equivalence class (including
299 * @p other), but will have its own set of tags.
300 */
301 mpi_process_group(const mpi_process_group& other,
302 attach_distributed_object,
303 bool out_of_band_receive = false);
304
305 /**
306 * Create an "empty" process group, with no information. This is an
307 * internal routine that users should never need.
308 */
mpi_process_group(create_empty)309 explicit mpi_process_group(create_empty) {}
310
311 /**
312 * Destroys this copy of the process group.
313 */
314 ~mpi_process_group();
315
316 /**
317 * Replace the current message handler with a new message handler.
318 *
319 * @param handle The new message handler.
320 * @param out_of_band_receive An anachronism: remove this
321 */
322 void replace_handler(const receiver_type& handler,
323 bool out_of_band_receive = false);
324
325 /**
326 * Turns this process group into the process group for a new
327 * distributed data structure or object, allocating its own tag
328 * block.
329 */
330 void make_distributed_object();
331
332 /**
333 * Replace the handler to be invoked at the beginning of synchronize.
334 */
335 void
336 replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
337
338 /**
339 * Return the block number of the current data structure. A value of
340 * 0 indicates that this particular instance of the process group is
341 * not associated with any distributed data structure.
342 */
my_block_number() const343 int my_block_number() const { return block_num? *block_num : 0; }
344
345 /**
346 * Encode a block number/tag pair into a single encoded tag for
347 * transmission.
348 */
encode_tag(int block_num,int tag) const349 int encode_tag(int block_num, int tag) const
350 { return block_num * max_tags + tag; }
351
352 /**
353 * Decode an encoded tag into a block number/tag pair.
354 */
decode_tag(int encoded_tag) const355 std::pair<int, int> decode_tag(int encoded_tag) const
356 { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
357
358 // @todo Actually write up the friend declarations so these could be
359 // private.
360
361 // private:
362
363 /** Allocate a block of tags for this instance. The block should not
364 * have been allocated already, e.g., my_block_number() ==
365 * 0. Returns the newly-allocated block number.
366 */
367 int allocate_block(bool out_of_band_receive = false);
368
369 /** Potentially emit a receive event out of band. Returns true if an event
370 * was actually sent, false otherwise.
371 */
372 bool maybe_emit_receive(int process, int encoded_tag) const;
373
374 /** Emit a receive event. Returns true if an event was actually
375 * sent, false otherwise.
376 */
377 bool emit_receive(int process, int encoded_tag) const;
378
379 /** Emit an on-synchronize event to all block handlers. */
380 void emit_on_synchronize() const;
381
382 /** Retrieve a reference to the stored receiver in this block. */
383 template<typename Receiver>
384 Receiver* get_receiver();
385
386 template<typename T>
387 void
388 send_impl(int dest, int tag, const T& value,
389 mpl::true_ /*is_mpi_datatype*/) const;
390
391 template<typename T>
392 void
393 send_impl(int dest, int tag, const T& value,
394 mpl::false_ /*is_mpi_datatype*/) const;
395
396 template<typename T>
397 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
398 array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
399
400 template<typename T>
401 bool
402 receive_impl(int source, int tag, T& value,
403 mpl::true_ /*is_mpi_datatype*/) const;
404
405 template<typename T>
406 bool
407 receive_impl(int source, int tag, T& value,
408 mpl::false_ /*is_mpi_datatype*/) const;
409
410 // Receive an array of values
411 template<typename T>
412 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
413 array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
414
415 optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
416
417 void synchronize() const;
418
operator bool()419 operator bool() { return bool(impl_); }
420
421 mpi_process_group base() const;
422
423 /**
424 * Create a new trigger for a specific message tag. Triggers handle
425 * out-of-band messaging, and the handler itself will be called
426 * whenever a message is available. The handler itself accepts four
427 * arguments: the source of the message, the message tag (which will
428 * be the same as @p tag), the message data (of type @c Type), and a
429 * boolean flag that states whether the message was received
430 * out-of-band. The last will be @c true for out-of-band receives,
431 * or @c false for receives at the end of a synchronization step.
432 */
433 template<typename Type, typename Handler>
434 void trigger(int tag, const Handler& handler);
435
436 /**
437 * Create a new trigger for a specific message tag, along with a way
438 * to send a reply with data back to the sender. Triggers handle
439 * out-of-band messaging, and the handler itself will be called
440 * whenever a message is available. The handler itself accepts four
441 * arguments: the source of the message, the message tag (which will
442 * be the same as @p tag), the message data (of type @c Type), and a
443 * boolean flag that states whether the message was received
444 * out-of-band. The last will be @c true for out-of-band receives,
445 * or @c false for receives at the end of a synchronization
446 * step. The handler also returns a value, which will be routed back
447 * to the sender.
448 */
449 template<typename Type, typename Handler>
450 void trigger_with_reply(int tag, const Handler& handler);
451
452 template<typename Type, typename Handler>
453 void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
454
455
456
457 /**
458 * Poll for any out-of-band messages. This routine will check if any
459 * out-of-band messages are available. Those that are available will
460 * be handled immediately, if possible.
461 *
462 * @returns if an out-of-band message has been received, but we are
463 * unable to actually receive the message, a (source, tag) pair will
464 * be returned. Otherwise, returns an empty optional.
465 *
466 * @param wait When true, we should block until a message comes in.
467 *
468 * @param synchronizing whether we are currently synchronizing the
469 * process group
470 */
471 optional<std::pair<int, int> >
472 poll(bool wait = false, int block = -1, bool synchronizing = false) const;
473
474 /**
475 * Determines the context of the trigger currently executing. If
476 * multiple triggers are executing (recursively), then the context
477 * for the most deeply nested trigger will be returned. If no
478 * triggers are executing, returns @c trc_none. This might be used,
479 * for example, to determine whether a reply to a message should
480 * itself be sent out-of-band or whether it can go via the normal,
481 * slower communication route.
482 */
483 trigger_receive_context trigger_context() const;
484
485 /// INTERNAL ONLY
486 void receive_batch(process_id_type source, outgoing_messages& batch) const;
487
488 /// INTERNAL ONLY
489 ///
490 /// Determine the actual communicator and tag will be used for a
491 /// transmission with the given tag.
492 std::pair<boost::mpi::communicator, int>
493 actual_communicator_and_tag(int tag, int block) const;
494
495 /// set the size of the message buffer used for buffered oob sends
496
497 static void set_message_buffer_size(std::size_t s);
498
499 /// get the size of the message buffer used for buffered oob sends
500
501 static std::size_t message_buffer_size();
502 static int old_buffer_size;
503 static void* old_buffer;
504 private:
505
506 void install_trigger(int tag, int block,
507 shared_ptr<trigger_base> const& launcher);
508
509 void poll_requests(int block=-1) const;
510
511
512 // send a batch if the buffer is full now or would get full
513 void maybe_send_batch(process_id_type dest) const;
514
515 // actually send a batch
516 void send_batch(process_id_type dest, outgoing_messages& batch) const;
517 void send_batch(process_id_type dest) const;
518
519 void pack_headers() const;
520
521 /**
522 * Process a batch of incoming messages immediately.
523 *
524 * @param source the source of these messages
525 */
526 void process_batch(process_id_type source) const;
527 void receive_batch(boost::mpi::status& status) const;
528
529 //void free_finished_sends() const;
530
531 /// Status messages used internally by the process group
532 enum status_messages {
533 /// the first of the reserved message tags
534 msg_reserved_first = 126,
535 /// Sent from a processor when sending batched messages
536 msg_batch = 126,
537 /// Sent from a processor when sending large batched messages, larger than
538 /// the maximum buffer size for messages to be received by MPI_Irecv
539 msg_large_batch = 127,
540 /// Sent from a source processor to everyone else when that
541 /// processor has entered the synchronize() function.
542 msg_synchronizing = 128,
543 /// the last of the reserved message tags
544 msg_reserved_last = 128
545 };
546
547 /**
548 * Description of a block of tags associated to a particular
549 * distributed data structure. This structure will live as long as
550 * the distributed data structure is around, and will be used to
551 * help send messages to the data structure.
552 */
553 struct block_type
554 {
block_typeboost::graph::distributed::mpi_process_group::block_type555 block_type() { }
556
557 /// Handler for receive events
558 receiver_type on_receive;
559
560 /// Handler executed at the start of synchronization
561 on_synchronize_event_type on_synchronize;
562
563 /// Individual message triggers. Note: at present, this vector is
564 /// indexed by the (local) tag of the trigger. Any tags that
565 /// don't have triggers will have NULL pointers in that spot.
566 std::vector<shared_ptr<trigger_base> > triggers;
567 };
568
569 /**
570 * Data structure containing all of the blocks for the distributed
571 * data structures attached to a process group.
572 */
573 typedef std::vector<block_type*> blocks_type;
574
575 /// Iterator into @c blocks_type.
576 typedef blocks_type::iterator block_iterator;
577
578 /**
579 * Deleter used to deallocate a block when its distributed data
580 * structure is destroyed. This type will be used as the deleter for
581 * @c block_num.
582 */
583 struct deallocate_block;
584
585 static std::vector<char> message_buffer;
586
587 public:
588 /**
589 * Data associated with the process group and all of its attached
590 * distributed data structures.
591 */
592 shared_ptr<impl> impl_;
593
594 /**
595 * When non-null, indicates that this copy of the process group is
596 * associated with a particular distributed data structure. The
597 * integer value contains the block number (a value > 0) associated
598 * with that data structure. The deleter for this @c shared_ptr is a
599 * @c deallocate_block object that will deallocate the associated
600 * block in @c impl_->blocks.
601 */
602 shared_ptr<int> block_num;
603
604 /**
605 * Rank of this process, to avoid having to call rank() repeatedly.
606 */
607 int rank;
608
609 /**
610 * Number of processes in this process group, to avoid having to
611 * call communicator::size() repeatedly.
612 */
613 int size;
614 };
615
616
617
618 inline mpi_process_group::process_id_type
process_id(const mpi_process_group & pg)619 process_id(const mpi_process_group& pg)
620 { return pg.rank; }
621
622 inline mpi_process_group::process_size_type
num_processes(const mpi_process_group & pg)623 num_processes(const mpi_process_group& pg)
624 { return pg.size; }
625
626 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
627
628 template<typename T>
629 void
630 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
631 int tag, const T& value);
632
633 template<typename InputIterator>
634 void
635 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
636 int tag, InputIterator first, InputIterator last);
637
638 template<typename T>
639 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,T * first,T * last)640 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
641 int tag, T* first, T* last)
642 { send(pg, dest, tag, first, last - first); }
643
644 template<typename T>
645 inline void
send(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T * first,const T * last)646 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
647 int tag, const T* first, const T* last)
648 { send(pg, dest, tag, first, last - first); }
649
650 template<typename T>
651 mpi_process_group::process_id_type
652 receive(const mpi_process_group& pg, int tag, T& value);
653
654 template<typename T>
655 mpi_process_group::process_id_type
656 receive(const mpi_process_group& pg,
657 mpi_process_group::process_id_type source, int tag, T& value);
658
659 optional<std::pair<mpi_process_group::process_id_type, int> >
660 probe(const mpi_process_group& pg);
661
662 void synchronize(const mpi_process_group& pg);
663
664 template<typename T, typename BinaryOperation>
665 T*
666 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
667 BinaryOperation bin_op);
668
669 template<typename T, typename BinaryOperation>
670 T*
671 scan(const mpi_process_group& pg, T* first, T* last, T* out,
672 BinaryOperation bin_op);
673
674 template<typename InputIterator, typename T>
675 void
676 all_gather(const mpi_process_group& pg,
677 InputIterator first, InputIterator last, std::vector<T>& out);
678
679 template<typename InputIterator>
680 mpi_process_group
681 process_subgroup(const mpi_process_group& pg,
682 InputIterator first, InputIterator last);
683
684 template<typename T>
685 void
686 broadcast(const mpi_process_group& pg, T& val,
687 mpi_process_group::process_id_type root);
688
689
690 /*******************************************************************
691 * Out-of-band communication *
692 *******************************************************************/
693
694 template<typename T>
695 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)696 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
697 int tag, const T& value, int block=-1)
698 {
699 using boost::mpi::get_mpi_datatype;
700
701 // Determine the actual message tag we will use for the send, and which
702 // communicator we will use.
703 std::pair<boost::mpi::communicator, int> actual
704 = pg.actual_communicator_and_tag(tag, block);
705
706 #ifdef SEND_OOB_BSEND
707 if (mpi_process_group::message_buffer_size()) {
708 MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
709 actual.second, actual.first);
710 return;
711 }
712 #endif
713 MPI_Request request;
714 MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
715 actual.second, actual.first, &request);
716
717 int done=0;
718 do {
719 pg.poll();
720 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
721 } while (!done);
722 }
723
724 template<typename T>
725 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
send_oob(const mpi_process_group & pg,mpi_process_group::process_id_type dest,int tag,const T & value,int block=-1)726 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
727 int tag, const T& value, int block=-1)
728 {
729 using boost::mpi::packed_oarchive;
730
731 // Determine the actual message tag we will use for the send, and which
732 // communicator we will use.
733 std::pair<boost::mpi::communicator, int> actual
734 = pg.actual_communicator_and_tag(tag, block);
735
736 // Serialize the data into a buffer
737 packed_oarchive out(actual.first);
738 out << value;
739 std::size_t size = out.size();
740
741 // Send the actual message data
742 #ifdef SEND_OOB_BSEND
743 if (mpi_process_group::message_buffer_size()) {
744 MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
745 dest, actual.second, actual.first);
746 return;
747 }
748 #endif
749 MPI_Request request;
750 MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
751 dest, actual.second, actual.first, &request);
752
753 int done=0;
754 do {
755 pg.poll();
756 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
757 } while (!done);
758 }
759
760 template<typename T>
761 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
762 receive_oob(const mpi_process_group& pg,
763 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
764
765 template<typename T>
766 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
767 receive_oob(const mpi_process_group& pg,
768 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
769
770 template<typename SendT, typename ReplyT>
771 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
772 send_oob_with_reply(const mpi_process_group& pg,
773 mpi_process_group::process_id_type dest,
774 int tag, const SendT& send_value, ReplyT& reply_value,
775 int block = -1);
776
777 template<typename SendT, typename ReplyT>
778 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
779 send_oob_with_reply(const mpi_process_group& pg,
780 mpi_process_group::process_id_type dest,
781 int tag, const SendT& send_value, ReplyT& reply_value,
782 int block = -1);
783
784 } } } // end namespace boost::graph::distributed
785
786 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
787 namespace boost { namespace mpi {
788 template<>
789 struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
790 } } // end namespace boost::mpi
791
792 namespace std {
793 /// optimized swap for outgoing messages
794 inline void
swap(boost::graph::distributed::mpi_process_group::outgoing_messages & x,boost::graph::distributed::mpi_process_group::outgoing_messages & y)795 swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
796 boost::graph::distributed::mpi_process_group::outgoing_messages& y)
797 {
798 x.swap(y);
799 }
800
801
802 }
803
804 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
805 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
806
807 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
808
809 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP
810