1// -*- C++ -*- 2 3// Copyright (C) 2004-2008 The Trustees of Indiana University. 4// Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com> 5// Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com> 6 7// Use, modification and distribution is subject to the Boost Software 8// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 9// http://www.boost.org/LICENSE_1_0.txt) 10 11// Authors: Douglas Gregor 12// Andrew Lumsdaine 13// Matthias Troyer 14 15//#define PBGL_PROCESS_GROUP_DEBUG 16 17#ifndef BOOST_GRAPH_USE_MPI 18#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" 19#endif 20 21#include <boost/assert.hpp> 22#include <algorithm> 23#include <boost/graph/parallel/detail/untracked_pair.hpp> 24#include <numeric> 25#include <iterator> 26#include <functional> 27#include <vector> 28#include <queue> 29#include <stack> 30#include <boost/graph/distributed/detail/tag_allocator.hpp> 31#include <stdio.h> 32 33// #define PBGL_PROCESS_GROUP_DEBUG 34 35#ifdef PBGL_PROCESS_GROUP_DEBUG 36# include <iostream> 37#endif 38 39namespace boost { namespace graph { namespace distributed { 40 41struct mpi_process_group::impl 42{ 43 44 typedef mpi_process_group::message_header message_header; 45 typedef mpi_process_group::outgoing_messages outgoing_messages; 46 47 /** 48 * Stores the incoming messages from a particular processor. 49 * 50 * @todo Evaluate whether we should use a deque instance, which 51 * would reduce could reduce the cost of "receiving" messages and 52 allow us to deallocate memory earlier, but increases the time 53 spent in the synchronization step. 54 */ 55 struct incoming_messages { 56 incoming_messages(); 57 ~incoming_messages() {} 58 59 std::vector<message_header> headers; 60 buffer_type buffer; 61 std::vector<std::vector<message_header>::iterator> next_header; 62 }; 63 64 struct batch_request { 65 MPI_Request request; 66 buffer_type buffer; 67 }; 68 69 // send once we have a certain number of messages or bytes in the buffer 70 // these numbers need to be tuned, we keep them small at first for testing 71 std::size_t batch_header_number; 72 std::size_t batch_buffer_size; 73 std::size_t batch_message_size; 74 75 /** 76 * The actual MPI communicator used to transmit data. 77 */ 78 boost::mpi::communicator comm; 79 80 /** 81 * The MPI communicator used to transmit out-of-band replies. 82 */ 83 boost::mpi::communicator oob_reply_comm; 84 85 /// Outgoing message information, indexed by destination processor. 86 std::vector<outgoing_messages> outgoing; 87 88 /// Incoming message information, indexed by source processor. 89 std::vector<incoming_messages> incoming; 90 91 /// The numbers of processors that have entered a synchronization stage 92 std::vector<int> processors_synchronizing_stage; 93 94 /// The synchronization stage of a processor 95 std::vector<int> synchronizing_stage; 96 97 /// Number of processors still sending messages 98 std::vector<int> synchronizing_unfinished; 99 100 /// Number of batches sent since last synchronization stage 101 std::vector<int> number_sent_batches; 102 103 /// Number of batches received minus number of expected batches 104 std::vector<int> number_received_batches; 105 106 107 /// The context of the currently-executing trigger, or @c trc_none 108 /// if no trigger is executing. 109 trigger_receive_context trigger_context; 110 111 /// Non-zero indicates that we're processing batches 112 /// Increment this when processing patches, 113 /// decrement it when you're done. 114 int processing_batches; 115 116 /** 117 * Contains all of the active blocks corresponding to attached 118 * distributed data structures. 119 */ 120 blocks_type blocks; 121 122 /// Whether we are currently synchronizing 123 bool synchronizing; 124 125 /// The MPI requests for posted sends of oob messages 126 std::vector<MPI_Request> requests; 127 128 /// The MPI buffers for posted irecvs of oob messages 129 std::map<int,buffer_type> buffers; 130 131 /// Queue for message batches received while already processing messages 132 std::queue<std::pair<int,outgoing_messages> > new_batches; 133 /// Maximum encountered size of the new_batches queue 134 std::size_t max_received; 135 136 /// The MPI requests and buffers for batchess being sent 137 std::list<batch_request> sent_batches; 138 /// Maximum encountered size of the sent_batches list 139 std::size_t max_sent; 140 141 /// Pre-allocated requests in a pool 142 std::vector<batch_request> batch_pool; 143 /// A stack controlling which batches are available 144 std::stack<std::size_t> free_batches; 145 146 void free_sent_batches(); 147 148 // Tag allocator 149 detail::tag_allocator allocated_tags; 150 151 impl(std::size_t num_headers, std::size_t buffers_size, 152 communicator_type parent_comm); 153 ~impl(); 154 155private: 156 void set_batch_size(std::size_t header_num, std::size_t buffer_sz); 157}; 158 159inline trigger_receive_context mpi_process_group::trigger_context() const 160{ 161 return impl_->trigger_context; 162} 163 164template<typename T> 165void 166mpi_process_group::send_impl(int dest, int tag, const T& value, 167 mpl::true_ /*is_mpi_datatype*/) const 168{ 169 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); 170 171 impl::outgoing_messages& outgoing = impl_->outgoing[dest]; 172 173 // Start constructing the message header 174 impl::message_header header; 175 header.source = process_id(*this); 176 header.tag = tag; 177 header.offset = outgoing.buffer.size(); 178 179 boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer); 180 oa << value; 181 182#ifdef PBGL_PROCESS_GROUP_DEBUG 183 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " 184 << tag << ", bytes = " << packed_size << std::endl; 185#endif 186 187 // Store the header 188 header.bytes = outgoing.buffer.size() - header.offset; 189 outgoing.headers.push_back(header); 190 191 maybe_send_batch(dest); 192} 193 194 195template<typename T> 196void 197mpi_process_group::send_impl(int dest, int tag, const T& value, 198 mpl::false_ /*is_mpi_datatype*/) const 199{ 200 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); 201 202 impl::outgoing_messages& outgoing = impl_->outgoing[dest]; 203 204 // Start constructing the message header 205 impl::message_header header; 206 header.source = process_id(*this); 207 header.tag = tag; 208 header.offset = outgoing.buffer.size(); 209 210 // Serialize into the buffer 211 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); 212 out << value; 213 214 // Store the header 215 header.bytes = outgoing.buffer.size() - header.offset; 216 outgoing.headers.push_back(header); 217 maybe_send_batch(dest); 218 219#ifdef PBGL_PROCESS_GROUP_DEBUG 220 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " 221 << tag << ", bytes = " << header.bytes << std::endl; 222#endif 223} 224 225template<typename T> 226inline void 227send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 228 int tag, const T& value) 229{ 230 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value, 231 boost::mpi::is_mpi_datatype<T>()); 232} 233 234template<typename T> 235typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type 236send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 237 int tag, const T values[], std::size_t n) 238{ 239 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), 240 boost::serialization::make_array(values,n), 241 boost::mpl::true_()); 242} 243 244template<typename T> 245typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type 246mpi_process_group:: 247array_send_impl(int dest, int tag, const T values[], std::size_t n) const 248{ 249 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); 250 251 impl::outgoing_messages& outgoing = impl_->outgoing[dest]; 252 253 // Start constructing the message header 254 impl::message_header header; 255 header.source = process_id(*this); 256 header.tag = tag; 257 header.offset = outgoing.buffer.size(); 258 259 // Serialize into the buffer 260 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); 261 out << n; 262 263 for (std::size_t i = 0; i < n; ++i) 264 out << values[i]; 265 266 // Store the header 267 header.bytes = outgoing.buffer.size() - header.offset; 268 outgoing.headers.push_back(header); 269 maybe_send_batch(dest); 270 271#ifdef PBGL_PROCESS_GROUP_DEBUG 272 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " 273 << tag << ", bytes = " << header.bytes << std::endl; 274#endif 275} 276 277template<typename T> 278typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type 279send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 280 int tag, const T values[], std::size_t n) 281{ 282 pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), 283 values, n); 284} 285 286template<typename InputIterator> 287void 288send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, 289 int tag, InputIterator first, InputIterator last) 290{ 291 typedef typename std::iterator_traits<InputIterator>::value_type value_type; 292 std::vector<value_type> values(first, last); 293 if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0); 294 else send(pg, dest, tag, &values[0], values.size()); 295} 296 297template<typename T> 298bool 299mpi_process_group::receive_impl(int source, int tag, T& value, 300 mpl::true_ /*is_mpi_datatype*/) const 301{ 302#ifdef PBGL_PROCESS_GROUP_DEBUG 303 std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = " 304 << tag << std::endl; 305#endif 306 307 impl::incoming_messages& incoming = impl_->incoming[source]; 308 309 // Find the next header with the right tag 310 std::vector<impl::message_header>::iterator header = 311 incoming.next_header[my_block_number()]; 312 while (header != incoming.headers.end() && header->tag != tag) ++header; 313 314 // If no header is found, notify the caller 315 if (header == incoming.headers.end()) return false; 316 317 // Unpack the data 318 if (header->bytes > 0) { 319 boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, 320 archive::no_header, header->offset); 321 ia >> value; 322 } 323 324 // Mark this message as received 325 header->tag = -1; 326 327 // Move the "next header" indicator to the next unreceived message 328 while (incoming.next_header[my_block_number()] != incoming.headers.end() 329 && incoming.next_header[my_block_number()]->tag == -1) 330 ++incoming.next_header[my_block_number()]; 331 332 if (incoming.next_header[my_block_number()] == incoming.headers.end()) { 333 bool finished = true; 334 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { 335 if (incoming.next_header[i] != incoming.headers.end()) finished = false; 336 } 337 338 if (finished) { 339 std::vector<impl::message_header> no_headers; 340 incoming.headers.swap(no_headers); 341 buffer_type empty_buffer; 342 incoming.buffer.swap(empty_buffer); 343 for (std::size_t i = 0; i < incoming.next_header.size(); ++i) 344 incoming.next_header[i] = incoming.headers.end(); 345 } 346 } 347 348 return true; 349} 350 351template<typename T> 352bool 353mpi_process_group::receive_impl(int source, int tag, T& value, 354 mpl::false_ /*is_mpi_datatype*/) const 355{ 356 impl::incoming_messages& incoming = impl_->incoming[source]; 357 358 // Find the next header with the right tag 359 std::vector<impl::message_header>::iterator header = 360 incoming.next_header[my_block_number()]; 361 while (header != incoming.headers.end() && header->tag != tag) ++header; 362 363 // If no header is found, notify the caller 364 if (header == incoming.headers.end()) return false; 365 366 // Deserialize the data 367 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, 368 archive::no_header, header->offset); 369 in >> value; 370 371 // Mark this message as received 372 header->tag = -1; 373 374 // Move the "next header" indicator to the next unreceived message 375 while (incoming.next_header[my_block_number()] != incoming.headers.end() 376 && incoming.next_header[my_block_number()]->tag == -1) 377 ++incoming.next_header[my_block_number()]; 378 379 if (incoming.next_header[my_block_number()] == incoming.headers.end()) { 380 bool finished = true; 381 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { 382 if (incoming.next_header[i] != incoming.headers.end()) finished = false; 383 } 384 385 if (finished) { 386 std::vector<impl::message_header> no_headers; 387 incoming.headers.swap(no_headers); 388 buffer_type empty_buffer; 389 incoming.buffer.swap(empty_buffer); 390 for (std::size_t i = 0; i < incoming.next_header.size(); ++i) 391 incoming.next_header[i] = incoming.headers.end(); 392 } 393 } 394 395 return true; 396} 397 398template<typename T> 399typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type 400mpi_process_group:: 401array_receive_impl(int source, int tag, T* values, std::size_t& n) const 402{ 403 impl::incoming_messages& incoming = impl_->incoming[source]; 404 405 // Find the next header with the right tag 406 std::vector<impl::message_header>::iterator header = 407 incoming.next_header[my_block_number()]; 408 while (header != incoming.headers.end() && header->tag != tag) ++header; 409 410 // If no header is found, notify the caller 411 if (header == incoming.headers.end()) return false; 412 413 // Deserialize the data 414 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, 415 archive::no_header, header->offset); 416 std::size_t num_sent; 417 in >> num_sent; 418 if (num_sent > n) 419 std::cerr << "ERROR: Have " << num_sent << " items but only space for " 420 << n << " items\n"; 421 422 for (std::size_t i = 0; i < num_sent; ++i) 423 in >> values[i]; 424 n = num_sent; 425 426 // Mark this message as received 427 header->tag = -1; 428 429 // Move the "next header" indicator to the next unreceived message 430 while (incoming.next_header[my_block_number()] != incoming.headers.end() 431 && incoming.next_header[my_block_number()]->tag == -1) 432 ++incoming.next_header[my_block_number()]; 433 434 if (incoming.next_header[my_block_number()] == incoming.headers.end()) { 435 bool finished = true; 436 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { 437 if (incoming.next_header[i] != incoming.headers.end()) finished = false; 438 } 439 440 if (finished) { 441 std::vector<impl::message_header> no_headers; 442 incoming.headers.swap(no_headers); 443 buffer_type empty_buffer; 444 incoming.buffer.swap(empty_buffer); 445 for (std::size_t i = 0; i < incoming.next_header.size(); ++i) 446 incoming.next_header[i] = incoming.headers.end(); 447 } 448 } 449 450 return true; 451} 452 453// Construct triggers 454template<typename Type, typename Handler> 455void mpi_process_group::trigger(int tag, const Handler& handler) 456{ 457 BOOST_ASSERT(block_num); 458 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( 459 new trigger_launcher<Type, Handler>(*this, tag, handler))); 460} 461 462template<typename Type, typename Handler> 463void mpi_process_group::trigger_with_reply(int tag, const Handler& handler) 464{ 465 BOOST_ASSERT(block_num); 466 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( 467 new reply_trigger_launcher<Type, Handler>(*this, tag, handler))); 468} 469 470template<typename Type, typename Handler> 471void mpi_process_group::global_trigger(int tag, const Handler& handler, 472 std::size_t sz) 473{ 474 if (sz==0) // normal trigger 475 install_trigger(tag,0,shared_ptr<trigger_base>( 476 new global_trigger_launcher<Type, Handler>(*this, tag, handler))); 477 else // trigger with irecv 478 install_trigger(tag,0,shared_ptr<trigger_base>( 479 new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz))); 480 481} 482 483namespace detail { 484 485template<typename Type> 486void do_oob_receive(mpi_process_group const& self, 487 int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) 488{ 489 using boost::mpi::get_mpi_datatype; 490 491 //self.impl_->comm.recv(source,tag,data); 492 MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm, 493 MPI_STATUS_IGNORE); 494} 495 496template<typename Type> 497void do_oob_receive(mpi_process_group const& self, 498 int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) 499{ 500 // self.impl_->comm.recv(source,tag,data); 501 // Receive the size of the data packet 502 boost::mpi::status status; 503 status = self.impl_->comm.probe(source, tag); 504 505#if BOOST_VERSION >= 103600 506 int size = status.count<boost::mpi::packed>().get(); 507#else 508 int size; 509 MPI_Status& mpi_status = status; 510 MPI_Get_count(&mpi_status, MPI_PACKED, &size); 511#endif 512 513 // Receive the data packed itself 514 boost::mpi::packed_iarchive in(self.impl_->comm); 515 in.resize(size); 516 MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm, 517 MPI_STATUS_IGNORE); 518 519 // Deserialize the data 520 in >> data; 521} 522 523template<typename Type> 524void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) 525{ 526 do_oob_receive(self, source, tag, data, 527 boost::mpi::is_mpi_datatype<Type>()); 528} 529 530 531} // namespace detail 532 533 534template<typename Type, typename Handler> 535void 536mpi_process_group::trigger_launcher<Type, Handler>:: 537receive(mpi_process_group const&, int source, int tag, 538 trigger_receive_context context, int block) const 539{ 540#ifdef PBGL_PROCESS_GROUP_DEBUG 541 std::cerr << (out_of_band? "OOB trigger" : "Trigger") 542 << " receive from source " << source << " and tag " << tag 543 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 544#endif 545 546 Type data; 547 548 if (context == trc_out_of_band) { 549 // Receive the message directly off the wire 550 int realtag = self.encode_tag( 551 block == -1 ? self.my_block_number() : block, tag); 552 detail::do_oob_receive(self,source,realtag,data); 553 } 554 else 555 // Receive the message out of the local buffer 556 boost::graph::distributed::receive(self, source, tag, data); 557 558 // Pass the message off to the handler 559 handler(source, tag, data, context); 560} 561 562template<typename Type, typename Handler> 563void 564mpi_process_group::reply_trigger_launcher<Type, Handler>:: 565receive(mpi_process_group const&, int source, int tag, 566 trigger_receive_context context, int block) const 567{ 568#ifdef PBGL_PROCESS_GROUP_DEBUG 569 std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") 570 << " receive from source " << source << " and tag " << tag 571 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 572#endif 573 BOOST_ASSERT(context == trc_out_of_band); 574 575 boost::parallel::detail::untracked_pair<int, Type> data; 576 577 // Receive the message directly off the wire 578 int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block, 579 tag); 580 detail::do_oob_receive(self, source, realtag, data); 581 582 // Pass the message off to the handler and send the result back to 583 // the source. 584 send_oob(self, source, data.first, 585 handler(source, tag, data.second, context), -2); 586} 587 588template<typename Type, typename Handler> 589void 590mpi_process_group::global_trigger_launcher<Type, Handler>:: 591receive(mpi_process_group const& self, int source, int tag, 592 trigger_receive_context context, int block) const 593{ 594#ifdef PBGL_PROCESS_GROUP_DEBUG 595 std::cerr << (out_of_band? "OOB trigger" : "Trigger") 596 << " receive from source " << source << " and tag " << tag 597 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 598#endif 599 600 Type data; 601 602 if (context == trc_out_of_band) { 603 // Receive the message directly off the wire 604 int realtag = self.encode_tag( 605 block == -1 ? self.my_block_number() : block, tag); 606 detail::do_oob_receive(self,source,realtag,data); 607 } 608 else 609 // Receive the message out of the local buffer 610 boost::graph::distributed::receive(self, source, tag, data); 611 612 // Pass the message off to the handler 613 handler(self, source, tag, data, context); 614} 615 616 617template<typename Type, typename Handler> 618void 619mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: 620receive(mpi_process_group const& self, int source, int tag, 621 trigger_receive_context context, int block) const 622{ 623#ifdef PBGL_PROCESS_GROUP_DEBUG 624 std::cerr << (out_of_band? "OOB trigger" : "Trigger") 625 << " receive from source " << source << " and tag " << tag 626 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; 627#endif 628 629 Type data; 630 631 if (context == trc_out_of_band) { 632 return; 633 } 634 BOOST_ASSERT (context == trc_irecv_out_of_band); 635 636 // force posting of new MPI_Irecv, even though buffer is already allocated 637 boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]); 638 ia >> data; 639 // Start a new receive 640 prepare_receive(self,tag,true); 641 // Pass the message off to the handler 642 handler(self, source, tag, data, context); 643} 644 645 646template<typename Type, typename Handler> 647void 648mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: 649prepare_receive(mpi_process_group const& self, int tag, bool force) const 650{ 651#ifdef PBGL_PROCESS_GROUP_DEBUG 652 std::cerr << ("Posting Irecv for trigger") 653 << " receive with tag " << tag << std::endl; 654#endif 655 if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) { 656 self.impl_->buffers[tag].resize(buffer_size); 657 force = true; 658 } 659 BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size); 660 661 //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >); 662 if (force) { 663 self.impl_->requests.push_back(MPI_Request()); 664 MPI_Request* request = &self.impl_->requests.back(); 665 MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size, 666 MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request); 667 } 668} 669 670 671template<typename T> 672inline mpi_process_group::process_id_type 673receive(const mpi_process_group& pg, int tag, T& value) 674{ 675 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { 676 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 677 value, boost::mpi::is_mpi_datatype<T>())) 678 return source; 679 } 680 BOOST_ASSERT (false); 681} 682 683template<typename T> 684typename 685 enable_if<boost::mpi::is_mpi_datatype<T>, 686 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 687receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) 688{ 689 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { 690 bool result = 691 pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 692 boost::serialization::make_array(values,n), 693 boost::mpl::true_()); 694 if (result) 695 return std::make_pair(source, n); 696 } 697 BOOST_ASSERT(false); 698} 699 700template<typename T> 701typename 702 disable_if<boost::mpi::is_mpi_datatype<T>, 703 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 704receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) 705{ 706 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { 707 if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 708 values, n)) 709 return std::make_pair(source, n); 710 } 711 BOOST_ASSERT(false); 712} 713 714template<typename T> 715mpi_process_group::process_id_type 716receive(const mpi_process_group& pg, 717 mpi_process_group::process_id_type source, int tag, T& value) 718{ 719 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 720 value, boost::mpi::is_mpi_datatype<T>())) 721 return source; 722 else { 723 fprintf(stderr, 724 "Process %d failed to receive a message from process %d with tag %d in block %d.\n", 725 process_id(pg), source, tag, pg.my_block_number()); 726 727 BOOST_ASSERT(false); 728 exit(1); 729 } 730} 731 732template<typename T> 733typename 734 enable_if<boost::mpi::is_mpi_datatype<T>, 735 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 736receive(const mpi_process_group& pg, int source, int tag, T values[], 737 std::size_t n) 738{ 739 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 740 boost::serialization::make_array(values,n), 741 boost::mpl::true_())) 742 return std::make_pair(source,n); 743 else { 744 fprintf(stderr, 745 "Process %d failed to receive a message from process %d with tag %d in block %d.\n", 746 process_id(pg), source, tag, pg.my_block_number()); 747 748 BOOST_ASSERT(false); 749 exit(1); 750 } 751} 752 753template<typename T> 754typename 755 disable_if<boost::mpi::is_mpi_datatype<T>, 756 std::pair<mpi_process_group::process_id_type, std::size_t> >::type 757receive(const mpi_process_group& pg, int source, int tag, T values[], 758 std::size_t n) 759{ 760 pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), 761 values, n); 762 763 return std::make_pair(source, n); 764} 765 766template<typename T, typename BinaryOperation> 767T* 768all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, 769 BinaryOperation bin_op) 770{ 771 synchronize(pg); 772 773 bool inplace = first == out; 774 775 if (inplace) out = new T [last-first]; 776 777 boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg), 778 boost::mpi::comm_attach), 779 first, last-first, out, bin_op); 780 781 if (inplace) { 782 std::copy(out, out + (last-first), first); 783 delete [] out; 784 return last; 785 } 786 787 return out; 788} 789 790template<typename T> 791void 792broadcast(const mpi_process_group& pg, T& val, 793 mpi_process_group::process_id_type root) 794{ 795 // broadcast the seed 796 boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach); 797 boost::mpi::broadcast(comm,val,root); 798} 799 800 801template<typename T, typename BinaryOperation> 802T* 803scan(const mpi_process_group& pg, T* first, T* last, T* out, 804 BinaryOperation bin_op) 805{ 806 synchronize(pg); 807 808 bool inplace = first == out; 809 810 if (inplace) out = new T [last-first]; 811 812 boost::mpi::scan(communicator(pg), first, last-first, out, bin_op); 813 814 if (inplace) { 815 std::copy(out, out + (last-first), first); 816 delete [] out; 817 return last; 818 } 819 820 return out; 821} 822 823 824template<typename InputIterator, typename T> 825void 826all_gather(const mpi_process_group& pg, InputIterator first, 827 InputIterator last, std::vector<T>& out) 828{ 829 synchronize(pg); 830 831 // Stick a copy of the local values into a vector, so we can broadcast it 832 std::vector<T> local_values(first, last); 833 834 // Collect the number of vertices stored in each process 835 int size = local_values.size(); 836 std::vector<int> sizes(num_processes(pg)); 837 int result = MPI_Allgather(&size, 1, MPI_INT, 838 &sizes[0], 1, MPI_INT, 839 communicator(pg)); 840 BOOST_ASSERT(result == MPI_SUCCESS); 841 842 // Adjust sizes based on the number of bytes 843 std::transform(sizes.begin(), sizes.end(), sizes.begin(), 844 std::bind2nd(std::multiplies<int>(), sizeof(T))); 845 846 // Compute displacements 847 std::vector<int> displacements; 848 displacements.reserve(sizes.size() + 1); 849 displacements.push_back(0); 850 std::partial_sum(sizes.begin(), sizes.end(), 851 std::back_inserter(displacements)); 852 853 // Gather all of the values 854 out.resize(displacements.back() / sizeof(T)); 855 if (!out.empty()) { 856 result = MPI_Allgatherv(local_values.empty()? (void*)&local_values 857 /* local results */: (void*)&local_values[0], 858 local_values.size() * sizeof(T), 859 MPI_BYTE, 860 &out[0], &sizes[0], &displacements[0], MPI_BYTE, 861 communicator(pg)); 862 } 863 BOOST_ASSERT(result == MPI_SUCCESS); 864} 865 866template<typename InputIterator> 867mpi_process_group 868process_subgroup(const mpi_process_group& pg, 869 InputIterator first, InputIterator last) 870{ 871/* 872 boost::mpi::group current_group = communicator(pg).group(); 873 boost::mpi::group new_group = current_group.include(first,last); 874 boost::mpi::communicator new_comm(communicator(pg),new_group); 875 return mpi_process_group(new_comm); 876*/ 877 std::vector<int> ranks(first, last); 878 879 MPI_Group current_group; 880 int result = MPI_Comm_group(communicator(pg), ¤t_group); 881 BOOST_ASSERT(result == MPI_SUCCESS); 882 883 MPI_Group new_group; 884 result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group); 885 BOOST_ASSERT(result == MPI_SUCCESS); 886 887 MPI_Comm new_comm; 888 result = MPI_Comm_create(communicator(pg), new_group, &new_comm); 889 BOOST_ASSERT(result == MPI_SUCCESS); 890 891 result = MPI_Group_free(&new_group); 892 BOOST_ASSERT(result == MPI_SUCCESS); 893 result = MPI_Group_free(¤t_group); 894 BOOST_ASSERT(result == MPI_SUCCESS); 895 896 if (new_comm != MPI_COMM_NULL) { 897 mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach)); 898 result = MPI_Comm_free(&new_comm); 899 BOOST_ASSERT(result == 0); 900 return result_pg; 901 } else { 902 return mpi_process_group(mpi_process_group::create_empty()); 903 } 904 905} 906 907 908template<typename Receiver> 909Receiver* mpi_process_group::get_receiver() 910{ 911 return impl_->blocks[my_block_number()]->on_receive 912 .template target<Receiver>(); 913} 914 915template<typename T> 916typename enable_if<boost::mpi::is_mpi_datatype<T> >::type 917receive_oob(const mpi_process_group& pg, 918 mpi_process_group::process_id_type source, int tag, T& value, int block) 919{ 920 using boost::mpi::get_mpi_datatype; 921 922 // Determine the actual message we expect to receive, and which 923 // communicator it will come by. 924 std::pair<boost::mpi::communicator, int> actual 925 = pg.actual_communicator_and_tag(tag, block); 926 927 // Post a non-blocking receive that waits until we complete this request. 928 MPI_Request request; 929 MPI_Irecv(&value, 1, get_mpi_datatype<T>(value), 930 source, actual.second, actual.first, &request); 931 932 int done = 0; 933 do { 934 MPI_Test(&request, &done, MPI_STATUS_IGNORE); 935 if (!done) 936 pg.poll(/*wait=*/false, block); 937 } while (!done); 938} 939 940template<typename T> 941typename disable_if<boost::mpi::is_mpi_datatype<T> >::type 942receive_oob(const mpi_process_group& pg, 943 mpi_process_group::process_id_type source, int tag, T& value, int block) 944{ 945 // Determine the actual message we expect to receive, and which 946 // communicator it will come by. 947 std::pair<boost::mpi::communicator, int> actual 948 = pg.actual_communicator_and_tag(tag, block); 949 950 boost::optional<boost::mpi::status> status; 951 do { 952 status = actual.first.iprobe(source, actual.second); 953 if (!status) 954 pg.poll(); 955 } while (!status); 956 957 //actual.first.recv(status->source(), status->tag(),value); 958 959 // Allocate the receive buffer 960 boost::mpi::packed_iarchive in(actual.first); 961 962#if BOOST_VERSION >= 103600 963 in.resize(status->count<boost::mpi::packed>().get()); 964#else 965 int size; 966 MPI_Status mpi_status = *status; 967 MPI_Get_count(&mpi_status, MPI_PACKED, &size); 968 in.resize(size); 969#endif 970 971 // Receive the message data 972 MPI_Recv(in.address(), in.size(), MPI_PACKED, 973 status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE); 974 975 // Unpack the message data 976 in >> value; 977} 978 979 980template<typename SendT, typename ReplyT> 981typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type 982send_oob_with_reply(const mpi_process_group& pg, 983 mpi_process_group::process_id_type dest, 984 int tag, const SendT& send_value, ReplyT& reply_value, 985 int block) 986{ 987 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); 988 send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair( 989 (int)reply_tag, send_value), block); 990 receive_oob(pg, dest, reply_tag, reply_value); 991} 992 993template<typename SendT, typename ReplyT> 994typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type 995send_oob_with_reply(const mpi_process_group& pg, 996 mpi_process_group::process_id_type dest, 997 int tag, const SendT& send_value, ReplyT& reply_value, 998 int block) 999{ 1000 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); 1001 send_oob(pg, dest, tag, 1002 boost::parallel::detail::make_untracked_pair((int)reply_tag, 1003 send_value), block); 1004 receive_oob(pg, dest, reply_tag, reply_value); 1005} 1006 1007} } } // end namespace boost::graph::distributed 1008