1// -*- C++ -*- 2 3// Copyright (C) 2004-2008 The Trustees of Indiana University. 4// Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com> 5// Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com> 6 7// Use, modification and distribution is subject to the Boost Software 8// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 9// http://www.boost.org/LICENSE_1_0.txt) 10 11// Authors: Douglas Gregor 12// Andrew Lumsdaine 13// Matthias Troyer 14 15//#define PBGL_PROCESS_GROUP_DEBUG 16 17#ifndef BOOST_GRAPH_USE_MPI 18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" 19#endif 20 21#include <boost/assert.hpp> 22#include <algorithm> 23#include <boost/graph/parallel/detail/untracked_pair.hpp> 24#include <numeric> 25#include <iterator> 26#include <functional> 27#include <vector> 28#include <queue> 29#include <stack> 30#include <list> 31#include <boost/graph/distributed/detail/tag_allocator.hpp> 32#include <stdio.h> 33 34// #define PBGL_PROCESS_GROUP_DEBUG 35 36#ifdef PBGL_PROCESS_GROUP_DEBUG 37# include <iostream> 38#endif 39 40namespace boost { namespace graph { namespace distributed { 41 42struct mpi_process_group::impl 43{ 44 45 typedef mpi_process_group::message_header message_header; 46 typedef mpi_process_group::outgoing_messages outgoing_messages; 47 48 /** 49 * Stores the incoming messages from a particular processor. 50 * 51 * @todo Evaluate whether we should use a deque instance, which 52 * would reduce could reduce the cost of "receiving" messages and 53 allow us to deallocate memory earlier, but increases the time 54 spent in the synchronization step. 55 */ 56 struct incoming_messages { 57 incoming_messages(); 58 ~incoming_messages() {} 59 60 std::vector<message_header> headers; 61 buffer_type buffer; 62 std::vector<std::vector<message_header>::iterator> next_header; 63 }; 64 65 struct batch_request { 66 MPI_Request request; 67 buffer_type buffer; 68 }; 69 70 // send once we have a certain number of messages or bytes in the buffer 71 // these numbers need to be tuned, we keep them small at first for testing 72 std::size_t batch_header_number; 73 std::size_t batch_buffer_size; 74 std::size_t batch_message_size; 75 76 /** 77 * The actual MPI communicator used to transmit data. 78 */ 79 boost::mpi::communicator comm; 80 81 /** 82 * The MPI communicator used to transmit out-of-band replies. 83 */ 84 boost::mpi::communicator oob_reply_comm; 85 86 /// Outgoing message information, indexed by destination processor. 87 std::vector<outgoing_messages> outgoing; 88 89 /// Incoming message information, indexed by source processor. 90 std::vector<incoming_messages> incoming; 91 92 /// The numbers of processors that have entered a synchronization stage 93 std::vector<int> processors_synchronizing_stage; 94 95 /// The synchronization stage of a processor 96 std::vector<int> synchronizing_stage; 97 98 /// Number of processors still sending messages 99 std::vector<int> synchronizing_unfinished; 100 101 /// Number of batches sent since last synchronization stage 102 std::vector<int> number_sent_batches; 103 104 /// Number of batches received minus number of expected batches 105 std::vector<int> number_received_batches; 106 107 108 /// The context of the currently-executing trigger, or @c trc_none 109 /// if no trigger is executing. 110 trigger_receive_context trigger_context; 111 112 /// Non-zero indicates that we're processing batches 113 /// Increment this when processing patches, 114 /// decrement it when you're done. 115 int processing_batches; 116 117 /** 118 * Contains all of the active blocks corresponding to attached 119 * distributed data structures. 120 */ 121 blocks_type blocks; 122 123 /// Whether we are currently synchronizing 124 bool synchronizing; 125 126 /// The MPI requests for posted sends of oob messages 127 std::vector<MPI_Request> requests; 128 129 /// The MPI buffers for posted irecvs of oob messages 130 std::map<int,buffer_type> buffers; 131 132 /// Queue for message batches received while already processing messages 133 std::queue<std::pair<int,outgoing_messages> > new_batches; 134 /// Maximum encountered size of the new_batches queue 135 std::size_t max_received; 136 137 /// The MPI requests and buffers for batchess being sent 138 std::list<batch_request> sent_batches; 139 /// Maximum encountered size of the sent_batches list 140 std::size_t max_sent; 141 142 /// Pre-allocated requests in a pool 143 std::vector<batch_request> batch_pool; 144 /// A stack controlling which batches are available 145 std::stack<std::size_t> free_batches; 146 147 void free_sent_batches(); 148 149 // Tag allocator 150 detail::tag_allocator allocated_tags; 151 152 impl(std::size_t num_headers, std::size_t buffers_size, 153 communicator_type parent_comm); 154 ~impl(); 155 156private: 157 void set_batch_size(std::size_t header_num, std::size_t buffer_sz); 158}; 159 160inline trigger_receive_context mpi_process_group::trigger_context() const 161{ 162 return impl_->trigger_context; 163} 164 165template<typename T> 166void 167mpi_process_group::send_impl(int dest, int tag, const T& value, 168 mpl::true_ /*is_mpi_datatype*/) const 169{ 170 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); 171 172 impl::outgoing_messages& outgoing = impl_->outgoing[dest]; 173 174 // Start constructing the message header 175 impl::message_header header; 176 header.source = process_id(*this); 177 header.tag = tag; 178 header.offset = outgoing.buffer.size(); 179 180 boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer); 181 oa << value; 182 183#ifdef PBGL_PROCESS_GROUP_DEBUG 184 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " 185 << tag << ", bytes = " << packed_size << std::endl; 186#endif 187 188 // Store the header 189 header.bytes = outgoing.buffer.size() - header.offset; 190 outgoing.headers.push_back(header); 191 192 maybe_send_batch(dest); 193} 194 195 196template<typename T> 197void 198mpi_process_group::send_impl(int dest, int tag, const T& value, 199 mpl::false_ /*is_mpi_datatype*/) const 200{ 201 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); 202 203 impl::outgoing_messages& outgoing = impl_->outgoing[dest]; 204 205 // Start constructing the message header 206 impl::message_header header; 207 header.source = process_id(*this); 208 header.tag = tag; 209 header.offset = outgoing.buffer.size(); 210 211 // Serialize into the buffer 212 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); 213 out << value; 214 215 // Store the header 216 header.bytes = outgoing.buffer.size() - header.offset; 217 outgoing.headers.push_back(header); 218 maybe_send_batch(dest); 219 220#ifdef PBGL_PROCESS_GROUP_DEBUG 221 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " 222 << tag << ", bytes = " << header.bytes << std::endl; 223#endif 224} 225 226template<typename T> 227inline void 228send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 229 int tag, const T& value) 230{ 231 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value, 232 boost::mpi::is_mpi_datatype<T>()); 233} 234 235template<typename T> 236typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type 237send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 238 int tag, const T values[], std::size_t n) 239{ 240 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), 241 boost::serialization::make_array(values,n), 242 boost::mpl::true_()); 243} 244 245template<typename T> 246typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type 247mpi_process_group:: 248array_send_impl(int dest, int tag, const T values[], std::size_t n) const 249{ 250 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); 251 252 impl::outgoing_messages& outgoing = impl_->outgoing[dest]; 253 254 // Start constructing the message header 255 impl::message_header header; 256 header.source = process_id(*this); 257 header.tag = tag; 258 header.offset = outgoing.buffer.size(); 259 260 // Serialize into the buffer 261 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); 262 out << n; 263 264 for (std::size_t i = 0; i < n; ++i) 265 out << values[i]; 266 267 // Store the header 268 header.bytes = outgoing.buffer.size() - header.offset; 269 outgoing.headers.push_back(header); 270 maybe_send_batch(dest); 271 272#ifdef PBGL_PROCESS_GROUP_DEBUG 273 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " 274 << tag << ", bytes = " << header.bytes << std::endl; 275#endif 276} 277 278template<typename T> 279typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type 280send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 281 int tag, const T values[], std::size_t n) 282{ 283 pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), 284 values, n); 285} 286 287template<typename InputIterator> 288void 289send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 290 int tag, InputIterator first, InputIterator last) 291{ 292 typedef typename std::iterator_traits<InputIterator>::value_type value_type; 293 std::vector<value_type> values(first, last); 294 if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0); 295 else send(pg, dest, tag, &values[0], values.size()); 296} 297 298template<typename T> 299bool 300mpi_process_group::receive_impl(int source, int tag, T& value, 301 mpl::true_ /*is_mpi_datatype*/) const 302{ 303#ifdef PBGL_PROCESS_GROUP_DEBUG 304 std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = " 305 << tag << std::endl; 306#endif 307 308 impl::incoming_messages& incoming = impl_->incoming[source]; 309 310 // Find the next header with the right tag 311 std::vector<impl::message_header>::iterator header = 312 incoming.next_header[my_block_number()]; 313 while (header != incoming.headers.end() && header->tag != tag) ++header; 314 315 // If no header is found, notify the caller 316 if (header == incoming.headers.end()) return false; 317 318 // Unpack the data 319 if (header->bytes > 0) { 320 boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, 321 archive::no_header, header->offset); 322 ia >> value; 323 } 324 325 // Mark this message as received 326 header->tag = -1; 327 328 // Move the "next header" indicator to the next unreceived message 329 while (incoming.next_header[my_block_number()] != incoming.headers.end() 330 && incoming.next_header[my_block_number()]->tag == -1) 331 ++incoming.next_header[my_block_number()]; 332 333 if (incoming.next_header[my_block_number()] == incoming.headers.end()) { 334 bool finished = true; 335 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { 336 if (incoming.next_header[i] != incoming.headers.end()) finished = false; 337 } 338 339 if (finished) { 340 std::vector<impl::message_header> no_headers; 341 incoming.headers.swap(no_headers); 342 buffer_type empty_buffer; 343 incoming.buffer.swap(empty_buffer); 344 for (std::size_t i = 0; i < incoming.next_header.size(); ++i) 345 incoming.next_header[i] = incoming.headers.end(); 346 } 347 } 348 349 return true; 350} 351 352template<typename T> 353bool 354mpi_process_group::receive_impl(int source, int tag, T& value, 355 mpl::false_ /*is_mpi_datatype*/) const 356{ 357 impl::incoming_messages& incoming = impl_->incoming[source]; 358 359 // Find the next header with the right tag 360 std::vector<impl::message_header>::iterator header = 361 incoming.next_header[my_block_number()]; 362 while (header != incoming.headers.end() && header->tag != tag) ++header; 363 364 // If no header is found, notify the caller 365 if (header == incoming.headers.end()) return false; 366 367 // Deserialize the data 368 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, 369 archive::no_header, header->offset); 370 in >> value; 371 372 // Mark this message as received 373 header->tag = -1; 374 375 // Move the "next header" indicator to the next unreceived message 376 while (incoming.next_header[my_block_number()] != incoming.headers.end() 377 && incoming.next_header[my_block_number()]->tag == -1) 378 ++incoming.next_header[my_block_number()]; 379 380 if (incoming.next_header[my_block_number()] == incoming.headers.end()) { 381 bool finished = true; 382 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { 383 if (incoming.next_header[i] != incoming.headers.end()) finished = false; 384 } 385 386 if (finished) { 387 std::vector<impl::message_header> no_headers; 388 incoming.headers.swap(no_headers); 389 buffer_type empty_buffer; 390 incoming.buffer.swap(empty_buffer); 391 for (std::size_t i = 0; i < incoming.next_header.size(); ++i) 392 incoming.next_header[i] = incoming.headers.end(); 393 } 394 } 395 396 return true; 397} 398 399template<typename T> 400typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type 401mpi_process_group:: 402array_receive_impl(int source, int tag, T* values, std::size_t& n) const 403{ 404 impl::incoming_messages& incoming = impl_->incoming[source]; 405 406 // Find the next header with the right tag 407 std::vector<impl::message_header>::iterator header = 408 incoming.next_header[my_block_number()]; 409 while (header != incoming.headers.end() && header->tag != tag) ++header; 410 411 // If no header is found, notify the caller 412 if (header == incoming.headers.end()) return false; 413 414 // Deserialize the data 415 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, 416 archive::no_header, header->offset); 417 std::size_t num_sent; 418 in >> num_sent; 419 if (num_sent > n) 420 std::cerr << "ERROR: Have " << num_sent << " items but only space for " 421 << n << " items\n"; 422 423 for (std::size_t i = 0; i < num_sent; ++i) 424 in >> values[i]; 425 n = num_sent; 426 427 // Mark this message as received 428 header->tag = -1; 429 430 // Move the "next header" indicator to the next unreceived message 431 while (incoming.next_header[my_block_number()] != incoming.headers.end() 432 && incoming.next_header[my_block_number()]->tag == -1) 433 ++incoming.next_header[my_block_number()]; 434 435 if (incoming.next_header[my_block_number()] == incoming.headers.end()) { 436 bool finished = true; 437 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { 438 if (incoming.next_header[i] != incoming.headers.end()) finished = false; 439 } 440 441 if (finished) { 442 std::vector<impl::message_header> no_headers; 443 incoming.headers.swap(no_headers); 444 buffer_type empty_buffer; 445 incoming.buffer.swap(empty_buffer); 446 for (std::size_t i = 0; i < incoming.next_header.size(); ++i) 447 incoming.next_header[i] = incoming.headers.end(); 448 } 449 } 450 451 return true; 452} 453 454// Construct triggers 455template<typename Type, typename Handler> 456void mpi_process_group::trigger(int tag, const Handler& handler) 457{ 458 BOOST_ASSERT(block_num); 459 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( 460 new trigger_launcher<Type, Handler>(*this, tag, handler))); 461} 462 463template<typename Type, typename Handler> 464void mpi_process_group::trigger_with_reply(int tag, const Handler& handler) 465{ 466 BOOST_ASSERT(block_num); 467 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( 468 new reply_trigger_launcher<Type, Handler>(*this, tag, handler))); 469} 470 471template<typename Type, typename Handler> 472void mpi_process_group::global_trigger(int tag, const Handler& handler, 473 std::size_t sz) 474{ 475 if (sz==0) // normal trigger 476 install_trigger(tag,0,shared_ptr<trigger_base>( 477 new global_trigger_launcher<Type, Handler>(*this, tag, handler))); 478 else // trigger with irecv 479 install_trigger(tag,0,shared_ptr<trigger_base>( 480 new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz))); 481 482} 483 484namespace detail { 485 486template<typename Type> 487void do_oob_receive(mpi_process_group const& self, 488 int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) 489{ 490 using boost::mpi::get_mpi_datatype; 491 492 //self.impl_->comm.recv(source,tag,data); 493 MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm, 494 MPI_STATUS_IGNORE); 495} 496 497template<typename Type> 498void do_oob_receive(mpi_process_group const& self, 499 int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) 500{ 501 // self.impl_->comm.recv(source,tag,data); 502 // Receive the size of the data packet 503 boost::mpi::status status; 504 status = self.impl_->comm.probe(source, tag); 505 506#if BOOST_VERSION >= 103600 507 int size = status.count<boost::mpi::packed>().get(); 508#else 509 int size; 510 MPI_Status& mpi_status = status; 511 MPI_Get_count(&mpi_status, MPI_PACKED, &size); 512#endif 513 514 // Receive the data packed itself 515 boost::mpi::packed_iarchive in(self.impl_->comm); 516 in.resize(size); 517 MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm, 518 MPI_STATUS_IGNORE); 519 520 // Deserialize the data 521 in >> data; 522} 523 524template<typename Type> 525void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) 526{ 527 do_oob_receive(self, source, tag, data, 528 boost::mpi::is_mpi_datatype<Type>()); 529} 530 531 532} // namespace detail 533 534 535template<typename Type, typename Handler> 536void 537mpi_process_group::trigger_launcher<Type, Handler>:: 538receive(mpi_process_group const&, int source, int tag, 539 trigger_receive_context context, int block) const 540{ 541#ifdef PBGL_PROCESS_GROUP_DEBUG 542 std::cerr << (out_of_band? "OOB trigger" : "Trigger") 543 << " receive from source " << source << " and tag " << tag 544 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 545#endif 546 547 Type data; 548 549 if (context == trc_out_of_band) { 550 // Receive the message directly off the wire 551 int realtag = self.encode_tag( 552 block == -1 ? self.my_block_number() : block, tag); 553 detail::do_oob_receive(self,source,realtag,data); 554 } 555 else 556 // Receive the message out of the local buffer 557 boost::graph::distributed::receive(self, source, tag, data); 558 559 // Pass the message off to the handler 560 handler(source, tag, data, context); 561} 562 563template<typename Type, typename Handler> 564void 565mpi_process_group::reply_trigger_launcher<Type, Handler>:: 566receive(mpi_process_group const&, int source, int tag, 567 trigger_receive_context context, int block) const 568{ 569#ifdef PBGL_PROCESS_GROUP_DEBUG 570 std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") 571 << " receive from source " << source << " and tag " << tag 572 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 573#endif 574 BOOST_ASSERT(context == trc_out_of_band); 575 576 boost::parallel::detail::untracked_pair<int, Type> data; 577 578 // Receive the message directly off the wire 579 int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block, 580 tag); 581 detail::do_oob_receive(self, source, realtag, data); 582 583 // Pass the message off to the handler and send the result back to 584 // the source. 585 send_oob(self, source, data.first, 586 handler(source, tag, data.second, context), -2); 587} 588 589template<typename Type, typename Handler> 590void 591mpi_process_group::global_trigger_launcher<Type, Handler>:: 592receive(mpi_process_group const& self, int source, int tag, 593 trigger_receive_context context, int block) const 594{ 595#ifdef PBGL_PROCESS_GROUP_DEBUG 596 std::cerr << (out_of_band? "OOB trigger" : "Trigger") 597 << " receive from source " << source << " and tag " << tag 598 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 599#endif 600 601 Type data; 602 603 if (context == trc_out_of_band) { 604 // Receive the message directly off the wire 605 int realtag = self.encode_tag( 606 block == -1 ? self.my_block_number() : block, tag); 607 detail::do_oob_receive(self,source,realtag,data); 608 } 609 else 610 // Receive the message out of the local buffer 611 boost::graph::distributed::receive(self, source, tag, data); 612 613 // Pass the message off to the handler 614 handler(self, source, tag, data, context); 615} 616 617 618template<typename Type, typename Handler> 619void 620mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: 621receive(mpi_process_group const& self, int source, int tag, 622 trigger_receive_context context, int block) const 623{ 624#ifdef PBGL_PROCESS_GROUP_DEBUG 625 std::cerr << (out_of_band? "OOB trigger" : "Trigger") 626 << " receive from source " << source << " and tag " << tag 627 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 628#endif 629 630 Type data; 631 632 if (context == trc_out_of_band) { 633 return; 634 } 635 BOOST_ASSERT (context == trc_irecv_out_of_band); 636 637 // force posting of new MPI_Irecv, even though buffer is already allocated 638 boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]); 639 ia >> data; 640 // Start a new receive 641 prepare_receive(self,tag,true); 642 // Pass the message off to the handler 643 handler(self, source, tag, data, context); 644} 645 646 647template<typename Type, typename Handler> 648void 649mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: 650prepare_receive(mpi_process_group const& self, int tag, bool force) const 651{ 652#ifdef PBGL_PROCESS_GROUP_DEBUG 653 std::cerr << ("Posting Irecv for trigger") 654 << " receive with tag " << tag << std::endl; 655#endif 656 if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) { 657 self.impl_->buffers[tag].resize(buffer_size); 658 force = true; 659 } 660 BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size); 661 662 //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >); 663 if (force) { 664 self.impl_->requests.push_back(MPI_Request()); 665 MPI_Request* request = &self.impl_->requests.back(); 666 MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size, 667 MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request); 668 } 669} 670 671 672template<typename T> 673inline mpi_process_group::process_id_type 674receive(const mpi_process_group& pg, int tag, T& value) 675{ 676 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { 677 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 678 value, boost::mpi::is_mpi_datatype<T>())) 679 return source; 680 } 681 BOOST_ASSERT (false); 682} 683 684template<typename T> 685typename 686 enable_if<boost::mpi::is_mpi_datatype<T>, 687 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 688receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) 689{ 690 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { 691 bool result = 692 pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 693 boost::serialization::make_array(values,n), 694 boost::mpl::true_()); 695 if (result) 696 return std::make_pair(source, n); 697 } 698 BOOST_ASSERT(false); 699} 700 701template<typename T> 702typename 703 disable_if<boost::mpi::is_mpi_datatype<T>, 704 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 705receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) 706{ 707 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { 708 if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 709 values, n)) 710 return std::make_pair(source, n); 711 } 712 BOOST_ASSERT(false); 713} 714 715template<typename T> 716mpi_process_group::process_id_type 717receive(const mpi_process_group& pg, 718 mpi_process_group::process_id_type source, int tag, T& value) 719{ 720 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 721 value, boost::mpi::is_mpi_datatype<T>())) 722 return source; 723 else { 724 fprintf(stderr, 725 "Process %d failed to receive a message from process %d with tag %d in block %d.\n", 726 process_id(pg), source, tag, pg.my_block_number()); 727 728 BOOST_ASSERT(false); 729 abort(); 730 } 731} 732 733template<typename T> 734typename 735 enable_if<boost::mpi::is_mpi_datatype<T>, 736 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 737receive(const mpi_process_group& pg, int source, int tag, T values[], 738 std::size_t n) 739{ 740 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 741 boost::serialization::make_array(values,n), 742 boost::mpl::true_())) 743 return std::make_pair(source,n); 744 else { 745 fprintf(stderr, 746 "Process %d failed to receive a message from process %d with tag %d in block %d.\n", 747 process_id(pg), source, tag, pg.my_block_number()); 748 749 BOOST_ASSERT(false); 750 abort(); 751 } 752} 753 754template<typename T> 755typename 756 disable_if<boost::mpi::is_mpi_datatype<T>, 757 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 758receive(const mpi_process_group& pg, int source, int tag, T values[], 759 std::size_t n) 760{ 761 pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 762 values, n); 763 764 return std::make_pair(source, n); 765} 766 767template<typename T, typename BinaryOperation> 768T* 769all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, 770 BinaryOperation bin_op) 771{ 772 synchronize(pg); 773 774 bool inplace = first == out; 775 776 if (inplace) out = new T [last-first]; 777 778 boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg), 779 boost::mpi::comm_attach), 780 first, last-first, out, bin_op); 781 782 if (inplace) { 783 std::copy(out, out + (last-first), first); 784 delete [] out; 785 return last; 786 } 787 788 return out; 789} 790 791template<typename T> 792void 793broadcast(const mpi_process_group& pg, T& val, 794 mpi_process_group::process_id_type root) 795{ 796 // broadcast the seed 797 boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach); 798 boost::mpi::broadcast(comm,val,root); 799} 800 801 802template<typename T, typename BinaryOperation> 803T* 804scan(const mpi_process_group& pg, T* first, T* last, T* out, 805 BinaryOperation bin_op) 806{ 807 synchronize(pg); 808 809 bool inplace = first == out; 810 811 if (inplace) out = new T [last-first]; 812 813 boost::mpi::scan(communicator(pg), first, last-first, out, bin_op); 814 815 if (inplace) { 816 std::copy(out, out + (last-first), first); 817 delete [] out; 818 return last; 819 } 820 821 return out; 822} 823 824 825template<typename InputIterator, typename T> 826void 827all_gather(const mpi_process_group& pg, InputIterator first, 828 InputIterator last, std::vector<T>& out) 829{ 830 synchronize(pg); 831 832 // Stick a copy of the local values into a vector, so we can broadcast it 833 std::vector<T> local_values(first, last); 834 835 // Collect the number of vertices stored in each process 836 int size = local_values.size(); 837 std::vector<int> sizes(num_processes(pg)); 838 int result = MPI_Allgather(&size, 1, MPI_INT, 839 &sizes[0], 1, MPI_INT, 840 communicator(pg)); 841 BOOST_ASSERT(result == MPI_SUCCESS); 842 (void)result; 843 844 // Adjust sizes based on the number of bytes 845 // 846 // std::transform(sizes.begin(), sizes.end(), sizes.begin(), 847 // std::bind2nd(std::multiplies<int>(), sizeof(T))); 848 // 849 // std::bind2nd has been removed from C++17 850 851 for( std::size_t i = 0, n = sizes.size(); i < n; ++i ) 852 { 853 sizes[ i ] *= sizeof( T ); 854 } 855 856 // Compute displacements 857 std::vector<int> displacements; 858 displacements.reserve(sizes.size() + 1); 859 displacements.push_back(0); 860 std::partial_sum(sizes.begin(), sizes.end(), 861 std::back_inserter(displacements)); 862 863 // Gather all of the values 864 out.resize(displacements.back() / sizeof(T)); 865 if (!out.empty()) { 866 result = MPI_Allgatherv(local_values.empty()? (void*)&local_values 867 /* local results */: (void*)&local_values[0], 868 local_values.size() * sizeof(T), 869 MPI_BYTE, 870 &out[0], &sizes[0], &displacements[0], MPI_BYTE, 871 communicator(pg)); 872 } 873 BOOST_ASSERT(result == MPI_SUCCESS); 874} 875 876template<typename InputIterator> 877mpi_process_group 878process_subgroup(const mpi_process_group& pg, 879 InputIterator first, InputIterator last) 880{ 881/* 882 boost::mpi::group current_group = communicator(pg).group(); 883 boost::mpi::group new_group = current_group.include(first,last); 884 boost::mpi::communicator new_comm(communicator(pg),new_group); 885 return mpi_process_group(new_comm); 886*/ 887 std::vector<int> ranks(first, last); 888 889 MPI_Group current_group; 890 int result = MPI_Comm_group(communicator(pg), ¤t_group); 891 BOOST_ASSERT(result == MPI_SUCCESS); 892 (void)result; 893 894 MPI_Group new_group; 895 result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group); 896 BOOST_ASSERT(result == MPI_SUCCESS); 897 898 MPI_Comm new_comm; 899 result = MPI_Comm_create(communicator(pg), new_group, &new_comm); 900 BOOST_ASSERT(result == MPI_SUCCESS); 901 902 result = MPI_Group_free(&new_group); 903 BOOST_ASSERT(result == MPI_SUCCESS); 904 result = MPI_Group_free(¤t_group); 905 BOOST_ASSERT(result == MPI_SUCCESS); 906 907 if (new_comm != MPI_COMM_NULL) { 908 mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach)); 909 result = MPI_Comm_free(&new_comm); 910 BOOST_ASSERT(result == 0); 911 return result_pg; 912 } else { 913 return mpi_process_group(mpi_process_group::create_empty()); 914 } 915 916} 917 918 919template<typename Receiver> 920Receiver* mpi_process_group::get_receiver() 921{ 922 return impl_->blocks[my_block_number()]->on_receive 923 .template target<Receiver>(); 924} 925 926template<typename T> 927typename enable_if<boost::mpi::is_mpi_datatype<T> >::type 928receive_oob(const mpi_process_group& pg, 929 mpi_process_group::process_id_type source, int tag, T& value, int block) 930{ 931 using boost::mpi::get_mpi_datatype; 932 933 // Determine the actual message we expect to receive, and which 934 // communicator it will come by. 935 std::pair<boost::mpi::communicator, int> actual 936 = pg.actual_communicator_and_tag(tag, block); 937 938 // Post a non-blocking receive that waits until we complete this request. 939 MPI_Request request; 940 MPI_Irecv(&value, 1, get_mpi_datatype<T>(value), 941 source, actual.second, actual.first, &request); 942 943 int done = 0; 944 do { 945 MPI_Test(&request, &done, MPI_STATUS_IGNORE); 946 if (!done) 947 pg.poll(/*wait=*/false, block); 948 } while (!done); 949} 950 951template<typename T> 952typename disable_if<boost::mpi::is_mpi_datatype<T> >::type 953receive_oob(const mpi_process_group& pg, 954 mpi_process_group::process_id_type source, int tag, T& value, int block) 955{ 956 // Determine the actual message we expect to receive, and which 957 // communicator it will come by. 958 std::pair<boost::mpi::communicator, int> actual 959 = pg.actual_communicator_and_tag(tag, block); 960 961 boost::optional<boost::mpi::status> status; 962 do { 963 status = actual.first.iprobe(source, actual.second); 964 if (!status) 965 pg.poll(); 966 } while (!status); 967 968 //actual.first.recv(status->source(), status->tag(),value); 969 970 // Allocate the receive buffer 971 boost::mpi::packed_iarchive in(actual.first); 972 973#if BOOST_VERSION >= 103600 974 in.resize(status->count<boost::mpi::packed>().get()); 975#else 976 int size; 977 MPI_Status mpi_status = *status; 978 MPI_Get_count(&mpi_status, MPI_PACKED, &size); 979 in.resize(size); 980#endif 981 982 // Receive the message data 983 MPI_Recv(in.address(), in.size(), MPI_PACKED, 984 status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE); 985 986 // Unpack the message data 987 in >> value; 988} 989 990 991template<typename SendT, typename ReplyT> 992typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type 993send_oob_with_reply(const mpi_process_group& pg, 994 mpi_process_group::process_id_type dest, 995 int tag, const SendT& send_value, ReplyT& reply_value, 996 int block) 997{ 998 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); 999 send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair( 1000 (int)reply_tag, send_value), block); 1001 receive_oob(pg, dest, reply_tag, reply_value); 1002} 1003 1004template<typename SendT, typename ReplyT> 1005typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type 1006send_oob_with_reply(const mpi_process_group& pg, 1007 mpi_process_group::process_id_type dest, 1008 int tag, const SendT& send_value, ReplyT& reply_value, 1009 int block) 1010{ 1011 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); 1012 send_oob(pg, dest, tag, 1013 boost::parallel::detail::make_untracked_pair((int)reply_tag, 1014 send_value), block); 1015 receive_oob(pg, dest, reply_tag, reply_value); 1016} 1017 1018} } } // end namespace boost::graph::distributed 1019