1// -*- C++ -*-
2
3// Copyright (C) 2004-2008 The Trustees of Indiana University.
4// Copyright (C) 2007  Douglas Gregor <doug.gregor@gmail.com>
5// Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
6
7// Use, modification and distribution is subject to the Boost Software
8// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
9// http://www.boost.org/LICENSE_1_0.txt)
10
11//  Authors: Douglas Gregor
12//           Andrew Lumsdaine
13//           Matthias Troyer
14
15//#define PBGL_PROCESS_GROUP_DEBUG
16
17#ifndef BOOST_GRAPH_USE_MPI
18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19#endif
20
21#include <boost/assert.hpp>
22#include <algorithm>
23#include <boost/graph/parallel/detail/untracked_pair.hpp>
24#include <numeric>
25#include <iterator>
26#include <functional>
27#include <vector>
28#include <queue>
29#include <stack>
30#include <boost/graph/distributed/detail/tag_allocator.hpp>
31#include <stdio.h>
32
33// #define PBGL_PROCESS_GROUP_DEBUG
34
35#ifdef PBGL_PROCESS_GROUP_DEBUG
36#  include <iostream>
37#endif
38
39namespace boost { namespace graph { namespace distributed {
40
41struct mpi_process_group::impl
42{
43
44  typedef mpi_process_group::message_header message_header;
45  typedef mpi_process_group::outgoing_messages outgoing_messages;
46
47  /**
48   * Stores the incoming messages from a particular processor.
49   *
50   * @todo Evaluate whether we should use a deque instance, which
51   * would reduce could reduce the cost of "receiving" messages and
52     allow us to deallocate memory earlier, but increases the time
53     spent in the synchronization step.
54   */
55  struct incoming_messages {
56    incoming_messages();
57    ~incoming_messages() {}
58
59    std::vector<message_header> headers;
60    buffer_type                 buffer;
61    std::vector<std::vector<message_header>::iterator> next_header;
62  };
63
64  struct batch_request {
65    MPI_Request request;
66    buffer_type buffer;
67  };
68
69  // send once we have a certain number of messages or bytes in the buffer
70  // these numbers need to be tuned, we keep them small at first for testing
71  std::size_t batch_header_number;
72  std::size_t batch_buffer_size;
73  std::size_t batch_message_size;
74
75  /**
76   * The actual MPI communicator used to transmit data.
77   */
78  boost::mpi::communicator             comm;
79
80  /**
81   * The MPI communicator used to transmit out-of-band replies.
82   */
83  boost::mpi::communicator             oob_reply_comm;
84
85  /// Outgoing message information, indexed by destination processor.
86  std::vector<outgoing_messages> outgoing;
87
88  /// Incoming message information, indexed by source processor.
89  std::vector<incoming_messages> incoming;
90
91  /// The numbers of processors that have entered a synchronization stage
92  std::vector<int> processors_synchronizing_stage;
93
94  /// The synchronization stage of a processor
95  std::vector<int> synchronizing_stage;
96
97  /// Number of processors still sending messages
98  std::vector<int> synchronizing_unfinished;
99
100  /// Number of batches sent since last synchronization stage
101  std::vector<int> number_sent_batches;
102
103  /// Number of batches received minus number of expected batches
104  std::vector<int> number_received_batches;
105
106
107  /// The context of the currently-executing trigger, or @c trc_none
108  /// if no trigger is executing.
109  trigger_receive_context trigger_context;
110
111  /// Non-zero indicates that we're processing batches
112  /// Increment this when processing patches,
113  /// decrement it when you're done.
114  int processing_batches;
115
116  /**
117   * Contains all of the active blocks corresponding to attached
118   * distributed data structures.
119   */
120  blocks_type blocks;
121
122  /// Whether we are currently synchronizing
123  bool synchronizing;
124
125  /// The MPI requests for posted sends of oob messages
126  std::vector<MPI_Request> requests;
127
128  /// The MPI buffers for posted irecvs of oob messages
129  std::map<int,buffer_type> buffers;
130
131  /// Queue for message batches received while already processing messages
132  std::queue<std::pair<int,outgoing_messages> > new_batches;
133  /// Maximum encountered size of the new_batches queue
134  std::size_t max_received;
135
136  /// The MPI requests and buffers for batchess being sent
137  std::list<batch_request> sent_batches;
138  /// Maximum encountered size of the sent_batches list
139  std::size_t max_sent;
140
141  /// Pre-allocated requests in a pool
142  std::vector<batch_request> batch_pool;
143  /// A stack controlling which batches are available
144  std::stack<std::size_t> free_batches;
145
146  void free_sent_batches();
147
148  // Tag allocator
149  detail::tag_allocator allocated_tags;
150
151  impl(std::size_t num_headers, std::size_t buffers_size,
152       communicator_type parent_comm);
153  ~impl();
154
155private:
156  void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
157};
158
159inline trigger_receive_context mpi_process_group::trigger_context() const
160{
161  return impl_->trigger_context;
162}
163
164template<typename T>
165void
166mpi_process_group::send_impl(int dest, int tag, const T& value,
167                             mpl::true_ /*is_mpi_datatype*/) const
168{
169  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
170
171  impl::outgoing_messages& outgoing = impl_->outgoing[dest];
172
173  // Start constructing the message header
174  impl::message_header header;
175  header.source = process_id(*this);
176  header.tag = tag;
177  header.offset = outgoing.buffer.size();
178
179  boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
180  oa << value;
181
182#ifdef PBGL_PROCESS_GROUP_DEBUG
183  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
184            << tag << ", bytes = " << packed_size << std::endl;
185#endif
186
187  // Store the header
188  header.bytes = outgoing.buffer.size() - header.offset;
189  outgoing.headers.push_back(header);
190
191  maybe_send_batch(dest);
192}
193
194
195template<typename T>
196void
197mpi_process_group::send_impl(int dest, int tag, const T& value,
198                             mpl::false_ /*is_mpi_datatype*/) const
199{
200  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
201
202  impl::outgoing_messages& outgoing = impl_->outgoing[dest];
203
204  // Start constructing the message header
205  impl::message_header header;
206  header.source = process_id(*this);
207  header.tag = tag;
208  header.offset = outgoing.buffer.size();
209
210  // Serialize into the buffer
211  boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
212  out << value;
213
214  // Store the header
215  header.bytes = outgoing.buffer.size() - header.offset;
216  outgoing.headers.push_back(header);
217  maybe_send_batch(dest);
218
219#ifdef PBGL_PROCESS_GROUP_DEBUG
220  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
221            << tag << ", bytes = " << header.bytes << std::endl;
222#endif
223}
224
225template<typename T>
226inline void
227send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
228     int tag, const T& value)
229{
230  pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
231               boost::mpi::is_mpi_datatype<T>());
232}
233
234template<typename T>
235typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
236send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
237     int tag, const T values[], std::size_t n)
238{
239  pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
240                 boost::serialization::make_array(values,n),
241                 boost::mpl::true_());
242}
243
244template<typename T>
245typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
246mpi_process_group::
247array_send_impl(int dest, int tag, const T values[], std::size_t n) const
248{
249  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
250
251  impl::outgoing_messages& outgoing = impl_->outgoing[dest];
252
253  // Start constructing the message header
254  impl::message_header header;
255  header.source = process_id(*this);
256  header.tag = tag;
257  header.offset = outgoing.buffer.size();
258
259  // Serialize into the buffer
260  boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
261  out << n;
262
263  for (std::size_t i = 0; i < n; ++i)
264    out << values[i];
265
266  // Store the header
267  header.bytes = outgoing.buffer.size() - header.offset;
268  outgoing.headers.push_back(header);
269  maybe_send_batch(dest);
270
271#ifdef PBGL_PROCESS_GROUP_DEBUG
272  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
273            << tag << ", bytes = " << header.bytes << std::endl;
274#endif
275}
276
277template<typename T>
278typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
279send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
280     int tag, const T values[], std::size_t n)
281{
282  pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
283                     values, n);
284}
285
286template<typename InputIterator>
287void
288send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
289     int tag, InputIterator first, InputIterator last)
290{
291  typedef typename std::iterator_traits<InputIterator>::value_type value_type;
292  std::vector<value_type> values(first, last);
293  if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
294  else send(pg, dest, tag, &values[0], values.size());
295}
296
297template<typename T>
298bool
299mpi_process_group::receive_impl(int source, int tag, T& value,
300                                mpl::true_ /*is_mpi_datatype*/) const
301{
302#ifdef PBGL_PROCESS_GROUP_DEBUG
303  std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
304            << tag << std::endl;
305#endif
306
307  impl::incoming_messages& incoming = impl_->incoming[source];
308
309  // Find the next header with the right tag
310  std::vector<impl::message_header>::iterator header =
311    incoming.next_header[my_block_number()];
312  while (header != incoming.headers.end() && header->tag != tag) ++header;
313
314  // If no header is found, notify the caller
315  if (header == incoming.headers.end()) return false;
316
317  // Unpack the data
318  if (header->bytes > 0) {
319    boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
320                                   archive::no_header, header->offset);
321    ia >> value;
322  }
323
324  // Mark this message as received
325  header->tag = -1;
326
327  // Move the "next header" indicator to the next unreceived message
328  while (incoming.next_header[my_block_number()] != incoming.headers.end()
329         && incoming.next_header[my_block_number()]->tag == -1)
330    ++incoming.next_header[my_block_number()];
331
332  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
333    bool finished = true;
334    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
335      if (incoming.next_header[i] != incoming.headers.end()) finished = false;
336    }
337
338    if (finished) {
339      std::vector<impl::message_header> no_headers;
340      incoming.headers.swap(no_headers);
341      buffer_type empty_buffer;
342      incoming.buffer.swap(empty_buffer);
343      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
344        incoming.next_header[i] = incoming.headers.end();
345    }
346  }
347
348  return true;
349}
350
351template<typename T>
352bool
353mpi_process_group::receive_impl(int source, int tag, T& value,
354                                mpl::false_ /*is_mpi_datatype*/) const
355{
356  impl::incoming_messages& incoming = impl_->incoming[source];
357
358  // Find the next header with the right tag
359  std::vector<impl::message_header>::iterator header =
360    incoming.next_header[my_block_number()];
361  while (header != incoming.headers.end() && header->tag != tag) ++header;
362
363  // If no header is found, notify the caller
364  if (header == incoming.headers.end()) return false;
365
366  // Deserialize the data
367  boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
368                                 archive::no_header, header->offset);
369  in >> value;
370
371  // Mark this message as received
372  header->tag = -1;
373
374  // Move the "next header" indicator to the next unreceived message
375  while (incoming.next_header[my_block_number()] != incoming.headers.end()
376         && incoming.next_header[my_block_number()]->tag == -1)
377    ++incoming.next_header[my_block_number()];
378
379  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
380    bool finished = true;
381    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
382      if (incoming.next_header[i] != incoming.headers.end()) finished = false;
383    }
384
385    if (finished) {
386      std::vector<impl::message_header> no_headers;
387      incoming.headers.swap(no_headers);
388      buffer_type empty_buffer;
389      incoming.buffer.swap(empty_buffer);
390      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
391        incoming.next_header[i] = incoming.headers.end();
392    }
393  }
394
395  return true;
396}
397
398template<typename T>
399typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
400mpi_process_group::
401array_receive_impl(int source, int tag, T* values, std::size_t& n) const
402{
403  impl::incoming_messages& incoming = impl_->incoming[source];
404
405  // Find the next header with the right tag
406  std::vector<impl::message_header>::iterator header =
407    incoming.next_header[my_block_number()];
408  while (header != incoming.headers.end() && header->tag != tag) ++header;
409
410  // If no header is found, notify the caller
411  if (header == incoming.headers.end()) return false;
412
413  // Deserialize the data
414  boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
415                                 archive::no_header, header->offset);
416  std::size_t num_sent;
417  in >> num_sent;
418  if (num_sent > n)
419    std::cerr << "ERROR: Have " << num_sent << " items but only space for "
420              << n << " items\n";
421
422  for (std::size_t i = 0; i < num_sent; ++i)
423    in >> values[i];
424  n = num_sent;
425
426  // Mark this message as received
427  header->tag = -1;
428
429  // Move the "next header" indicator to the next unreceived message
430  while (incoming.next_header[my_block_number()] != incoming.headers.end()
431         && incoming.next_header[my_block_number()]->tag == -1)
432    ++incoming.next_header[my_block_number()];
433
434  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
435    bool finished = true;
436    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
437      if (incoming.next_header[i] != incoming.headers.end()) finished = false;
438    }
439
440    if (finished) {
441      std::vector<impl::message_header> no_headers;
442      incoming.headers.swap(no_headers);
443      buffer_type empty_buffer;
444      incoming.buffer.swap(empty_buffer);
445      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
446        incoming.next_header[i] = incoming.headers.end();
447    }
448  }
449
450  return true;
451}
452
453// Construct triggers
454template<typename Type, typename Handler>
455void mpi_process_group::trigger(int tag, const Handler& handler)
456{
457  BOOST_ASSERT(block_num);
458  install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
459    new trigger_launcher<Type, Handler>(*this, tag, handler)));
460}
461
462template<typename Type, typename Handler>
463void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
464{
465  BOOST_ASSERT(block_num);
466  install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
467    new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
468}
469
470template<typename Type, typename Handler>
471void mpi_process_group::global_trigger(int tag, const Handler& handler,
472      std::size_t sz)
473{
474  if (sz==0) // normal trigger
475    install_trigger(tag,0,shared_ptr<trigger_base>(
476      new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
477  else // trigger with irecv
478    install_trigger(tag,0,shared_ptr<trigger_base>(
479      new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
480
481}
482
483namespace detail {
484
485template<typename Type>
486void  do_oob_receive(mpi_process_group const& self,
487    int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
488{
489  using boost::mpi::get_mpi_datatype;
490
491  //self.impl_->comm.recv(source,tag,data);
492  MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
493           MPI_STATUS_IGNORE);
494}
495
496template<typename Type>
497void do_oob_receive(mpi_process_group const& self,
498    int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
499{
500  //  self.impl_->comm.recv(source,tag,data);
501  // Receive the size of the data packet
502  boost::mpi::status status;
503  status = self.impl_->comm.probe(source, tag);
504
505#if BOOST_VERSION >= 103600
506  int size = status.count<boost::mpi::packed>().get();
507#else
508  int size;
509  MPI_Status& mpi_status = status;
510  MPI_Get_count(&mpi_status, MPI_PACKED, &size);
511#endif
512
513  // Receive the data packed itself
514  boost::mpi::packed_iarchive in(self.impl_->comm);
515  in.resize(size);
516  MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
517       MPI_STATUS_IGNORE);
518
519  // Deserialize the data
520  in >> data;
521}
522
523template<typename Type>
524void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
525{
526  do_oob_receive(self, source, tag, data,
527                           boost::mpi::is_mpi_datatype<Type>());
528}
529
530
531} // namespace detail
532
533
534template<typename Type, typename Handler>
535void
536mpi_process_group::trigger_launcher<Type, Handler>::
537receive(mpi_process_group const&, int source, int tag,
538        trigger_receive_context context, int block) const
539{
540#ifdef PBGL_PROCESS_GROUP_DEBUG
541  std::cerr << (out_of_band? "OOB trigger" : "Trigger")
542            << " receive from source " << source << " and tag " << tag
543        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
544#endif
545
546  Type data;
547
548  if (context == trc_out_of_band) {
549    // Receive the message directly off the wire
550    int realtag  = self.encode_tag(
551      block == -1 ? self.my_block_number() : block, tag);
552    detail::do_oob_receive(self,source,realtag,data);
553  }
554  else
555    // Receive the message out of the local buffer
556    boost::graph::distributed::receive(self, source, tag, data);
557
558  // Pass the message off to the handler
559  handler(source, tag, data, context);
560}
561
562template<typename Type, typename Handler>
563void
564mpi_process_group::reply_trigger_launcher<Type, Handler>::
565receive(mpi_process_group const&, int source, int tag,
566        trigger_receive_context context, int block) const
567{
568#ifdef PBGL_PROCESS_GROUP_DEBUG
569  std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
570            << " receive from source " << source << " and tag " << tag
571        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
572#endif
573  BOOST_ASSERT(context == trc_out_of_band);
574
575  boost::parallel::detail::untracked_pair<int, Type> data;
576
577  // Receive the message directly off the wire
578  int realtag  = self.encode_tag(block == -1 ? self.my_block_number() : block,
579                                 tag);
580  detail::do_oob_receive(self, source, realtag, data);
581
582  // Pass the message off to the handler and send the result back to
583  // the source.
584  send_oob(self, source, data.first,
585           handler(source, tag, data.second, context), -2);
586}
587
588template<typename Type, typename Handler>
589void
590mpi_process_group::global_trigger_launcher<Type, Handler>::
591receive(mpi_process_group const& self, int source, int tag,
592        trigger_receive_context context, int block) const
593{
594#ifdef PBGL_PROCESS_GROUP_DEBUG
595  std::cerr << (out_of_band? "OOB trigger" : "Trigger")
596            << " receive from source " << source << " and tag " << tag
597        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
598#endif
599
600  Type data;
601
602  if (context == trc_out_of_band) {
603    // Receive the message directly off the wire
604    int realtag  = self.encode_tag(
605      block == -1 ? self.my_block_number() : block, tag);
606    detail::do_oob_receive(self,source,realtag,data);
607  }
608  else
609    // Receive the message out of the local buffer
610    boost::graph::distributed::receive(self, source, tag, data);
611
612  // Pass the message off to the handler
613  handler(self, source, tag, data, context);
614}
615
616
617template<typename Type, typename Handler>
618void
619mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
620receive(mpi_process_group const& self, int source, int tag,
621        trigger_receive_context context, int block) const
622{
623#ifdef PBGL_PROCESS_GROUP_DEBUG
624  std::cerr << (out_of_band? "OOB trigger" : "Trigger")
625            << " receive from source " << source << " and tag " << tag
626        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
627#endif
628
629  Type data;
630
631  if (context == trc_out_of_band) {
632    return;
633  }
634  BOOST_ASSERT (context == trc_irecv_out_of_band);
635
636  // force posting of new MPI_Irecv, even though buffer is already allocated
637  boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
638  ia >> data;
639  // Start a new receive
640  prepare_receive(self,tag,true);
641  // Pass the message off to the handler
642  handler(self, source, tag, data, context);
643}
644
645
646template<typename Type, typename Handler>
647void
648mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
649prepare_receive(mpi_process_group const& self, int tag, bool force) const
650{
651#ifdef PBGL_PROCESS_GROUP_DEBUG
652 std::cerr << ("Posting Irecv for trigger")
653      << " receive with tag " << tag << std::endl;
654#endif
655  if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
656    self.impl_->buffers[tag].resize(buffer_size);
657    force = true;
658  }
659  BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
660
661  //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
662  if (force) {
663    self.impl_->requests.push_back(MPI_Request());
664    MPI_Request* request = &self.impl_->requests.back();
665    MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
666               MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
667  }
668}
669
670
671template<typename T>
672inline mpi_process_group::process_id_type
673receive(const mpi_process_group& pg, int tag, T& value)
674{
675  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
676    if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
677                        value, boost::mpi::is_mpi_datatype<T>()))
678      return source;
679  }
680  BOOST_ASSERT (false);
681}
682
683template<typename T>
684typename
685  enable_if<boost::mpi::is_mpi_datatype<T>,
686            std::pair<mpi_process_group::process_id_type, std::size_t> >::type
687receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
688{
689  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
690    bool result =
691      pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
692                 boost::serialization::make_array(values,n),
693                 boost::mpl::true_());
694    if (result)
695      return std::make_pair(source, n);
696  }
697  BOOST_ASSERT(false);
698}
699
700template<typename T>
701typename
702  disable_if<boost::mpi::is_mpi_datatype<T>,
703             std::pair<mpi_process_group::process_id_type, std::size_t> >::type
704receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
705{
706  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
707    if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
708                              values, n))
709      return std::make_pair(source, n);
710  }
711  BOOST_ASSERT(false);
712}
713
714template<typename T>
715mpi_process_group::process_id_type
716receive(const mpi_process_group& pg,
717        mpi_process_group::process_id_type source, int tag, T& value)
718{
719  if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
720                      value, boost::mpi::is_mpi_datatype<T>()))
721    return source;
722  else {
723    fprintf(stderr,
724            "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
725            process_id(pg), source, tag, pg.my_block_number());
726
727    BOOST_ASSERT(false);
728    exit(1);
729  }
730}
731
732template<typename T>
733typename
734  enable_if<boost::mpi::is_mpi_datatype<T>,
735            std::pair<mpi_process_group::process_id_type, std::size_t> >::type
736receive(const mpi_process_group& pg, int source, int tag, T values[],
737        std::size_t n)
738{
739  if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
740                      boost::serialization::make_array(values,n),
741                      boost::mpl::true_()))
742    return std::make_pair(source,n);
743  else {
744    fprintf(stderr,
745            "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
746            process_id(pg), source, tag, pg.my_block_number());
747
748    BOOST_ASSERT(false);
749    exit(1);
750  }
751}
752
753template<typename T>
754typename
755  disable_if<boost::mpi::is_mpi_datatype<T>,
756             std::pair<mpi_process_group::process_id_type, std::size_t> >::type
757receive(const mpi_process_group& pg, int source, int tag, T values[],
758        std::size_t n)
759{
760  pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
761                        values, n);
762
763  return std::make_pair(source, n);
764}
765
766template<typename T, typename BinaryOperation>
767T*
768all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
769           BinaryOperation bin_op)
770{
771  synchronize(pg);
772
773  bool inplace = first == out;
774
775  if (inplace) out = new T [last-first];
776
777  boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
778                                                  boost::mpi::comm_attach),
779                         first, last-first, out, bin_op);
780
781  if (inplace) {
782    std::copy(out, out + (last-first), first);
783    delete [] out;
784    return last;
785  }
786
787  return out;
788}
789
790template<typename T>
791void
792broadcast(const mpi_process_group& pg, T& val,
793          mpi_process_group::process_id_type root)
794{
795  // broadcast the seed
796  boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
797  boost::mpi::broadcast(comm,val,root);
798}
799
800
801template<typename T, typename BinaryOperation>
802T*
803scan(const mpi_process_group& pg, T* first, T* last, T* out,
804           BinaryOperation bin_op)
805{
806  synchronize(pg);
807
808  bool inplace = first == out;
809
810  if (inplace) out = new T [last-first];
811
812  boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
813
814  if (inplace) {
815    std::copy(out, out + (last-first), first);
816    delete [] out;
817    return last;
818  }
819
820  return out;
821}
822
823
824template<typename InputIterator, typename T>
825void
826all_gather(const mpi_process_group& pg, InputIterator first,
827           InputIterator last, std::vector<T>& out)
828{
829  synchronize(pg);
830
831  // Stick a copy of the local values into a vector, so we can broadcast it
832  std::vector<T> local_values(first, last);
833
834  // Collect the number of vertices stored in each process
835  int size = local_values.size();
836  std::vector<int> sizes(num_processes(pg));
837  int result = MPI_Allgather(&size, 1, MPI_INT,
838                             &sizes[0], 1, MPI_INT,
839                             communicator(pg));
840  BOOST_ASSERT(result == MPI_SUCCESS);
841
842  // Adjust sizes based on the number of bytes
843  std::transform(sizes.begin(), sizes.end(), sizes.begin(),
844                 std::bind2nd(std::multiplies<int>(), sizeof(T)));
845
846  // Compute displacements
847  std::vector<int> displacements;
848  displacements.reserve(sizes.size() + 1);
849  displacements.push_back(0);
850  std::partial_sum(sizes.begin(), sizes.end(),
851                   std::back_inserter(displacements));
852
853  // Gather all of the values
854  out.resize(displacements.back() / sizeof(T));
855  if (!out.empty()) {
856    result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
857                            /* local results */: (void*)&local_values[0],
858                            local_values.size() * sizeof(T),
859                            MPI_BYTE,
860                            &out[0], &sizes[0], &displacements[0], MPI_BYTE,
861                            communicator(pg));
862  }
863  BOOST_ASSERT(result == MPI_SUCCESS);
864}
865
866template<typename InputIterator>
867mpi_process_group
868process_subgroup(const mpi_process_group& pg,
869                 InputIterator first, InputIterator last)
870{
871/*
872  boost::mpi::group current_group = communicator(pg).group();
873  boost::mpi::group new_group = current_group.include(first,last);
874  boost::mpi::communicator new_comm(communicator(pg),new_group);
875  return mpi_process_group(new_comm);
876*/
877  std::vector<int> ranks(first, last);
878
879  MPI_Group current_group;
880  int result = MPI_Comm_group(communicator(pg), &current_group);
881  BOOST_ASSERT(result == MPI_SUCCESS);
882
883  MPI_Group new_group;
884  result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
885  BOOST_ASSERT(result == MPI_SUCCESS);
886
887  MPI_Comm new_comm;
888  result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
889  BOOST_ASSERT(result == MPI_SUCCESS);
890
891  result = MPI_Group_free(&new_group);
892  BOOST_ASSERT(result == MPI_SUCCESS);
893  result = MPI_Group_free(&current_group);
894  BOOST_ASSERT(result == MPI_SUCCESS);
895
896  if (new_comm != MPI_COMM_NULL) {
897    mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
898    result = MPI_Comm_free(&new_comm);
899    BOOST_ASSERT(result == 0);
900    return result_pg;
901  } else {
902    return mpi_process_group(mpi_process_group::create_empty());
903  }
904
905}
906
907
908template<typename Receiver>
909Receiver* mpi_process_group::get_receiver()
910{
911  return impl_->blocks[my_block_number()]->on_receive
912           .template target<Receiver>();
913}
914
915template<typename T>
916typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
917receive_oob(const mpi_process_group& pg,
918            mpi_process_group::process_id_type source, int tag, T& value, int block)
919{
920  using boost::mpi::get_mpi_datatype;
921
922  // Determine the actual message we expect to receive, and which
923  // communicator it will come by.
924  std::pair<boost::mpi::communicator, int> actual
925    = pg.actual_communicator_and_tag(tag, block);
926
927  // Post a non-blocking receive that waits until we complete this request.
928  MPI_Request request;
929  MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
930            source, actual.second, actual.first, &request);
931
932  int done = 0;
933  do {
934    MPI_Test(&request, &done, MPI_STATUS_IGNORE);
935    if (!done)
936      pg.poll(/*wait=*/false, block);
937  } while (!done);
938}
939
940template<typename T>
941typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
942receive_oob(const mpi_process_group& pg,
943            mpi_process_group::process_id_type source, int tag, T& value, int block)
944{
945  // Determine the actual message we expect to receive, and which
946  // communicator it will come by.
947  std::pair<boost::mpi::communicator, int> actual
948    = pg.actual_communicator_and_tag(tag, block);
949
950  boost::optional<boost::mpi::status> status;
951  do {
952    status = actual.first.iprobe(source, actual.second);
953    if (!status)
954      pg.poll();
955  } while (!status);
956
957  //actual.first.recv(status->source(), status->tag(),value);
958
959  // Allocate the receive buffer
960  boost::mpi::packed_iarchive in(actual.first);
961
962#if BOOST_VERSION >= 103600
963  in.resize(status->count<boost::mpi::packed>().get());
964#else
965  int size;
966  MPI_Status mpi_status = *status;
967  MPI_Get_count(&mpi_status, MPI_PACKED, &size);
968  in.resize(size);
969#endif
970
971  // Receive the message data
972  MPI_Recv(in.address(), in.size(), MPI_PACKED,
973           status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
974
975  // Unpack the message data
976  in >> value;
977}
978
979
980template<typename SendT, typename ReplyT>
981typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
982send_oob_with_reply(const mpi_process_group& pg,
983                    mpi_process_group::process_id_type dest,
984                    int tag, const SendT& send_value, ReplyT& reply_value,
985                    int block)
986{
987  detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
988  send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
989        (int)reply_tag, send_value), block);
990  receive_oob(pg, dest, reply_tag, reply_value);
991}
992
993template<typename SendT, typename ReplyT>
994typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
995send_oob_with_reply(const mpi_process_group& pg,
996                    mpi_process_group::process_id_type dest,
997                    int tag, const SendT& send_value, ReplyT& reply_value,
998                    int block)
999{
1000  detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1001  send_oob(pg, dest, tag,
1002           boost::parallel::detail::make_untracked_pair((int)reply_tag,
1003                                                        send_value), block);
1004  receive_oob(pg, dest, reply_tag, reply_value);
1005}
1006
1007} } } // end namespace boost::graph::distributed
1008