1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #include "mpiimpl.h"
7
8 /*
9 * Ring Algorithm:
10 *
11 * In the first step, each process i sends its contribution to process
12 * i+1 and receives the contribution from process i-1 (with
13 * wrap-around). From the second step onwards, each process i
14 * forwards to process i+1 the data it received from process i-1 in
15 * the previous step. This takes a total of p-1 steps.
16 *
17 * Cost = (p-1).alpha + n.((p-1)/p).beta
18 *
19 * This algorithm is preferred to recursive doubling for long messages
20 * because we find that this communication pattern (nearest neighbor)
21 * performs twice as fast as recursive doubling for long messages (on
22 * Myrinet and IBM SP).
23 */
24
MPIR_Allgatherv_intra_ring(const void * sendbuf,int sendcount,MPI_Datatype sendtype,void * recvbuf,const int * recvcounts,const int * displs,MPI_Datatype recvtype,MPIR_Comm * comm_ptr,MPIR_Errflag_t * errflag)25 int MPIR_Allgatherv_intra_ring(const void *sendbuf,
26 int sendcount,
27 MPI_Datatype sendtype,
28 void *recvbuf,
29 const int *recvcounts,
30 const int *displs,
31 MPI_Datatype recvtype,
32 MPIR_Comm * comm_ptr, MPIR_Errflag_t * errflag)
33 {
34 int comm_size, rank, i, left, right;
35 int mpi_errno = MPI_SUCCESS;
36 int mpi_errno_ret = MPI_SUCCESS;
37 MPI_Status status;
38 MPI_Aint recvtype_extent;
39 int total_count;
40
41 comm_size = comm_ptr->local_size;
42 rank = comm_ptr->rank;
43
44 total_count = 0;
45 for (i = 0; i < comm_size; i++)
46 total_count += recvcounts[i];
47
48 if (total_count == 0)
49 goto fn_exit;
50
51 MPIR_Datatype_get_extent_macro(recvtype, recvtype_extent);
52
53 char *sbuf = NULL, *rbuf = NULL;
54 int soffset, roffset;
55 int torecv, tosend, max, chunk_count = 0;
56 int sendnow, recvnow;
57 int sidx, ridx;
58
59 if (sendbuf != MPI_IN_PLACE) {
60 /* First, load the "local" version in the recvbuf. */
61 mpi_errno = MPIR_Localcopy(sendbuf, sendcount, sendtype,
62 ((char *) recvbuf + displs[rank] * recvtype_extent),
63 recvcounts[rank], recvtype);
64 MPIR_ERR_CHECK(mpi_errno);
65 }
66
67 left = (comm_size + rank - 1) % comm_size;
68 right = (rank + 1) % comm_size;
69
70 torecv = total_count - recvcounts[rank];
71 tosend = total_count - recvcounts[right];
72
73 max = recvcounts[0];
74 for (i = 1; i < comm_size; i++)
75 if (max < recvcounts[i])
76 max = recvcounts[i];
77 if (MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE > 0 &&
78 max * recvtype_extent > MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE) {
79 chunk_count = MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE / recvtype_extent;
80 /* Handle the case where the datatype extent is larger than
81 * the pipeline size. */
82 if (!chunk_count)
83 chunk_count = 1;
84 }
85 /* pipeline is disabled */
86 if (!chunk_count)
87 chunk_count = max;
88
89 sidx = rank;
90 ridx = left;
91 soffset = 0;
92 roffset = 0;
93 while (tosend || torecv) { /* While we have data to send or receive */
94 sendnow = ((recvcounts[sidx] - soffset) >
95 chunk_count) ? chunk_count : (recvcounts[sidx] - soffset);
96 recvnow = ((recvcounts[ridx] - roffset) >
97 chunk_count) ? chunk_count : (recvcounts[ridx] - roffset);
98 sbuf = (char *) recvbuf + ((displs[sidx] + soffset) * recvtype_extent);
99 rbuf = (char *) recvbuf + ((displs[ridx] + roffset) * recvtype_extent);
100
101 /* Protect against wrap-around of indices */
102 if (!tosend)
103 sendnow = 0;
104 if (!torecv)
105 recvnow = 0;
106
107 /* Communicate */
108 if (!sendnow && !recvnow) {
109 /* Don't do anything. This case is possible if two
110 * consecutive processes contribute 0 bytes each. */
111 } else if (!sendnow) { /* If there's no data to send, just do a recv call */
112 mpi_errno =
113 MPIC_Recv(rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG, comm_ptr, &status,
114 errflag);
115 if (mpi_errno) {
116 /* for communication errors, just record the error but continue */
117 *errflag =
118 MPIX_ERR_PROC_FAILED ==
119 MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
120 MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
121 MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
122 }
123 torecv -= recvnow;
124 } else if (!recvnow) { /* If there's no data to receive, just do a send call */
125 mpi_errno =
126 MPIC_Send(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG, comm_ptr, errflag);
127 if (mpi_errno) {
128 /* for communication errors, just record the error but continue */
129 *errflag =
130 MPIX_ERR_PROC_FAILED ==
131 MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
132 MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
133 MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
134 }
135 tosend -= sendnow;
136 } else { /* There's data to be sent and received */
137 mpi_errno = MPIC_Sendrecv(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG,
138 rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG,
139 comm_ptr, &status, errflag);
140 if (mpi_errno) {
141 /* for communication errors, just record the error but continue */
142 *errflag =
143 MPIX_ERR_PROC_FAILED ==
144 MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
145 MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
146 MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
147 }
148 tosend -= sendnow;
149 torecv -= recvnow;
150 }
151
152 soffset += sendnow;
153 roffset += recvnow;
154 if (soffset == recvcounts[sidx]) {
155 soffset = 0;
156 sidx = (sidx + comm_size - 1) % comm_size;
157 }
158 if (roffset == recvcounts[ridx]) {
159 roffset = 0;
160 ridx = (ridx + comm_size - 1) % comm_size;
161 }
162 }
163
164 fn_exit:
165 if (mpi_errno_ret)
166 mpi_errno = mpi_errno_ret;
167 else if (*errflag != MPIR_ERR_NONE)
168 MPIR_ERR_SET(mpi_errno, *errflag, "**coll_fail");
169
170 return mpi_errno;
171 fn_fail:
172 goto fn_exit;
173 }
174