1 /*
2 * $COPYRIGHT$
3 *
4 * Additional copyrights may follow
5 *
6 * $HEADER$
7 */
8
9 #include "ompi_config.h"
10 #include "coll_spacc.h"
11
12 #include "mpi.h"
13 #include "ompi/constants.h"
14 #include "opal/util/bit_ops.h"
15 #include "ompi/datatype/ompi_datatype.h"
16 #include "ompi/communicator/communicator.h"
17 #include "ompi/mca/coll/coll.h"
18 #include "ompi/mca/coll/base/coll_base_functions.h"
19 #include "ompi/mca/coll/base/coll_tags.h"
20 #include "ompi/mca/coll/base/coll_base_util.h"
21 #include "ompi/mca/pml/pml.h"
22 #include "ompi/op/op.h"
23
24 /*
25 * mca_coll_spacc_allreduce_intra_redscat_gather
26 *
27 * Function: Allreduce using Rabenseifner's algorithm.
28 * Accepts: Same arguments as MPI_Allreduce
29 * Returns: MPI_SUCCESS or error code
30 *
31 * Description: an implementation of Rabenseifner's allreduce algorithm [1, 2].
32 * [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
33 * Optimization of Collective Communication Operations in MPICH //
34 * The Int. Journal of High Performance Computing Applications. Vol 19,
35 * Issue 1, pp. 49--66.
36 * [2] http://www.hlrs.de/mpi/myreduce.html.
37 *
38 * This algorithm is a combination of a reduce-scatter implemented with
39 * recursive vector halving and recursive distance doubling, followed either
40 * by an allgather implemented with recursive doubling [1].
41 *
42 * Step 1. If the number of processes is not a power of two, reduce it to
43 * the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
44 * by removing r = p - p' extra processes as follows. In the first 2r processes
45 * (ranks 0 to 2r - 1), all the even ranks send the second half of the input
46 * vector to their right neighbor (rank + 1), and all the odd ranks send
47 * the first half of the input vector to their left neighbor (rank - 1).
48 * The even ranks compute the reduction on the first half of the vector and
49 * the odd ranks compute the reduction on the second half. The odd ranks then
50 * send the result to their left neighbors (the even ranks). As a result,
51 * the even ranks among the first 2r processes now contain the reduction with
52 * the input vector on their right neighbors (the odd ranks). These odd ranks
53 * do not participate in the rest of the algorithm, which leaves behind
54 * a power-of-two number of processes. The first r even-ranked processes and
55 * the last p - 2r processes are now renumbered from 0 to p' - 1.
56 *
57 * Step 2. The remaining processes now perform a reduce-scatter by using
58 * recursive vector halving and recursive distance doubling. The even-ranked
59 * processes send the second half of their buffer to rank + 1 and the odd-ranked
60 * processes send the first half of their buffer to rank - 1. All processes
61 * then compute the reduction between the local buffer and the received buffer.
62 * In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
63 * distance is doubled. At the end, each of the p' processes has 1 / p' of the
64 * total reduction result.
65 *
66 * Step 3. An allgather is performed by using recursive vector doubling and
67 * distance halving. All exchanges are executed in reverse order relative
68 * to recursive doubling on previous step. If the number of processes is not
69 * a power of two, the total result vector must be sent to the r processes
70 * that were removed in the first step.
71 *
72 * Limitations:
73 * count >= 2^{\floor{\log_2 p}}
74 * commutative operations only
75 * intra-communicators only
76 *
77 * Memory requirements (per process):
78 * count * typesize + 4 * log_2(p) * sizeof(int) = O(count)
79 */
mca_coll_spacc_allreduce_intra_redscat_allgather(const void * sbuf,void * rbuf,int count,struct ompi_datatype_t * dtype,struct ompi_op_t * op,struct ompi_communicator_t * comm,mca_coll_base_module_t * module)80 int mca_coll_spacc_allreduce_intra_redscat_allgather(
81 const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
82 struct ompi_op_t *op, struct ompi_communicator_t *comm,
83 mca_coll_base_module_t *module)
84 {
85 int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
86
87 int comm_size = ompi_comm_size(comm);
88 int rank = ompi_comm_rank(comm);
89
90 opal_output_verbose(30, mca_coll_spacc_stream,
91 "coll:spacc:allreduce_intra_redscat_allgather: rank %d/%d",
92 rank, comm_size);
93
94 /* Find nearest power-of-two less than or equal to comm_size */
95 int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
96 assert(nsteps >= 0);
97 int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
98
99 if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
100 opal_output_verbose(20, mca_coll_spacc_stream,
101 "coll:spacc:allreduce_intra_redscat_allgather: rank %d/%d count %d switching to base allreduce",
102 rank, comm_size, count);
103 return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
104 op, comm, module);
105 }
106
107 int err = MPI_SUCCESS;
108
109 ptrdiff_t lb, extent, dsize, gap = 0;
110 ompi_datatype_get_extent(dtype, &lb, &extent);
111 dsize = opal_datatype_span(&dtype->super, count, &gap);
112
113 /* Temporary buffer for receiving messages */
114 char *tmp_buf = NULL;
115 char *tmp_buf_raw = (char *)malloc(dsize);
116 if (NULL == tmp_buf_raw)
117 return OMPI_ERR_OUT_OF_RESOURCE;
118 tmp_buf = tmp_buf_raw - gap;
119
120 if (sbuf != MPI_IN_PLACE) {
121 /* Copy sbuf to rbuf */
122 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
123 (char *)sbuf);
124 }
125
126 /*
127 * Step 1. Reduce the number of processes to the nearest lower power of two
128 * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
129 * 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
130 * the second half of the input vector to their right neighbor (rank + 1)
131 * and all the odd ranks send the first half of the input vector to their
132 * left neighbor (rank - 1).
133 * 2. All 2r processes compute the reduction on their half.
134 * 3. The odd ranks then send the result to their left neighbors
135 * (the even ranks).
136 *
137 * The even ranks (0 to 2r - 1) now contain the reduction with the input
138 * vector on their right neighbors (the odd ranks). The first r even
139 * processes and the p - 2r last processes are renumbered from
140 * 0 to 2^{\floor{\log_2 p}} - 1.
141 */
142
143 int vrank, step, wsize;
144 int nprocs_rem = comm_size - nprocs_pof2;
145
146 if (rank < 2 * nprocs_rem) {
147 int count_lhalf = count / 2;
148 int count_rhalf = count - count_lhalf;
149
150 if (rank % 2 != 0) {
151 /*
152 * Odd process -- exchange with rank - 1
153 * Send the left half of the input vector to the left neighbor,
154 * Recv the right half of the input vector from the left neighbor
155 */
156 err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
157 MCA_COLL_BASE_TAG_ALLREDUCE,
158 (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
159 count_rhalf, dtype, rank - 1,
160 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
161 MPI_STATUS_IGNORE, rank);
162 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
163
164 /* Reduce on the right half of the buffers (result in rbuf) */
165 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
166 (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
167
168 /* Send the right half to the left neighbor */
169 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
170 count_rhalf, dtype, rank - 1,
171 MCA_COLL_BASE_TAG_ALLREDUCE,
172 MCA_PML_BASE_SEND_STANDARD, comm));
173 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
174
175 /* This process does not pariticipate in recursive doubling phase */
176 vrank = -1;
177
178 } else {
179 /*
180 * Even process -- exchange with rank + 1
181 * Send the right half of the input vector to the right neighbor,
182 * Recv the left half of the input vector from the right neighbor
183 */
184 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
185 count_rhalf, dtype, rank + 1,
186 MCA_COLL_BASE_TAG_ALLREDUCE,
187 tmp_buf, count_lhalf, dtype, rank + 1,
188 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
189 MPI_STATUS_IGNORE, rank);
190 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
191
192 /* Reduce on the right half of the buffers (result in rbuf) */
193 ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
194
195 /* Recv the right half from the right neighbor */
196 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
197 count_rhalf, dtype, rank + 1,
198 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
199 MPI_STATUS_IGNORE));
200 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
201
202 vrank = rank / 2;
203 }
204 } else { /* rank >= 2 * nprocs_rem */
205 vrank = rank - nprocs_rem;
206 }
207
208 /*
209 * Step 2. Reduce-scatter implemented with recursive vector halving and
210 * recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
211 * power-of-two number of processes with new ranks (vrank) and result in rbuf.
212 *
213 * The even-ranked processes send the right half of their buffer to rank + 1
214 * and the odd-ranked processes send the left half of their buffer to
215 * rank - 1. All processes then compute the reduction between the local
216 * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
217 * buffers are recursively halved, and the distance is doubled. At the end,
218 * each of the p' processes has 1 / p' of the total reduction result.
219 */
220 rindex = malloc(sizeof(*rindex) * nsteps);
221 sindex = malloc(sizeof(*sindex) * nsteps);
222 rcount = malloc(sizeof(*rcount) * nsteps);
223 scount = malloc(sizeof(*scount) * nsteps);
224 if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
225 err = OMPI_ERR_OUT_OF_RESOURCE;
226 goto cleanup_and_return;
227 }
228
229 if (vrank != -1) {
230 step = 0;
231 wsize = count;
232 sindex[0] = rindex[0] = 0;
233
234 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
235 /*
236 * On each iteration: rindex[step] = sindex[step] -- begining of the
237 * current window. Length of the current window is storded in wsize.
238 */
239 int vdest = vrank ^ mask;
240 /* Translate vdest virtual rank to real rank */
241 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
242
243 if (rank < dest) {
244 /*
245 * Recv into the left half of the current window, send the right
246 * half of the window to the peer (perform reduce on the left
247 * half of the current window)
248 */
249 rcount[step] = wsize / 2;
250 scount[step] = wsize - rcount[step];
251 sindex[step] = rindex[step] + rcount[step];
252 } else {
253 /*
254 * Recv into the right half of the current window, send the left
255 * half of the window to the peer (perform reduce on the right
256 * half of the current window)
257 */
258 scount[step] = wsize / 2;
259 rcount[step] = wsize - scount[step];
260 rindex[step] = sindex[step] + scount[step];
261 }
262
263 /* Send part of data from the rbuf, recv into the tmp_buf */
264 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
265 scount[step], dtype, dest,
266 MCA_COLL_BASE_TAG_ALLREDUCE,
267 (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
268 rcount[step], dtype, dest,
269 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
270 MPI_STATUS_IGNORE, rank);
271 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
272
273 /* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
274 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
275 (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
276 rcount[step], dtype);
277
278 /* Move the current window to the received message */
279 if (step + 1 < nsteps) {
280 rindex[step + 1] = rindex[step];
281 sindex[step + 1] = rindex[step];
282 wsize = rcount[step];
283 step++;
284 }
285 }
286 /*
287 * Assertion: each process has 1 / p' of the total reduction result:
288 * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
289 */
290
291 /*
292 * Step 3. Allgather by the recursive doubling algorithm.
293 * Each process has 1 / p' of the total reduction result:
294 * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
295 * All exchanges are executed in reverse order relative
296 * to recursive doubling (previous step).
297 */
298
299 step--;
300
301 for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
302 int vdest = vrank ^ mask;
303 /* Translate vdest virtual rank to real rank */
304 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
305
306 /*
307 * Send rcount[step] elements from rbuf[rindex[step]...]
308 * Recv scount[step] elements to rbuf[sindex[step]...]
309 */
310 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
311 rcount[step], dtype, dest,
312 MCA_COLL_BASE_TAG_ALLREDUCE,
313 (char *)rbuf + (ptrdiff_t)sindex[step] * extent,
314 scount[step], dtype, dest,
315 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
316 MPI_STATUS_IGNORE, rank);
317 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
318 step--;
319 }
320 }
321
322 /*
323 * Step 4. Send total result to excluded odd ranks.
324 */
325 if (rank < 2 * nprocs_rem) {
326 if (rank % 2 != 0) {
327 /* Odd process -- recv result from rank - 1 */
328 err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
329 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
330 MPI_STATUS_IGNORE));
331 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
332
333 } else {
334 /* Even process -- send result to rank + 1 */
335 err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
336 MCA_COLL_BASE_TAG_ALLREDUCE,
337 MCA_PML_BASE_SEND_STANDARD, comm));
338 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
339 }
340 }
341
342 cleanup_and_return:
343 if (NULL != tmp_buf_raw)
344 free(tmp_buf_raw);
345 if (NULL != rindex)
346 free(rindex);
347 if (NULL != sindex)
348 free(sindex);
349 if (NULL != rcount)
350 free(rcount);
351 if (NULL != scount)
352 free(scount);
353
354 return err;
355 }
356