1 /*
2  * $COPYRIGHT$
3  *
4  * Additional copyrights may follow
5  *
6  * $HEADER$
7  */
8 
9 #include "ompi_config.h"
10 #include "coll_spacc.h"
11 
12 #include "mpi.h"
13 #include "ompi/constants.h"
14 #include "opal/util/bit_ops.h"
15 #include "ompi/datatype/ompi_datatype.h"
16 #include "ompi/communicator/communicator.h"
17 #include "ompi/mca/coll/coll.h"
18 #include "ompi/mca/coll/base/coll_base_functions.h"
19 #include "ompi/mca/coll/base/coll_tags.h"
20 #include "ompi/mca/coll/base/coll_base_util.h"
21 #include "ompi/mca/pml/pml.h"
22 #include "ompi/op/op.h"
23 
24 /*
25  * mca_coll_spacc_allreduce_intra_redscat_gather
26  *
27  * Function:  Allreduce using Rabenseifner's algorithm.
28  * Accepts:   Same arguments as MPI_Allreduce
29  * Returns:   MPI_SUCCESS or error code
30  *
31  * Description: an implementation of Rabenseifner's allreduce algorithm [1, 2].
32  *   [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
33  *       Optimization of Collective Communication Operations in MPICH //
34  *       The Int. Journal of High Performance Computing Applications. Vol 19,
35  *       Issue 1, pp. 49--66.
36  *   [2] http://www.hlrs.de/mpi/myreduce.html.
37  *
38  * This algorithm is a combination of a reduce-scatter implemented with
39  * recursive vector halving and recursive distance doubling, followed either
40  * by an allgather implemented with recursive doubling [1].
41  *
42  * Step 1. If the number of processes is not a power of two, reduce it to
43  * the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
44  * by removing r = p - p' extra processes as follows. In the first 2r processes
45  * (ranks 0 to 2r - 1), all the even ranks send the second half of the input
46  * vector to their right neighbor (rank + 1), and all the odd ranks send
47  * the first half of the input vector to their left neighbor (rank - 1).
48  * The even ranks compute the reduction on the first half of the vector and
49  * the odd ranks compute the reduction on the second half. The odd ranks then
50  * send the result to their left neighbors (the even ranks). As a result,
51  * the even ranks among the first 2r processes now contain the reduction with
52  * the input vector on their right neighbors (the odd ranks). These odd ranks
53  * do not participate in the rest of the algorithm, which leaves behind
54  * a power-of-two number of processes. The first r even-ranked processes and
55  * the last p - 2r processes are now renumbered from 0 to p' - 1.
56  *
57  * Step 2. The remaining processes now perform a reduce-scatter by using
58  * recursive vector halving and recursive distance doubling. The even-ranked
59  * processes send the second half of their buffer to rank + 1 and the odd-ranked
60  * processes send the first half of their buffer to rank - 1. All processes
61  * then compute the reduction between the local buffer and the received buffer.
62  * In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
63  * distance is doubled. At the end, each of the p' processes has 1 / p' of the
64  * total reduction result.
65  *
66  * Step 3. An allgather is performed by using recursive vector doubling and
67  * distance halving. All exchanges are executed in reverse order relative
68  * to recursive doubling on previous step. If the number of processes is not
69  * a power of two, the total result vector must be sent to the r processes
70  * that were removed in the first step.
71  *
72  * Limitations:
73  *   count >= 2^{\floor{\log_2 p}}
74  *   commutative operations only
75  *   intra-communicators only
76  *
77  * Memory requirements (per process):
78  *   count * typesize + 4 * log_2(p) * sizeof(int) = O(count)
79  */
mca_coll_spacc_allreduce_intra_redscat_allgather(const void * sbuf,void * rbuf,int count,struct ompi_datatype_t * dtype,struct ompi_op_t * op,struct ompi_communicator_t * comm,mca_coll_base_module_t * module)80 int mca_coll_spacc_allreduce_intra_redscat_allgather(
81     const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
82     struct ompi_op_t *op, struct ompi_communicator_t *comm,
83     mca_coll_base_module_t *module)
84 {
85     int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
86 
87     int comm_size = ompi_comm_size(comm);
88     int rank = ompi_comm_rank(comm);
89 
90     opal_output_verbose(30, mca_coll_spacc_stream,
91                         "coll:spacc:allreduce_intra_redscat_allgather: rank %d/%d",
92                         rank, comm_size);
93 
94     /* Find nearest power-of-two less than or equal to comm_size */
95     int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1);   /* ilog2(comm_size) */
96     assert(nsteps >= 0);
97     int nprocs_pof2 = 1 << nsteps;                              /* flp2(comm_size) */
98 
99     if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
100         opal_output_verbose(20, mca_coll_spacc_stream,
101                             "coll:spacc:allreduce_intra_redscat_allgather: rank %d/%d count %d switching to base allreduce",
102                             rank, comm_size, count);
103         return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
104                                                            op, comm, module);
105     }
106 
107     int err = MPI_SUCCESS;
108 
109     ptrdiff_t lb, extent, dsize, gap = 0;
110     ompi_datatype_get_extent(dtype, &lb, &extent);
111     dsize = opal_datatype_span(&dtype->super, count, &gap);
112 
113     /* Temporary buffer for receiving messages */
114     char *tmp_buf = NULL;
115     char *tmp_buf_raw = (char *)malloc(dsize);
116     if (NULL == tmp_buf_raw)
117         return OMPI_ERR_OUT_OF_RESOURCE;
118     tmp_buf = tmp_buf_raw - gap;
119 
120     if (sbuf != MPI_IN_PLACE) {
121         /* Copy sbuf to rbuf */
122         err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
123                                                   (char *)sbuf);
124     }
125 
126     /*
127      * Step 1. Reduce the number of processes to the nearest lower power of two
128      * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
129      * 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
130      *    the second half of the input vector to their right neighbor (rank + 1)
131      *    and all the odd ranks send the first half of the input vector to their
132      *    left neighbor (rank - 1).
133      * 2. All 2r processes compute the reduction on their half.
134      * 3. The odd ranks then send the result to their left neighbors
135      *    (the even ranks).
136      *
137      * The even ranks (0 to 2r - 1) now contain the reduction with the input
138      * vector on their right neighbors (the odd ranks). The first r even
139      * processes and the p - 2r last processes are renumbered from
140      * 0 to 2^{\floor{\log_2 p}} - 1.
141      */
142 
143     int vrank, step, wsize;
144     int nprocs_rem = comm_size - nprocs_pof2;
145 
146     if (rank < 2 * nprocs_rem) {
147         int count_lhalf = count / 2;
148         int count_rhalf = count - count_lhalf;
149 
150         if (rank % 2 != 0) {
151             /*
152              * Odd process -- exchange with rank - 1
153              * Send the left half of the input vector to the left neighbor,
154              * Recv the right half of the input vector from the left neighbor
155              */
156             err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
157                                           MCA_COLL_BASE_TAG_ALLREDUCE,
158                                           (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
159                                           count_rhalf, dtype, rank - 1,
160                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
161                                           MPI_STATUS_IGNORE, rank);
162             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
163 
164             /* Reduce on the right half of the buffers (result in rbuf) */
165             ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
166                            (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
167 
168             /* Send the right half to the left neighbor */
169             err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
170                                     count_rhalf, dtype, rank - 1,
171                                     MCA_COLL_BASE_TAG_ALLREDUCE,
172                                     MCA_PML_BASE_SEND_STANDARD, comm));
173             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
174 
175             /* This process does not pariticipate in recursive doubling phase */
176             vrank = -1;
177 
178         } else {
179             /*
180              * Even process -- exchange with rank + 1
181              * Send the right half of the input vector to the right neighbor,
182              * Recv the left half of the input vector from the right neighbor
183              */
184             err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
185                                           count_rhalf, dtype, rank + 1,
186                                           MCA_COLL_BASE_TAG_ALLREDUCE,
187                                           tmp_buf, count_lhalf, dtype, rank + 1,
188                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
189                                           MPI_STATUS_IGNORE, rank);
190             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
191 
192             /* Reduce on the right half of the buffers (result in rbuf) */
193             ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
194 
195             /* Recv the right half from the right neighbor */
196             err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
197                                     count_rhalf, dtype, rank + 1,
198                                     MCA_COLL_BASE_TAG_ALLREDUCE, comm,
199                                     MPI_STATUS_IGNORE));
200             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
201 
202             vrank = rank / 2;
203         }
204     } else { /* rank >= 2 * nprocs_rem */
205         vrank = rank - nprocs_rem;
206     }
207 
208     /*
209      * Step 2. Reduce-scatter implemented with recursive vector halving and
210      * recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
211      * power-of-two number of processes with new ranks (vrank) and result in rbuf.
212      *
213      * The even-ranked processes send the right half of their buffer to rank + 1
214      * and the odd-ranked processes send the left half of their buffer to
215      * rank - 1. All processes then compute the reduction between the local
216      * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
217      * buffers are recursively halved, and the distance is doubled. At the end,
218      * each of the p' processes has 1 / p' of the total reduction result.
219      */
220     rindex = malloc(sizeof(*rindex) * nsteps);
221     sindex = malloc(sizeof(*sindex) * nsteps);
222     rcount = malloc(sizeof(*rcount) * nsteps);
223     scount = malloc(sizeof(*scount) * nsteps);
224     if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
225         err = OMPI_ERR_OUT_OF_RESOURCE;
226         goto cleanup_and_return;
227     }
228 
229     if (vrank != -1) {
230         step = 0;
231         wsize = count;
232         sindex[0] = rindex[0] = 0;
233 
234         for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
235             /*
236              * On each iteration: rindex[step] = sindex[step] -- begining of the
237              * current window. Length of the current window is storded in wsize.
238              */
239             int vdest = vrank ^ mask;
240             /* Translate vdest virtual rank to real rank */
241             int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
242 
243             if (rank < dest) {
244                 /*
245                  * Recv into the left half of the current window, send the right
246                  * half of the window to the peer (perform reduce on the left
247                  * half of the current window)
248                  */
249                 rcount[step] = wsize / 2;
250                 scount[step] = wsize - rcount[step];
251                 sindex[step] = rindex[step] + rcount[step];
252             } else {
253                 /*
254                  * Recv into the right half of the current window, send the left
255                  * half of the window to the peer (perform reduce on the right
256                  * half of the current window)
257                  */
258                 scount[step] = wsize / 2;
259                 rcount[step] = wsize - scount[step];
260                 rindex[step] = sindex[step] + scount[step];
261             }
262 
263             /* Send part of data from the rbuf, recv into the tmp_buf */
264             err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
265                                           scount[step], dtype, dest,
266                                           MCA_COLL_BASE_TAG_ALLREDUCE,
267                                           (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
268                                           rcount[step], dtype, dest,
269                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
270                                           MPI_STATUS_IGNORE, rank);
271             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
272 
273             /* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
274             ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
275                            (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
276                            rcount[step], dtype);
277 
278             /* Move the current window to the received message */
279             if (step + 1 < nsteps) {
280                 rindex[step + 1] = rindex[step];
281                 sindex[step + 1] = rindex[step];
282                 wsize = rcount[step];
283                 step++;
284             }
285         }
286         /*
287          * Assertion: each process has 1 / p' of the total reduction result:
288          * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
289          */
290 
291         /*
292          * Step 3. Allgather by the recursive doubling algorithm.
293          * Each process has 1 / p' of the total reduction result:
294          * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
295          * All exchanges are executed in reverse order relative
296          * to recursive doubling (previous step).
297          */
298 
299         step--;
300 
301         for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
302             int vdest = vrank ^ mask;
303             /* Translate vdest virtual rank to real rank */
304             int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
305 
306             /*
307              * Send rcount[step] elements from rbuf[rindex[step]...]
308              * Recv scount[step] elements to rbuf[sindex[step]...]
309              */
310             err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
311                                           rcount[step], dtype, dest,
312                                           MCA_COLL_BASE_TAG_ALLREDUCE,
313                                           (char *)rbuf + (ptrdiff_t)sindex[step] * extent,
314                                           scount[step], dtype, dest,
315                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
316                                           MPI_STATUS_IGNORE, rank);
317             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
318             step--;
319         }
320     }
321 
322     /*
323      * Step 4. Send total result to excluded odd ranks.
324      */
325     if (rank < 2 * nprocs_rem) {
326         if (rank % 2 != 0) {
327             /* Odd process -- recv result from rank - 1 */
328             err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
329                                     MCA_COLL_BASE_TAG_ALLREDUCE, comm,
330                                     MPI_STATUS_IGNORE));
331             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
332 
333         } else {
334             /* Even process -- send result to rank + 1 */
335             err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
336                                     MCA_COLL_BASE_TAG_ALLREDUCE,
337                                     MCA_PML_BASE_SEND_STANDARD, comm));
338             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
339         }
340     }
341 
342   cleanup_and_return:
343     if (NULL != tmp_buf_raw)
344         free(tmp_buf_raw);
345     if (NULL != rindex)
346         free(rindex);
347     if (NULL != sindex)
348         free(sindex);
349     if (NULL != rcount)
350         free(rcount);
351     if (NULL != scount)
352         free(scount);
353 
354     return err;
355 }
356