1// -*- C++ -*-
2
3// Copyright (C) 2004-2008 The Trustees of Indiana University.
4// Copyright (C) 2007  Douglas Gregor <doug.gregor@gmail.com>
5// Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
6
7// Use, modification and distribution is subject to the Boost Software
8// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
9// http://www.boost.org/LICENSE_1_0.txt)
10
11//  Authors: Douglas Gregor
12//           Andrew Lumsdaine
13//           Matthias Troyer
14
15//#define PBGL_PROCESS_GROUP_DEBUG
16
17#ifndef BOOST_GRAPH_USE_MPI
18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19#endif
20
21#include <boost/assert.hpp>
22#include <algorithm>
23#include <boost/graph/parallel/detail/untracked_pair.hpp>
24#include <numeric>
25#include <iterator>
26#include <functional>
27#include <vector>
28#include <queue>
29#include <stack>
30#include <list>
31#include <boost/graph/distributed/detail/tag_allocator.hpp>
32#include <stdio.h>
33
34// #define PBGL_PROCESS_GROUP_DEBUG
35
36#ifdef PBGL_PROCESS_GROUP_DEBUG
37#  include <iostream>
38#endif
39
40namespace boost { namespace graph { namespace distributed {
41
42struct mpi_process_group::impl
43{
44
45  typedef mpi_process_group::message_header message_header;
46  typedef mpi_process_group::outgoing_messages outgoing_messages;
47
48  /**
49   * Stores the incoming messages from a particular processor.
50   *
51   * @todo Evaluate whether we should use a deque instance, which
52   * would reduce could reduce the cost of "receiving" messages and
53     allow us to deallocate memory earlier, but increases the time
54     spent in the synchronization step.
55   */
56  struct incoming_messages {
57    incoming_messages();
58    ~incoming_messages() {}
59
60    std::vector<message_header> headers;
61    buffer_type                 buffer;
62    std::vector<std::vector<message_header>::iterator> next_header;
63  };
64
65  struct batch_request {
66    MPI_Request request;
67    buffer_type buffer;
68  };
69
70  // send once we have a certain number of messages or bytes in the buffer
71  // these numbers need to be tuned, we keep them small at first for testing
72  std::size_t batch_header_number;
73  std::size_t batch_buffer_size;
74  std::size_t batch_message_size;
75
76  /**
77   * The actual MPI communicator used to transmit data.
78   */
79  boost::mpi::communicator             comm;
80
81  /**
82   * The MPI communicator used to transmit out-of-band replies.
83   */
84  boost::mpi::communicator             oob_reply_comm;
85
86  /// Outgoing message information, indexed by destination processor.
87  std::vector<outgoing_messages> outgoing;
88
89  /// Incoming message information, indexed by source processor.
90  std::vector<incoming_messages> incoming;
91
92  /// The numbers of processors that have entered a synchronization stage
93  std::vector<int> processors_synchronizing_stage;
94
95  /// The synchronization stage of a processor
96  std::vector<int> synchronizing_stage;
97
98  /// Number of processors still sending messages
99  std::vector<int> synchronizing_unfinished;
100
101  /// Number of batches sent since last synchronization stage
102  std::vector<int> number_sent_batches;
103
104  /// Number of batches received minus number of expected batches
105  std::vector<int> number_received_batches;
106
107
108  /// The context of the currently-executing trigger, or @c trc_none
109  /// if no trigger is executing.
110  trigger_receive_context trigger_context;
111
112  /// Non-zero indicates that we're processing batches
113  /// Increment this when processing patches,
114  /// decrement it when you're done.
115  int processing_batches;
116
117  /**
118   * Contains all of the active blocks corresponding to attached
119   * distributed data structures.
120   */
121  blocks_type blocks;
122
123  /// Whether we are currently synchronizing
124  bool synchronizing;
125
126  /// The MPI requests for posted sends of oob messages
127  std::vector<MPI_Request> requests;
128
129  /// The MPI buffers for posted irecvs of oob messages
130  std::map<int,buffer_type> buffers;
131
132  /// Queue for message batches received while already processing messages
133  std::queue<std::pair<int,outgoing_messages> > new_batches;
134  /// Maximum encountered size of the new_batches queue
135  std::size_t max_received;
136
137  /// The MPI requests and buffers for batchess being sent
138  std::list<batch_request> sent_batches;
139  /// Maximum encountered size of the sent_batches list
140  std::size_t max_sent;
141
142  /// Pre-allocated requests in a pool
143  std::vector<batch_request> batch_pool;
144  /// A stack controlling which batches are available
145  std::stack<std::size_t> free_batches;
146
147  void free_sent_batches();
148
149  // Tag allocator
150  detail::tag_allocator allocated_tags;
151
152  impl(std::size_t num_headers, std::size_t buffers_size,
153       communicator_type parent_comm);
154  ~impl();
155
156private:
157  void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
158};
159
160inline trigger_receive_context mpi_process_group::trigger_context() const
161{
162  return impl_->trigger_context;
163}
164
165template<typename T>
166void
167mpi_process_group::send_impl(int dest, int tag, const T& value,
168                             mpl::true_ /*is_mpi_datatype*/) const
169{
170  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
171
172  impl::outgoing_messages& outgoing = impl_->outgoing[dest];
173
174  // Start constructing the message header
175  impl::message_header header;
176  header.source = process_id(*this);
177  header.tag = tag;
178  header.offset = outgoing.buffer.size();
179
180  boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
181  oa << value;
182
183#ifdef PBGL_PROCESS_GROUP_DEBUG
184  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
185            << tag << ", bytes = " << packed_size << std::endl;
186#endif
187
188  // Store the header
189  header.bytes = outgoing.buffer.size() - header.offset;
190  outgoing.headers.push_back(header);
191
192  maybe_send_batch(dest);
193}
194
195
196template<typename T>
197void
198mpi_process_group::send_impl(int dest, int tag, const T& value,
199                             mpl::false_ /*is_mpi_datatype*/) const
200{
201  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
202
203  impl::outgoing_messages& outgoing = impl_->outgoing[dest];
204
205  // Start constructing the message header
206  impl::message_header header;
207  header.source = process_id(*this);
208  header.tag = tag;
209  header.offset = outgoing.buffer.size();
210
211  // Serialize into the buffer
212  boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
213  out << value;
214
215  // Store the header
216  header.bytes = outgoing.buffer.size() - header.offset;
217  outgoing.headers.push_back(header);
218  maybe_send_batch(dest);
219
220#ifdef PBGL_PROCESS_GROUP_DEBUG
221  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
222            << tag << ", bytes = " << header.bytes << std::endl;
223#endif
224}
225
226template<typename T>
227inline void
228send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
229     int tag, const T& value)
230{
231  pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
232               boost::mpi::is_mpi_datatype<T>());
233}
234
235template<typename T>
236typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
237send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
238     int tag, const T values[], std::size_t n)
239{
240  pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
241                 boost::serialization::make_array(values,n),
242                 boost::mpl::true_());
243}
244
245template<typename T>
246typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
247mpi_process_group::
248array_send_impl(int dest, int tag, const T values[], std::size_t n) const
249{
250  BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
251
252  impl::outgoing_messages& outgoing = impl_->outgoing[dest];
253
254  // Start constructing the message header
255  impl::message_header header;
256  header.source = process_id(*this);
257  header.tag = tag;
258  header.offset = outgoing.buffer.size();
259
260  // Serialize into the buffer
261  boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
262  out << n;
263
264  for (std::size_t i = 0; i < n; ++i)
265    out << values[i];
266
267  // Store the header
268  header.bytes = outgoing.buffer.size() - header.offset;
269  outgoing.headers.push_back(header);
270  maybe_send_batch(dest);
271
272#ifdef PBGL_PROCESS_GROUP_DEBUG
273  std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
274            << tag << ", bytes = " << header.bytes << std::endl;
275#endif
276}
277
278template<typename T>
279typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
280send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
281     int tag, const T values[], std::size_t n)
282{
283  pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
284                     values, n);
285}
286
287template<typename InputIterator>
288void
289send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
290     int tag, InputIterator first, InputIterator last)
291{
292  typedef typename std::iterator_traits<InputIterator>::value_type value_type;
293  std::vector<value_type> values(first, last);
294  if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
295  else send(pg, dest, tag, &values[0], values.size());
296}
297
298template<typename T>
299bool
300mpi_process_group::receive_impl(int source, int tag, T& value,
301                                mpl::true_ /*is_mpi_datatype*/) const
302{
303#ifdef PBGL_PROCESS_GROUP_DEBUG
304  std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
305            << tag << std::endl;
306#endif
307
308  impl::incoming_messages& incoming = impl_->incoming[source];
309
310  // Find the next header with the right tag
311  std::vector<impl::message_header>::iterator header =
312    incoming.next_header[my_block_number()];
313  while (header != incoming.headers.end() && header->tag != tag) ++header;
314
315  // If no header is found, notify the caller
316  if (header == incoming.headers.end()) return false;
317
318  // Unpack the data
319  if (header->bytes > 0) {
320    boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
321                                   archive::no_header, header->offset);
322    ia >> value;
323  }
324
325  // Mark this message as received
326  header->tag = -1;
327
328  // Move the "next header" indicator to the next unreceived message
329  while (incoming.next_header[my_block_number()] != incoming.headers.end()
330         && incoming.next_header[my_block_number()]->tag == -1)
331    ++incoming.next_header[my_block_number()];
332
333  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
334    bool finished = true;
335    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
336      if (incoming.next_header[i] != incoming.headers.end()) finished = false;
337    }
338
339    if (finished) {
340      std::vector<impl::message_header> no_headers;
341      incoming.headers.swap(no_headers);
342      buffer_type empty_buffer;
343      incoming.buffer.swap(empty_buffer);
344      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
345        incoming.next_header[i] = incoming.headers.end();
346    }
347  }
348
349  return true;
350}
351
352template<typename T>
353bool
354mpi_process_group::receive_impl(int source, int tag, T& value,
355                                mpl::false_ /*is_mpi_datatype*/) const
356{
357  impl::incoming_messages& incoming = impl_->incoming[source];
358
359  // Find the next header with the right tag
360  std::vector<impl::message_header>::iterator header =
361    incoming.next_header[my_block_number()];
362  while (header != incoming.headers.end() && header->tag != tag) ++header;
363
364  // If no header is found, notify the caller
365  if (header == incoming.headers.end()) return false;
366
367  // Deserialize the data
368  boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
369                                 archive::no_header, header->offset);
370  in >> value;
371
372  // Mark this message as received
373  header->tag = -1;
374
375  // Move the "next header" indicator to the next unreceived message
376  while (incoming.next_header[my_block_number()] != incoming.headers.end()
377         && incoming.next_header[my_block_number()]->tag == -1)
378    ++incoming.next_header[my_block_number()];
379
380  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
381    bool finished = true;
382    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
383      if (incoming.next_header[i] != incoming.headers.end()) finished = false;
384    }
385
386    if (finished) {
387      std::vector<impl::message_header> no_headers;
388      incoming.headers.swap(no_headers);
389      buffer_type empty_buffer;
390      incoming.buffer.swap(empty_buffer);
391      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
392        incoming.next_header[i] = incoming.headers.end();
393    }
394  }
395
396  return true;
397}
398
399template<typename T>
400typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
401mpi_process_group::
402array_receive_impl(int source, int tag, T* values, std::size_t& n) const
403{
404  impl::incoming_messages& incoming = impl_->incoming[source];
405
406  // Find the next header with the right tag
407  std::vector<impl::message_header>::iterator header =
408    incoming.next_header[my_block_number()];
409  while (header != incoming.headers.end() && header->tag != tag) ++header;
410
411  // If no header is found, notify the caller
412  if (header == incoming.headers.end()) return false;
413
414  // Deserialize the data
415  boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
416                                 archive::no_header, header->offset);
417  std::size_t num_sent;
418  in >> num_sent;
419  if (num_sent > n)
420    std::cerr << "ERROR: Have " << num_sent << " items but only space for "
421              << n << " items\n";
422
423  for (std::size_t i = 0; i < num_sent; ++i)
424    in >> values[i];
425  n = num_sent;
426
427  // Mark this message as received
428  header->tag = -1;
429
430  // Move the "next header" indicator to the next unreceived message
431  while (incoming.next_header[my_block_number()] != incoming.headers.end()
432         && incoming.next_header[my_block_number()]->tag == -1)
433    ++incoming.next_header[my_block_number()];
434
435  if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
436    bool finished = true;
437    for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
438      if (incoming.next_header[i] != incoming.headers.end()) finished = false;
439    }
440
441    if (finished) {
442      std::vector<impl::message_header> no_headers;
443      incoming.headers.swap(no_headers);
444      buffer_type empty_buffer;
445      incoming.buffer.swap(empty_buffer);
446      for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
447        incoming.next_header[i] = incoming.headers.end();
448    }
449  }
450
451  return true;
452}
453
454// Construct triggers
455template<typename Type, typename Handler>
456void mpi_process_group::trigger(int tag, const Handler& handler)
457{
458  BOOST_ASSERT(block_num);
459  install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
460    new trigger_launcher<Type, Handler>(*this, tag, handler)));
461}
462
463template<typename Type, typename Handler>
464void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
465{
466  BOOST_ASSERT(block_num);
467  install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
468    new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
469}
470
471template<typename Type, typename Handler>
472void mpi_process_group::global_trigger(int tag, const Handler& handler,
473      std::size_t sz)
474{
475  if (sz==0) // normal trigger
476    install_trigger(tag,0,shared_ptr<trigger_base>(
477      new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
478  else // trigger with irecv
479    install_trigger(tag,0,shared_ptr<trigger_base>(
480      new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
481
482}
483
484namespace detail {
485
486template<typename Type>
487void  do_oob_receive(mpi_process_group const& self,
488    int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
489{
490  using boost::mpi::get_mpi_datatype;
491
492  //self.impl_->comm.recv(source,tag,data);
493  MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
494           MPI_STATUS_IGNORE);
495}
496
497template<typename Type>
498void do_oob_receive(mpi_process_group const& self,
499    int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
500{
501  //  self.impl_->comm.recv(source,tag,data);
502  // Receive the size of the data packet
503  boost::mpi::status status;
504  status = self.impl_->comm.probe(source, tag);
505
506#if BOOST_VERSION >= 103600
507  int size = status.count<boost::mpi::packed>().get();
508#else
509  int size;
510  MPI_Status& mpi_status = status;
511  MPI_Get_count(&mpi_status, MPI_PACKED, &size);
512#endif
513
514  // Receive the data packed itself
515  boost::mpi::packed_iarchive in(self.impl_->comm);
516  in.resize(size);
517  MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
518       MPI_STATUS_IGNORE);
519
520  // Deserialize the data
521  in >> data;
522}
523
524template<typename Type>
525void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
526{
527  do_oob_receive(self, source, tag, data,
528                           boost::mpi::is_mpi_datatype<Type>());
529}
530
531
532} // namespace detail
533
534
535template<typename Type, typename Handler>
536void
537mpi_process_group::trigger_launcher<Type, Handler>::
538receive(mpi_process_group const&, int source, int tag,
539        trigger_receive_context context, int block) const
540{
541#ifdef PBGL_PROCESS_GROUP_DEBUG
542  std::cerr << (out_of_band? "OOB trigger" : "Trigger")
543            << " receive from source " << source << " and tag " << tag
544        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
545#endif
546
547  Type data;
548
549  if (context == trc_out_of_band) {
550    // Receive the message directly off the wire
551    int realtag  = self.encode_tag(
552      block == -1 ? self.my_block_number() : block, tag);
553    detail::do_oob_receive(self,source,realtag,data);
554  }
555  else
556    // Receive the message out of the local buffer
557    boost::graph::distributed::receive(self, source, tag, data);
558
559  // Pass the message off to the handler
560  handler(source, tag, data, context);
561}
562
563template<typename Type, typename Handler>
564void
565mpi_process_group::reply_trigger_launcher<Type, Handler>::
566receive(mpi_process_group const&, int source, int tag,
567        trigger_receive_context context, int block) const
568{
569#ifdef PBGL_PROCESS_GROUP_DEBUG
570  std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
571            << " receive from source " << source << " and tag " << tag
572        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
573#endif
574  BOOST_ASSERT(context == trc_out_of_band);
575
576  boost::parallel::detail::untracked_pair<int, Type> data;
577
578  // Receive the message directly off the wire
579  int realtag  = self.encode_tag(block == -1 ? self.my_block_number() : block,
580                                 tag);
581  detail::do_oob_receive(self, source, realtag, data);
582
583  // Pass the message off to the handler and send the result back to
584  // the source.
585  send_oob(self, source, data.first,
586           handler(source, tag, data.second, context), -2);
587}
588
589template<typename Type, typename Handler>
590void
591mpi_process_group::global_trigger_launcher<Type, Handler>::
592receive(mpi_process_group const& self, int source, int tag,
593        trigger_receive_context context, int block) const
594{
595#ifdef PBGL_PROCESS_GROUP_DEBUG
596  std::cerr << (out_of_band? "OOB trigger" : "Trigger")
597            << " receive from source " << source << " and tag " << tag
598        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
599#endif
600
601  Type data;
602
603  if (context == trc_out_of_band) {
604    // Receive the message directly off the wire
605    int realtag  = self.encode_tag(
606      block == -1 ? self.my_block_number() : block, tag);
607    detail::do_oob_receive(self,source,realtag,data);
608  }
609  else
610    // Receive the message out of the local buffer
611    boost::graph::distributed::receive(self, source, tag, data);
612
613  // Pass the message off to the handler
614  handler(self, source, tag, data, context);
615}
616
617
618template<typename Type, typename Handler>
619void
620mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
621receive(mpi_process_group const& self, int source, int tag,
622        trigger_receive_context context, int block) const
623{
624#ifdef PBGL_PROCESS_GROUP_DEBUG
625  std::cerr << (out_of_band? "OOB trigger" : "Trigger")
626            << " receive from source " << source << " and tag " << tag
627        << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
628#endif
629
630  Type data;
631
632  if (context == trc_out_of_band) {
633    return;
634  }
635  BOOST_ASSERT (context == trc_irecv_out_of_band);
636
637  // force posting of new MPI_Irecv, even though buffer is already allocated
638  boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
639  ia >> data;
640  // Start a new receive
641  prepare_receive(self,tag,true);
642  // Pass the message off to the handler
643  handler(self, source, tag, data, context);
644}
645
646
647template<typename Type, typename Handler>
648void
649mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
650prepare_receive(mpi_process_group const& self, int tag, bool force) const
651{
652#ifdef PBGL_PROCESS_GROUP_DEBUG
653 std::cerr << ("Posting Irecv for trigger")
654      << " receive with tag " << tag << std::endl;
655#endif
656  if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
657    self.impl_->buffers[tag].resize(buffer_size);
658    force = true;
659  }
660  BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
661
662  //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
663  if (force) {
664    self.impl_->requests.push_back(MPI_Request());
665    MPI_Request* request = &self.impl_->requests.back();
666    MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
667               MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
668  }
669}
670
671
672template<typename T>
673inline mpi_process_group::process_id_type
674receive(const mpi_process_group& pg, int tag, T& value)
675{
676  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
677    if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
678                        value, boost::mpi::is_mpi_datatype<T>()))
679      return source;
680  }
681  BOOST_ASSERT (false);
682}
683
684template<typename T>
685typename
686  enable_if<boost::mpi::is_mpi_datatype<T>,
687            std::pair<mpi_process_group::process_id_type, std::size_t> >::type
688receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
689{
690  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
691    bool result =
692      pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
693                 boost::serialization::make_array(values,n),
694                 boost::mpl::true_());
695    if (result)
696      return std::make_pair(source, n);
697  }
698  BOOST_ASSERT(false);
699}
700
701template<typename T>
702typename
703  disable_if<boost::mpi::is_mpi_datatype<T>,
704             std::pair<mpi_process_group::process_id_type, std::size_t> >::type
705receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
706{
707  for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
708    if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
709                              values, n))
710      return std::make_pair(source, n);
711  }
712  BOOST_ASSERT(false);
713}
714
715template<typename T>
716mpi_process_group::process_id_type
717receive(const mpi_process_group& pg,
718        mpi_process_group::process_id_type source, int tag, T& value)
719{
720  if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
721                      value, boost::mpi::is_mpi_datatype<T>()))
722    return source;
723  else {
724    fprintf(stderr,
725            "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
726            process_id(pg), source, tag, pg.my_block_number());
727
728    BOOST_ASSERT(false);
729    abort();
730  }
731}
732
733template<typename T>
734typename
735  enable_if<boost::mpi::is_mpi_datatype<T>,
736            std::pair<mpi_process_group::process_id_type, std::size_t> >::type
737receive(const mpi_process_group& pg, int source, int tag, T values[],
738        std::size_t n)
739{
740  if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
741                      boost::serialization::make_array(values,n),
742                      boost::mpl::true_()))
743    return std::make_pair(source,n);
744  else {
745    fprintf(stderr,
746            "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
747            process_id(pg), source, tag, pg.my_block_number());
748
749    BOOST_ASSERT(false);
750    abort();
751  }
752}
753
754template<typename T>
755typename
756  disable_if<boost::mpi::is_mpi_datatype<T>,
757             std::pair<mpi_process_group::process_id_type, std::size_t> >::type
758receive(const mpi_process_group& pg, int source, int tag, T values[],
759        std::size_t n)
760{
761  pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
762                        values, n);
763
764  return std::make_pair(source, n);
765}
766
767template<typename T, typename BinaryOperation>
768T*
769all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
770           BinaryOperation bin_op)
771{
772  synchronize(pg);
773
774  bool inplace = first == out;
775
776  if (inplace) out = new T [last-first];
777
778  boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
779                                                  boost::mpi::comm_attach),
780                         first, last-first, out, bin_op);
781
782  if (inplace) {
783    std::copy(out, out + (last-first), first);
784    delete [] out;
785    return last;
786  }
787
788  return out;
789}
790
791template<typename T>
792void
793broadcast(const mpi_process_group& pg, T& val,
794          mpi_process_group::process_id_type root)
795{
796  // broadcast the seed
797  boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
798  boost::mpi::broadcast(comm,val,root);
799}
800
801
802template<typename T, typename BinaryOperation>
803T*
804scan(const mpi_process_group& pg, T* first, T* last, T* out,
805           BinaryOperation bin_op)
806{
807  synchronize(pg);
808
809  bool inplace = first == out;
810
811  if (inplace) out = new T [last-first];
812
813  boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
814
815  if (inplace) {
816    std::copy(out, out + (last-first), first);
817    delete [] out;
818    return last;
819  }
820
821  return out;
822}
823
824
825template<typename InputIterator, typename T>
826void
827all_gather(const mpi_process_group& pg, InputIterator first,
828           InputIterator last, std::vector<T>& out)
829{
830  synchronize(pg);
831
832  // Stick a copy of the local values into a vector, so we can broadcast it
833  std::vector<T> local_values(first, last);
834
835  // Collect the number of vertices stored in each process
836  int size = local_values.size();
837  std::vector<int> sizes(num_processes(pg));
838  int result = MPI_Allgather(&size, 1, MPI_INT,
839                             &sizes[0], 1, MPI_INT,
840                             communicator(pg));
841  BOOST_ASSERT(result == MPI_SUCCESS);
842  (void)result;
843
844  // Adjust sizes based on the number of bytes
845  //
846  // std::transform(sizes.begin(), sizes.end(), sizes.begin(),
847  //               std::bind2nd(std::multiplies<int>(), sizeof(T)));
848  //
849  // std::bind2nd has been removed from C++17
850
851  for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
852  {
853    sizes[ i ] *= sizeof( T );
854  }
855
856  // Compute displacements
857  std::vector<int> displacements;
858  displacements.reserve(sizes.size() + 1);
859  displacements.push_back(0);
860  std::partial_sum(sizes.begin(), sizes.end(),
861                   std::back_inserter(displacements));
862
863  // Gather all of the values
864  out.resize(displacements.back() / sizeof(T));
865  if (!out.empty()) {
866    result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
867                            /* local results */: (void*)&local_values[0],
868                            local_values.size() * sizeof(T),
869                            MPI_BYTE,
870                            &out[0], &sizes[0], &displacements[0], MPI_BYTE,
871                            communicator(pg));
872  }
873  BOOST_ASSERT(result == MPI_SUCCESS);
874}
875
876template<typename InputIterator>
877mpi_process_group
878process_subgroup(const mpi_process_group& pg,
879                 InputIterator first, InputIterator last)
880{
881/*
882  boost::mpi::group current_group = communicator(pg).group();
883  boost::mpi::group new_group = current_group.include(first,last);
884  boost::mpi::communicator new_comm(communicator(pg),new_group);
885  return mpi_process_group(new_comm);
886*/
887  std::vector<int> ranks(first, last);
888
889  MPI_Group current_group;
890  int result = MPI_Comm_group(communicator(pg), &current_group);
891  BOOST_ASSERT(result == MPI_SUCCESS);
892  (void)result;
893
894  MPI_Group new_group;
895  result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
896  BOOST_ASSERT(result == MPI_SUCCESS);
897
898  MPI_Comm new_comm;
899  result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
900  BOOST_ASSERT(result == MPI_SUCCESS);
901
902  result = MPI_Group_free(&new_group);
903  BOOST_ASSERT(result == MPI_SUCCESS);
904  result = MPI_Group_free(&current_group);
905  BOOST_ASSERT(result == MPI_SUCCESS);
906
907  if (new_comm != MPI_COMM_NULL) {
908    mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
909    result = MPI_Comm_free(&new_comm);
910    BOOST_ASSERT(result == 0);
911    return result_pg;
912  } else {
913    return mpi_process_group(mpi_process_group::create_empty());
914  }
915
916}
917
918
919template<typename Receiver>
920Receiver* mpi_process_group::get_receiver()
921{
922  return impl_->blocks[my_block_number()]->on_receive
923           .template target<Receiver>();
924}
925
926template<typename T>
927typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
928receive_oob(const mpi_process_group& pg,
929            mpi_process_group::process_id_type source, int tag, T& value, int block)
930{
931  using boost::mpi::get_mpi_datatype;
932
933  // Determine the actual message we expect to receive, and which
934  // communicator it will come by.
935  std::pair<boost::mpi::communicator, int> actual
936    = pg.actual_communicator_and_tag(tag, block);
937
938  // Post a non-blocking receive that waits until we complete this request.
939  MPI_Request request;
940  MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
941            source, actual.second, actual.first, &request);
942
943  int done = 0;
944  do {
945    MPI_Test(&request, &done, MPI_STATUS_IGNORE);
946    if (!done)
947      pg.poll(/*wait=*/false, block);
948  } while (!done);
949}
950
951template<typename T>
952typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
953receive_oob(const mpi_process_group& pg,
954            mpi_process_group::process_id_type source, int tag, T& value, int block)
955{
956  // Determine the actual message we expect to receive, and which
957  // communicator it will come by.
958  std::pair<boost::mpi::communicator, int> actual
959    = pg.actual_communicator_and_tag(tag, block);
960
961  boost::optional<boost::mpi::status> status;
962  do {
963    status = actual.first.iprobe(source, actual.second);
964    if (!status)
965      pg.poll();
966  } while (!status);
967
968  //actual.first.recv(status->source(), status->tag(),value);
969
970  // Allocate the receive buffer
971  boost::mpi::packed_iarchive in(actual.first);
972
973#if BOOST_VERSION >= 103600
974  in.resize(status->count<boost::mpi::packed>().get());
975#else
976  int size;
977  MPI_Status mpi_status = *status;
978  MPI_Get_count(&mpi_status, MPI_PACKED, &size);
979  in.resize(size);
980#endif
981
982  // Receive the message data
983  MPI_Recv(in.address(), in.size(), MPI_PACKED,
984           status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
985
986  // Unpack the message data
987  in >> value;
988}
989
990
991template<typename SendT, typename ReplyT>
992typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
993send_oob_with_reply(const mpi_process_group& pg,
994                    mpi_process_group::process_id_type dest,
995                    int tag, const SendT& send_value, ReplyT& reply_value,
996                    int block)
997{
998  detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
999  send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
1000        (int)reply_tag, send_value), block);
1001  receive_oob(pg, dest, reply_tag, reply_value);
1002}
1003
1004template<typename SendT, typename ReplyT>
1005typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
1006send_oob_with_reply(const mpi_process_group& pg,
1007                    mpi_process_group::process_id_type dest,
1008                    int tag, const SendT& send_value, ReplyT& reply_value,
1009                    int block)
1010{
1011  detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1012  send_oob(pg, dest, tag,
1013           boost::parallel::detail::make_untracked_pair((int)reply_tag,
1014                                                        send_value), block);
1015  receive_oob(pg, dest, reply_tag, reply_value);
1016}
1017
1018} } } // end namespace boost::graph::distributed
1019