1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #include "adio.h"
7 #include "adio_extern.h"
8 #include "adio_cb_config_list.h"
9 
10 #include "mpio.h"
11 static int is_aggregator(int rank, ADIO_File fd);
12 static int uses_generic_read(ADIO_File fd);
13 static int uses_generic_write(ADIO_File fd);
14 static int build_cb_config_list(ADIO_File fd,
15                                 MPI_Comm orig_comm, MPI_Comm comm,
16                                 int rank, int procs, int *error_code);
17 
ADIO_Open(MPI_Comm orig_comm,MPI_Comm comm,const char * filename,int file_system,ADIOI_Fns * ops,int access_mode,ADIO_Offset disp,MPI_Datatype etype,MPI_Datatype filetype,MPI_Info info,int perm,int * error_code)18 MPI_File ADIO_Open(MPI_Comm orig_comm,
19                    MPI_Comm comm, const char *filename, int file_system,
20                    ADIOI_Fns * ops,
21                    int access_mode, ADIO_Offset disp, MPI_Datatype etype,
22                    MPI_Datatype filetype, MPI_Info info, int perm, int *error_code)
23 {
24     MPI_File mpi_fh;
25     ADIO_File fd;
26     int err, rank, procs;
27     static char myname[] = "ADIO_OPEN";
28     int max_error_code;
29     MPI_Info dupinfo;
30     int syshints_processed, can_skip;
31     char *p;
32 
33     *error_code = MPI_SUCCESS;
34 
35     /* obtain MPI_File handle */
36     mpi_fh = MPIO_File_create(sizeof(struct ADIOI_FileD));
37     if (mpi_fh == MPI_FILE_NULL) {
38         fd = MPI_FILE_NULL;
39         *error_code = MPIO_Err_create_code(*error_code,
40                                            MPIR_ERR_RECOVERABLE,
41                                            myname, __LINE__, MPI_ERR_OTHER, "**nomem2", 0);
42         goto fn_exit;
43 
44     }
45     fd = MPIO_File_resolve(mpi_fh);
46 
47     fd->cookie = ADIOI_FILE_COOKIE;
48     fd->fp_ind = disp;
49     fd->fp_sys_posn = 0;
50     fd->comm = comm;    /* dup'ed in MPI_File_open */
51     fd->filename = ADIOI_Strdup(filename);
52     fd->file_system = file_system;
53     fd->fs_ptr = NULL;
54 
55     fd->fns = ops;
56 
57     fd->disp = disp;
58     fd->split_coll_count = 0;
59     fd->shared_fp_fd = ADIO_FILE_NULL;
60     fd->atomicity = 0;
61     fd->etype = etype;  /* MPI_BYTE by default */
62     fd->filetype = filetype;    /* MPI_BYTE by default */
63     fd->etype_size = 1; /* default etype is MPI_BYTE */
64 
65     fd->file_realm_st_offs = NULL;
66     fd->file_realm_types = NULL;
67 
68     fd->perm = perm;
69 
70     fd->async_count = 0;
71 
72     fd->fortran_handle = -1;
73 
74     fd->err_handler = ADIOI_DFLT_ERR_HANDLER;
75 
76     fd->io_buf_window = MPI_WIN_NULL;
77     fd->io_buf_put_amounts_window = MPI_WIN_NULL;
78 
79     MPI_Comm_rank(comm, &rank);
80     MPI_Comm_size(comm, &procs);
81 /* create and initialize info object */
82     fd->hints = (ADIOI_Hints *) ADIOI_Calloc(1, sizeof(struct ADIOI_Hints_struct));
83     if (fd->hints == NULL) {
84         *error_code = MPIO_Err_create_code(*error_code,
85                                            MPIR_ERR_RECOVERABLE,
86                                            myname, __LINE__, MPI_ERR_OTHER, "**nomem2", 0);
87         goto fn_exit;
88     }
89     fd->hints->cb_config_list = NULL;
90     fd->hints->ranklist = NULL;
91     fd->hints->initialized = 0;
92     fd->info = MPI_INFO_NULL;
93 
94     /* move system-wide hint processing *back* into open, but this time the
95      * hintfile reader will do a scalable read-and-broadcast.  The global
96      * ADIOI_syshints will get initialized at first open.  subsequent open
97      * calls will just use result from first open.
98      *
99      * We have two goals here:
100      * 1: avoid processing the hintfile multiple times
101      * 2: have all processes participate in hintfile processing (so we can read-and-broadcast)
102      *
103      * a code might do an "initialize from 0", so we can only skip hint
104      * processing once everyone has particpiated in hint processing */
105     if (ADIOI_syshints == MPI_INFO_NULL)
106         syshints_processed = 0;
107     else
108         syshints_processed = 1;
109 
110     MPI_Allreduce(&syshints_processed, &can_skip, 1, MPI_INT, MPI_MIN, fd->comm);
111     if (!can_skip) {
112         if (ADIOI_syshints == MPI_INFO_NULL)
113             MPI_Info_create(&ADIOI_syshints);
114         ADIOI_process_system_hints(fd, ADIOI_syshints);
115     }
116 
117     ADIOI_incorporate_system_hints(info, ADIOI_syshints, &dupinfo);
118     ADIO_SetInfo(fd, dupinfo, &err);
119     if (dupinfo != MPI_INFO_NULL) {
120         *error_code = MPI_Info_free(&dupinfo);
121         if (*error_code != MPI_SUCCESS)
122             goto fn_exit;
123     }
124     ADIOI_Info_set(fd->info, "romio_filesystem_type", fd->fns->fsname);
125 
126     /* Instead of repeatedly allocating this buffer in collective read/write,
127      * allocating up-front might make memory management on small platforms
128      * (e.g. Blue Gene) more efficent */
129 
130     fd->io_buf = ADIOI_Malloc(fd->hints->cb_buffer_size);
131     /* deferred open:
132      * we can only do this optimization if 'fd->hints->deferred_open' is set
133      * (which means the user hinted 'no_indep_rw' and collective buffering).
134      * Furthermore, we only do this if our collective read/write routines use
135      * our generic function, and not an fs-specific routine (we can defer opens
136      * only if we use our aggreagation code). */
137     if (fd->hints->deferred_open && !(uses_generic_read(fd)
138                                       && uses_generic_write(fd))) {
139         fd->hints->deferred_open = 0;
140     }
141     if (ADIO_Feature(fd, ADIO_SCALABLE_OPEN))
142         /* disable deferred open on these fs so that scalable broadcast
143          * will always use the propper communicator */
144         fd->hints->deferred_open = 0;
145 
146 
147     /* on BlueGene, the cb_config_list is built when hints are processed. No
148      * one else does that right now */
149     if (fd->hints->ranklist == NULL) {
150         build_cb_config_list(fd, orig_comm, comm, rank, procs, error_code);
151         if (*error_code != MPI_SUCCESS)
152             goto fn_exit;
153     }
154     fd->is_open = 0;
155     fd->my_cb_nodes_index = -2;
156     fd->is_agg = is_aggregator(rank, fd);
157     /* deferred open used to split the communicator to create an "aggregator
158      * communicator", but we only used it as a way to indicate that deferred
159      * open happened.  fd->is_open and fd->is_agg are sufficient */
160 
161     /* actual opens start here */
162     /* generic open: one process opens to create the file, all others open */
163     /* nfs open: everybody opens or else you'll end up with "file not found"
164      * due to stupid nfs consistency semantics */
165     /* scalable open: one process opens and broadcasts results to everyone */
166 
167     ADIOI_OpenColl(fd, rank, access_mode, error_code);
168 
169     /* deferred open consideration: if an independent process lied about
170      * "no_indep_rw" and opens the file later (example: HDF5 uses independent
171      * i/o for metadata), that deferred open will use the access_mode provided
172      * by the user.  CREATE|EXCL only makes sense here -- exclusive access in
173      * the deferred open case is going to fail and surprise the user.  Turn off
174      * the excl amode bit. Save user's ammode for MPI_FILE_GET_AMODE */
175     fd->orig_access_mode = access_mode;
176     if (fd->access_mode & ADIO_EXCL)
177         fd->access_mode ^= ADIO_EXCL;
178 
179 
180     /* for debugging, it can be helpful to see the hints selected. Some file
181      * systes set up the hints in the open call (e.g. lustre) */
182     p = getenv("ROMIO_PRINT_HINTS");
183     if (rank == 0 && p != NULL) {
184         ADIOI_Info_print_keyvals(fd->info);
185     }
186 
187   fn_exit:
188     MPI_Allreduce(error_code, &max_error_code, 1, MPI_INT, MPI_MAX, comm);
189     if (max_error_code != MPI_SUCCESS) {
190 
191         /* If the file was successfully opened, close it */
192         if (*error_code == MPI_SUCCESS) {
193 
194             /* in the deferred open case, only those who have actually
195              * opened the file should close it */
196             if (fd->hints->deferred_open) {
197                 if (fd->is_agg) {
198                     (*(fd->fns->ADIOI_xxx_Close)) (fd, error_code);
199                 }
200             } else {
201                 (*(fd->fns->ADIOI_xxx_Close)) (fd, error_code);
202             }
203         }
204         ADIOI_Free(fd->filename);
205         if (fd->hints->ranklist != NULL)
206             ADIOI_Free(fd->hints->ranklist);
207         if (fd->hints->cb_config_list != NULL)
208             ADIOI_Free(fd->hints->cb_config_list);
209         ADIOI_Free(fd->hints);
210         if (fd->info != MPI_INFO_NULL)
211             MPI_Info_free(&(fd->info));
212         ADIOI_Free(fd->io_buf);
213         ADIOI_Free(fd);
214         fd = ADIO_FILE_NULL;
215         if (*error_code == MPI_SUCCESS) {
216             *error_code = MPIO_Err_create_code(MPI_SUCCESS,
217                                                MPIR_ERR_RECOVERABLE, myname,
218                                                __LINE__, MPI_ERR_IO, "**oremote_fail", 0);
219         }
220     }
221 
222     return fd;
223 }
224 
225 /* a simple linear search. possible enancement: add a my_cb_nodes_index member
226  * (index into cb_nodes, else -1 if not aggregator) for faster lookups
227  *
228  * fd->hints->cb_nodes is the number of aggregators
229  * fd->hints->ranklist[] is an array of the ranks of aggregators
230  *
231  * might want to move this to adio/common/cb_config_list.c
232  */
is_aggregator(int rank,ADIO_File fd)233 int is_aggregator(int rank, ADIO_File fd)
234 {
235     int i;
236 
237     if (fd->my_cb_nodes_index == -2) {
238         for (i = 0; i < fd->hints->cb_nodes; i++) {
239             if (rank == fd->hints->ranklist[i]) {
240                 fd->my_cb_nodes_index = i;
241                 return 1;
242             }
243         }
244         fd->my_cb_nodes_index = -1;
245     } else if (fd->my_cb_nodes_index != -1)
246         return 1;
247 
248     return 0;
249 }
250 
251 /*
252  * If file system implements some version of two-phase -- doesn't have to be
253  * generic -- we can still carry out the defered open optimization
254  */
uses_generic_read(ADIO_File fd)255 static int uses_generic_read(ADIO_File fd)
256 {
257     if (ADIO_Feature(fd, ADIO_TWO_PHASE))
258         return 1;
259     return 0;
260 }
261 
uses_generic_write(ADIO_File fd)262 static int uses_generic_write(ADIO_File fd)
263 {
264     if (ADIO_Feature(fd, ADIO_TWO_PHASE))
265         return 1;
266     return 0;
267 }
268 
build_cb_config_list(ADIO_File fd,MPI_Comm orig_comm,MPI_Comm comm,int rank,int procs,int * error_code)269 static int build_cb_config_list(ADIO_File fd,
270                                 MPI_Comm orig_comm, MPI_Comm comm,
271                                 int rank, int procs, int *error_code)
272 {
273     ADIO_cb_name_array array;
274     int *tmp_ranklist;
275     int rank_ct;
276     char *value;
277     static char myname[] = "ADIO_OPEN cb_config_list";
278 
279     /* gather the processor name array if we don't already have it */
280     /* this has to be done early in ADIO_Open so that we can cache the name
281      * array in both the dup'd communicator (in case we want it later) and the
282      * original communicator */
283     ADIOI_cb_gather_name_array(orig_comm, comm, &array);
284 
285 /* parse the cb_config_list and create a rank map on rank 0 */
286     if (rank == 0) {
287         tmp_ranklist = (int *) ADIOI_Malloc(sizeof(int) * procs);
288         if (tmp_ranklist == NULL) {
289             *error_code = MPIO_Err_create_code(*error_code,
290                                                MPIR_ERR_RECOVERABLE,
291                                                myname, __LINE__, MPI_ERR_OTHER, "**nomem2", 0);
292             return 0;
293         }
294 
295         rank_ct = ADIOI_cb_config_list_parse(fd->hints->cb_config_list,
296                                              array, tmp_ranklist, fd->hints->cb_nodes);
297 
298         /* store the ranklist using the minimum amount of memory */
299         if (rank_ct > 0) {
300             fd->hints->ranklist = (int *) ADIOI_Malloc(sizeof(int) * rank_ct);
301             memcpy(fd->hints->ranklist, tmp_ranklist, sizeof(int) * rank_ct);
302         }
303         ADIOI_Free(tmp_ranklist);
304         fd->hints->cb_nodes = rank_ct;
305         /* TEMPORARY -- REMOVE WHEN NO LONGER UPDATING INFO FOR FS-INDEP. */
306         value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char));
307         MPL_snprintf(value, MPI_MAX_INFO_VAL + 1, "%d", rank_ct);
308         ADIOI_Info_set(fd->info, "cb_nodes", value);
309         ADIOI_Free(value);
310     }
311 
312     ADIOI_cb_bcast_rank_map(fd);
313     if (fd->hints->cb_nodes <= 0) {
314         *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
315                                            myname, __LINE__, MPI_ERR_IO, "**ioagnomatch", 0);
316         fd = ADIO_FILE_NULL;
317     }
318     return 0;
319 }
320