1 // Copyright (C) 2005-2006 Matthias Troyer
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 // An example of a parallel Monte Carlo simulation using some nodes to produce
8 // data and others to aggregate the data
9 #include <iostream>
10 
11 #include <boost/mpi.hpp>
12 #include <boost/random/parallel.hpp>
13 #include <boost/random.hpp>
14 #include <boost/foreach.hpp>
15 #include <iostream>
16 #include <cstdlib>
17 
18 namespace mpi = boost::mpi;
19 
20 enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};
21 
22 
calculate_samples(int sample_length)23 void calculate_samples(int sample_length)
24 {
25   int num_samples = 100;
26   std::vector<double> sample(sample_length);
27 
28   // setup communicator by splitting
29 
30   mpi::communicator world;
31   mpi::communicator calculate_communicator = world.split(0);
32 
33   unsigned int num_calculate_ranks = calculate_communicator.size();
34 
35   // the master of the accumulaion ranks is the first of them, hence
36   // with a rank just one after the last calculation rank
37   int master_accumulate_rank = num_calculate_ranks;
38 
39   // the master of the calculation ranks sends the skeleton of the sample
40   // to the master of the accumulation ranks
41 
42   if (world.rank()==0)
43     world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
44 
45   // next we extract the content of the sample vector, to be used in sending
46   // the content later on
47 
48   mpi::content sample_content = mpi::get_content(sample);
49 
50   // now intialize the parallel random number generator
51 
52   boost::lcg64 engine(
53         boost::random::stream_number = calculate_communicator.rank(),
54         boost::random::total_streams = calculate_communicator.size()
55       );
56 
57   boost::variate_generator<boost::lcg64&,boost::uniform_real<> >
58     rng(engine,boost::uniform_real<>());
59 
60   for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
61 
62     // calculate sample by filling the vector with random numbers
63     // note that std::generate will not work since it takes the generator
64     // by value, and boost::ref cannot be used as a generator.
65     // boost::ref should be fixed so that it can be used as generator
66 
67     BOOST_FOREACH(double& x, sample)
68       x = rng();
69 
70     // send sample to accumulation ranks
71     // Ideally we want to do this as a broadcast with an inter-communicator
72     // between the calculation and accumulation ranks. MPI2 should support
73     // this, but here we present an MPI1 compatible solution.
74 
75     // send content of sample to first (master) accumulation process
76 
77     world.send(master_accumulate_rank,sample_tag,sample_content);
78 
79     // gather some results from all calculation ranks
80 
81     double local_result = sample[0];
82     std::vector<double> gathered_results(calculate_communicator.size());
83     mpi::all_gather(calculate_communicator,local_result,gathered_results);
84   }
85 
86   // we are done: the master tells the accumulation ranks to quit
87   if (world.rank()==0)
88     world.send(master_accumulate_rank,quit_tag);
89 }
90 
91 
92 
accumulate_samples()93 void accumulate_samples()
94 {
95   std::vector<double> sample;
96 
97   // setup the communicator for all accumulation ranks by splitting
98 
99   mpi::communicator world;
100   mpi::communicator accumulate_communicator = world.split(1);
101 
102   bool is_master_accumulate_rank = accumulate_communicator.rank()==0;
103 
104   // the master receives the sample skeleton
105 
106   if (is_master_accumulate_rank)
107     world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
108 
109   // and broadcasts it to all accumulation ranks
110   mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
111 
112   // next we extract the content of the sample vector, to be used in receiving
113   // the content later on
114 
115   mpi::content sample_content = mpi::get_content(sample);
116 
117   // accumulate until quit is called
118   double sum=0.;
119   while (true) {
120 
121 
122     // the accumulation master checks whether we should quit
123     if (world.iprobe(0,quit_tag)) {
124       world.recv(0,quit_tag);
125       for (int i=1; i<accumulate_communicator.size();++i)
126         accumulate_communicator.send(i,quit_tag);
127       std::cout << sum << "\n";
128       break; // We're done
129     }
130 
131     // the otehr accumulation ranks check whether we should quit
132     if (accumulate_communicator.iprobe(0,quit_tag)) {
133       accumulate_communicator.recv(0,quit_tag);
134       std::cout << sum << "\n";
135       break; // We're done
136     }
137 
138     // check whether the master accumulation rank has received a sample
139     if (world.iprobe(mpi::any_source,sample_tag)) {
140       BOOST_ASSERT(is_master_accumulate_rank);
141 
142       // receive the content
143       world.recv(mpi::any_source,sample_tag,sample_content);
144 
145       // now we need to braodcast
146       // the problam is we do not have a non-blocking broadcast that we could
147       // abort if we receive a quit message from the master. We thus need to
148       // first tell all accumulation ranks to start a broadcast. If the sample
149       // is small, we could just send the sample in this message, but here we
150       // optimize the code for large samples, so that the overhead of these
151       // sends can be ignored, and we count on an optimized broadcast
152       // implementation with O(log N) complexity
153 
154       for (int i=1; i<accumulate_communicator.size();++i)
155         accumulate_communicator.send(i,sample_broadcast_tag);
156 
157       // now broadcast the contents of the sample to all accumulate ranks
158       mpi::broadcast(accumulate_communicator,sample_content,0);
159 
160       // and handle the sample by summing the appropriate value
161       sum += sample[0];
162     }
163 
164     // the other accumulation ranks wait for a mesage to start the broadcast
165     if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
166       BOOST_ASSERT(!is_master_accumulate_rank);
167 
168       accumulate_communicator.recv(0,sample_broadcast_tag);
169 
170       // receive broadcast of the sample contents
171       mpi::broadcast(accumulate_communicator,sample_content,0);
172 
173       // and handle the sample
174 
175       // and handle the sample by summing the appropriate value
176       sum += sample[accumulate_communicator.rank()];
177     }
178   }
179 }
180 
main(int argc,char ** argv)181 int main(int argc, char** argv)
182 {
183   mpi::environment env(argc, argv);
184   mpi::communicator world;
185 
186   // half of the processes generate, the others accumulate
187   // the sample size is just the number of accumulation ranks
188   if (world.rank() < world.size()/2)
189     calculate_samples(world.size()-world.size()/2);
190   else
191     accumulate_samples();
192 
193   return 0;
194 }
195 
196 
197