1 /*
2 * Copyright (C) 2013, Northwestern University and Argonne National Laboratory
3 * See COPYRIGHT notice in top-level directory.
4 */
5 /* $Id: test_subfile.c 2744 2016-12-28 16:25:22Z wkliao $ */
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <libgen.h> /* basename() */
10 #include <unistd.h>
11 #include <fcntl.h>
12 #include <dirent.h>
13 #include <mpi.h>
14 #include <pnetcdf.h>
15
16 #include <testutils.h>
17
18 #define MAXLINE 128
19
20 /* The file name is taken as a command-line argument. */
21
22 /* Measures the I/O bandwidth for writing/reading a 3D
23 block-distributed array to a file corresponding to the global array
24 in row-major (C) order.
25 Note that the file access pattern is noncontiguous.
26
27 Array size 128^3. For other array sizes, change array_of_gsizes below.*/
28 #define TEST_HANDLE_ERR(status) \
29 if ((status) != NC_NOERR) { \
30 printf("Error at line %d (%s)\n", __LINE__, \
31 ncmpi_strerror((status)) ); \
32 nerrs++; \
33 } \
34
main(int argc,char ** argv)35 int main(int argc, char **argv)
36 {
37 int opt, verbose=0;
38 extern char *optarg;
39 extern int optind;
40 int i, j, array_of_gsizes[3];
41 int nprocs, len, **buf, rank;
42 MPI_Offset bufcount;
43 int array_of_psizes[3];
44 int status;
45 MPI_Offset array_of_starts[3];
46 char *fbasename = NULL, *fbasename1 = NULL, filename[256];
47 char dimname[20], varname[20];
48 int ncid, dimids0[3], rank_dim[3], *varid=NULL;
49 MPI_Info info=MPI_INFO_NULL, info_used=MPI_INFO_NULL;
50 MPI_Offset **starts_list, **count_list;
51 MPI_Offset *bufcount_list;
52 int ndims=3, nvars=1, ngatts, unlimdimid;
53 MPI_Datatype *datatype_list;
54 int length = 128; /* 8MB per proc */
55 double stim, write_tim, new_write_tim, write_bw;
56 double read_tim, new_read_tim, read_bw;
57 double open_tim, new_open_tim;
58 double close_tim, new_close_tim;
59 int num_sf = 2;
60 int par_dim_id = 0; /* default is 0 */
61 int do_read = 0;
62 int nerrs=0;
63
64 MPI_Init(&argc,&argv);
65 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
66 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
67
68 /* process 0 takes the file name as a command-line argument and
69 broadcasts it to other processes */
70 if (!rank) {
71 while ((opt = getopt(argc, argv, "f:s:rp:n:l:")) != EOF) {
72 switch (opt) {
73 case 'f': fbasename = optarg;
74 break;
75 case 's': num_sf = (int)strtol(optarg,NULL,10);
76 break;
77 case 'r': do_read = 1;
78 break;
79 case 'p': par_dim_id = (int)strtol(optarg,NULL,10);
80 break;
81 case 'n': nvars = (int)strtol(optarg,NULL,10);
82 break;
83 case 'l': length = (int)strtol(optarg,NULL,10);
84 break;
85 default:
86 break;
87 }
88 }
89 if (fbasename == NULL) {
90 fprintf(stderr, "\n*# Usage: test_subfile -f pathname -s num_sf -p par_dim_id \n\n");
91 MPI_Abort(MPI_COMM_WORLD, 1);
92 }
93
94 fbasename1 = (char *) malloc (MAXLINE);
95 sprintf(fbasename1, "%s", fbasename);
96 len = strlen(fbasename1);
97 MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
98 MPI_Bcast(fbasename, len+1, MPI_CHAR, 0, MPI_COMM_WORLD);
99 MPI_Bcast(&num_sf, 1, MPI_INT, 0, MPI_COMM_WORLD);
100 MPI_Bcast(&par_dim_id, 1, MPI_INT, 0, MPI_COMM_WORLD);
101 MPI_Bcast(&nvars, 1, MPI_INT, 0, MPI_COMM_WORLD);
102 MPI_Bcast(&do_read, 1, MPI_INT, 0, MPI_COMM_WORLD);
103 MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
104 }
105 else {
106 fbasename1 = (char *) malloc (MAXLINE);
107 MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
108 MPI_Bcast(fbasename1, len+1, MPI_CHAR, 0, MPI_COMM_WORLD);
109 MPI_Bcast(&num_sf, 1, MPI_INT, 0, MPI_COMM_WORLD);
110 MPI_Bcast(&par_dim_id, 1, MPI_INT, 0, MPI_COMM_WORLD);
111 MPI_Bcast(&nvars, 1, MPI_INT, 0, MPI_COMM_WORLD);
112 MPI_Bcast(&do_read, 1, MPI_INT, 0, MPI_COMM_WORLD);
113 MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
114 }
115
116 if (rank == 0) {
117 char *cmd_str = (char*)malloc(strlen(argv[0]) + 256);
118 sprintf(cmd_str, "*** TESTING C %s for subfiling", basename(argv[0]));
119 printf("%-66s ------ ", cmd_str);
120 free(cmd_str);
121 }
122
123 array_of_gsizes[0] = array_of_gsizes[1] = array_of_gsizes[2] = length;
124
125 buf = (int **)malloc(nvars*sizeof(int*));
126 if (buf == NULL){
127 printf("buf malloc error\n");
128 return 0;
129 }
130 bufcount_list = (MPI_Offset *)malloc(nvars*sizeof(MPI_Offset));
131 if (bufcount_list == NULL){
132 printf("bufcount_list malloc error\n");
133 return 0;
134 }
135 starts_list = (MPI_Offset **)malloc(nvars*sizeof(MPI_Offset *));
136 if (starts_list== NULL){
137 printf("starts_list malloc error\n");
138 return 0;
139 }
140 count_list = (MPI_Offset **)malloc(nvars*sizeof(MPI_Offset *));
141 if (count_list == NULL){
142 printf("count_list malloc error\n");
143 return 0;
144 }
145 datatype_list = (MPI_Datatype*)malloc(nvars*sizeof(MPI_Datatype));
146 if (datatype_list == NULL){
147 printf("count_list malloc error\n");
148 return 0;
149 }
150
151 for (i=0; i<nvars; i++) {
152 starts_list[i] = (MPI_Offset *)malloc(ndims*sizeof(MPI_Offset));
153 if (starts_list[i] == NULL){
154 printf("starts_list[%d] malloc error\n", i);
155 return 0;
156 }
157 count_list[i] = (MPI_Offset *)malloc(ndims*sizeof(MPI_Offset));
158 if (count_list[i] == NULL){
159 printf("count_list[%d] malloc error\n", i);
160 return 0;
161 }
162 }
163
164 bufcount = 1;
165 for (i=0; i<ndims; i++) {
166 array_of_psizes[i] = 0;
167 bufcount *= length;
168 }
169 MPI_Dims_create(nprocs, ndims, array_of_psizes);
170 if (verbose)
171 for(i=0; i<ndims&&rank==0; i++)
172 printf("array_of_psizes[%d]=%d\n", i, array_of_psizes[i]);
173
174 /* subarray in each process is len x len x len */
175 for (i=0; i<ndims; i++)
176 array_of_gsizes[i] = length * array_of_psizes[i];
177
178 /* mynd's process rank in each dimension (in MPI_ORDER_C) */
179 rank_dim[2] = rank % array_of_psizes[2];
180 rank_dim[1] = (rank / array_of_psizes[2]) % array_of_psizes[1];
181 rank_dim[0] = rank / (array_of_psizes[2] * array_of_psizes[1]);
182
183 /* starting coordinates of the subarray in each dimension */
184 for (i=0; i<ndims; i++)
185 array_of_starts[i] = length * rank_dim[i];
186
187 for (i=0; i<nvars; i++) {
188 for (j=0; j<ndims; j++) {
189 starts_list[i][j] = array_of_starts[j];
190 count_list[i][j] = length;
191 }
192 bufcount_list[i] = bufcount;
193 datatype_list[i] = MPI_INT;
194 }
195
196 for (i=0; i<nvars; i++) {
197 buf[i] = (int *) malloc(bufcount * sizeof(int));
198 if (buf[i] == NULL){
199 printf("buf[i]malloc error\n");
200 return 0;
201 }
202
203 for (j=0; j<bufcount; j++)
204 buf[i][j]=rank+1;
205 }
206
207 MPI_Info_create(&info);
208 /* set all non-record variable to be subfiled */
209 char tmp[10];
210 sprintf(tmp, "%d", num_sf);
211 MPI_Info_set(info, "nc_num_subfiles", tmp);
212
213 sprintf(filename, "%s.%d.%d.%d.nc", fbasename1, length, 1, 0);
214
215 if (do_read == 1) goto read;
216
217 stim = MPI_Wtime();
218 status = ncmpi_create(MPI_COMM_WORLD, filename, NC_CLOBBER|NC_64BIT_DATA,
219 info, &ncid);
220 TEST_HANDLE_ERR(status)
221
222 open_tim = MPI_Wtime() - stim;
223
224 MPI_Allreduce(&open_tim, &new_open_tim, 1, MPI_DOUBLE, MPI_MAX,
225 MPI_COMM_WORLD);
226 if (verbose && rank == 0)
227 printf("create time = %f sec\n", new_open_tim);
228
229 /* define dimensions */
230 for (i=0; i<ndims; i++){
231 sprintf(dimname, "dim0_%d", i);
232 status = ncmpi_def_dim(ncid, dimname, array_of_gsizes[i], &dimids0[i]);
233 TEST_HANDLE_ERR(status)
234 }
235
236 /* define variables */
237 varid = (int *)malloc(nvars*sizeof(int));
238 for (i=0; i<nvars; i++) {
239 sprintf(varname, "var0_%d", i);
240 status = ncmpi_def_var(ncid, varname, NC_INT, ndims, dimids0, &varid[i]);
241 TEST_HANDLE_ERR(status)
242 }
243
244 if (par_dim_id != 0) {
245 for (i=0; i<nvars; i++) {
246 status = ncmpi_put_att_int(ncid, varid[i], "par_dim_id",
247 NC_INT, 1, &dimids0[par_dim_id]);
248 TEST_HANDLE_ERR(status)
249 }
250 }
251
252 /* set all non-record variable to be subfiled */
253 /*
254 MPI_Info_set(info, "nc_num_subfiles", "2");
255 status = ncmpi_set_var_info(ncid, varid, info);
256 TEST_HANDLE_ERR(status);
257 */
258
259 status = ncmpi_enddef(ncid);
260 TEST_HANDLE_ERR(status)
261
262 /* test ncmpi_inq_var() */
263 for (i=0; i<nvars; i++) {
264 char name[128];
265 nc_type typep;
266 int ndimsp, dimids[3], nattsp;
267
268 status = ncmpi_inq_var(ncid, varid[i], name, &typep, &ndimsp, dimids,
269 &nattsp);
270 TEST_HANDLE_ERR(status)
271
272 sprintf(varname, "var0_%d", i);
273 if (strcmp(name, varname)) {
274 printf("Error: unexpected var[%d] name %s, should be %s\n",i,name,varname);
275 nerrs++;
276 continue;
277 }
278 if (typep != NC_INT) {
279 printf("Error: unexpected var[%d] type %d, should be %d\n",i,typep,NC_INT);
280 nerrs++;
281 continue;
282 }
283 if (ndimsp != ndims) {
284 printf("Error: unexpected var[%d] ndims %d, should be %d\n",i,ndimsp,ndims);
285 nerrs++;
286 continue;
287 }
288 for (j=0; j<ndims; j++) {
289 if (dimids[j] != dimids0[j]) {
290 printf("Error: unexpected var[%d] dimids[%d] %d, should be %d\n",i,j,dimids0[j],dimids[j]);
291 nerrs++;
292 continue;
293 }
294 }
295 /*
296 printf("var[%d] %s has %d attributes\n",i,name,nattsp);
297 */
298 }
299
300 #if 0
301 if (rank == 0)
302 printf("*** Testing to write 1 non-record variable by using ncmpi_put_vara_all() ...");
303 #endif
304 stim = MPI_Wtime();
305 for (i=0; i<nvars; i++) {
306 status = ncmpi_put_vara_all(ncid, varid[i],
307 starts_list[i], count_list[i],
308 buf[i],
309 bufcount_list[i], MPI_INT);
310 TEST_HANDLE_ERR(status)
311 }
312 write_tim = MPI_Wtime() - stim;
313
314 MPI_Allreduce(&write_tim, &new_write_tim, 1, MPI_DOUBLE, MPI_MAX,
315 MPI_COMM_WORLD);
316
317 if (verbose && rank == 0) {
318 write_bw = ((double)array_of_gsizes[0]*(double)array_of_gsizes[1]*(double)array_of_gsizes[2]*(double)sizeof(int)*(double)nvars)/(new_write_tim*1024.0*1024.0);
319 printf("Global array size %d x %d x %d integers\n", array_of_gsizes[0], array_of_gsizes[1], array_of_gsizes[2]);
320 printf("Collective write time = %f sec, Collective write bandwidth = %f Mbytes/sec\n", new_write_tim, write_bw);
321 }
322
323 status = ncmpi_inq_file_info(ncid, &info_used);
324 TEST_HANDLE_ERR(status)
325
326 stim = MPI_Wtime();
327 status = ncmpi_close(ncid);
328 TEST_HANDLE_ERR(status)
329 close_tim = MPI_Wtime() - stim;
330
331 MPI_Allreduce(&close_tim, &new_close_tim, 1, MPI_DOUBLE, MPI_MAX,
332 MPI_COMM_WORLD);
333
334 if (verbose && rank == 0) {
335 fprintf(stderr, "close time = %f sec\n", new_close_tim);
336 }
337
338 goto end;
339
340 read:
341 status = ncmpi_open(MPI_COMM_WORLD, filename, NC_NOWRITE, MPI_INFO_NULL, &ncid);
342 TEST_HANDLE_ERR(status)
343
344 stim = MPI_Wtime();
345
346 /**
347 * Inquire the dataset definitions of input dataset AND
348 * Add dataset definitions for output dataset.
349 */
350
351 status = ncmpi_inq(ncid, &ndims, &nvars, &ngatts, &unlimdimid);
352 TEST_HANDLE_ERR(status)
353
354 for (i=0; i<nvars; i++) {
355 status = ncmpi_get_vara_all(ncid, i,
356 starts_list[i], count_list[i],
357 buf[i], bufcount_list[i], MPI_INT);
358 TEST_HANDLE_ERR(status)
359 }
360 read_tim = MPI_Wtime() - stim;
361
362 MPI_Allreduce(&read_tim, &new_read_tim, 1, MPI_DOUBLE, MPI_MAX,
363 MPI_COMM_WORLD);
364
365 if (verbose && rank == 0) {
366 read_bw = ((double)array_of_gsizes[0]*(double)array_of_gsizes[1]*(double)array_of_gsizes[2]*sizeof(int)*(double)nvars)/(new_read_tim*1024.0*1024.0);
367 printf("Collective read time = %f sec, Collective read bandwidth = %f Mbytes/sec\n", new_read_tim, read_bw);
368 }
369
370 status = ncmpi_inq_file_info(ncid, &info_used);
371 TEST_HANDLE_ERR(status)
372
373 status = ncmpi_close(ncid);
374 TEST_HANDLE_ERR(status)
375
376 end:
377 if (info != MPI_INFO_NULL) MPI_Info_free(&info);
378 if (info_used != MPI_INFO_NULL) MPI_Info_free(&info_used);
379
380 for (i=0; i<nvars; i++){
381 free(buf[i]);
382 free(starts_list[i]);
383 free(count_list[i]);
384 }
385 free(buf);
386 free(bufcount_list);
387 free(datatype_list);
388 if (!do_read) free(varid);
389 free(starts_list);
390 free(count_list);
391 free(fbasename1);
392
393 MPI_Offset malloc_size, sum_size;
394 int err, nfiles, ncids[10];
395
396 /* check if there are files still left opened */
397 err = ncmpi_inq_files_opened(&nfiles, ncids);
398 TEST_HANDLE_ERR(err)
399 if (nfiles > 0) printf("nfiles %d still opened\n",nfiles);
400
401 /* check for any PnetCDF internal malloc residues */
402 err = ncmpi_inq_malloc_size(&malloc_size);
403 if (err == NC_NOERR) {
404 MPI_Reduce(&malloc_size, &sum_size, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
405 if (rank == 0 && sum_size > 0)
406 printf("heap memory allocated by PnetCDF internally has %lld bytes yet to be freed\n",
407 sum_size);
408 }
409
410 MPI_Allreduce(MPI_IN_PLACE, &nerrs, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
411 if (rank == 0) {
412 if (nerrs) printf(FAIL_STR,nerrs);
413 else printf(PASS_STR);
414 }
415
416 MPI_Finalize();
417
418 return 0;
419 }
420