1 /*
2  *  Copyright (C) 2013, Northwestern University and Argonne National Laboratory
3  *  See COPYRIGHT notice in top-level directory.
4  */
5 /* $Id: test_subfile.c 2744 2016-12-28 16:25:22Z wkliao $ */
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <libgen.h> /* basename() */
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <dirent.h>
13 #include <mpi.h>
14 #include <pnetcdf.h>
15 
16 #include <testutils.h>
17 
18 #define MAXLINE 128
19 
20 /* The file name is taken as a command-line argument. */
21 
22 /* Measures the I/O bandwidth for writing/reading a 3D
23    block-distributed array to a file corresponding to the global array
24    in row-major (C) order.
25    Note that the file access pattern is noncontiguous.
26 
27    Array size 128^3. For other array sizes, change array_of_gsizes below.*/
28 #define TEST_HANDLE_ERR(status)                         \
29         if ((status) != NC_NOERR) {                     \
30             printf("Error at line %d (%s)\n", __LINE__, \
31                    ncmpi_strerror((status)) );          \
32             nerrs++;                                     \
33         }                                               \
34 
main(int argc,char ** argv)35 int main(int argc, char **argv)
36 {
37     int opt, verbose=0;
38     extern char *optarg;
39     extern int optind;
40     int i, j, array_of_gsizes[3];
41     int nprocs, len, **buf, rank;
42     MPI_Offset bufcount;
43     int array_of_psizes[3];
44     int status;
45     MPI_Offset array_of_starts[3];
46     char *fbasename = NULL, *fbasename1 = NULL, filename[256];
47     char dimname[20], varname[20];
48     int ncid, dimids0[3], rank_dim[3], *varid=NULL;
49     MPI_Info info=MPI_INFO_NULL, info_used=MPI_INFO_NULL;
50     MPI_Offset **starts_list, **count_list;
51     MPI_Offset *bufcount_list;
52     int ndims=3, nvars=1, ngatts, unlimdimid;
53     MPI_Datatype *datatype_list;
54     int length = 128; /* 8MB per proc */
55     double stim, write_tim, new_write_tim, write_bw;
56     double read_tim, new_read_tim, read_bw;
57     double open_tim, new_open_tim;
58     double close_tim, new_close_tim;
59     int num_sf = 2;
60     int par_dim_id = 0; /* default is 0 */
61     int do_read = 0;
62     int nerrs=0;
63 
64     MPI_Init(&argc,&argv);
65     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
66     MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
67 
68     /* process 0 takes the file name as a command-line argument and
69        broadcasts it to other processes */
70     if (!rank) {
71 	while ((opt = getopt(argc, argv, "f:s:rp:n:l:")) != EOF) {
72 	    switch (opt) {
73 	    case 'f': fbasename = optarg;
74 		break;
75 	    case 's': num_sf = (int)strtol(optarg,NULL,10);
76 		break;
77 	    case 'r': do_read = 1;
78 		break;
79             case 'p': par_dim_id = (int)strtol(optarg,NULL,10);
80                 break;
81             case 'n': nvars = (int)strtol(optarg,NULL,10);
82                 break;
83             case 'l': length = (int)strtol(optarg,NULL,10);
84                 break;
85 	    default:
86 		break;
87 	    }
88 	}
89 	if (fbasename == NULL) {
90 	    fprintf(stderr, "\n*#  Usage: test_subfile -f pathname -s num_sf -p par_dim_id \n\n");
91 	    MPI_Abort(MPI_COMM_WORLD, 1);
92 	}
93 
94 	fbasename1 = (char *) malloc (MAXLINE);
95 	sprintf(fbasename1, "%s", fbasename);
96 	len = strlen(fbasename1);
97 	MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
98 	MPI_Bcast(fbasename, len+1, MPI_CHAR, 0, MPI_COMM_WORLD);
99         MPI_Bcast(&num_sf, 1, MPI_INT, 0, MPI_COMM_WORLD);
100         MPI_Bcast(&par_dim_id, 1, MPI_INT, 0, MPI_COMM_WORLD);
101         MPI_Bcast(&nvars, 1, MPI_INT, 0, MPI_COMM_WORLD);
102         MPI_Bcast(&do_read, 1, MPI_INT, 0, MPI_COMM_WORLD);
103         MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
104     }
105     else {
106 	fbasename1 = (char *) malloc (MAXLINE);
107 	MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
108 	MPI_Bcast(fbasename1, len+1, MPI_CHAR, 0, MPI_COMM_WORLD);
109         MPI_Bcast(&num_sf, 1, MPI_INT, 0, MPI_COMM_WORLD);
110         MPI_Bcast(&par_dim_id, 1, MPI_INT, 0, MPI_COMM_WORLD);
111         MPI_Bcast(&nvars, 1, MPI_INT, 0, MPI_COMM_WORLD);
112         MPI_Bcast(&do_read, 1, MPI_INT, 0, MPI_COMM_WORLD);
113         MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
114     }
115 
116     if (rank == 0) {
117         char *cmd_str = (char*)malloc(strlen(argv[0]) + 256);
118         sprintf(cmd_str, "*** TESTING C   %s for subfiling", basename(argv[0]));
119         printf("%-66s ------ ", cmd_str);
120         free(cmd_str);
121     }
122 
123     array_of_gsizes[0] = array_of_gsizes[1] = array_of_gsizes[2] = length;
124 
125     buf = (int **)malloc(nvars*sizeof(int*));
126     if (buf == NULL){
127         printf("buf malloc error\n");
128         return 0;
129     }
130     bufcount_list = (MPI_Offset *)malloc(nvars*sizeof(MPI_Offset));
131     if (bufcount_list == NULL){
132         printf("bufcount_list malloc error\n");
133         return 0;
134     }
135     starts_list = (MPI_Offset **)malloc(nvars*sizeof(MPI_Offset *));
136     if (starts_list== NULL){
137         printf("starts_list malloc error\n");
138         return 0;
139     }
140     count_list = (MPI_Offset **)malloc(nvars*sizeof(MPI_Offset *));
141     if (count_list == NULL){
142         printf("count_list malloc error\n");
143         return 0;
144     }
145     datatype_list = (MPI_Datatype*)malloc(nvars*sizeof(MPI_Datatype));
146     if (datatype_list == NULL){
147         printf("count_list malloc error\n");
148         return 0;
149     }
150 
151     for (i=0; i<nvars; i++) {
152 	starts_list[i] = (MPI_Offset *)malloc(ndims*sizeof(MPI_Offset));
153 	if (starts_list[i] == NULL){
154 	    printf("starts_list[%d] malloc error\n", i);
155 	    return 0;
156 	}
157 	count_list[i] = (MPI_Offset *)malloc(ndims*sizeof(MPI_Offset));
158 	if (count_list[i] == NULL){
159 	    printf("count_list[%d] malloc error\n", i);
160 	    return 0;
161 	}
162     }
163 
164     bufcount = 1;
165     for (i=0; i<ndims; i++) {
166         array_of_psizes[i] = 0;
167         bufcount *= length;
168     }
169     MPI_Dims_create(nprocs, ndims, array_of_psizes);
170     if (verbose)
171         for(i=0; i<ndims&&rank==0; i++)
172 	    printf("array_of_psizes[%d]=%d\n", i, array_of_psizes[i]);
173 
174     /* subarray in each process is len x len x len */
175     for (i=0; i<ndims; i++)
176         array_of_gsizes[i] = length * array_of_psizes[i];
177 
178     /* mynd's process rank in each dimension (in MPI_ORDER_C) */
179     rank_dim[2] =  rank %  array_of_psizes[2];
180     rank_dim[1] = (rank /  array_of_psizes[2]) % array_of_psizes[1];
181     rank_dim[0] =  rank / (array_of_psizes[2]  * array_of_psizes[1]);
182 
183     /* starting coordinates of the subarray in each dimension */
184     for (i=0; i<ndims; i++)
185         array_of_starts[i] = length * rank_dim[i];
186 
187     for (i=0; i<nvars; i++) {
188 	for (j=0; j<ndims; j++) {
189 	    starts_list[i][j] = array_of_starts[j];
190 	    count_list[i][j]  = length;
191 	}
192         bufcount_list[i] = bufcount;
193         datatype_list[i] = MPI_INT;
194     }
195 
196     for (i=0; i<nvars; i++) {
197 	buf[i] = (int *) malloc(bufcount * sizeof(int));
198 	if (buf[i] == NULL){
199 	    printf("buf[i]malloc error\n");
200 	    return 0;
201 	}
202 
203 	for (j=0; j<bufcount; j++)
204 	    buf[i][j]=rank+1;
205     }
206 
207     MPI_Info_create(&info);
208     /* set all non-record variable to be subfiled */
209     char tmp[10];
210     sprintf(tmp, "%d", num_sf);
211     MPI_Info_set(info, "nc_num_subfiles", tmp);
212 
213     sprintf(filename, "%s.%d.%d.%d.nc", fbasename1, length, 1, 0);
214 
215     if (do_read == 1) goto read;
216 
217     stim = MPI_Wtime();
218     status = ncmpi_create(MPI_COMM_WORLD, filename, NC_CLOBBER|NC_64BIT_DATA,
219                           info, &ncid);
220     TEST_HANDLE_ERR(status)
221 
222     open_tim = MPI_Wtime() - stim;
223 
224     MPI_Allreduce(&open_tim, &new_open_tim, 1, MPI_DOUBLE, MPI_MAX,
225                   MPI_COMM_WORLD);
226     if (verbose && rank == 0)
227         printf("create time = %f sec\n", new_open_tim);
228 
229     /* define dimensions */
230     for (i=0; i<ndims; i++){
231         sprintf(dimname, "dim0_%d", i);
232         status = ncmpi_def_dim(ncid, dimname, array_of_gsizes[i], &dimids0[i]);
233         TEST_HANDLE_ERR(status)
234     }
235 
236     /* define variables */
237     varid = (int *)malloc(nvars*sizeof(int));
238     for (i=0; i<nvars; i++) {
239 	sprintf(varname, "var0_%d", i);
240 	status = ncmpi_def_var(ncid, varname, NC_INT, ndims, dimids0, &varid[i]);
241 	TEST_HANDLE_ERR(status)
242     }
243 
244     if (par_dim_id != 0) {
245         for (i=0; i<nvars; i++) {
246             status = ncmpi_put_att_int(ncid, varid[i], "par_dim_id",
247                                        NC_INT, 1, &dimids0[par_dim_id]);
248 	    TEST_HANDLE_ERR(status)
249         }
250     }
251 
252     /* set all non-record variable to be subfiled */
253     /*
254     MPI_Info_set(info, "nc_num_subfiles", "2");
255     status = ncmpi_set_var_info(ncid, varid, info);
256     TEST_HANDLE_ERR(status);
257     */
258 
259     status = ncmpi_enddef(ncid);
260     TEST_HANDLE_ERR(status)
261 
262     /* test ncmpi_inq_var() */
263     for (i=0; i<nvars; i++) {
264         char name[128];
265         nc_type typep;
266         int ndimsp, dimids[3], nattsp;
267 
268         status = ncmpi_inq_var(ncid, varid[i], name, &typep, &ndimsp, dimids,
269                                &nattsp);
270         TEST_HANDLE_ERR(status)
271 
272 	sprintf(varname, "var0_%d", i);
273         if (strcmp(name, varname)) {
274             printf("Error: unexpected var[%d] name %s, should be %s\n",i,name,varname);
275             nerrs++;
276             continue;
277         }
278         if (typep != NC_INT) {
279             printf("Error: unexpected var[%d] type %d, should be %d\n",i,typep,NC_INT);
280             nerrs++;
281             continue;
282         }
283         if (ndimsp != ndims) {
284             printf("Error: unexpected var[%d] ndims %d, should be %d\n",i,ndimsp,ndims);
285             nerrs++;
286             continue;
287         }
288         for (j=0; j<ndims; j++) {
289             if (dimids[j] != dimids0[j]) {
290                 printf("Error: unexpected var[%d] dimids[%d] %d, should be %d\n",i,j,dimids0[j],dimids[j]);
291                 nerrs++;
292                 continue;
293             }
294         }
295         /*
296         printf("var[%d] %s has %d attributes\n",i,name,nattsp);
297         */
298     }
299 
300 #if 0
301     if (rank == 0)
302         printf("*** Testing to write 1 non-record variable by using ncmpi_put_vara_all() ...");
303 #endif
304     stim = MPI_Wtime();
305     for (i=0; i<nvars; i++) {
306         status = ncmpi_put_vara_all(ncid, varid[i],
307                                     starts_list[i], count_list[i],
308                                     buf[i],
309                                     bufcount_list[i], MPI_INT);
310         TEST_HANDLE_ERR(status)
311     }
312     write_tim = MPI_Wtime() - stim;
313 
314     MPI_Allreduce(&write_tim, &new_write_tim, 1, MPI_DOUBLE, MPI_MAX,
315                   MPI_COMM_WORLD);
316 
317     if (verbose && rank == 0) {
318         write_bw = ((double)array_of_gsizes[0]*(double)array_of_gsizes[1]*(double)array_of_gsizes[2]*(double)sizeof(int)*(double)nvars)/(new_write_tim*1024.0*1024.0);
319         printf("Global array size %d x %d x %d integers\n", array_of_gsizes[0], array_of_gsizes[1], array_of_gsizes[2]);
320         printf("Collective write time = %f sec, Collective write bandwidth = %f Mbytes/sec\n", new_write_tim, write_bw);
321     }
322 
323     status = ncmpi_inq_file_info(ncid, &info_used);
324     TEST_HANDLE_ERR(status)
325 
326     stim = MPI_Wtime();
327     status = ncmpi_close(ncid);
328     TEST_HANDLE_ERR(status)
329     close_tim = MPI_Wtime() - stim;
330 
331     MPI_Allreduce(&close_tim, &new_close_tim, 1, MPI_DOUBLE, MPI_MAX,
332                   MPI_COMM_WORLD);
333 
334     if (verbose && rank == 0) {
335         fprintf(stderr, "close time = %f sec\n", new_close_tim);
336     }
337 
338     goto end;
339 
340 read:
341     status = ncmpi_open(MPI_COMM_WORLD, filename, NC_NOWRITE, MPI_INFO_NULL, &ncid);
342     TEST_HANDLE_ERR(status)
343 
344     stim = MPI_Wtime();
345 
346     /**
347      * Inquire the dataset definitions of input dataset AND
348      * Add dataset definitions for output dataset.
349      */
350 
351     status = ncmpi_inq(ncid, &ndims, &nvars, &ngatts, &unlimdimid);
352     TEST_HANDLE_ERR(status)
353 
354     for (i=0; i<nvars; i++) {
355         status = ncmpi_get_vara_all(ncid, i,
356                                     starts_list[i], count_list[i],
357                                     buf[i], bufcount_list[i], MPI_INT);
358         TEST_HANDLE_ERR(status)
359     }
360     read_tim = MPI_Wtime() - stim;
361 
362     MPI_Allreduce(&read_tim, &new_read_tim, 1, MPI_DOUBLE, MPI_MAX,
363                   MPI_COMM_WORLD);
364 
365     if (verbose && rank == 0) {
366         read_bw = ((double)array_of_gsizes[0]*(double)array_of_gsizes[1]*(double)array_of_gsizes[2]*sizeof(int)*(double)nvars)/(new_read_tim*1024.0*1024.0);
367         printf("Collective read time = %f sec, Collective read bandwidth = %f Mbytes/sec\n", new_read_tim, read_bw);
368     }
369 
370     status = ncmpi_inq_file_info(ncid, &info_used);
371     TEST_HANDLE_ERR(status)
372 
373     status = ncmpi_close(ncid);
374     TEST_HANDLE_ERR(status)
375 
376 end:
377     if (info      != MPI_INFO_NULL) MPI_Info_free(&info);
378     if (info_used != MPI_INFO_NULL) MPI_Info_free(&info_used);
379 
380     for (i=0; i<nvars; i++){
381         free(buf[i]);
382         free(starts_list[i]);
383         free(count_list[i]);
384     }
385     free(buf);
386     free(bufcount_list);
387     free(datatype_list);
388     if (!do_read) free(varid);
389     free(starts_list);
390     free(count_list);
391     free(fbasename1);
392 
393     MPI_Offset malloc_size, sum_size;
394     int err, nfiles, ncids[10];
395 
396     /* check if there are files still left opened */
397     err = ncmpi_inq_files_opened(&nfiles, ncids);
398     TEST_HANDLE_ERR(err)
399     if (nfiles > 0) printf("nfiles %d still opened\n",nfiles);
400 
401     /* check for any PnetCDF internal malloc residues */
402     err = ncmpi_inq_malloc_size(&malloc_size);
403     if (err == NC_NOERR) {
404         MPI_Reduce(&malloc_size, &sum_size, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
405         if (rank == 0 && sum_size > 0)
406             printf("heap memory allocated by PnetCDF internally has %lld bytes yet to be freed\n",
407                    sum_size);
408     }
409 
410     MPI_Allreduce(MPI_IN_PLACE, &nerrs, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
411     if (rank == 0) {
412         if (nerrs) printf(FAIL_STR,nerrs);
413         else       printf(PASS_STR);
414     }
415 
416     MPI_Finalize();
417 
418     return 0;
419 }
420