1 /*********************************************************************
2  *
3  *  Copyright (C) 2012, Northwestern University and Argonne National Laboratory
4  *  See COPYRIGHT notice in top-level directory.
5  *
6  *********************************************************************/
7 /* $Id: nonblocking_write.c 2717 2016-12-18 01:20:47Z wkliao $ */
8 
9 /*    This example is similar to collective_write.c but using nonblocking APIs.
10  *    It creates a netcdf file in CD-5 format and writes a number of
11  *    3D integer non-record variables. The measured write bandwidth is reported
12  *    at the end. Usage: (for example)
13  *    To compile:
14  *        mpicc -O2 nonblocking_write.c -o nonblocking_write -lpnetcdf
15  *    To run:
16  *        mpiexec -n num_processes ./nonblocking_write [filename] [len]
17  *    where len decides the size of each local array, which is len x len x len.
18  *    So, each non-record variable is of size len*len*len * nprocs * sizeof(int)
19  *    All variables are partitioned among all processes in a 3D
20  *    block-block-block fashion. Below is an example standard output from
21  *    command:
22  *        mpiexec -n 32 ./nonblocking_write /pvfs2/wkliao/testfile.nc 100
23  *
24  *    MPI hint: cb_nodes        = 2
25  *    MPI hint: cb_buffer_size  = 16777216
26  *    MPI hint: striping_factor = 32
27  *    MPI hint: striping_unit   = 1048576
28  *    Local array size 100 x 100 x 100 integers, size = 3.81 MB
29  *    Global array size 400 x 400 x 200 integers, write size = 0.30 GB
30  *     procs    Global array size  exec(sec)  write(MB/s)
31  *     -------  ------------------  ---------  -----------
32  *        32     400 x  400 x  200     6.67       45.72
33  */
34 
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h> /* strcpy(), strncpy() */
38 #include <unistd.h> /* getopt() */
39 #include <mpi.h>
40 #include <pnetcdf.h>
41 
42 #ifndef MPI_OFFSET
43 #define MPI_OFFSET MPI_LONG_LONG_INT
44 #endif
45 
46 #define NDIMS    3
47 #define NUM_VARS 10
48 
49 #define ERR {if(err!=NC_NOERR){printf("Error at line=%d: %s\n", __LINE__, ncmpi_strerror(err));nerrs++;}}
50 
51 static void
usage(char * argv0)52 usage(char *argv0)
53 {
54     char *help =
55     "Usage: %s [-h] | [-q] [file_name] [len]\n"
56     "       [-h] Print help\n"
57     "       [-q] Quiet mode (reports when fail)\n"
58     "       [filename] output netCDF file name\n"
59     "       [len] size of each dimension of the local array\n";
60     fprintf(stderr, help, argv0);
61 }
62 
63 /*----< print_info() >------------------------------------------------------*/
64 static
print_info(MPI_Info * info_used)65 void print_info(MPI_Info *info_used)
66 {
67     int     flag;
68     char    info_cb_nodes[64], info_cb_buffer_size[64];
69     char    info_striping_factor[64], info_striping_unit[64];
70 
71     strcpy(info_cb_nodes,        "undefined");
72     strcpy(info_cb_buffer_size,  "undefined");
73     strcpy(info_striping_factor, "undefined");
74     strcpy(info_striping_unit,   "undefined");
75 
76     MPI_Info_get(*info_used, "cb_nodes", 64, info_cb_nodes, &flag);
77     MPI_Info_get(*info_used, "cb_buffer_size", 64, info_cb_buffer_size, &flag);
78     MPI_Info_get(*info_used, "striping_factor", 64, info_striping_factor, &flag);
79     MPI_Info_get(*info_used, "striping_unit", 64, info_striping_unit, &flag);
80 
81     printf("MPI hint: cb_nodes        = %s\n", info_cb_nodes);
82     printf("MPI hint: cb_buffer_size  = %s\n", info_cb_buffer_size);
83     printf("MPI hint: striping_factor = %s\n", info_striping_factor);
84     printf("MPI hint: striping_unit   = %s\n", info_striping_unit);
85 }
86 
87 /*----< main() >------------------------------------------------------------*/
main(int argc,char ** argv)88 int main(int argc, char **argv)
89 {
90     extern int optind;
91     int i, j, verbose=1, err, nerrs=0;
92     int nprocs, len, *buf[NUM_VARS], bufsize, rank;
93     int gsizes[NDIMS], psizes[NDIMS];
94     double write_timing, max_write_timing, write_bw;
95     char filename[256], str[512];
96     int ncid, dimids[NDIMS], varids[NUM_VARS], req[NUM_VARS], st[NUM_VARS];
97     MPI_Offset starts[NDIMS], counts[NDIMS], write_size, sum_write_size;
98     MPI_Offset bbufsize;
99     MPI_Info info, info_used;
100 
101     MPI_Init(&argc,&argv);
102     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
103     MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
104 
105     /* get command-line arguments */
106     while ((i = getopt(argc, argv, "hq")) != EOF)
107         switch(i) {
108             case 'q': verbose = 0;
109                       break;
110             case 'h':
111             default:  if (rank==0) usage(argv[0]);
112                       MPI_Finalize();
113                       return 0;
114         }
115     argc -= optind;
116     argv += optind;
117     if (argc >= 1) snprintf(filename, 256, "%s", argv[0]);
118     else           strcpy(filename, "testfile.nc");
119 
120     len = 10;
121     if (argc >= 2) len = (int)strtol(argv[1],NULL,10); /* optional argument */
122     len = (len <= 0) ? 10 : len;
123 
124     for (i=0; i<NDIMS; i++) psizes[i] = 0;
125 
126     MPI_Dims_create(nprocs, NDIMS, psizes);
127     starts[0] = rank % psizes[0];
128     starts[1] = (rank / psizes[1]) % psizes[1];
129     starts[2] = (rank / (psizes[0] * psizes[1])) % psizes[2];
130 
131     bufsize = 1;
132     for (i=0; i<NDIMS; i++) {
133         gsizes[i] = len * psizes[i];
134         starts[i] *= len;
135         counts[i]  = len;
136         bufsize   *= len;
137     }
138 
139     /* allocate buffer and initialize with some non-zero numbers */
140     for (i=0; i<NUM_VARS; i++) {
141         buf[i] = (int *) malloc(bufsize * sizeof(int));
142         for (j=0; j<bufsize; j++) buf[i][j] = rank * i + 123 + j;
143     }
144 
145     MPI_Barrier(MPI_COMM_WORLD);
146     write_timing = MPI_Wtime();
147 
148     /* set an MPI-IO hint to disable file offset alignment for fixed-size
149      * variables */
150     MPI_Info_create(&info);
151     MPI_Info_set(info, "nc_var_align_size", "1");
152 
153     /* create the file */
154     err = ncmpi_create(MPI_COMM_WORLD, filename, NC_CLOBBER|NC_64BIT_DATA,
155                        info, &ncid);
156     if (err != NC_NOERR) {
157         printf("Error: ncmpi_create() file %s (%s)\n",filename,ncmpi_strerror(err));
158         MPI_Abort(MPI_COMM_WORLD, -1);
159         exit(1);
160     }
161 
162     MPI_Info_free(&info);
163 
164     /* define dimensions */
165     for (i=0; i<NDIMS; i++) {
166         sprintf(str, "%c", 'x'+i);
167         err = ncmpi_def_dim(ncid, str, gsizes[i], &dimids[i]);
168         ERR
169     }
170 
171     /* define variables */
172     for (i=0; i<NUM_VARS; i++) {
173         sprintf(str, "var%d", i);
174         err = ncmpi_def_var(ncid, str, NC_INT, NDIMS, dimids, &varids[i]);
175         ERR
176     }
177 
178     /* exit the define mode */
179     err = ncmpi_enddef(ncid);
180     ERR
181 
182     /* get all the hints used */
183     err = ncmpi_inq_file_info(ncid, &info_used);
184     ERR
185 
186     /* write one variable at a time using iput */
187     for (i=0; i<NUM_VARS; i++) {
188         err = ncmpi_iput_vara_int(ncid, varids[i], starts, counts, buf[i], &req[i]);
189         ERR
190     }
191 
192     /* wait for the nonblocking I/O to complete */
193     err = ncmpi_wait_all(ncid, NUM_VARS, req, st);
194     ERR
195     for (i=0; i<NUM_VARS; i++) {
196         if (st[i] != NC_NOERR)
197             printf("Error: nonblocking write fails on request %d (%s)\n",
198                    i, ncmpi_strerror(st[i]));
199     }
200 
201     /* write one variable at a time using bput */
202 
203     /* bbufsize must be max of data type converted before and after */
204     bbufsize = bufsize * NUM_VARS * sizeof(int);
205     err = ncmpi_buffer_attach(ncid, bbufsize);
206     ERR
207 
208     for (i=0; i<NUM_VARS; i++) {
209         err = ncmpi_bput_vara_int(ncid, varids[i], starts, counts, buf[i], &req[i]);
210         ERR
211         /* can safely change contents or free up the buf[i] here */
212     }
213 
214     /* wait for the nonblocking I/O to complete */
215     err = ncmpi_wait_all(ncid, NUM_VARS, req, st);
216     ERR
217     for (i=0; i<NUM_VARS; i++) {
218         if (st[i] != NC_NOERR)
219             printf("Error: nonblocking write fails on request %d (%s)\n",
220                    i, ncmpi_strerror(st[i]));
221     }
222 
223     /* detach the temporary buffer */
224     err = ncmpi_buffer_detach(ncid);
225     ERR
226 
227     MPI_Offset put_size;
228     ncmpi_inq_put_size(ncid, &put_size);
229     MPI_Allreduce(MPI_IN_PLACE, &put_size, 1, MPI_OFFSET, MPI_SUM, MPI_COMM_WORLD);
230 
231     /* close the file */
232     err = ncmpi_close(ncid);
233     ERR
234 
235     write_timing = MPI_Wtime() - write_timing;
236 
237     write_size = bufsize * NUM_VARS * sizeof(int);
238     for (i=0; i<NUM_VARS; i++) free(buf[i]);
239 
240     MPI_Reduce(&write_size,   &sum_write_size,   1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
241     MPI_Reduce(&write_timing, &max_write_timing, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
242 
243     if (rank == 0 && verbose) {
244         printf("\n");
245         printf("Total amount writes to variables only   (exclude header) = %lld bytes\n", sum_write_size);
246         printf("Total amount writes reported by pnetcdf (include header) = %lld bytes\n", put_size);
247         printf("\n");
248         float subarray_size = (float)bufsize*sizeof(int)/1048576.0;
249         print_info(&info_used);
250         printf("Local array size %d x %d x %d integers, size = %.2f MB\n",len,len,len,subarray_size);
251         sum_write_size /= 1048576.0;
252         printf("Global array size %d x %d x %d integers, write size = %.2f GB\n",
253                gsizes[0], gsizes[1], gsizes[2], sum_write_size/1024.0);
254 
255         write_bw = sum_write_size/max_write_timing;
256         printf(" procs    Global array size  exec(sec)  write(MB/s)\n");
257         printf("-------  ------------------  ---------  -----------\n");
258         printf(" %4d    %4d x %4d x %4d %8.2f  %10.2f\n", nprocs,
259                gsizes[0], gsizes[1], gsizes[2], max_write_timing, write_bw);
260     }
261     MPI_Info_free(&info_used);
262 
263     /* check if there is any PnetCDF internal malloc residue */
264     MPI_Offset malloc_size, sum_size;
265     err = ncmpi_inq_malloc_size(&malloc_size);
266     if (err == NC_NOERR) {
267         MPI_Reduce(&malloc_size, &sum_size, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD);
268         if (rank == 0 && sum_size > 0)
269             printf("heap memory allocated by PnetCDF internally has %lld bytes yet to be freed\n",
270                    sum_size);
271     }
272 
273     MPI_Finalize();
274     return nerrs;
275 }
276 
277