1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #include "mpiimpl.h"
7 
8 /*
9  * Ring Algorithm:
10  *
11  * In the first step, each process i sends its contribution to process
12  * i+1 and receives the contribution from process i-1 (with
13  * wrap-around).  From the second step onwards, each process i
14  * forwards to process i+1 the data it received from process i-1 in
15  * the previous step.  This takes a total of p-1 steps.
16  *
17  * Cost = (p-1).alpha + n.((p-1)/p).beta
18  *
19  * This algorithm is preferred to recursive doubling for long messages
20  * because we find that this communication pattern (nearest neighbor)
21  * performs twice as fast as recursive doubling for long messages (on
22  * Myrinet and IBM SP).
23  */
24 
MPIR_Allgatherv_intra_ring(const void * sendbuf,int sendcount,MPI_Datatype sendtype,void * recvbuf,const int * recvcounts,const int * displs,MPI_Datatype recvtype,MPIR_Comm * comm_ptr,MPIR_Errflag_t * errflag)25 int MPIR_Allgatherv_intra_ring(const void *sendbuf,
26                                int sendcount,
27                                MPI_Datatype sendtype,
28                                void *recvbuf,
29                                const int *recvcounts,
30                                const int *displs,
31                                MPI_Datatype recvtype,
32                                MPIR_Comm * comm_ptr, MPIR_Errflag_t * errflag)
33 {
34     int comm_size, rank, i, left, right;
35     int mpi_errno = MPI_SUCCESS;
36     int mpi_errno_ret = MPI_SUCCESS;
37     MPI_Status status;
38     MPI_Aint recvtype_extent;
39     int total_count;
40 
41     comm_size = comm_ptr->local_size;
42     rank = comm_ptr->rank;
43 
44     total_count = 0;
45     for (i = 0; i < comm_size; i++)
46         total_count += recvcounts[i];
47 
48     if (total_count == 0)
49         goto fn_exit;
50 
51     MPIR_Datatype_get_extent_macro(recvtype, recvtype_extent);
52 
53     char *sbuf = NULL, *rbuf = NULL;
54     int soffset, roffset;
55     int torecv, tosend, max, chunk_count = 0;
56     int sendnow, recvnow;
57     int sidx, ridx;
58 
59     if (sendbuf != MPI_IN_PLACE) {
60         /* First, load the "local" version in the recvbuf. */
61         mpi_errno = MPIR_Localcopy(sendbuf, sendcount, sendtype,
62                                    ((char *) recvbuf + displs[rank] * recvtype_extent),
63                                    recvcounts[rank], recvtype);
64         MPIR_ERR_CHECK(mpi_errno);
65     }
66 
67     left = (comm_size + rank - 1) % comm_size;
68     right = (rank + 1) % comm_size;
69 
70     torecv = total_count - recvcounts[rank];
71     tosend = total_count - recvcounts[right];
72 
73     max = recvcounts[0];
74     for (i = 1; i < comm_size; i++)
75         if (max < recvcounts[i])
76             max = recvcounts[i];
77     if (MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE > 0 &&
78         max * recvtype_extent > MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE) {
79         chunk_count = MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE / recvtype_extent;
80         /* Handle the case where the datatype extent is larger than
81          * the pipeline size. */
82         if (!chunk_count)
83             chunk_count = 1;
84     }
85     /* pipeline is disabled */
86     if (!chunk_count)
87         chunk_count = max;
88 
89     sidx = rank;
90     ridx = left;
91     soffset = 0;
92     roffset = 0;
93     while (tosend || torecv) {  /* While we have data to send or receive */
94         sendnow = ((recvcounts[sidx] - soffset) >
95                    chunk_count) ? chunk_count : (recvcounts[sidx] - soffset);
96         recvnow = ((recvcounts[ridx] - roffset) >
97                    chunk_count) ? chunk_count : (recvcounts[ridx] - roffset);
98         sbuf = (char *) recvbuf + ((displs[sidx] + soffset) * recvtype_extent);
99         rbuf = (char *) recvbuf + ((displs[ridx] + roffset) * recvtype_extent);
100 
101         /* Protect against wrap-around of indices */
102         if (!tosend)
103             sendnow = 0;
104         if (!torecv)
105             recvnow = 0;
106 
107         /* Communicate */
108         if (!sendnow && !recvnow) {
109             /* Don't do anything. This case is possible if two
110              * consecutive processes contribute 0 bytes each. */
111         } else if (!sendnow) {  /* If there's no data to send, just do a recv call */
112             mpi_errno =
113                 MPIC_Recv(rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG, comm_ptr, &status,
114                           errflag);
115             if (mpi_errno) {
116                 /* for communication errors, just record the error but continue */
117                 *errflag =
118                     MPIX_ERR_PROC_FAILED ==
119                     MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
120                 MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
121                 MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
122             }
123             torecv -= recvnow;
124         } else if (!recvnow) {  /* If there's no data to receive, just do a send call */
125             mpi_errno =
126                 MPIC_Send(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG, comm_ptr, errflag);
127             if (mpi_errno) {
128                 /* for communication errors, just record the error but continue */
129                 *errflag =
130                     MPIX_ERR_PROC_FAILED ==
131                     MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
132                 MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
133                 MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
134             }
135             tosend -= sendnow;
136         } else {        /* There's data to be sent and received */
137             mpi_errno = MPIC_Sendrecv(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG,
138                                       rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG,
139                                       comm_ptr, &status, errflag);
140             if (mpi_errno) {
141                 /* for communication errors, just record the error but continue */
142                 *errflag =
143                     MPIX_ERR_PROC_FAILED ==
144                     MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
145                 MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
146                 MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
147             }
148             tosend -= sendnow;
149             torecv -= recvnow;
150         }
151 
152         soffset += sendnow;
153         roffset += recvnow;
154         if (soffset == recvcounts[sidx]) {
155             soffset = 0;
156             sidx = (sidx + comm_size - 1) % comm_size;
157         }
158         if (roffset == recvcounts[ridx]) {
159             roffset = 0;
160             ridx = (ridx + comm_size - 1) % comm_size;
161         }
162     }
163 
164   fn_exit:
165     if (mpi_errno_ret)
166         mpi_errno = mpi_errno_ret;
167     else if (*errflag != MPIR_ERR_NONE)
168         MPIR_ERR_SET(mpi_errno, *errflag, "**coll_fail");
169 
170     return mpi_errno;
171   fn_fail:
172     goto fn_exit;
173 }
174