1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *
4  *  (C) 2009 by Argonne National Laboratory.
5  *      See COPYRIGHT in top-level directory.
6  */
7 
8 #include "mpiimpl.h"
9 #include "topo.h"
10 
11 /* -- Begin Profiling Symbol Block for routine MPI_Dist_graph_create */
12 #if defined(HAVE_PRAGMA_WEAK)
13 #pragma weak MPI_Dist_graph_create = PMPI_Dist_graph_create
14 #elif defined(HAVE_PRAGMA_HP_SEC_DEF)
15 #pragma _HP_SECONDARY_DEF PMPI_Dist_graph_create  MPI_Dist_graph_create
16 #elif defined(HAVE_PRAGMA_CRI_DUP)
17 #pragma _CRI duplicate MPI_Dist_graph_create as PMPI_Dist_graph_create
18 #endif
19 /* -- End Profiling Symbol Block */
20 
21 /* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
22    the MPI routines */
23 #ifndef MPICH_MPI_FROM_PMPI
24 #undef MPI_Dist_graph_create
25 #define MPI_Dist_graph_create PMPI_Dist_graph_create
26 /* any utility functions should go here, usually prefixed with PMPI_LOCAL to
27  * correctly handle weak symbols and the profiling interface */
28 #endif
29 
30 #undef FUNCNAME
31 #define FUNCNAME MPI_Dist_graph_create
32 #undef FCNAME
33 #define FCNAME MPIU_QUOTE(FUNCNAME)
34 /*@
35 MPI_Dist_graph_create - MPI_DIST_GRAPH_CREATE returns a handle to a new
36 communicator to which the distributed graph topology information is
37 attached.
38 
39 Input Parameters:
40 + comm_old - input communicator (handle)
41 . n - number of source nodes for which this process specifies edges
42   (non-negative integer)
43 . sources - array containing the n source nodes for which this process
44   specifies edges (array of non-negative integers)
45 . degrees - array specifying the number of destinations for each source node
46   in the source node array (array of non-negative integers)
47 . destinations - destination nodes for the source nodes in the source node
48   array (array of non-negative integers)
49 . weights - weights for source to destination edges (array of non-negative
50   integers or MPI_UNWEIGHTED)
51 . info - hints on optimization and interpretation of weights (handle)
52 - reorder - the process may be reordered (true) or not (false) (logical)
53 
54 Output Parameter:
55 . comm_dist_graph - communicator with distributed graph topology added (handle)
56 
57 .N ThreadSafe
58 
59 .N Fortran
60 
61 .N Errors
62 .N MPI_SUCCESS
63 .N MPI_ERR_ARG
64 .N MPI_ERR_OTHER
65 @*/
MPI_Dist_graph_create(MPI_Comm comm_old,int n,MPICH2_CONST int sources[],MPICH2_CONST int degrees[],MPICH2_CONST int destinations[],MPICH2_CONST int weights[],MPI_Info info,int reorder,MPI_Comm * comm_dist_graph)66 int MPI_Dist_graph_create(MPI_Comm comm_old, int n, MPICH2_CONST int sources[],
67                           MPICH2_CONST int degrees[], MPICH2_CONST int destinations[],
68                           MPICH2_CONST int weights[],
69                           MPI_Info info, int reorder, MPI_Comm *comm_dist_graph)
70 {
71     int mpi_errno = MPI_SUCCESS;
72     MPID_Comm *comm_ptr = NULL;
73     MPID_Comm *comm_dist_graph_ptr = NULL;
74     MPI_Request *reqs = NULL;
75     MPIR_Topology *topo_ptr = NULL;
76     MPIR_Dist_graph_topology *dist_graph_ptr = NULL;
77     int i;
78     int j;
79     int idx;
80     int comm_size = 0;
81     int in_capacity;
82     int out_capacity;
83     int **rout = NULL;
84     int **rin = NULL;
85     int *rin_sizes;
86     int *rout_sizes;
87     int *rin_idx;
88     int *rout_idx;
89     int *rs;
90     int in_out_peers[2] = {-1, -1};
91     int errflag = FALSE;
92     MPIU_CHKLMEM_DECL(9);
93     MPIU_CHKPMEM_DECL(1);
94     MPID_MPI_STATE_DECL(MPID_STATE_MPI_DIST_GRAPH_CREATE);
95 
96     MPIR_ERRTEST_INITIALIZED_ORDIE();
97 
98     MPIU_THREAD_CS_ENTER(ALLFUNC,);
99     MPID_MPI_FUNC_ENTER(MPID_STATE_MPI_DIST_GRAPH_CREATE);
100 
101     /* Validate parameters, especially handles needing to be converted */
102 #   ifdef HAVE_ERROR_CHECKING
103     {
104         MPID_BEGIN_ERROR_CHECKS;
105         {
106             MPIR_ERRTEST_COMM(comm_old, mpi_errno);
107             MPIR_ERRTEST_INFO_OR_NULL(info, mpi_errno);
108             if (mpi_errno != MPI_SUCCESS) goto fn_fail;
109         }
110         MPID_END_ERROR_CHECKS;
111     }
112 #   endif
113 
114     /* Convert MPI object handles to object pointers */
115     MPID_Comm_get_ptr(comm_old, comm_ptr);
116 
117     /* Validate parameters and objects (post conversion) */
118 #   ifdef HAVE_ERROR_CHECKING
119     {
120         MPID_BEGIN_ERROR_CHECKS;
121         {
122             /* Validate comm_ptr */
123             MPID_Comm_valid_ptr(comm_ptr, mpi_errno);
124             /* If comm_ptr is not valid, it will be reset to null */
125             if (comm_ptr) {
126                 MPIR_ERRTEST_COMM_INTRA(comm_ptr, mpi_errno);
127             }
128 
129             MPIR_ERRTEST_ARGNEG(n, "n", mpi_errno);
130             if (n > 0) {
131                 int have_degrees = 0;
132                 MPIR_ERRTEST_ARGNULL(sources, "sources", mpi_errno);
133                 MPIR_ERRTEST_ARGNULL(degrees, "degrees", mpi_errno);
134                 for (i = 0; i < n; ++i) {
135                     if (degrees[i]) {
136                         have_degrees = 1;
137                         break;
138                     }
139                 }
140                 if (have_degrees) {
141                     MPIR_ERRTEST_ARGNULL(destinations, "destinations", mpi_errno);
142                     if (weights != MPI_UNWEIGHTED)
143                         MPIR_ERRTEST_ARGNULL(weights, "weights", mpi_errno);
144                 }
145             }
146 
147             if (mpi_errno != MPI_SUCCESS) goto fn_fail;
148         }
149         MPID_END_ERROR_CHECKS;
150     }
151 #   endif /* HAVE_ERROR_CHECKING */
152 
153 
154     /* ... body of routine ...  */
155     /* Implementation based on Torsten Hoefler's reference implementation
156      * attached to MPI-2.2 ticket #33. */
157     *comm_dist_graph = MPI_COMM_NULL;
158 
159     comm_size = comm_ptr->local_size;
160 
161     /* following the spirit of the old topo interface, attributes do not
162      * propagate to the new communicator (see MPI-2.1 pp. 243 line 11) */
163     mpi_errno = MPIR_Comm_copy(comm_ptr, comm_size, &comm_dist_graph_ptr);
164     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
165     MPIU_Assert(comm_dist_graph_ptr != NULL);
166 
167     /* rin is an array of size comm_size containing pointers to arrays of
168      * rin_sizes[x].  rin[x] is locally known number of edges into this process
169      * from rank x.
170      *
171      * rout is an array of comm_size containing pointers to arrays of
172      * rout_sizes[x].  rout[x] is the locally known number of edges out of this
173      * process to rank x. */
174     MPIU_CHKLMEM_MALLOC(rout,       int **, comm_size*sizeof(int*), mpi_errno, "rout");
175     MPIU_CHKLMEM_MALLOC(rin,        int **, comm_size*sizeof(int*), mpi_errno, "rin");
176     MPIU_CHKLMEM_MALLOC(rin_sizes,  int *, comm_size*sizeof(int), mpi_errno, "rin_sizes");
177     MPIU_CHKLMEM_MALLOC(rout_sizes, int *, comm_size*sizeof(int), mpi_errno, "rout_sizes");
178     MPIU_CHKLMEM_MALLOC(rin_idx,    int *, comm_size*sizeof(int), mpi_errno, "rin_idx");
179     MPIU_CHKLMEM_MALLOC(rout_idx,   int *, comm_size*sizeof(int), mpi_errno, "rout_idx");
180 
181     memset(rout,       0, comm_size*sizeof(int*));
182     memset(rin,        0, comm_size*sizeof(int*));
183     memset(rin_sizes,  0, comm_size*sizeof(int));
184     memset(rout_sizes, 0, comm_size*sizeof(int));
185     memset(rin_idx,    0, comm_size*sizeof(int));
186     memset(rout_idx,   0, comm_size*sizeof(int));
187 
188     /* compute array sizes */
189     idx = 0;
190     for (i = 0; i < n; ++i) {
191         MPIU_Assert(sources[i] < comm_size);
192         for (j = 0; j < degrees[i]; ++j) {
193             MPIU_Assert(destinations[idx] < comm_size);
194             /* rout_sizes[i] is twice as long as the number of edges to be
195              * sent to rank i by this process */
196             rout_sizes[sources[i]] += 2;
197             rin_sizes[destinations[idx]] += 2;
198             ++idx;
199         }
200     }
201 
202     /* allocate arrays */
203     for (i = 0; i < comm_size; ++i) {
204         /* can't use CHKLMEM macros b/c we are in a loop */
205         if (rin_sizes[i]) {
206             rin[i] = MPIU_Malloc(rin_sizes[i] * sizeof(int));
207         }
208         if (rout_sizes[i]) {
209             rout[i] = MPIU_Malloc(rout_sizes[i] * sizeof(int));
210         }
211     }
212 
213     /* populate arrays */
214     idx = 0;
215     for (i = 0; i < n; ++i) {
216         /* TODO add this assert as proper error checking above */
217         int s_rank = sources[i];
218         MPIU_Assert(s_rank < comm_size);
219         MPIU_Assert(s_rank >= 0);
220 
221         for (j = 0; j < degrees[i]; ++j) {
222             int d_rank = destinations[idx];
223             int weight = (weights == MPI_UNWEIGHTED ? 0 : weights[idx]);
224             /* TODO add this assert as proper error checking above */
225             MPIU_Assert(d_rank < comm_size);
226             MPIU_Assert(d_rank >= 0);
227 
228             /* XXX DJG what about self-edges? do we need to drop one of these
229              * cases when there is a self-edge to avoid double-counting? */
230 
231             /* rout[s][2*x] is the value of d for the j'th edge between (s,d)
232              * with weight rout[s][2*x+1], where x is the current end of the
233              * outgoing edge list for s.  x==(rout_idx[s]/2) */
234             rout[s_rank][rout_idx[s_rank]++] = d_rank;
235             rout[s_rank][rout_idx[s_rank]++] = weight;
236 
237             /* rin[d][2*x] is the value of s for the j'th edge between (s,d)
238              * with weight rout[d][2*x+1], where x is the current end of the
239              * incoming edge list for d.  x==(rin_idx[d]/2) */
240             rin[d_rank][rin_idx[d_rank]++] = s_rank;
241             rin[d_rank][rin_idx[d_rank]++] = weight;
242 
243             ++idx;
244         }
245     }
246 
247     for (i = 0; i < comm_size; ++i) {
248         /* sanity check that all arrays are fully populated*/
249         MPIU_Assert(rin_idx[i] == rin_sizes[i]);
250         MPIU_Assert(rout_idx[i] == rout_sizes[i]);
251     }
252 
253     MPIU_CHKLMEM_MALLOC(rs, int *, 2*comm_size*sizeof(int), mpi_errno, "red-scat source buffer");
254     for (i = 0; i < comm_size; ++i) {
255         rs[2*i]   = (rin_sizes[i]  ? 1 : 0);
256         rs[2*i+1] = (rout_sizes[i] ? 1 : 0);
257     }
258 
259     /* compute the number of peers I will recv from */
260     mpi_errno = MPIR_Reduce_scatter_block_impl(rs, in_out_peers, 2, MPI_INT, MPI_SUM, comm_ptr, &errflag);
261     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
262     MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
263 
264     MPIU_Assert(in_out_peers[0] <= comm_size && in_out_peers[0] >= 0);
265     MPIU_Assert(in_out_peers[1] <= comm_size && in_out_peers[1] >= 0);
266 
267     idx = 0;
268     /* must be 2*comm_size requests because we will possibly send inbound and
269      * outbound edges to everyone in our communicator */
270     MPIU_CHKLMEM_MALLOC(reqs, MPI_Request *, 2*comm_size*sizeof(MPI_Request), mpi_errno, "temp request array");
271     for (i = 0; i < comm_size; ++i) {
272         if (rin_sizes[i]) {
273             /* send edges where i is a destination to process i */
274             mpi_errno = MPIC_Isend(&rin[i][0], rin_sizes[i], MPI_INT, i, MPIR_TOPO_A_TAG, comm_old, &reqs[idx++]);
275             if (mpi_errno) MPIU_ERR_POP(mpi_errno);
276         }
277         if (rout_sizes[i]) {
278             /* send edges where i is a source to process i */
279             mpi_errno = MPIC_Isend(&rout[i][0], rout_sizes[i], MPI_INT, i, MPIR_TOPO_B_TAG, comm_old, &reqs[idx++]);
280             if (mpi_errno) MPIU_ERR_POP(mpi_errno);
281         }
282     }
283     MPIU_Assert(idx <= (2 * comm_size));
284 
285     /* Create the topology structure */
286     MPIU_CHKPMEM_MALLOC(topo_ptr, MPIR_Topology *, sizeof(MPIR_Topology), mpi_errno, "topo_ptr");
287     topo_ptr->kind = MPI_DIST_GRAPH;
288     dist_graph_ptr = &topo_ptr->topo.dist_graph;
289     dist_graph_ptr->indegree = 0;
290     dist_graph_ptr->in = NULL;
291     dist_graph_ptr->in_weights = NULL;
292     dist_graph_ptr->outdegree = 0;
293     dist_graph_ptr->out = NULL;
294     dist_graph_ptr->out_weights = NULL;
295     dist_graph_ptr->is_weighted = (weights != MPI_UNWEIGHTED);
296 
297     /* can't use CHKPMEM macros for this b/c we need to realloc */
298     in_capacity = 10; /* arbitrary */
299     dist_graph_ptr->in = MPIU_Malloc(in_capacity*sizeof(int));
300     if (dist_graph_ptr->is_weighted)
301         dist_graph_ptr->in_weights = MPIU_Malloc(in_capacity*sizeof(int));
302     out_capacity = 10; /* arbitrary */
303     dist_graph_ptr->out = MPIU_Malloc(out_capacity*sizeof(int));
304     if (dist_graph_ptr->is_weighted)
305         dist_graph_ptr->out_weights = MPIU_Malloc(out_capacity*sizeof(int));
306 
307     for (i = 0; i < in_out_peers[0]; ++i) {
308         MPI_Status status;
309         int count;
310         int *buf;
311         /* receive inbound edges */
312         mpi_errno = MPIC_Probe(MPI_ANY_SOURCE, MPIR_TOPO_A_TAG, comm_old, &status);
313         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
314         MPIR_Get_count_impl(&status, MPI_INT, &count);
315         /* can't use CHKLMEM macros b/c we are in a loop */
316         buf = MPIU_Malloc(count*sizeof(int));
317         MPIU_ERR_CHKANDJUMP(!buf, mpi_errno, MPIR_ERR_RECOVERABLE, "**nomem");
318 
319         mpi_errno = MPIC_Recv(buf, count, MPI_INT, MPI_ANY_SOURCE, MPIR_TOPO_A_TAG, comm_old, MPI_STATUS_IGNORE);
320         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
321 
322         for (j = 0; j < count/2; ++j) {
323             int deg = dist_graph_ptr->indegree++;
324             if (deg >= in_capacity) {
325                 in_capacity *= 2;
326                 MPIU_REALLOC_ORJUMP(dist_graph_ptr->in, in_capacity*sizeof(int), mpi_errno);
327                 if (dist_graph_ptr->is_weighted)
328                     MPIU_REALLOC_ORJUMP(dist_graph_ptr->in_weights, in_capacity*sizeof(int), mpi_errno);
329             }
330             dist_graph_ptr->in[deg] = buf[2*j];
331             if (dist_graph_ptr->is_weighted)
332                 dist_graph_ptr->in_weights[deg] = buf[2*j+1];
333         }
334         MPIU_Free(buf);
335     }
336 
337     for (i = 0; i < in_out_peers[1]; ++i) {
338         MPI_Status status;
339         int count;
340         int *buf;
341         /* receive outbound edges */
342         mpi_errno = MPIC_Probe(MPI_ANY_SOURCE, MPIR_TOPO_B_TAG, comm_old, &status);
343         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
344         MPIR_Get_count_impl(&status, MPI_INT, &count);
345         /* can't use CHKLMEM macros b/c we are in a loop */
346         buf = MPIU_Malloc(count*sizeof(int));
347         MPIU_ERR_CHKANDJUMP(!buf, mpi_errno, MPIR_ERR_RECOVERABLE, "**nomem");
348 
349         mpi_errno = MPIC_Recv(buf, count, MPI_INT, MPI_ANY_SOURCE, MPIR_TOPO_B_TAG, comm_old, MPI_STATUS_IGNORE);
350         if (mpi_errno) MPIU_ERR_POP(mpi_errno);
351 
352         for (j = 0; j < count/2; ++j) {
353             int deg = dist_graph_ptr->outdegree++;
354             if (deg >= out_capacity) {
355                 out_capacity *= 2;
356                 MPIU_REALLOC_ORJUMP(dist_graph_ptr->out, out_capacity*sizeof(int), mpi_errno);
357                 if (dist_graph_ptr->is_weighted)
358                     MPIU_REALLOC_ORJUMP(dist_graph_ptr->out_weights, out_capacity*sizeof(int), mpi_errno);
359             }
360             dist_graph_ptr->out[deg] = buf[2*j];
361             if (dist_graph_ptr->is_weighted)
362                 dist_graph_ptr->out_weights[deg] = buf[2*j+1];
363         }
364         MPIU_Free(buf);
365     }
366 
367     mpi_errno = MPIR_Waitall_impl(idx, reqs, MPI_STATUSES_IGNORE);
368     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
369 
370     /* remove any excess memory allocation */
371     MPIU_REALLOC_ORJUMP(dist_graph_ptr->in, dist_graph_ptr->indegree*sizeof(int), mpi_errno);
372     MPIU_REALLOC_ORJUMP(dist_graph_ptr->out, dist_graph_ptr->outdegree*sizeof(int), mpi_errno);
373     if (dist_graph_ptr->is_weighted) {
374         MPIU_REALLOC_ORJUMP(dist_graph_ptr->in_weights, dist_graph_ptr->indegree*sizeof(int), mpi_errno);
375         MPIU_REALLOC_ORJUMP(dist_graph_ptr->out_weights, dist_graph_ptr->outdegree*sizeof(int), mpi_errno);
376     }
377 
378     mpi_errno = MPIR_Topology_put(comm_dist_graph_ptr, topo_ptr);
379     if (mpi_errno) MPIU_ERR_POP(mpi_errno);
380 
381     MPIU_CHKPMEM_COMMIT();
382 
383     MPIU_OBJ_PUBLISH_HANDLE(*comm_dist_graph, comm_dist_graph_ptr->handle);
384 
385     /* ... end of body of routine ... */
386 
387   fn_exit:
388     for (i = 0; i < comm_size; ++i) {
389         if (rin[i])
390             MPIU_Free(rin[i]);
391         if (rout[i])
392             MPIU_Free(rout[i]);
393     }
394 
395     MPIU_CHKLMEM_FREEALL();
396 
397     MPID_MPI_FUNC_EXIT(MPID_STATE_MPI_DIST_GRAPH_CREATE);
398     MPIU_THREAD_CS_EXIT(ALLFUNC,);
399     return mpi_errno;
400 
401     /* --BEGIN ERROR HANDLING-- */
402   fn_fail:
403     if (dist_graph_ptr && dist_graph_ptr->in)
404         MPIU_Free(dist_graph_ptr->in);
405     if (dist_graph_ptr && dist_graph_ptr->in_weights)
406         MPIU_Free(dist_graph_ptr->in_weights);
407     if (dist_graph_ptr && dist_graph_ptr->out)
408         MPIU_Free(dist_graph_ptr->out);
409     if (dist_graph_ptr && dist_graph_ptr->out_weights)
410         MPIU_Free(dist_graph_ptr->out_weights);
411     MPIU_CHKPMEM_REAP();
412 #ifdef HAVE_ERROR_CHECKING
413     mpi_errno = MPIR_Err_create_code(
414         mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER,
415         "**mpi_dist_graph_create", "**mpi_dist_graph_create %C %d %p %p %p %p %I %d %p",
416         comm_old, n, sources, degrees, destinations, weights, info, reorder, comm_dist_graph);
417 #endif
418     mpi_errno = MPIR_Err_return_comm(comm_ptr, FCNAME, mpi_errno);
419     goto fn_exit;
420     /* --END ERROR HANDLING-- */
421 }
422 
423