1 /*********************************************************************
2 *
3 * Copyright (C) 2012, Northwestern University and Argonne National Laboratory
4 * See COPYRIGHT notice in top-level directory.
5 *
6 *********************************************************************/
7 /* $Id: nonblocking_write.c 2717 2016-12-18 01:20:47Z wkliao $ */
9 /* This example is similar to collective_write.c but using nonblocking APIs.
10 * It creates a netcdf file in CD-5 format and writes a number of
11 * 3D integer non-record variables. The measured write bandwidth is reported
12 * at the end. Usage: (for example)
13 * To compile:
14 * mpicc -O2 nonblocking_write.c -o nonblocking_write -lpnetcdf
15 * To run:
16 * mpiexec -n num_processes ./nonblocking_write [filename] [len]
17 * where len decides the size of each local array, which is len x len x len.
18 * So, each non-record variable is of size len*len*len * nprocs * sizeof(int)
19 * All variables are partitioned among all processes in a 3D
20 * block-block-block fashion. Below is an example standard output from
21 * command:
22 * mpiexec -n 32 ./nonblocking_write /pvfs2/wkliao/testfile.nc 100
23 *
24 * MPI hint: cb_nodes = 2
25 * MPI hint: cb_buffer_size = 16777216
26 * MPI hint: striping_factor = 32
27 * MPI hint: striping_unit = 1048576
28 * Local array size 100 x 100 x 100 integers, size = 3.81 MB
29 * Global array size 400 x 400 x 200 integers, write size = 0.30 GB
30 * procs Global array size exec(sec) write(MB/s)
31 * ------- ------------------ --------- -----------
32 * 32 400 x 400 x 200 6.67 45.72
33 */
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h> /* strcpy(), strncpy() */
38 #include <unistd.h> /* getopt() */
39 #include <mpi.h>
40 #include <pnetcdf.h>
42 #ifndef MPI_OFFSET
44 #endif
46 #define NDIMS 3
47 #define NUM_VARS 10
49 #define ERR {if(err!=NC_NOERR){printf("Error at line=%d: %s\n", __LINE__, ncmpi_strerror(err));nerrs++;}}
51 static void
usage(char * argv0)52 usage(char *argv0)
53 {
54 char *help =
55 "Usage: %s [-h] | [-q] [file_name] [len]\n"
56 " [-h] Print help\n"
57 " [-q] Quiet mode (reports when fail)\n"
58 " [filename] output netCDF file name\n"
59 " [len] size of each dimension of the local array\n";
60 fprintf(stderr, help, argv0);
61 }
63 /*----< print_info() >------------------------------------------------------*/
64 static
print_info(MPI_Info * info_used)65 void print_info(MPI_Info *info_used)
66 {
67 int flag;
68 char info_cb_nodes[64], info_cb_buffer_size[64];
69 char info_striping_factor[64], info_striping_unit[64];
71 strcpy(info_cb_nodes, "undefined");
72 strcpy(info_cb_buffer_size, "undefined");
73 strcpy(info_striping_factor, "undefined");
74 strcpy(info_striping_unit, "undefined");
76 MPI_Info_get(*info_used, "cb_nodes", 64, info_cb_nodes, &flag);
77 MPI_Info_get(*info_used, "cb_buffer_size", 64, info_cb_buffer_size, &flag);
78 MPI_Info_get(*info_used, "striping_factor", 64, info_striping_factor, &flag);
79 MPI_Info_get(*info_used, "striping_unit", 64, info_striping_unit, &flag);
81 printf("MPI hint: cb_nodes = %s\n", info_cb_nodes);
82 printf("MPI hint: cb_buffer_size = %s\n", info_cb_buffer_size);
83 printf("MPI hint: striping_factor = %s\n", info_striping_factor);
84 printf("MPI hint: striping_unit = %s\n", info_striping_unit);
85 }
87 /*----< main() >------------------------------------------------------------*/
main(int argc,char ** argv)88 int main(int argc, char **argv)
89 {
90 extern int optind;
91 int i, j, verbose=1, err, nerrs=0;
92 int nprocs, len, *buf[NUM_VARS], bufsize, rank;
93 int gsizes[NDIMS], psizes[NDIMS];
94 double write_timing, max_write_timing, write_bw;
95 char filename[256], str[512];
96 int ncid, dimids[NDIMS], varids[NUM_VARS], req[NUM_VARS], st[NUM_VARS];
97 MPI_Offset starts[NDIMS], counts[NDIMS], write_size, sum_write_size;
98 MPI_Offset bbufsize;
99 MPI_Info info, info_used;
101 MPI_Init(&argc,&argv);
102 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
103 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
105 /* get command-line arguments */
106 while ((i = getopt(argc, argv, "hq")) != EOF)
107 switch(i) {
108 case 'q': verbose = 0;
109 break;
110 case 'h':
111 default: if (rank==0) usage(argv[0]);
112 MPI_Finalize();
113 return 0;
114 }
115 argc -= optind;
116 argv += optind;
117 if (argc >= 1) snprintf(filename, 256, "%s", argv[0]);
118 else strcpy(filename, "testfile.nc");
120 len = 10;
121 if (argc >= 2) len = (int)strtol(argv[1],NULL,10); /* optional argument */
122 len = (len <= 0) ? 10 : len;
124 for (i=0; i<NDIMS; i++) psizes[i] = 0;
126 MPI_Dims_create(nprocs, NDIMS, psizes);
127 starts[0] = rank % psizes[0];
128 starts[1] = (rank / psizes[1]) % psizes[1];
129 starts[2] = (rank / (psizes[0] * psizes[1])) % psizes[2];
131 bufsize = 1;
132 for (i=0; i<NDIMS; i++) {
133 gsizes[i] = len * psizes[i];
134 starts[i] *= len;
135 counts[i] = len;
136 bufsize *= len;
137 }
139 /* allocate buffer and initialize with some non-zero numbers */
140 for (i=0; i<NUM_VARS; i++) {
141 buf[i] = (int *) malloc(bufsize * sizeof(int));
142 for (j=0; j<bufsize; j++) buf[i][j] = rank * i + 123 + j;
143 }
145 MPI_Barrier(MPI_COMM_WORLD);
146 write_timing = MPI_Wtime();
148 /* set an MPI-IO hint to disable file offset alignment for fixed-size
149 * variables */
150 MPI_Info_create(&info);
151 MPI_Info_set(info, "nc_var_align_size", "1");
153 /* create the file */
154 err = ncmpi_create(MPI_COMM_WORLD, filename, NC_CLOBBER|NC_64BIT_DATA,
155 info, &ncid);
156 if (err != NC_NOERR) {
157 printf("Error: ncmpi_create() file %s (%s)\n",filename,ncmpi_strerror(err));
158 MPI_Abort(MPI_COMM_WORLD, -1);
159 exit(1);
160 }
162 MPI_Info_free(&info);
164 /* define dimensions */
165 for (i=0; i<NDIMS; i++) {
166 sprintf(str, "%c", 'x'+i);
167 err = ncmpi_def_dim(ncid, str, gsizes[i], &dimids[i]);
168 ERR
169 }
171 /* define variables */
172 for (i=0; i<NUM_VARS; i++) {
173 sprintf(str, "var%d", i);
174 err = ncmpi_def_var(ncid, str, NC_INT, NDIMS, dimids, &varids[i]);
175 ERR
176 }
178 /* exit the define mode */
179 err = ncmpi_enddef(ncid);
180 ERR
182 /* get all the hints used */
183 err = ncmpi_inq_file_info(ncid, &info_used);
184 ERR
186 /* write one variable at a time using iput */
187 for (i=0; i<NUM_VARS; i++) {
188 err = ncmpi_iput_vara_int(ncid, varids[i], starts, counts, buf[i], &req[i]);
189 ERR
190 }
192 /* wait for the nonblocking I/O to complete */
193 err = ncmpi_wait_all(ncid, NUM_VARS, req, st);
194 ERR
195 for (i=0; i<NUM_VARS; i++) {
196 if (st[i] != NC_NOERR)
197 printf("Error: nonblocking write fails on request %d (%s)\n",
198 i, ncmpi_strerror(st[i]));
199 }
201 /* write one variable at a time using bput */
203 /* bbufsize must be max of data type converted before and after */
204 bbufsize = bufsize * NUM_VARS * sizeof(int);
205 err = ncmpi_buffer_attach(ncid, bbufsize);
206 ERR
208 for (i=0; i<NUM_VARS; i++) {
209 err = ncmpi_bput_vara_int(ncid, varids[i], starts, counts, buf[i], &req[i]);
210 ERR
211 /* can safely change contents or free up the buf[i] here */
212 }
214 /* wait for the nonblocking I/O to complete */
215 err = ncmpi_wait_all(ncid, NUM_VARS, req, st);
216 ERR
217 for (i=0; i<NUM_VARS; i++) {
218 if (st[i] != NC_NOERR)
219 printf("Error: nonblocking write fails on request %d (%s)\n",
220 i, ncmpi_strerror(st[i]));
221 }
223 /* detach the temporary buffer */
224 err = ncmpi_buffer_detach(ncid);
225 ERR
227 MPI_Offset put_size;
228 ncmpi_inq_put_size(ncid, &put_size);
229 MPI_Allreduce(MPI_IN_PLACE, &put_size, 1, MPI_OFFSET, MPI_SUM, MPI_COMM_WORLD);
231 /* close the file */
232 err = ncmpi_close(ncid);
233 ERR
235 write_timing = MPI_Wtime() - write_timing;
237 write_size = bufsize * NUM_VARS * sizeof(int);
238 for (i=0; i<NUM_VARS; i++) free(buf[i]);
240 MPI_Reduce(&write_size, &sum_write_size, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
241 MPI_Reduce(&write_timing, &max_write_timing, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
243 if (rank == 0 && verbose) {
244 printf("\n");
245 printf("Total amount writes to variables only (exclude header) = %lld bytes\n", sum_write_size);
246 printf("Total amount writes reported by pnetcdf (include header) = %lld bytes\n", put_size);
247 printf("\n");
248 float subarray_size = (float)bufsize*sizeof(int)/1048576.0;
249 print_info(&info_used);
250 printf("Local array size %d x %d x %d integers, size = %.2f MB\n",len,len,len,subarray_size);
251 sum_write_size /= 1048576.0;
252 printf("Global array size %d x %d x %d integers, write size = %.2f GB\n",
253 gsizes[0], gsizes[1], gsizes[2], sum_write_size/1024.0);
255 write_bw = sum_write_size/max_write_timing;
256 printf(" procs Global array size exec(sec) write(MB/s)\n");
257 printf("------- ------------------ --------- -----------\n");
258 printf(" %4d %4d x %4d x %4d %8.2f %10.2f\n", nprocs,
259 gsizes[0], gsizes[1], gsizes[2], max_write_timing, write_bw);
260 }
261 MPI_Info_free(&info_used);
263 /* check if there is any PnetCDF internal malloc residue */
264 MPI_Offset malloc_size, sum_size;
265 err = ncmpi_inq_malloc_size(&malloc_size);
266 if (err == NC_NOERR) {
267 MPI_Reduce(&malloc_size, &sum_size, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
268 if (rank == 0 && sum_size > 0)
269 printf("heap memory allocated by PnetCDF internally has %lld bytes yet to be freed\n",
270 sum_size);
271 }
273 MPI_Finalize();
274 return nerrs;
275 }