1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 /** @file
5  * DRA operations with a buffer manager layer, modified by Bilash
6  * The buffer manager provides functionalities related to buffers
7  *
8  * $Id: disk.arrays.c,v 1.79.2.2 2007-03-24 01:19:28 manoj Exp $
9  *
10  ************************** DISK ARRAYS **************************************
11  *         Jarek Nieplocha, Fri May 12 11:26:38 PDT 1995                     *
12  *****************************************************************************
13  *
14  * DISCLAIMER
15  *
16  * This material was prepared as an account of work sponsored by an
17  * agency of the United States Government.  Neither the United States
18  * Government nor the United States Department of Energy, nor Battelle,
19  * nor any of their employees, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
20  * ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE ACCURACY,
21  * COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, APPARATUS, PRODUCT,
22  * SOFTWARE, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT
23  * INFRINGE PRIVATELY OWNED RIGHTS.
24  *
25  *
26  * ACKNOWLEDGMENT
27  *
28  * This software and its documentation were produced with United States
29  * Government support under Contract Number DE-AC06-76RLO-1830 awarded by
30  * the United States Department of Energy.  The United States Government
31  * retains a paid-up non-exclusive, irrevocable worldwide license to
32  * reproduce, prepare derivative works, perform publicly and display
33  * publicly by or for the US Government, including the right to
34  * distribute to other US Government contractors.
35  */
36 #if HAVE_MATH_H
37 #   include <math.h>
38 #endif
39 #if HAVE_STDLIB_H
40 #   include <stdlib.h>
41 #endif
42 #if HAVE_STDIO_H
43 #   include <stdio.h>
44 #endif
45 
46 #include "buffers.h"
47 #include "dra.h"
48 #include "draf2c.h"
49 #include "drap.h"
50 #include "global.h"
51 #include "ga-papi.h"
52 #include "macdecls.h"
53 
54 #define WALLTIME 0
55 #if WALLTIME
56 #   include "walltime.c"
57 #endif
58 
59 /************************** constants ****************************************/
60 
61 #define DRA_FAIL  (Integer)1
62 #define COLUMN    1
63 #define ROW       0
64 #define ON        1
65 #define OFF       0
66 
67 #define ILO       0
68 #define IHI       1
69 #define JLO       2
70 #define JHI       3
71 
72 #define DRA_OP_WRITE 777
73 #define DRA_OP_READ  888
74 #define PROBE 111
75 #define WAIT 222
76 
77 #define MAX_REQ   5
78 
79 /** message type/tag used by DRA */
80 #define  DRA_TYPE_GSM 32760 - 6
81 #define  DRA_TYPE_GOP 32760 - 7
82 
83 #define INFINITE_NUM_PROCS  8094
84 
85 #define CLIENT_TO_SERVER 2
86 
87 /* #define DRA_DBLE_BUFFER */
88 
89 #if defined(SP1)|| defined(SP) || defined(LAPI)
90 #   define DRA_NUM_IOPROCS 8
91 #else
92 #   define DRA_NUM_IOPROCS 1
93 #endif
94 
95 #ifndef DRA_NUM_FILE_MGR
96 #  define DRA_NUM_FILE_MGR DRA_NUM_IOPROCS
97 #endif
98 
99 #define DRA_BUF_SIZE BUF_SIZE
100 
101 #define DEF_MAX_ARRAYS 16
102 #define DRA_MAX_ARRAYS 1024
103 #define EPS_SEARCH 100
104 #define LOAD 1
105 #define STORE 2
106 #define TRANS 1
107 #define NOTRANS 0
108 /*#define DEBUG 1*/
109 /*#define CLEAR_BUF 1*/
110 
111 
112 /***************************** Global Data ***********************************/
113 
114 /** Default number of IO procs */
115 Integer _dra_io_procs;
116 /** Default number of files to use on open file systems */
117 Integer _dra_number_of_files;
118 /** array of struct for basic info about DRA arrays*/
119 disk_array_t *DRA;
120 
121 buf_context_t buf_ctxt; /**< buffer context handle */
122 int nbuf = 4; /**< number of buffers to be used */
123 
124 Integer _max_disk_array; /**< max number of disk arrays open at a time */
125 logical dra_debug_flag;  /**< globally defined debug parameter */
126 
127 request_t Requests[MAX_REQ];
128 int num_pending_requests=0;
129 Integer _dra_turn=0;
130 int Dra_num_serv=DRA_NUM_IOPROCS;
131 
132 
133 /****************************** Macros ***************************************/
134 
135 #define dai_sizeofM(_type) MA_sizeof(_type, 1, MT_C_CHAR)
136 
137 #define dai_check_typeM(_type)  if (_type != C_DBL && _type != C_INT \
138      && _type != C_LONG && _type != C_DCPL && _type != C_FLOAT && _type != C_SCPL) \
139                                   dai_error("invalid type ",_type)
140 #define dai_check_handleM(_handle, msg)                                    \
141 {\
142         if((_handle+DRA_OFFSET)>=_max_disk_array || (_handle+DRA_OFFSET)<0) \
143         {fprintf(stderr,"%s, %ld --",msg, (long)_max_disk_array);\
144         dai_error("invalid DRA handle",_handle);}                           \
145         if( DRA[(_handle+DRA_OFFSET)].actv == 0)                            \
146         {fprintf(stderr,"%s:",msg);\
147         dai_error("disk array not active",_handle);}                       \
148 }
149 
150 #define dai_check_rangeM(_lo, _hi, _dim, _err_msg)                         \
151         if(_lo < (Integer)1   || _lo > _dim ||_hi < _lo || _hi > _dim)     \
152         dai_error(_err_msg, _dim)
153 
154 #define ga_get_sectM(sect, _buf, _ld)\
155    pnga_get(sect.handle, sect.lo, sect.hi, _buf, &_ld)
156 
157 #define ga_put_sectM(sect, _buf, _ld)\
158    pnga_put(sect.handle, sect.lo, sect.hi, _buf, &_ld)
159 
160 #define fill_sectionM(sect, _hndl, _ilo, _ihi, _jlo, _jhi) \
161 { \
162         sect.handle = _hndl;\
163         sect.ndim   = 2; \
164         sect.lo[0]    = _ilo;\
165         sect.hi[0]    = _ihi;\
166         sect.lo[1]    = _jlo;\
167         sect.hi[1]    = _jhi;\
168 }
169 
170 #define sect_to_blockM(ds_a, CR)\
171 {\
172       Integer   hndl = (ds_a).handle+DRA_OFFSET;\
173       Integer   br   = ((ds_a).lo[0]-1)/DRA[hndl].chunk[0];\
174       Integer   bc   = ((ds_a).lo[1]-1)/DRA[hndl].chunk[1];\
175       Integer   R    = (DRA[hndl].dims[0] + DRA[hndl].chunk[0] -1)/DRA[hndl].chunk[0];\
176                *(CR) = bc * R + br;\
177 }
178 
179 #define block_to_sectM(ds_a, CR)\
180 {\
181       Integer   hndl = (ds_a)->handle+DRA_OFFSET;\
182       Integer   R    = (DRA[hndl].dims[0] + DRA[hndl].chunk[0]-1)/DRA[hndl].chunk[0];\
183       Integer   br = (CR)%R;\
184       Integer   bc = ((CR) - br)/R;\
185       (ds_a)->  lo[0]= br * DRA[hndl].chunk[0] +1;\
186       (ds_a)->  lo[1]= bc * DRA[hndl].chunk[1] +1;\
187       (ds_a)->  hi[0]= (ds_a)->lo[0] + DRA[hndl].chunk[0];\
188       (ds_a)->  hi[1]= (ds_a)->lo[1] + DRA[hndl].chunk[1];\
189       if( (ds_a)->hi[0] > DRA[hndl].dims[0]) (ds_a)->hi[0] = DRA[hndl].dims[0];\
190       if( (ds_a)->hi[1] > DRA[hndl].dims[1]) (ds_a)->hi[1] = DRA[hndl].dims[1];\
191 }
192 
193 #define INDEPFILES(x) (DRA[(x)+DRA_OFFSET].indep)
194 
195 #define ITERATOR_2D(i,j, base, ds_chunk)\
196     for(j = ds_chunk.lo[1], base=0, jj=0; j<= ds_chunk.hi[1]; j++,jj++)\
197         for(i = ds_chunk.lo[0], ii=0; i<= ds_chunk.hi[0]; i++,ii++,base++)
198 
199 #define COPY_SCATTER(ADDR_BASE, TYPE, ds_chunk)\
200     ITERATOR_2D(i,j, base, ds_chunk) \
201     ADDR_BASE[base+vindex] = ((TYPE*)buffer)[ldb*jj + ii]
202 
203 #define COPY_GATHER(ADDR_BASE, TYPE, ds_chunk)\
204     for(i=0; i< nelem; i++){\
205         Integer ldc = ds_chunk.hi[0] - ds_chunk.lo[0]+1;\
206         base = INT_MB[pindex+i]; jj = base/ldc; ii = base%ldc;\
207         ((TYPE*)buffer)[ldb*jj + ii] = ADDR_BASE[i+vindex];\
208     }
209 
210 #define COPY_TYPE(OPERATION, MATYPE, ds_chunk)\
211     switch(MATYPE){\
212         case C_DBL:   COPY_ ## OPERATION(DBL_MB,double,ds_chunk);break;\
213         case C_INT:   COPY_ ## OPERATION(INT_MB,int,ds_chunk);break;\
214         case C_DCPL:  COPY_ ## OPERATION(DCPL_MB,DoubleComplex,ds_chunk);break;\
215         case C_SCPL:  COPY_ ## OPERATION(SCPL_MB,SingleComplex,ds_chunk);break;\
216         case C_FLOAT: COPY_ ## OPERATION(FLT_MB,float,ds_chunk);\
217     }
218 
219 #define nsect_to_blockM(ds_a, CR) \
220 { \
221   Integer hndl = (ds_a).handle+DRA_OFFSET;\
222   Integer _i, _ndim = DRA[hndl].ndim; \
223   Integer _R, _b; \
224   *(CR) = 0; \
225   _R = 0; \
226   for (_i=_ndim-1; _i >= 0; _i--) { \
227     _b = ((ds_a).lo[_i]-1)/DRA[hndl].chunk[_i]; \
228     _R = (DRA[hndl].dims[_i]+DRA[hndl].chunk[_i]-1)/DRA[hndl].chunk[_i];\
229     *(CR) = *(CR) * _R + _b; \
230   } \
231 }
232 
233 
234 #define dai_dest_indices_1d_M(index, id, jd, ilod, jlod, ldd) \
235 { \
236     Integer _index_;\
237     _index_ = (is)-(ilos);\
238     *(id) = (_index_)%(ldd) + (ilod);\
239     *(jd) = (_index_)/(ldd) + (jlod);\
240 }
241 
242 
243 #define dai_dest_indicesM(is, js, ilos, jlos, lds, id, jd, ilod, jlod, ldd)\
244 { \
245     Integer _index_;\
246     _index_ = (lds)*((js)-(jlos)) + (is)-(ilos);\
247     *(id) = (_index_)%(ldd) + (ilod);\
248     *(jd) = (_index_)/(ldd) + (jlod);\
249 }
250 
251 
252 #define nga_get_sectM(sect, _buf, _ld, hdl)\
253  if (hdl != NULL)\
254  pnga_nbget(sect.handle, sect.lo, sect.hi, _buf, _ld, hdl);\
255  else\
256  pnga_get(sect.handle, sect.lo, sect.hi, _buf, _ld);
257 
258 
259 #define nga_put_sectM(sect, _buf, _ld, hdl)\
260  if (hdl != NULL)\
261  pnga_nbput(sect.handle, sect.lo, sect.hi, _buf, _ld, hdl);\
262  else\
263  pnga_put(sect.handle, sect.lo, sect.hi, _buf, _ld);
264 
265 
266 #define ndai_dest_indicesM(ds_chunk, ds_a, gs_chunk, gs_a)   \
267 {\
268   Integer _i; \
269   Integer _ndim = ds_a.ndim; \
270   for (_i=0; _i<_ndim; _i++) { \
271     gs_chunk.lo[_i] = gs_a.lo[_i] + ds_chunk.lo[_i]- ds_a.lo[_i]; \
272     gs_chunk.hi[_i] = gs_a.lo[_i] + ds_chunk.hi[_i]- ds_a.lo[_i]; \
273   } \
274 }
275 
276 
277 #define ndai_trnsp_dest_indicesM(ds_chunk, ds_a, gs_chunk, gs_a)   \
278 {\
279   Integer _i; \
280   Integer _ndim = ds_a.ndim; \
281   for (_i=0; _i<_ndim; _i++) { \
282     gs_chunk.lo[_ndim-1-_i] = gs_a.lo[_ndim-1-_i] \
283                             + ds_chunk.lo[_i]- ds_a.lo[_i]; \
284     gs_chunk.hi[_ndim-1-_i] = gs_a.lo[_ndim-1-_i] \
285                             + ds_chunk.hi[_i]- ds_a.lo[_i]; \
286   } \
287 }
288 
289 
290 /** Simple sort using straight insertion */
291 #define block_sortM(_ndim, _block_orig, _block_map) \
292 {\
293   Integer _i,_j,_it,_bt; \
294   Integer _block_tmp[MAXDIM]; \
295   for (_i=0; _i < (_ndim); _i++) { \
296     _block_map[_i] = _i; \
297     _block_tmp[_i] = _block_orig[_i]; \
298   } \
299   for (_j=(_ndim)-2; _j >= 0; _j--) { \
300     _i = _j + 1; \
301     _bt = _block_tmp[_j]; \
302     _it = _block_map[_j]; \
303     while (_i < (_ndim) && _bt < _block_tmp[_i]) { \
304       _block_tmp[_i-1] = _block_tmp[_i]; \
305       _block_map[_i-1] = _block_map[_i]; \
306       _i++; \
307     }\
308     _block_tmp[_i-1] = _bt; \
309     _block_map[_i-1] = _it; \
310   }\
311 }
312 
313 
314 #define nfill_sectionM(sect, _hndl, _ndim, _lo, _hi) \
315 { \
316   Integer _i; \
317   sect.handle = _hndl; \
318   sect.ndim = _ndim; \
319   for (_i=0; _i<_ndim; _i++) { \
320     sect.lo[_i]    = _lo[_i]; \
321     sect.hi[_i]    = _hi[_i]; \
322   } \
323 }
324 
325 
326 #define nblock_to_sectM(ds_a, _CR) \
327 {\
328   Integer _i, _b[MAXDIM], _C = (_CR); \
329   Integer _hndl = (ds_a)->handle+DRA_OFFSET; \
330   Integer _R = (DRA[_hndl].dims[0]+DRA[_hndl].chunk[0]-1)/DRA[_hndl].chunk[0]; \
331   (ds_a)->ndim = DRA[_hndl].ndim; \
332   _b[0] = _C%_R; \
333   for (_i=1; _i<DRA[_hndl].ndim; _i++) { \
334     _C = (_C - _b[_i-1])/_R; \
335     _R = (DRA[_hndl].dims[_i]+DRA[_hndl].chunk[_i]-1)/DRA[_hndl].chunk[_i]; \
336     _b[_i] = (_C)%_R; \
337   } \
338   for (_i=0; _i<DRA[_hndl].ndim; _i++) { \
339     (ds_a)->lo[_i] = _b[_i]*DRA[_hndl].chunk[_i] + 1; \
340     (ds_a)->hi[_i] = (ds_a)->lo[_i] + DRA[_hndl].chunk[_i] - 1; \
341     if ((ds_a)->hi[_i] > DRA[_hndl].dims[_i]) \
342       (ds_a)->hi[_i] = DRA[_hndl].dims[_i]; \
343   } \
344 }
345 
346 
347 #define nblock_to_indicesM(_index,_ndim,_block_dims,_CC) \
348 { \
349   Integer _i, _C=(_CC); \
350   _index[0] = _C%_block_dims[0]; \
351   for (_i=1; _i<(_ndim); _i++) { \
352     _C = (_C - _index[_i-1])/_block_dims[_i-1]; \
353     _index[_i] = _C%_block_dims[_i]; \
354   } \
355 }
356 
357 
358 #define ndai_check_rangeM(_lo, _hi, _ndim, _dims, _err_msg) \
359 { \
360   int _range_ok = 1, _i; \
361   for (_i=0; _i < (_ndim); _i++) { \
362     if (_lo[_i] < 1 || _lo[_i] > _dims[_i] || _hi[_i] < _lo[_i] \
363                 || _hi[_i] > _dims[_i] ) _range_ok = 0; \
364   } \
365   if(!_range_ok) dai_error(_err_msg, _dim); \
366 }
367 
368 
369 char dummy_fname[DRA_MAX_FNAME];
370 
371 /*****************************************************************************/
372 
373 /**
374  * determines if write operation to a disk array is allowed
375  */
dai_write_allowed(Integer d_a)376 int dai_write_allowed(Integer d_a)
377 {
378     Integer handle = d_a+DRA_OFFSET;
379     if(DRA[handle].mode == DRA_W || DRA[handle].mode == DRA_RW) return 1;
380     else return 0;
381 }
382 
383 
384 /**
385  * determines if read operation from a disk array is allowed
386  */
dai_read_allowed(Integer d_a)387 int dai_read_allowed(Integer d_a)
388 {
389     Integer handle = d_a+DRA_OFFSET;
390     if(DRA[handle].mode == DRA_R || DRA[handle].mode == DRA_RW) return 1;
391     else return 0;
392 }
393 
394 
395 /**
396  * number of processes that could perform I/O
397  */
dai_io_procs(Integer d_a)398 Integer dai_io_procs(Integer d_a)
399 {
400     Integer handle = d_a+DRA_OFFSET;
401     Integer num;
402 
403     /* this one of many possibilities -- depends on the system */
404     /*
405 #ifdef _CRAYMPP
406 num = DRA_NUM_IOPROCS;
407 #else
408 num = (INDEPFILES(d_a)) ? INFINITE_NUM_PROCS: DRA_NUM_IOPROCS;
409 #endif
410 */
411     if (INDEPFILES(d_a)) {
412         num = pnga_cluster_nnodes();
413     } else {
414         num = DRA[handle].ioprocs;
415     }
416 
417     return( PARIO_MIN( pnga_nnodes(), num));
418 }
419 
420 /**
421  * Translation of DRA create/opening modes to ELIO create/open
422  * mode. DRA modes map directly to ELIO modes, except that write-only
423  * DRAs are backed by read-write ELIO files.
424  */
dai_elio_mode(int dra_mode)425 int dai_elio_mode(int dra_mode) {
426   int emode = dra_mode; /* dra modes map to elio mode*/
427   if(dra_mode == DRA_W) {
428     /*except W, which translate to read-write files*/
429     emode = ELIO_RW;
430   }
431   return emode;
432 }
433 
434 
435 /**
436  * rank of calling process in group of processes that could perform I/O
437  * a negative value means that this process doesn't do I/O
438  */
dai_io_nodeid(Integer d_a)439 Integer dai_io_nodeid(Integer d_a)
440 {
441     Integer handle = d_a+DRA_OFFSET;
442     Integer me = pnga_nodeid();
443     Integer pid, id, nid, nnodes,nprocs;
444     Integer nodeid = pnga_cluster_nodeid();
445     Integer zero = 0;
446 
447     /* again, one of many possibilities:
448      * if proc id beyond I/O procs number, negate it
449      */
450 
451     if (INDEPFILES(d_a)) {
452         if(me == pnga_cluster_procid(nodeid, zero)) me = nodeid;
453         else me = -1;
454     } else {
455         if (DRA[handle].ioprocs == 1) {
456             if (me == 0) return me;
457             else return -1;
458         } else {
459             nnodes = pnga_cluster_nnodes();
460             nprocs = pnga_cluster_nprocs(nodeid);
461             pid = me % nprocs;
462             nid = (me - pid)/nprocs;
463             id = pid * nnodes + nid;
464             if (id < DRA[handle].ioprocs) return id;
465             else return -1;
466         }
467     }
468 
469     /*        if (me >= dai_io_procs(d_a)) me = -me;*/
470     return (me);
471 }
472 
473 
474 /**
475  * determines if I/O process participates in file management (create/delete)
476  */
dai_io_manage(Integer d_a)477 Integer dai_io_manage(Integer d_a)
478 {
479     Integer me = dai_io_nodeid(d_a);
480 
481     if(me >= 0 )
482         return (1);
483     else
484         return (0);
485 }
486 
487 
488 /**
489  * select one master process for each file associated with d_a
490  */
dai_file_master(Integer d_a)491 Integer dai_file_master(Integer d_a)
492 {
493     Integer handle = d_a+DRA_OFFSET;
494 
495     if(dai_io_nodeid(d_a)<0)return 0;
496 
497     /* for indep files each I/O process has its own file
498      * for shared file 0 is the master
499      */
500 
501     if(INDEPFILES(d_a) || DRA[handle].numfiles > 1 ||
502             dai_io_nodeid(d_a) == 0 ) return 1;
503     else return 0;
504 
505 }
506 
507 
dai_callback(int op,int transp,section_t gs_a,section_t ds_a,section_t ds_chunk,Integer ld[],char * buf,Integer req)508 void dai_callback(int op, int transp, section_t gs_a, section_t ds_a, section_t ds_chunk, Integer ld[], char *buf, Integer req)
509 {
510     int i;
511     buf_info *bi;
512 
513     bi = (buf_info*) buf;
514     if (bi->callback==ON)
515         dai_error("DRA: callback not cleared for a buffer",0);
516 
517     bi->callback = ON;
518     bi->args.op = op;
519     bi->args.transp = transp;
520     bi->args.gs_a = gs_a;
521     bi->args.ds_a = ds_a;
522     bi->args.ds_chunk = ds_chunk;
523     for (i=0; i<gs_a.ndim-1; i++)
524         bi->args.ld[i] = ld[i];
525 
526 
527 }
528 
529 
530 /**
531  * function to release buffers by completing the transfers
532  * this function will be passed on as a parameter to a buffer management layer
533  */
534 void wait_buf(char *buf);
535 
536 
537 /**
538  * INITIALIZE DISK ARRAY DATA STRUCTURES
539  *
540  * @param max_arrays[in]
541  * @param max_array_size[in]
542  * @param tot_disk_space[in]
543  * @param max_memory[in]
544  */
dra_init_(Integer * max_arrays,DoublePrecision * max_array_size,DoublePrecision * tot_disk_space,DoublePrecision * max_memory)545 Integer FATR dra_init_(
546         Integer *max_arrays,
547         DoublePrecision *max_array_size,
548         DoublePrecision *tot_disk_space,
549         DoublePrecision *max_memory)
550 {
551     int i, buf_size;
552     pnga_sync();
553 
554     if(*max_arrays<-1 || *max_arrays> DRA_MAX_ARRAYS)
555         dai_error("dra_init: incorrect max number of arrays",*max_arrays);
556     _max_disk_array = (*max_arrays==-1) ? DEF_MAX_ARRAYS: *max_arrays;
557 
558     Dra_num_serv = drai_get_num_serv();
559 
560     DRA = (disk_array_t*)malloc(sizeof(disk_array_t)* (int)*max_arrays);
561     if(!DRA) dai_error("dra_init: memory alocation failed\n",0);
562     for(i=0; i<_max_disk_array ; i++)DRA[i].actv=0;
563 
564     for(i=0; i<MAX_REQ; i++) Requests[i].num_pending=0;
565 
566     dra_debug_flag = FALSE;
567     _dra_io_procs = pnga_cluster_nnodes();
568     _dra_number_of_files = pnga_cluster_nnodes();
569 
570     /* initialize Buffer Manager */
571     buf_size = sizeof (buf_info) + (int) DBL_BUF_SIZE;
572     buffer_init(&buf_ctxt, nbuf, buf_size, &wait_buf);
573 
574     pnga_sync();
575 
576     return(ELIO_OK);
577 }
578 
579 
580 /**
581  * correct chunk size to fit the buffer and introduce allignment
582  *
583  * @param a[in,out] Initial guess for larger chunk
584  * @param b[in,out] Initial guess for smaller chunk
585  * @param prod[in]  Total number of elements that will fit in buffer [input]
586  * @param ratio[in] Ratio of size of current patch formed by a and b
587  *                  to size of I/O buffer
588  */
dai_correct_chunking(Integer * a,Integer * b,Integer prod,double ratio)589 void dai_correct_chunking(Integer* a, Integer* b, Integer prod, double ratio)
590 {
591     Integer b0, bt, eps0, eps1, trial;
592     double  da = (double)*a, db = (double)*b;
593     double  ch_ratio = da/db;
594 
595     db = sqrt(prod /ch_ratio);
596     trial = b0 = (Integer)db; eps0 = prod%b0;
597     trial = PARIO_MAX(trial,EPS_SEARCH); /******/
598     /* search in the neighborhood for a better solution */
599     for (bt = trial+ EPS_SEARCH; bt> PARIO_MIN(1,trial-EPS_SEARCH); bt--){
600         eps1 =  prod%bt;
601         if(eps1 < eps0){
602             /* better solution found */
603             b0 = bt; eps0 = eps1;
604         }
605     }
606     *a = prod/b0;
607     *b = b0;
608 }
609 
610 
611 /**
612  * compute chunk parameters for layout of arrays on the disk
613  *   ---- a very simple algorithm to be refined later ----
614  * @param elem_size[in] Size of stored data
615  * @param block1[in]    Estimated size of request in dimension 1
616  * @param block2[in]    Estimated size of request in dimension 2
617  * @param dim1[in]      Size of DRA in dimension 1
618  * @param dim2[in]      Size of DRA in dimension 2
619  * @param chunk1[out]   Data block size in dimension 1?
620  * @param chunk2[out]   Data block size in dimension 2?
621  */
dai_chunking(Integer elem_size,Integer block1,Integer block2,Integer dim1,Integer dim2,Integer * chunk1,Integer * chunk2)622 void dai_chunking(Integer elem_size, Integer block1, Integer block2,
623         Integer dim1, Integer dim2, Integer *chunk1, Integer *chunk2)
624 {
625     Integer patch_size;
626 
627     *chunk1 = *chunk2 =0;
628     if(block1 <= 0 && block2 <= 0){
629 
630         *chunk1 = dim1;
631         *chunk2 = dim2;
632 
633     }else if(block1 <= 0){
634         *chunk2 = block2;
635         *chunk1 = PARIO_MAX(1, DRA_BUF_SIZE/(elem_size* (*chunk2)));
636     }else if(block2 <= 0){
637         *chunk1 = block1;
638         *chunk2 = PARIO_MAX(1, DRA_BUF_SIZE/(elem_size* (*chunk1)));
639     }else{
640         *chunk1 = block1;
641         *chunk2 = block2;
642     }
643 
644     /* need to correct chunk size to fit chunk1 x chunk2 request in buffer*/
645     patch_size = (*chunk1)* (*chunk2)*elem_size;
646 
647     if (patch_size > ((Integer)DRA_BUF_SIZE)){
648 
649         if( *chunk1 == 1) *chunk2  = DRA_BUF_SIZE/elem_size;
650         else if( *chunk2 == 1) *chunk1  = DRA_BUF_SIZE/elem_size;
651         else {
652             double  ratio = ((double)patch_size)/((double)DRA_BUF_SIZE);
653             /* smaller chunk to be scaled first */
654             if(*chunk1 < *chunk2){
655                 dai_correct_chunking(chunk2,chunk1,DRA_BUF_SIZE/elem_size,ratio);
656             }else{
657                 dai_correct_chunking(chunk1,chunk2,DRA_BUF_SIZE/elem_size,ratio);
658             }
659         }
660     }
661 #ifdef DEBUG
662     printf("\n%d:CREATE chunk=(%d,%d) elem_size=%d req=(%d,%d) buf=%d\n",
663             pnga_nodeid(),*chunk1, *chunk2, elem_size, block1, block2,
664             DRA_DBL_BUF_SIZE);
665     fflush(stdout);
666 #endif
667 }
668 
669 
670 /**
671  * get a new handle for disk array
672  */
dai_get_handle(void)673 Integer dai_get_handle(void)
674 {
675     Integer dra_handle =-1, candidate = 0;
676 
677     do{
678         if(!DRA[candidate].actv){
679             dra_handle=candidate;
680             DRA[candidate].actv =1;
681         }
682         candidate++;
683     }while(candidate < _max_disk_array && dra_handle == -1);
684     return(dra_handle);
685 }
686 
687 
688 /**
689  * release handle -- makes array inactive
690  */
dai_release_handle(Integer * handle)691 void dai_release_handle(Integer *handle)
692 {
693     DRA[*handle+DRA_OFFSET].actv =0;
694     *handle = 0;
695 }
696 
697 
698 /**
699  * find offset in file for (ilo,ihi) element
700  */
dai_file_location(section_t ds_a,Off_t * offset)701 void dai_file_location(section_t ds_a, Off_t* offset)
702 {
703     Integer row_blocks, handle=ds_a.handle+DRA_OFFSET, offelem, cur_ld, part_chunk1;
704 
705     if((ds_a.lo[0]-1)%DRA[handle].chunk[0])
706         dai_error("dai_file_location: not alligned ??",ds_a.lo[0]);
707 
708     row_blocks  = (ds_a.lo[0]-1)/DRA[handle].chunk[0];/* # row blocks from top*/
709     part_chunk1 = DRA[handle].dims[0]%DRA[handle].chunk[0];/*dim1 in part block*/
710     cur_ld      = (row_blocks == DRA[handle].dims[0] / DRA[handle].chunk[0]) ?
711         part_chunk1: DRA[handle].chunk[0];
712 
713     /* compute offset (in elements) */
714 
715     if(INDEPFILES(ds_a.handle) || DRA[handle].numfiles > 1) {
716 
717         Integer   CR, R;
718         Integer   i, num_part_block = 0;
719         Integer   ioprocs=dai_io_procs(ds_a.handle);
720         Integer   iome = dai_io_nodeid(ds_a.handle);
721 
722         sect_to_blockM(ds_a, &CR);
723 
724         R    = (DRA[handle].dims[0] + DRA[handle].chunk[0]-1)/DRA[handle].chunk[0];
725         for(i = R -1; i< CR; i+=R) if(i%ioprocs == iome)num_part_block++;
726 
727         if(!part_chunk1) part_chunk1=DRA[handle].chunk[0];
728         offelem = ((CR/ioprocs - num_part_block)*DRA[handle].chunk[0] +
729                 num_part_block * part_chunk1 ) * DRA[handle].chunk[1];
730 
731         /* add offset within block */
732         offelem += ((ds_a.lo[1]-1) %DRA[handle].chunk[1])*cur_ld;
733     } else {
734 
735         offelem = row_blocks  * DRA[handle].dims[1] * DRA[handle].chunk[0];
736         offelem += (ds_a.lo[1]-1)*cur_ld;
737 
738     }
739 
740     *offset = offelem* dai_sizeofM(DRA[handle].type);
741 }
742 
743 
744 /**
745  * write aligned block of data from memory buffer to d_a
746  */
dai_put(section_t ds_a,void * buf,Integer ld,io_request_t * id)747 void dai_put(
748         section_t    ds_a,
749         void         *buf,
750         Integer      ld,
751         io_request_t *id)
752 {
753     Integer handle = ds_a.handle + DRA_OFFSET, elem;
754     Off_t   offset;
755     Size_t  bytes;
756 
757     /* find location in a file where data should be written */
758     dai_file_location(ds_a, &offset);
759 
760     if((ds_a.hi[0] - ds_a.lo[0] + 1) != ld) dai_error("dai_put: bad ld",ld);
761 
762     /* since everything is aligned, write data to disk */
763     elem = (ds_a.hi[0] - ds_a.lo[0] + 1) * (ds_a.hi[1] - ds_a.lo[1] + 1);
764     bytes= (Size_t) elem * dai_sizeofM(DRA[handle].type);
765     if( ELIO_OK != elio_awrite(DRA[handle].fd, offset, buf, bytes, id ))
766         dai_error("dai_put failed", ds_a.handle);
767 }
768 
769 
770 /**
771  * write zero at EOF
772  */
dai_zero_eof(Integer d_a)773 void dai_zero_eof(Integer d_a)
774 {
775     Integer handle = d_a+DRA_OFFSET, nelem;
776     char byte;
777     Off_t offset;
778 
779     byte = (char)0;
780 
781     if(INDEPFILES(d_a) || DRA[handle].numfiles > 1) {
782 
783         Integer   CR=0, i=0, nblocks=0;
784         section_t ds_a;
785         /* number of processors that do io */
786         Integer   ioprocs=dai_io_procs(d_a);
787         /* node id of current process (if it does io) */
788         Integer   iome = dai_io_nodeid(d_a);
789 
790         /* total number of blocks in the disk resident array */
791         nblocks = ((DRA[handle].dims[0]
792                     + DRA[handle].chunk[0]-1)/DRA[handle].chunk[0])
793             * ((DRA[handle].dims[1]
794                         + DRA[handle].chunk[1]-1)/DRA[handle].chunk[1]);
795         fill_sectionM(ds_a, d_a, 0, 0, 0, 0);
796 
797         /* search for the last block for each I/O processor */
798         for(i = 0; i <ioprocs; i++){
799             CR = nblocks - 1 -i;
800             if(CR % ioprocs == iome) break;
801         }
802         if(CR<0) return; /* no blocks owned */
803 
804         block_to_sectM(&ds_a, CR); /* convert block number to section */
805         dai_file_location(ds_a, &offset);
806         nelem = (ds_a.hi[0] - ds_a.lo[0] +1)*(ds_a.hi[1] - ds_a.lo[1] +1);
807         offset += ((Off_t)nelem) * dai_sizeofM(DRA[handle].type);
808 
809 #ifdef DEBUG
810         printf("me=%d zeroing EOF (%d) at %ld bytes \n",iome,CR,offset);
811 #endif
812     } else {
813 
814         nelem = DRA[handle].dims[0]*DRA[handle].dims[1];
815         offset = ((Off_t)nelem) * dai_sizeofM(DRA[handle].type);
816     }
817 
818     if(elio_write(DRA[handle].fd, offset-1, &byte, 1) != (Size_t)1)
819         dai_error("dai_zero_eof: write error ",0);
820 }
821 
822 
823 #ifdef CLEAR_BUF
dai_clear_buffer()824 static void dai_clear_buffer()
825 {
826     /* int i, j; */
827     /*
828        for (j = 0; j < DRA_NBUF; j++)
829        for (i=0;i<DRA_DBL_BUF_SIZE;i++)
830        ((double*)_dra_buffer_state[j].buffer)[i]=0.;
831        */
832 }
833 #endif
834 
835 
836 /**
837  * read aligned block of data from d_a to memory buffer
838  */
dai_get(section_t ds_a,void * buf,Integer ld,io_request_t * id)839 void dai_get(section_t ds_a, void *buf, Integer ld, io_request_t *id)
840 {
841     Integer handle = ds_a.handle + DRA_OFFSET, elem, rc;
842     Off_t   offset;
843     Size_t  bytes;
844 
845     /* find location in a file where data should be read from */
846     dai_file_location(ds_a, &offset);
847 
848 #       ifdef CLEAR_BUF
849     dai_clear_buffer();
850 #       endif
851 
852     if((ds_a.hi[0] - ds_a.lo[0] + 1) != ld) dai_error("dai_get: bad ld",ld);
853     /* since everything is aligned, read data from disk */
854     elem = (ds_a.hi[0] - ds_a.lo[0] + 1) * (ds_a.hi[1] - ds_a.lo[1] + 1);
855     bytes= (Size_t) elem * dai_sizeofM(DRA[handle].type);
856     rc= elio_aread(DRA[handle].fd, offset, buf, bytes, id );
857     if(rc !=  ELIO_OK) dai_error("dai_get failed", rc);
858 }
859 
860 
dai_assign_request_handle(Integer * request)861 void dai_assign_request_handle(Integer* request)
862 {
863     int      i;
864 
865     *request = -1;
866     for(i=0;i<MAX_REQ;i++)if(Requests[i].num_pending==0){
867         *request = i;
868         break;
869     }
870 
871     if(*request ==-1)
872         dai_error("DRA: number of pending I/O requests exceeded",MAX_REQ);
873 
874     Requests[*request].na=0;
875     Requests[*request].nu=0;
876     Requests[*request].call_id = *request;
877     Requests[*request].num_pending = ON;
878     Requests[*request].call_id = *request;
879 }
880 
881 
882 /**
883  * OPEN AN ARRAY THAT EXISTS ON THE DISK
884  *
885  * @param filename[in]
886  * @param mode[in]
887  * @param d_a[out]
888  */
drai_open(char * filename,Integer * mode,Integer * d_a)889 Integer drai_open(char *filename, Integer *mode, Integer *d_a)
890 {
891     Integer handle;
892     int emode;
893 
894     pnga_sync();
895 
896     /*** Get next free DRA handle ***/
897     if( (handle = dai_get_handle()) == -1)
898         dai_error("dra_open: too many disk arrays ", _max_disk_array);
899     *d_a = handle - DRA_OFFSET;
900 
901     DRA[handle].mode = (int)*mode;
902     strncpy (DRA[handle].fname, filename,  DRA_MAX_FNAME);
903 
904     /*translate DRA mode into ELIO mode*/
905     emode = dai_elio_mode((int)*mode);
906 
907     if(dai_read_param(DRA[handle].fname, *d_a))return((Integer)-1);
908 
909     DRA[handle].indep = dai_file_config(filename); /*check file configuration*/
910 
911     if(dai_io_manage(*d_a)){
912 
913         if (INDEPFILES(*d_a) || DRA[handle].numfiles > 1) {
914 
915             sprintf(dummy_fname,"%s.%ld",DRA[handle].fname,(long)dai_io_nodeid(*d_a));
916             DRA[handle].fd = elio_open(dummy_fname,emode, ELIO_PRIVATE);
917 
918         }else{
919 
920             DRA[handle].fd = elio_open(DRA[handle].fname,emode, ELIO_SHARED);
921         }
922 
923         if(DRA[handle].fd ==NULL)dai_error("dra_open failed (null)",
924                 pnga_nodeid());
925         if(DRA[handle].fd->fd ==-1)dai_error("dra_open failed (-1)",
926                 pnga_nodeid());
927     }
928 
929 
930 #ifdef DEBUG
931     printf("\n%d:OPEN chunking=(%d,%d) type=%d buf=%d\n",
932             pnga_nodeid(),DRA[handle].chunk[0], DRA[handle].chunk[1],
933             DRA[handle].type, DRA_DBL_BUF_SIZE);
934     fflush(stdout);
935 #endif
936 
937     pnga_sync();
938 
939     /* printf("FILE OPENED!!\n"); */
940     return(ELIO_OK);
941 }
942 
943 
944 /**
945  * CLOSE AN ARRAY AND SAVE IT ON THE DISK
946  */
dra_close_(Integer * d_a)947 Integer FATR dra_close_(Integer* d_a) /* input:DRA handle*/
948 {
949     Integer handle = *d_a+DRA_OFFSET;
950     int rc;
951 
952     pnga_sync();
953 
954     dai_check_handleM(*d_a, "dra_close");
955     if(dai_io_manage(*d_a)) if(ELIO_OK != (rc=elio_close(DRA[handle].fd)))
956         dai_error("dra_close: close failed",rc);
957     dai_release_handle(d_a);
958 
959     pnga_sync();
960 
961     return(ELIO_OK);
962 }
963 
964 
965 /**
966  * decompose [ilo:ihi, jlo:jhi] into aligned and unaligned DRA subsections
967  *
968  *
969  * section [ilo:ihi, jlo:jhi] is decomposed into a number of
970  * 'aligned' and 'unaligned' (on chunk1/chunk2 bounday) subsections
971  * depending on the layout of the 2D array on the disk;
972  *
973  * 'cover' subsections correspond to 'unaligned' subsections and
974  * extend them to aligned on chunk1/chunk2 boundaries;
975  *
976  * disk I/O will be actually performed on 'aligned' and
977  * 'cover' instead of 'unaligned' subsections
978  */
dai_decomp_section(section_t ds_a,Integer aligned[][2* MAXDIM],int * na,Integer cover[][2* MAXDIM],Integer unaligned[][2* MAXDIM],int * nu)979 void dai_decomp_section(
980         section_t ds_a,
981         Integer aligned[][2*MAXDIM],
982         int *na,
983         Integer cover[][2*MAXDIM],
984         Integer unaligned[][2*MAXDIM],
985         int *nu)
986 {
987     Integer a=0, u=0, handle = ds_a.handle+DRA_OFFSET, off, chunk_units, algn_flag;
988 
989 
990     aligned[a][ ILO ] = ds_a.lo[0]; aligned[a][ IHI ] = ds_a.hi[0];
991     aligned[a][ JLO ] = ds_a.lo[1]; aligned[a][ JHI ] = ds_a.hi[1];
992 
993     switch   (DRA[handle].layout){
994         case COLUMN : /* need to check row alignment only */
995 
996             algn_flag = ON; /* has at least one aligned subsection */
997 
998             /* top of section */
999             off = (ds_a.lo[0] -1) % DRA[handle].chunk[0];
1000             if(off){
1001 
1002                 if(MAX_UNLG<= u)
1003                     dai_error("dai_decomp_sect:insufficient nu",u);
1004 
1005                 chunk_units = (ds_a.lo[0] -1) / DRA[handle].chunk[0];
1006 
1007                 cover[u][ ILO ] = chunk_units*DRA[handle].chunk[0] + 1;
1008                 cover[u][ IHI ] = PARIO_MIN(cover[u][ ILO ] + DRA[handle].chunk[0]-1,
1009                         DRA[handle].dims[0]);
1010 
1011                 unaligned[u][ ILO ] = ds_a.lo[0];
1012                 unaligned[u][ IHI ] = PARIO_MIN(ds_a.hi[0],cover[u][ IHI ]);
1013                 unaligned[u][ JLO ] = cover[u][ JLO ] = ds_a.lo[1];
1014                 unaligned[u][ JHI ] = cover[u][ JHI ] = ds_a.hi[1];
1015 
1016                 if(cover[u][ IHI ] < ds_a.hi[0]){
1017                     /* cover subsection ends above ihi */
1018                     if(MAX_ALGN<=a)
1019                         dai_error("dai_decomp_sect: na too small",a);
1020                     aligned[a][ ILO ] = cover[u][ IHI ]+1;
1021                 }else{
1022                     /* cover subsection includes ihi */
1023                     algn_flag = OFF;
1024                 }
1025                 u++;
1026             }
1027 
1028             /* bottom of section */
1029             off = ds_a.hi[0] % DRA[handle].chunk[0];
1030             if(off && (ds_a.hi[0] != DRA[handle].dims[0]) && (algn_flag == ON)){
1031 
1032                 if(MAX_UNLG<=u)
1033                     dai_error("dai_decomp_sect:insufficient nu",u);
1034                 chunk_units = ds_a.hi[0] / DRA[handle].chunk[0];
1035 
1036                 cover[u][ ILO ] = chunk_units*DRA[handle].chunk[0] + 1;
1037                 cover[u][ IHI ] = PARIO_MIN(cover[u][ ILO ] + DRA[handle].chunk[0]-1,
1038                         DRA[handle].dims[0]);
1039 
1040                 unaligned[u][ ILO ] = cover[u][ ILO ];
1041                 unaligned[u][ IHI ] = ds_a.hi[0];
1042                 unaligned[u][ JLO ] = cover[u][ JLO ] = ds_a.lo[1];
1043                 unaligned[u][ JHI ] = cover[u][ JHI ] = ds_a.hi[1];
1044 
1045                 aligned[a][ IHI ] = PARIO_MAX(1,unaligned[u][ ILO ]-1);
1046                 algn_flag=(DRA[handle].chunk[0] == DRA[handle].dims[0])?OFF:ON;
1047                 u++;
1048             }
1049             *nu = (int)u;
1050             if(aligned[0][ IHI ]-aligned[0][ ILO ] < 0) algn_flag= OFF;
1051             *na = (algn_flag== OFF)? 0: 1;
1052             break;
1053 
1054         case ROW : /* we need to check column alignment only */
1055 
1056         default: dai_error("dai_decomp_sect: ROW layout not yet implemented",
1057                          DRA[handle].layout);
1058     }
1059 }
1060 
1061 
1062 /**
1063  * given current (i,j) compute (ni, nj) - next loop index
1064  * o - outermost loop, i- innermost loop
1065  * iinc increment for i
1066  * oinc increment for o
1067  */
dai_next2d(Integer * i,Integer imin,Integer imax,Integer iinc,Integer * o,Integer omin,Integer omax,Integer oinc)1068 int dai_next2d(Integer* i, Integer imin, Integer imax, Integer iinc,
1069         Integer* o, Integer omin, Integer omax, Integer oinc)
1070 {
1071     int retval;
1072     if (*o == 0  || *i == 0) {
1073         /* to handle initial out-of range indices */
1074         *o = omin;
1075         *i = imin;
1076     } else {
1077         *i = *i + iinc;
1078     }
1079     if (*i > imax) {
1080         *i = imin;
1081         *o += oinc;
1082     }
1083     retval = (*o <= omax);
1084     return retval;
1085 }
1086 
1087 
1088 /**
1089  * compute next chunk of array to process
1090  */
dai_next_chunk(Integer req,Integer * list,section_t * ds_chunk)1091 int dai_next_chunk(Integer req, Integer* list, section_t* ds_chunk)
1092 {
1093     Integer   handle = ds_chunk->handle+DRA_OFFSET;
1094     int       retval;
1095 
1096     if(INDEPFILES(ds_chunk->handle) || DRA[handle].numfiles > 1)
1097         if(ds_chunk->lo[1] && DRA[handle].chunk[1]>1)
1098             ds_chunk->lo[1] -= (ds_chunk->lo[1] -1) % DRA[handle].chunk[1];
1099 
1100     retval = dai_next2d(&ds_chunk->lo[0], list[ ILO ], list[ IHI ],
1101             DRA[handle].chunk[0],
1102             &ds_chunk->lo[1], list[ JLO ], list[ JHI ],
1103             DRA[handle].chunk[1]);
1104     if(!retval) return(retval);
1105 
1106     ds_chunk->hi[0] = PARIO_MIN(list[ IHI ], ds_chunk->lo[0] + DRA[handle].chunk[0] -1);
1107     ds_chunk->hi[1] = PARIO_MIN(list[ JHI ], ds_chunk->lo[1] + DRA[handle].chunk[1] -1);
1108 
1109     if(INDEPFILES(ds_chunk->handle) || DRA[handle].numfiles > 1) {
1110         Integer jhi_temp =  ds_chunk->lo[1] + DRA[handle].chunk[1] -1;
1111         jhi_temp -= jhi_temp % DRA[handle].chunk[1];
1112         ds_chunk->hi[1] = PARIO_MIN(ds_chunk->hi[1], jhi_temp);
1113 
1114         /*this line was absent from older version on bonnie that worked */
1115         if(ds_chunk->lo[1] < list[ JLO ]) ds_chunk->lo[1] = list[ JLO ];
1116     }
1117 
1118     return 1;
1119 }
1120 
1121 
dai_myturn(section_t ds_chunk)1122 int dai_myturn(section_t ds_chunk)
1123 {
1124     /* Integer   handle = ds_chunk.handle+DRA_OFFSET; */
1125     Integer   ioprocs = dai_io_procs(ds_chunk.handle);
1126     Integer   iome    = dai_io_nodeid(ds_chunk.handle);
1127 
1128     /*    if(INDEPFILES(ds_chunk.handle) || DRA[handle].numfiles > 1){ */
1129 
1130     /* compute cardinal number for the current chunk */
1131     nsect_to_blockM(ds_chunk, &_dra_turn);
1132 
1133     /*    }else{
1134           _dra_turn++;
1135           } */
1136 
1137     return ((_dra_turn%ioprocs) == iome);
1138 }
1139 
1140 
1141 /**
1142  * print routine for debugging purposes only (double)
1143  */
dai_print_buf(double * buf,Integer ld,Integer rows,Integer cols)1144 void dai_print_buf(double *buf, Integer ld, Integer rows, Integer cols)
1145 {
1146     int i,j;
1147     printf("\n ld=%ld rows=%ld cols=%ld\n",(long)ld,(long)rows,(long)cols);
1148 
1149     for (i=0; i<rows; i++){
1150         for (j=0; j<cols; j++)
1151             printf("%f ", buf[j*ld+i]);
1152         printf("\n");
1153     }
1154 }
1155 
1156 
dra_set_mode_(Integer * val)1157 void dra_set_mode_(Integer* val)
1158 {
1159 }
1160 
1161 
1162 
ga_move_1d(int op,section_t gs_a,section_t ds_a,section_t ds_chunk,void * buffer,Integer ldb)1163 void ga_move_1d(int op, section_t gs_a, section_t ds_a,
1164         section_t ds_chunk, void* buffer, Integer ldb)
1165 {
1166     Integer index, ldd = gs_a.hi[0] - gs_a.lo[0] + 1, one=1;
1167     Integer atype, cols, rows, elemsize, ilo, ihi;
1168     Integer istart, iend, jstart, jend;
1169     void  (*f)(Integer,Integer*,Integer*,void*,Integer*);
1170     char *buf = (char*)buffer;
1171 
1172     if(op==LOAD) f = pnga_get;
1173     else f = pnga_put;
1174 
1175     pnga_inquire(gs_a.handle, &atype, &rows, &cols);
1176     elemsize = MA_sizeof(atype, 1, MT_C_CHAR);
1177 
1178     /* find where in global array the first dra chunk element in buffer goes*/
1179     index = ds_chunk.lo[0] - ds_a.lo[0];
1180     istart = index%ldd + gs_a.lo[0];
1181     jstart = index/ldd + gs_a.lo[1];
1182 
1183     /* find where in global array the last dra chunk element in buffer goes*/
1184     index = ds_chunk.hi[0] - ds_a.lo[0];
1185     iend = index%ldd + gs_a.lo[0];
1186     jend = index/ldd + gs_a.lo[1];
1187 
1188     /* we have up to 3 rectangle chunks corresponding to gs_chunk
1189        .|' incomplete first column, full complete middle column, and
1190        incomplete last column */
1191     if(istart != gs_a.lo[0] || jstart==jend ){
1192         Integer lo[2], hi[2];
1193         ilo = istart;
1194         ihi = gs_a.hi[0];
1195         if(jstart==jend) ihi=iend;
1196         lo[0] = ilo; lo[1] = jstart;
1197         hi[0] = ihi; hi[1] = jstart;
1198         f(gs_a.handle, lo, hi, buf, &one);
1199         buf += elemsize*(ihi -ilo+1);
1200         if(jstart==jend) return;
1201         jstart++;
1202     }
1203 
1204     if(iend != gs_a.hi[0]) jend--;
1205 
1206     if(jstart <= jend) {
1207         Integer lo[2], hi[2];
1208         lo[0] = gs_a.lo[0]; lo[1] = jstart;
1209         hi[0] = gs_a.hi[0]; hi[1] = jend;
1210         f(gs_a.handle, lo, hi, buf, &ldd);
1211         buf += elemsize*ldd*(jend-jstart+1);
1212     }
1213 
1214     if(iend != gs_a.hi[0]){
1215         Integer lo[2], hi[2];
1216         jend++; /* Since decremented above */
1217         lo[0] = gs_a.lo[0]; lo[1] = jend;
1218         hi[0] = iend;       hi[1] = jend;
1219         f(gs_a.handle, lo, hi, buf, &one);
1220     }
1221 }
1222 
1223 
ga_move(int op,int trans,section_t gs_a,section_t ds_a,section_t ds_chunk,void * buffer,Integer ldb)1224 void ga_move(int op, int trans, section_t gs_a, section_t ds_a,
1225         section_t ds_chunk, void* buffer, Integer ldb)
1226 {
1227     if(!trans && (gs_a.lo[0]- gs_a.hi[0] ==  ds_a.lo[0]- ds_a.hi[0]) ){
1228         /*** straight copy possible if there's no reshaping or transpose ***/
1229 
1230         /* determine gs_chunk corresponding to ds_chunk */
1231         section_t gs_chunk = gs_a;
1232         dai_dest_indicesM(ds_chunk.lo[0], ds_chunk.lo[1], ds_a.lo[0], ds_a.lo[1],
1233                 ds_a.hi[0]-ds_a.lo[0]+1, &gs_chunk.lo[0], &gs_chunk.lo[1],
1234                 gs_a.lo[0], gs_a.lo[1],   gs_a.hi[0] - gs_a.lo[0] + 1);
1235         dai_dest_indicesM(ds_chunk.hi[0], ds_chunk.hi[1], ds_a.lo[0], ds_a.lo[1],
1236                 ds_a.hi[0]-ds_a.lo[0]+1, &gs_chunk.hi[0], &gs_chunk.hi[1],
1237                 gs_a.lo[0], gs_a.lo[1],  gs_a.hi[0] - gs_a.lo[0] + 1);
1238 
1239         /* move data */
1240         if(op==LOAD) ga_get_sectM(gs_chunk, buffer, ldb);
1241         else         ga_put_sectM(gs_chunk, buffer, ldb);
1242 
1243 #ifdef MOVE1D_ENABLED
1244     }else if(!trans && (ds_a.lo[1]==ds_a.hi[1]) ){
1245 
1246         /* for a 1-dim section (column) some optimization possible */
1247         ga_move_1d(op, gs_a, ds_a, ds_chunk, buffer, ldb);
1248 #endif
1249     }else{
1250         /** due to generality of this transformation scatter/gather is required **/
1251 
1252         MA_AccessIndex iindex, jindex, vindex, pindex;
1253         Integer ihandle, jhandle, vhandle, phandle;
1254         int type = DRA[ds_a.handle+DRA_OFFSET].type;
1255         Integer i, j, ii, jj, base,nelem;
1256         char    *base_addr;
1257 
1258         if(pnga_nodeid()==0) printf("DRA warning: using scatter/gather\n");
1259 
1260         nelem = (ds_chunk.hi[0]-ds_chunk.lo[0]+1)
1261             * (ds_chunk.hi[1]-ds_chunk.lo[1]+1);
1262         if(!MA_push_get(C_INT, nelem, "i_", &ihandle, &iindex))
1263             dai_error("DRA move: MA failed-i ", 0L);
1264         if(!MA_push_get(C_INT, nelem, "j_", &jhandle, &jindex))
1265             dai_error("DRA move: MA failed-j ", 0L);
1266         if(!MA_push_get(type, nelem, "v_", &vhandle, &vindex))
1267             dai_error("DRA move: MA failed-v ", 0L);
1268         if(!MA_get_pointer(vhandle, &base_addr))
1269             dai_error("DRA move: MA get_pointer failed ", 0L);
1270 
1271         if(trans==TRANS)
1272             ITERATOR_2D(i,j, base, ds_chunk) {
1273                 dai_dest_indicesM(j, i, ds_a.lo[0], ds_a.lo[1], ds_a.hi[0]-ds_a.lo[0]+1,
1274                         INT_MB+base+iindex, INT_MB+base+jindex,
1275                         gs_a.lo[0], gs_a.lo[1],  gs_a.hi[0] - gs_a.lo[0] + 1);
1276             }
1277         else
1278             ITERATOR_2D(i,j, base, ds_chunk) {
1279                 dai_dest_indicesM(i, j, ds_a.lo[0], ds_a.lo[1], ds_a.hi[0]-ds_a.lo[0]+1,
1280                         INT_MB+base+iindex, INT_MB+base+jindex,
1281                         gs_a.lo[0], gs_a.lo[1],  gs_a.hi[0] - gs_a.lo[0] + 1);
1282             }
1283 
1284         /* move data */
1285         if(op==LOAD){
1286 
1287             if(!MA_push_get(C_INT, nelem, "pindex", &phandle, &pindex))
1288                 dai_error("DRA move: MA failed-p ", 0L);
1289             for(i=0; i< nelem; i++) INT_MB[pindex+i] = i;
1290             pnga_gather2d(gs_a.handle, base_addr, INT_MB+iindex, INT_MB+jindex, nelem);
1291             COPY_TYPE(GATHER, type, ds_chunk);
1292             MA_pop_stack(phandle);
1293 
1294         }else{
1295 
1296             COPY_TYPE(SCATTER, type, ds_chunk);
1297             pnga_scatter2d(gs_a.handle, base_addr, INT_MB+iindex, INT_MB+jindex, nelem);
1298         }
1299 
1300         MA_pop_stack(vhandle);
1301         MA_pop_stack(jhandle);
1302         MA_pop_stack(ihandle);
1303     }
1304 }
1305 
1306 
1307 /**
1308  * @param op[in]       flag for read or write
1309  * @param trans[in]    flag for transpose
1310  * @param gs_a[in]     complete section of global array
1311  * @param ds_a[in]     complete section of DRA
1312  * @param ds_chunk[in] actual DRA chunk
1313  * @param buffer[in]   pointer to io buffer containing DRA cover section
1314  * @param ldb
1315  * @param ga_movhdl
1316  */
nga_move(int op,int trans,section_t gs_a,section_t ds_a,section_t ds_chunk,void * buffer,Integer ldb[],Integer * ga_movhdl)1317 void nga_move(int op, int trans, section_t gs_a, section_t ds_a,
1318         section_t ds_chunk, void* buffer, Integer ldb[], Integer *ga_movhdl)
1319 {
1320     Integer ndim = gs_a.ndim, i;
1321     logical consistent = TRUE;
1322 
1323 #if WALLTIME
1324     double ss0,tt0;
1325     walltime_(&ss0,&tt0);
1326     printf("p[%d] Beginning nga_move: %16.6f\n",pnga_nodeid(),tt0);
1327 #endif
1328     if (!trans) {
1329         for (i=0; i<ndim-1; i++)
1330             if (gs_a.lo[i]-gs_a.hi[i] != ds_a.lo[i]-ds_a.hi[i]) consistent = FALSE;
1331     } else {
1332         for (i=0; i<ndim-1; i++)
1333             if (gs_a.lo[ndim-1-i]-gs_a.hi[ndim-1-i]
1334                     != ds_a.lo[i]-ds_a.hi[i]) consistent = FALSE;
1335     }
1336     if (!trans && consistent){
1337 
1338         /*** straight copy possible if there's no reshaping or transpose ***/
1339 
1340         /* determine gs_chunk corresponding to ds_chunk */
1341         section_t gs_chunk = gs_a;
1342         ndai_dest_indicesM(ds_chunk, ds_a, gs_chunk, gs_a);
1343         consistent = TRUE;
1344         for (i=0; i<ndim; i++) {
1345             if (gs_chunk.hi[i]<gs_chunk.lo[i] || gs_chunk.lo[i]<0) {
1346                 consistent = FALSE;
1347             }
1348         }
1349         if (!consistent) {
1350             for(i=0; i<ndim; i++) {
1351                 printf("gs_chunk[%d] %5d:%5d  ds_chunk[%d] %5d:%5d",
1352                         (int)i,(int)gs_chunk.lo[i],(int)gs_chunk.hi[i],
1353                         (int)i,(int)ds_chunk.lo[i],(int)ds_chunk.hi[i]);
1354                 printf(" gs_a[%d] %5d:%5d  ds_a[%d] %5d:%5d\n",
1355                         (int)i,(int)gs_a.lo[i],(int)gs_a.hi[i],
1356                         (int)i,(int)ds_a.lo[i],(int)ds_a.hi[i]);
1357             }
1358         }
1359         /* move data */
1360         if (op==LOAD) {
1361             nga_get_sectM(gs_chunk, buffer, ldb, ga_movhdl);
1362         } else {
1363             nga_put_sectM(gs_chunk, buffer, ldb, ga_movhdl);
1364         }
1365 
1366     } else if (trans && consistent) {
1367         /* Only transpose is supported, not reshaping, so scatter/gather is not
1368            required */
1369         MA_AccessIndex vindex;
1370         Integer vhandle, index[MAXDIM];
1371         Integer i, j, itmp, jtmp, nelem, ldt[MAXDIM], ldg[MAXDIM];
1372         Integer nelem1, nelem2, nelem3;
1373         int type = DRA[ds_a.handle+DRA_OFFSET].type;
1374         char    *base_addr;
1375         section_t gs_chunk = gs_a;
1376 
1377         /* create space to copy transpose of DRA section into */
1378         nelem = 1;
1379         for (i=0; i<ndim; i++) {
1380             nelem *= (ds_chunk.hi[i] - ds_chunk.lo[i] + 1);
1381         }
1382         nelem1 = 1;
1383         ndai_trnsp_dest_indicesM(ds_chunk, ds_a, gs_chunk, gs_a);
1384         for (i=1; i<ndim; i++) nelem1 *= (gs_chunk.hi[i] - gs_chunk.lo[i] + 1);
1385         nelem2 = 1;
1386         for (i=0; i<ndim-1; i++) nelem2 *= ldb[i];
1387         if(!MA_push_get(type, nelem, "v_", &vhandle, &vindex))
1388             dai_error("DRA move: MA failed-v ", 0L);
1389         if(!MA_get_pointer(vhandle, &base_addr))
1390             dai_error("DRA move: MA get_pointer failed ", 0L);
1391 
1392         /* copy and transpose relevant numbers from IO buffer to temporary array */
1393         for (i=1; i<ndim; i++) ldt[ndim-1-i] = ds_chunk.hi[i] - ds_chunk.lo[i] + 1;
1394         if (op == LOAD) {
1395             /* transpose buffer with data from global array */
1396             for (i=0; i<ndim; i++) ldg[i] = gs_chunk.hi[i] - gs_chunk.lo[i] + 1;
1397             /* copy data from global array to temporary buffer */
1398             nga_get_sectM(gs_chunk, base_addr, ldg, ga_movhdl);
1399             for (i=0; i<nelem1; i++ ) {
1400                 /* find indices of elements in MA buffer */
1401                 if (ndim > 1) {
1402                     itmp = i;
1403                     index[1] = itmp%ldg[1];
1404                     for (j=2; j<ndim; j++) {
1405                         itmp = (itmp-index[j-1])/ldg[j-1];
1406                         if (j != ndim-1) {
1407                             index[j] = itmp%ldg[j];
1408                         } else {
1409                             index[j] = itmp;
1410                         }
1411                     }
1412                     nelem3 = index[1];
1413                     for (j=2; j<ndim; j++) {
1414                         nelem3 *= ldb[ndim-1-j];
1415                         nelem3 += index[j];
1416                     }
1417                 } else {
1418                     nelem2 = 1;
1419                     nelem3 = 0;
1420                 }
1421                 /* find corresponding indices of element from IO buffer */
1422                 itmp = ldg[0]*i;
1423                 jtmp = nelem3;
1424                 for (j=0; j<ldg[0]; j++) {
1425                     switch(type){
1426                         case C_DBL:
1427                             ((double*)buffer)[jtmp] = ((double*)base_addr)[itmp];
1428                             break;
1429                         case C_INT:
1430                             ((int*)buffer)[jtmp] = ((int*)base_addr)[itmp];
1431                             break;
1432                         case C_LONG:
1433                             ((long*)buffer)[jtmp] = ((long*)base_addr)[itmp];
1434                             break;
1435                         case C_DCPL:
1436                             ((double*)buffer)[2*jtmp] = ((double*)base_addr)[2*itmp];
1437                             ((double*)buffer)[2*jtmp+1] = ((double*)base_addr)[2*itmp+1];
1438                             break;
1439                         case C_SCPL:
1440                             ((float*)buffer)[2*jtmp] = ((float*)base_addr)[2*itmp];
1441                             ((float*)buffer)[2*jtmp+1] = ((float*)base_addr)[2*itmp+1];
1442                             break;
1443                         case C_FLOAT:
1444                             ((float*)buffer)[jtmp] = ((float*)base_addr)[itmp];
1445                             break;
1446                     }
1447                     itmp++;
1448                     jtmp += nelem2;
1449                 }
1450             }
1451         } else {
1452             /* get transposed indices */
1453             for (i=0; i<ndim; i++) ldg[i] = gs_chunk.hi[i] - gs_chunk.lo[i] + 1;
1454             for (i=0; i<nelem1; i++ ) {
1455                 /* find indices of elements in MA buffer */
1456                 if (ndim > 1) {
1457                     itmp = i;
1458                     index[1] = itmp%ldg[1];
1459                     for (j=2; j<ndim; j++) {
1460                         itmp = (itmp-index[j-1])/ldg[j-1];
1461                         if (j != ndim-1) {
1462                             index[j] = itmp%ldg[j];
1463                         } else {
1464                             index[j] = itmp;
1465                         }
1466                     }
1467                     nelem3 = index[1];
1468                     for (j=2; j<ndim; j++) {
1469                         nelem3 *= ldb[ndim-1-j];
1470                         nelem3 += index[j];
1471                     }
1472                 } else {
1473                     nelem2 = 1;
1474                     nelem3 = 0;
1475                 }
1476                 /* find corresponding indices of element from IO buffer */
1477                 itmp = ldg[0]*i;
1478                 jtmp = nelem3;
1479                 for (j=0; j<ldg[0]; j++) {
1480                     switch(type){
1481                         case C_DBL:
1482                             ((double*)base_addr)[itmp] = ((double*)buffer)[jtmp];
1483                             break;
1484                         case C_INT:
1485                             ((int*)base_addr)[itmp] = ((int*)buffer)[jtmp];
1486                             break;
1487                         case C_LONG:
1488                             ((long*)base_addr)[itmp] = ((long*)buffer)[jtmp];
1489                             break;
1490                         case C_DCPL:
1491                             ((double*)base_addr)[2*itmp] = ((double*)buffer)[2*jtmp];
1492                             ((double*)base_addr)[2*itmp+1] = ((double*)buffer)[2*jtmp+1];
1493                             break;
1494                         case C_SCPL:
1495                             ((float*)base_addr)[2*itmp] = ((float*)buffer)[2*jtmp];
1496                             ((float*)base_addr)[2*itmp+1] = ((float*)buffer)[2*jtmp+1];
1497                             break;
1498                         case C_FLOAT:
1499                             ((float*)base_addr)[itmp] = ((float*)buffer)[jtmp];
1500                             break;
1501                     }
1502                     itmp++;
1503                     jtmp += nelem2;
1504                 }
1505             }
1506             nga_put_sectM(gs_chunk, base_addr, ldt, ga_movhdl);
1507         }
1508         MA_pop_stack(vhandle);
1509     } else {
1510         dai_error("DRA move: Inconsistent dimensions found ", 0L);
1511     }
1512 #if WALLTIME
1513     walltime_(&ss0,&tt0);
1514     printf("p[%d] Ending nga_move: %16.6f\n",pnga_nodeid(),tt0);
1515 #endif
1516 }
1517 
1518 
1519 /**
1520  * executes callback function associated with completion of asynch. I/\O
1521  */
dai_exec_callback(char * buf,int caller)1522 void dai_exec_callback(char *buf, int caller)
1523 {
1524     args_t   *arg;
1525     char *buffer;
1526     buf_info *bi;
1527 
1528 #if WALLTIME
1529     double ss0,tt0;
1530 #endif
1531 
1532     bi = (buf_info*) buf;
1533     if(bi->callback==OFF)
1534         return;
1535 
1536     bi->callback = OFF;
1537 
1538     arg = &(bi->args);
1539     /* bail if there is no valid global array handle */
1540     if (arg->gs_a.handle == 0)
1541         return;
1542 
1543     buffer = (char*) (buf + sizeof(buf_info));
1544     if (caller == WAIT) {/* call blocking nga_move() */
1545         nga_move(arg->op, arg->transp, arg->gs_a, arg->ds_a, arg->ds_chunk, buffer, arg->ld, NULL);
1546         free_buf(&buf_ctxt, buf);
1547     }
1548     else if (caller == PROBE) /* call non-blocking nga_move() */
1549         nga_move(arg->op, arg->transp, arg->gs_a, arg->ds_a, arg->ds_chunk, buffer, arg->ld, &(bi->ga_movhdl));
1550 
1551 }
1552 
1553 
1554 /**
1555  * wait until buffer space associated with request is avilable
1556  */
dai_wait(Integer req0)1557 void dai_wait(Integer req0)
1558 {
1559     /*
1560     Integer req;
1561     int ibuf;
1562     */
1563 
1564     /* wait for all requests to complete on buffer Requests[req].ibuf */
1565 
1566     /*        ibuf = Requests[req0].ibuf;
1567               for(req=0; req<MAX_REQ; req++)
1568               if (Requests[req].num_pending && Requests[req].ibuf == ibuf)
1569               if (elio_wait(&_dra_buffer_state[_dra_cur_buf].id)==ELIO_OK)
1570               dai_exec_callback(Requests + req);
1571               else
1572               dai_error("dai_wait: DRA internal error",0);
1573               */
1574 }
1575 
1576 
1577 /**
1578  * WAIT FOR COMPLETION OF DRA OPERATION ASSOCIATED WITH request
1579  */
dra_wait_(Integer * request)1580 Integer FATR dra_wait_(Integer* request)
1581 {
1582 #if WALLTIME
1583     double ss0, tt0;
1584 #endif
1585     if(*request == DRA_REQ_INVALID) return(ELIO_OK);
1586 #if WALLTIME
1587     walltime(&ss0,&tt0);
1588     printf("p[%d] executing dra_wait: %16.6f\n",pnga_nodeid(),tt0);
1589 #endif
1590 
1591     /* complete all outstanding operations and release the corresponding buffers invloved with this request */
1592 
1593     buf_complete_call(&buf_ctxt, Requests[*request].call_id);
1594 
1595     /* mark this request to be no longer pending */
1596     Requests[*request].num_pending=0;
1597 
1598     pnga_sync();
1599 
1600     return(ELIO_OK);
1601 
1602 }
1603 
1604 
1605 /**
1606  * TEST FOR COMPLETION OF DRA OPERATION ASSOCIATED WITH request
1607  */
dra_probe_(Integer * request,Integer * status)1608 Integer FATR dra_probe_(
1609         Integer *request, /* [in] */
1610         Integer *status)  /* [out] */
1611 {
1612     Integer done;
1613     int  stat, i, k, call_id, op_code, n_buf, ret;
1614     io_request_t *io_req;
1615     Integer *ga_movhdl;
1616     char *op = "*", *bufs[MAXBUF];
1617     int cb[MAXBUF]; /* will mark id of bufs for which we'll call a callback */
1618     buf_info *bi;
1619 
1620     k = 1;
1621     ret = ELIO_OK;
1622     for (i = 0; i < MAXBUF; i++)
1623     {
1624         cb[i] = 0;
1625         bufs[i] = NULL;
1626     }
1627 
1628     if(*request == DRA_REQ_INVALID || Requests[*request].num_pending ==0) {
1629         *status = ELIO_DONE;
1630         ret = ELIO_OK;
1631     }
1632 
1633     call_id = Requests[*request].call_id;
1634     /* get the buffers associated with this call_id */
1635     if (get_bufs_of_call_id(&buf_ctxt, call_id, &n_buf, bufs) != 0)
1636         ret = ELIO_OK;
1637 
1638     for (i = 0; i < n_buf; i++) {
1639         bi = (buf_info*) bufs[i];
1640 
1641         op_code = bi->op;
1642         io_req = &(bi->io_req);
1643         ga_movhdl = &(bi->ga_movhdl);
1644 
1645         if (op_code == DRA_OP_WRITE) {
1646             /* last op is a disk write */
1647             if(elio_probe(io_req, &stat) != ELIO_OK) {
1648                 ret = DRA_FAIL;
1649                 k = 0;
1650                 break;
1651             }
1652             if (stat != ELIO_DONE) {
1653                 k = 0;
1654             }
1655             else {
1656                 free_buf(&buf_ctxt, bufs[i]);
1657             }
1658         }
1659         else if (op_code == DRA_OP_READ) {
1660             /* last op depends on aligned or unaligned transfer */
1661             if (bi->align == 0) { /* unaligned read */
1662                 /* last op is a ga move */
1663                 if (pnga_nbtest(ga_movhdl) == 0) { /* ga op not complete */
1664                     k = 0;
1665                 }
1666                 else { /* ga op complete, free this buf */
1667                     free_buf(&buf_ctxt, bufs[i]);
1668                 }
1669             }
1670             else { /* if aligned read, last op is a disk read */
1671                 if(elio_probe(io_req, &stat) != ELIO_OK) {
1672                     ret = DRA_FAIL;
1673                     k = 0;
1674                     break;
1675                 }
1676 
1677                 if (stat != ELIO_DONE)
1678                     k = 0;
1679                 else { /* disk read done, initiate/test ga move */
1680                     /* callback=OFF means ga move done/being done */
1681                     if (bi->callback == OFF && pnga_nbtest(ga_movhdl) == 0)
1682                         k = 0;
1683                     else if (bi->callback == OFF && pnga_nbtest(ga_movhdl) ==1) {
1684                         free_buf(&buf_ctxt, bufs[i]);
1685                     }
1686                     else if (bi->callback == ON) {/* need to call callback */
1687                         k = 0;
1688                         cb[i] = 1; /* mark for a ga move */
1689                     }
1690                 }
1691             }
1692         }
1693     }
1694 
1695     done = (Integer) k;
1696 
1697     /* determine global status */
1698     pnga_gop(pnga_type_f2c(MT_F_INT), &done, (Integer)1, op);
1699 
1700     if(done){
1701         *status = ELIO_DONE;
1702         Requests[*request].num_pending = 0;
1703     }
1704     else {
1705         *status = 0;
1706         for (i = 0; i < n_buf; i++)
1707             if (cb[i])
1708                 dai_exec_callback(bufs[i], PROBE);
1709     }
1710     if (ret == DRA_FAIL)
1711         *status = 0; /* basically value of status is irrelevant/undetermined in this case */
1712     return ((Integer) ret);
1713 }
1714 
1715 
1716 /**
1717  * Returns control to DRA for a VERY short time to improve progress
1718  */
dra_flick_()1719 void dra_flick_()
1720 {
1721     Integer req, stat;
1722 
1723     for (req = 0; req < MAX_REQ; req++) {
1724         if (Requests[req].num_pending) {
1725             dra_probe_(&req, &stat);
1726         }
1727     }
1728 }
1729 
1730 
1731 /**
1732  * INQUIRE PARAMETERS OF EXISTING DISK ARRAY
1733  * @param d_a[in] DRA handle
1734  * @param type[out]
1735  * @param dim1[out]
1736  * @param dim2[out]
1737  * @param name[out]
1738  * @param filename[out]
1739  */
drai_inquire(Integer * d_a,Integer * type,Integer * dim1,Integer * dim2,char * name,char * filename)1740 Integer drai_inquire(Integer *d_a, Integer *type, Integer *dim1, Integer *dim2,
1741         char *name, char *filename)
1742 {
1743     Integer handle=*d_a+DRA_OFFSET;
1744 
1745     dai_check_handleM(*d_a,"dra_inquire");
1746 
1747     *type = (Integer)DRA[handle].type;
1748     *dim1 = DRA[handle].dims[0];
1749     *dim2 = DRA[handle].dims[1];
1750     strcpy(name, DRA[handle].name);
1751     strcpy(filename, DRA[handle].fname);
1752 
1753     return(ELIO_OK);
1754 }
1755 
1756 
1757 /**
1758  * DELETE DISK ARRAY -- relevant file(s) gone
1759  *
1760  * @param d_a[in] DRA handle
1761  */
dra_delete_(Integer * d_a)1762 Integer FATR dra_delete_(Integer* d_a)
1763 {
1764     Integer handle = *d_a+DRA_OFFSET;
1765     int rc;
1766 
1767     pnga_sync();
1768 
1769     dai_check_handleM(*d_a,"dra_delete");
1770     dai_delete_param(DRA[handle].fname,*d_a);
1771 
1772     if(dai_io_manage(*d_a)) if(ELIO_OK != (rc=elio_close(DRA[handle].fd)))
1773         dai_error("dra_close: close failed",rc);
1774 
1775     if(dai_file_master(*d_a)) {
1776         if(INDEPFILES(*d_a) || DRA[handle].numfiles > 1){
1777             sprintf(dummy_fname,"%s.%ld",DRA[handle].fname,(long)dai_io_nodeid(*d_a));
1778             elio_delete(dummy_fname);
1779         } else {
1780             elio_delete(DRA[handle].fname);
1781         }
1782     }
1783 
1784     dai_release_handle(d_a);
1785 
1786     pnga_sync();
1787     return(ELIO_OK);
1788 }
1789 
1790 
1791 /**
1792  * TERMINATE DRA DRATA STRUCTURES
1793  */
dra_terminate_()1794 Integer FATR dra_terminate_()
1795 {
1796     free(DRA);
1797     buf_terminate(&buf_ctxt);
1798 
1799     pnga_sync();
1800     return(ELIO_OK);
1801 }
1802 
1803 
1804 /**
1805  * compute chunk parameters for layout of arrays on the disk
1806  *   ---- a very simple algorithm to be refined later ----
1807  *
1808  * @param elem_size[in]  Size of individual data element in bytes
1809  * @param ndim[in]       Dimension of DRA
1810  * @param block_orig[in] Estimated size of request in each coordinate
1811  *                       direction. If size is unknown then use -1.
1812  * @param dims[in]       Size of DRA in each coordinate direction
1813  * @param chunk[out]     Size of data block size (in elements) in each
1814  *                       coordinate direction
1815  */
ndai_chunking(Integer elem_size,Integer ndim,Integer block_orig[],Integer dims[],Integer chunk[])1816 void ndai_chunking(Integer elem_size, Integer ndim, Integer block_orig[],
1817         Integer dims[], Integer chunk[])
1818 {
1819     long patch_size, tmp_patch;
1820     Integer i, j, block[MAXDIM], block_map[MAXDIM];
1821     double ratio;
1822     logical full_buf, some_neg, overfull_buf;
1823     /* copy block_orig so that original guesses are not destroyed */
1824     for (i=0; i<ndim; i++) block[i] = block_orig[i];
1825 
1826     /* do some preliminary checks on block to make sure initial guesses
1827        are less than corresponding DRA dimensions */
1828     for (i=0; i<ndim; i++) {
1829         if (block[i] > dims[i]) block[i] = dims[i];
1830     }
1831     /* do additional adjustments to see if initial guesses are near some
1832        perfect factors of DRA dimensions */
1833     for (i=0; i<ndim; i++) {
1834         if (block[i] > 0 && block[i]<dims[i]) {
1835             if (dims[i]%block[i] != 0) {
1836                 ratio = (double)dims[i]/(double)block[i];
1837                 j = (int)(ratio+0.5);
1838                 if (dims[i]%j ==0) block[i] = dims[i]/j;
1839             }
1840         }
1841     }
1842 
1843     /* initialize chunk array to zero and find out how big patch is based
1844        on specified block dimensions */
1845     patch_size = 1;
1846     some_neg = FALSE;
1847     full_buf = FALSE;
1848     overfull_buf = FALSE;
1849     for (i=0; i<ndim; i++) {
1850         if (block[i] > 0) patch_size *= (long)block[i];
1851         else some_neg = TRUE;
1852     }
1853     if (patch_size*((long)elem_size) > ((long)DRA_BUF_SIZE))
1854         overfull_buf = TRUE;
1855 
1856     /* map dimension sizes from highest to lowest */
1857     block_sortM(ndim, dims, block_map);
1858 
1859     /* IO buffer is not full and there are some unspecied chunk dimensions.
1860        Set unspecified dimensions equal to block dimensions until buffer
1861        is filled. */
1862     if (!full_buf && !overfull_buf && some_neg) {
1863         for (i=ndim-1; i>=0; i--) {
1864             if (block[block_map[i]] < 0) {
1865                 tmp_patch = patch_size * ((long)dims[block_map[i]]);
1866                 if (tmp_patch*elem_size < ((long)DRA_BUF_SIZE)) {
1867                     patch_size *= (long)dims[block_map[i]];
1868                     block[block_map[i]] = dims[block_map[i]];
1869                 } else {
1870                     block[block_map[i]] = (Integer)(DRA_BUF_SIZE/(patch_size*((long)elem_size)));
1871                     patch_size *= ((long)block[block_map[i]]);
1872                     full_buf = TRUE;
1873                 }
1874             }
1875         }
1876     }
1877 
1878     /* copy block array to chunk array */
1879     for (i=0; i<ndim; i++) {
1880         if (block[i] > 0) chunk[i] = block[i];
1881         else chunk[i] = 1;
1882     }
1883 
1884     /* If patch overfills buffer, scale patch down until it fits */
1885     if (overfull_buf) {
1886         ratio = ((double)DRA_BUF_SIZE)
1887             / ((double)(patch_size*((long)elem_size)));
1888         ratio = pow(ratio,1.0/((double)ndim));
1889         patch_size = 1;
1890         for (i=0; i<ndim; i++) {
1891             chunk[i] = (int)(((double)chunk[i])*ratio);
1892             if (chunk[i] < 1) chunk[i] = 1;
1893             patch_size *= ((long)chunk[i]);
1894         }
1895     }
1896 
1897 #ifdef DEBUG
1898     printf("Current patch at 2 is %d\n",(int)patch_size*elem_size);
1899 #endif
1900     /* set remaining block sizes equal to 1 */
1901     for (i=0; i<ndim; i++) {
1902         if (chunk[i] == 0) chunk[i] = 1;
1903     }
1904     /* Patch size may be slightly larger than buffer. If so, nudge
1905        size down until patch is smaller than buffer. */
1906     if (((long)elem_size)*patch_size > ((long)DRA_BUF_SIZE)) {
1907         /* map chunks from highest to lowest */
1908         block_sortM(ndim, chunk, block_map);
1909         for (i=0; i < ndim; i++) {
1910             while (chunk[block_map[i]] > 1 &&
1911                     ((long)elem_size)*patch_size > ((long)DRA_BUF_SIZE)) {
1912                 patch_size /= ((long)chunk[block_map[i]]);
1913                 chunk[block_map[i]]--;
1914                 patch_size *= ((long)chunk[block_map[i]]);
1915             }
1916         }
1917     }
1918 }
1919 
1920 
1921 /**
1922  * find offset in file for (lo,hi) element
1923  */
ndai_file_location(section_t ds_a,Off_t * offset)1924 void ndai_file_location(section_t ds_a, Off_t* offset)
1925 {
1926     Integer handle=ds_a.handle+DRA_OFFSET, ndim, i, j;
1927     Integer blocks[MAXDIM], part_chunk[MAXDIM], cur_ld[MAXDIM];
1928     long par_block[MAXDIM];
1929     long offelem=0;
1930 
1931 
1932     ndim = DRA[handle].ndim;
1933     for (i=0; i<ndim-1; i++) {
1934         if((ds_a.lo[i]-1)%DRA[handle].chunk[i])
1935             dai_error("ndai_file_location: not alligned ??",ds_a.lo[i]);
1936     }
1937 
1938     for (i=0; i<ndim; i++) {
1939         /* number of blocks from edge */
1940         blocks[i] = (ds_a.lo[i]-1)/DRA[handle].chunk[i];
1941         /* size of incomplete chunk */
1942         part_chunk[i] = DRA[handle].dims[i]%DRA[handle].chunk[i];
1943         /* stride for this block of data in this direction */
1944         cur_ld[i] = (blocks[i] == DRA[handle].dims[i]/DRA[handle].chunk[i]) ?
1945             part_chunk[i]: DRA[handle].chunk[i];
1946     }
1947 
1948     /* compute offset (in elements) */
1949 
1950     if (INDEPFILES(ds_a.handle) || DRA[handle].numfiles > 1) {
1951         Integer   CR, block_dims[MAXDIM];
1952         Integer   index[MAXDIM];
1953         long      nelem;
1954         Integer   i, j;
1955         Integer   ioprocs = dai_io_procs(ds_a.handle);
1956         Integer   iome = dai_io_nodeid(ds_a.handle);
1957 
1958         /* Find index of current block and find number of chunks in
1959            each dimension of DRA */
1960         nsect_to_blockM(ds_a, &CR);
1961         for (i=0; i<ndim; i++) {
1962             block_dims[i] = (DRA[handle].dims[i]+DRA[handle].chunk[i]-1)
1963                 / DRA[handle].chunk[i];
1964         }
1965         if (iome >= 0) {
1966             offelem = 0;
1967             for (i=iome; i<CR; i+=ioprocs) {
1968                 /* Copy i because macro destroys i */
1969                 nblock_to_indicesM(index,ndim,block_dims,i);
1970                 nelem = 1;
1971                 for (j=0; j<ndim; j++) {
1972                     if (index[j]<block_dims[j]-1) {
1973                         nelem *= (long)DRA[handle].chunk[j];
1974                     } else {
1975                         if (part_chunk[j] != 0) {
1976                             nelem *= (long)part_chunk[j];
1977                         } else {
1978                             nelem *= (long)DRA[handle].chunk[j];
1979                         }
1980                     }
1981                 }
1982                 offelem += nelem;
1983             }
1984             /* add fractional offset for current block */
1985             nelem = 1;
1986             nblock_to_indicesM(index,ndim,block_dims,CR);
1987             for (i=0; i<ndim-1; i++) {
1988                 if (index[i]<block_dims[i]-1) {
1989                     nelem *= (long)DRA[handle].chunk[i];
1990                 } else {
1991                     if (part_chunk[i] != 0) {
1992                         nelem *= (long)part_chunk[i];
1993                     } else {
1994                         nelem *= (long)DRA[handle].chunk[i];
1995                     }
1996                 }
1997             }
1998             nelem *= (long)(ds_a.lo[ndim-1]-1)%DRA[handle].chunk[ndim-1];
1999             offelem += (long)nelem;
2000         }
2001     } else {
2002         /* Find offset by calculating the number of chunks that must be
2003          * traversed to get to the corner of block containing the lower
2004          * coordinate index ds_a.lo[]. Then move into the block along
2005          * the last dimension to the point ds_a.lo[ndim-1]. */
2006         for (i=0; i<ndim; i++) {
2007             par_block[i] = 1;
2008             for (j=0; j<ndim; j++) {
2009                 if (j < i) {
2010                     par_block[i] *= (long)cur_ld[j];
2011                 } else if (j == i) {
2012                     if (i == ndim-1) {
2013                         /* special case for last dimension, which may represent
2014                          * a fraction of a chunk */
2015                         par_block[i] *= (long)(ds_a.lo[i]-1);
2016                     } else {
2017                         par_block[i] *= (long)(blocks[j]*DRA[handle].chunk[j]);
2018                     }
2019                 } else {
2020                     par_block[i] *= (long)(DRA[handle].dims[j]);
2021                 }
2022             }
2023         }
2024         offelem = 0;
2025         for (i=0; i<ndim; i++) offelem += (long)par_block[i];
2026     }
2027 
2028     *offset = (Off_t)offelem * dai_sizeofM(DRA[handle].type);
2029 }
2030 
2031 
2032 /**
2033  * write zero at EOF for NDRA
2034  */
ndai_zero_eof(Integer d_a)2035 void ndai_zero_eof(Integer d_a)
2036 {
2037     Integer handle = d_a+DRA_OFFSET, nelem, i;
2038     Integer zero[MAXDIM];
2039     char byte;
2040     Off_t offset;
2041 
2042     byte = (char)0;
2043 
2044     if(INDEPFILES(d_a) || DRA[handle].numfiles > 1) {
2045 
2046         Integer   CR=0, i=0, nblocks=0;
2047         section_t ds_a;
2048         /* number of processors that do io */
2049         Integer   ioprocs=dai_io_procs(d_a);
2050         /* node id of current process (if it does io) */
2051         Integer   iome = dai_io_nodeid(d_a);
2052 
2053         /* total number of blocks in the disk resident array */
2054         nblocks = 1;
2055         for (i=0; i<DRA[handle].ndim; i++) {
2056             nblocks *= (DRA[handle].dims[i]+DRA[handle].chunk[i]-1)
2057                 / DRA[handle].chunk[i];
2058             zero[i] = 0;
2059         }
2060         nfill_sectionM(ds_a, d_a, DRA[handle].ndim, zero, zero);
2061 
2062         /* search for the last block for each I/O processor */
2063         for(i = 0; i <ioprocs; i++){
2064             CR = nblocks - 1 -i;
2065             if(CR % ioprocs == iome) break;
2066         }
2067         if(CR<0) return; /* no blocks owned */
2068 
2069         nblock_to_sectM(&ds_a, CR); /* convert block number to section */
2070         ndai_file_location(ds_a, &offset);
2071         nelem = 1;
2072         for (i=0; i<DRA[handle].ndim; i++) nelem *= (ds_a.hi[i] - ds_a.lo[i] + 1);
2073         offset += ((Off_t)nelem) * dai_sizeofM(DRA[handle].type);
2074 
2075 #         ifdef DEBUG
2076         printf("me=%d zeroing EOF (%d) at %ld bytes \n",iome,CR,offset);
2077 #         endif
2078     } else {
2079 
2080         nelem = 1;
2081         for (i=0; i<DRA[handle].ndim; i++) nelem *= DRA[handle].dims[i];
2082         offset = ((Off_t)nelem) * dai_sizeofM(DRA[handle].type);
2083     }
2084 
2085     if(elio_write(DRA[handle].fd, offset-1, &byte, 1) != (Size_t)1)
2086         dai_error("ndai_zero_eof: write error ",0);
2087 
2088     /* This is a modification added by Sriram. Not sure what it is suppose
2089      * to do for you so I'm commenting it out for now. This function is
2090      * strictly an addition to existing code.
2091      elio_zero_eof(DRA[handle].fd);
2092      */
2093 }
2094 
2095 
2096 /**
2097  * SET CONFIGURATION FOR HANDLING DRAs STORED ON OPEN FILE SYSTEMS
2098  */
dai_set_config(Integer numfiles,Integer numioprocs,Integer * number_of_files,Integer * io_procs)2099 void dai_set_config(Integer numfiles, Integer numioprocs,
2100         Integer *number_of_files, Integer *io_procs)
2101 {
2102     if (numfiles < 1) {
2103         if (numioprocs > 0) {
2104             numfiles = numioprocs;
2105         } else {
2106             numfiles = pnga_cluster_nnodes();
2107         }
2108     }
2109     if (numioprocs < 1) {
2110         numioprocs = numfiles;
2111     }
2112     *number_of_files = numfiles;
2113     *io_procs = numioprocs;
2114     if (*number_of_files > pnga_nnodes()) {
2115         if (pnga_nodeid() == 0) {
2116             printf("WARNING: Number of files requested exceeds number of\n");
2117             printf("processors. Value is reset to number of processors: %ld\n",
2118                     (long)pnga_nnodes());
2119         }
2120         *number_of_files = pnga_nnodes();
2121     }
2122     if (*io_procs > 1 && *number_of_files > 1) {
2123         if (*io_procs != *number_of_files) {
2124             if (pnga_nodeid() == 0) {
2125                 printf("WARNING: Number of IO processors is not equal to the\n");
2126                 printf("number of files requested. Number of IO processors\n");
2127                 printf("is reset to number of files: %ld\n",(long)*number_of_files);
2128             }
2129             *io_procs = *number_of_files;
2130         }
2131     }
2132     if (*number_of_files == 1) {
2133         if (*io_procs > pnga_nnodes()) {
2134             if (pnga_nodeid() == 0) {
2135                 printf("WARNING: Number of requested IO processors\n");
2136                 printf("exceeds number of available processors. Number of IO\n");
2137                 printf("processors reset to the number of available processors %ld\n",
2138                         (long)pnga_nnodes());
2139             }
2140             *io_procs = pnga_nnodes();
2141         }
2142     }
2143     if (*number_of_files > *io_procs) {
2144         if (pnga_nodeid() == 0) {
2145             printf("WARNING: Number of files is greater than\n");
2146             printf("number of IO processors. Number of files reset to number of\n");
2147             printf("IO processors: %ld",(long)*io_procs);
2148         }
2149         *number_of_files = *io_procs;
2150     }
2151 }
2152 
2153 
2154 /**
2155  * CREATE AN N-DIMENSIONAL DISK ARRAY WITH USER SPECIFIED IO CONFIGURATION
2156  *
2157  * @param type[in]
2158  * @param ndim[in]       dimension of DRA
2159  * @param dims[in]       dimensions of DRA
2160  * @param name[in]
2161  * @param filename[in]
2162  * @param mode[in]
2163  * @param reqdims[in]    dimension of typical request
2164  * @param numfiles[in]   number of files for DRA
2165  * @param numioprocs[in] number of IO procs to use
2166  * @param d_a[out]       DRA handle
2167  */
ndrai_create_config(Integer * type,Integer * ndim,Integer dims[],char * name,char * filename,Integer * mode,Integer reqdims[],Integer * numfiles,Integer * numioprocs,Integer * d_a)2168 Integer ndrai_create_config(Integer *type, Integer *ndim, Integer dims[],
2169         char *name, char *filename, Integer *mode, Integer reqdims[],
2170         Integer *numfiles, Integer *numioprocs, Integer *d_a)
2171 {
2172     Integer handle, elem_size, ctype, i;
2173     int emode;
2174 
2175     /* convert Fortran to C data type */
2176     ctype = pnga_type_f2c(*type);
2177     pnga_sync();
2178 
2179     /* if we have an error here, it is fatal */
2180     dai_check_typeM(ctype);
2181     for (i=0; i<*ndim; i++) if (dims[i] <=0)
2182         dai_error("ndra_create: disk array dimension invalid ", dims[i]);
2183     if(strlen(filename)>DRA_MAX_FNAME)
2184         dai_error("ndra_create: filename too long", DRA_MAX_FNAME);
2185 
2186     /*** Get next free DRA handle ***/
2187     if( (handle = dai_get_handle()) == -1)
2188         dai_error("ndra_create: too many disk arrays ", _max_disk_array);
2189     *d_a = handle - DRA_OFFSET;
2190 
2191     /*translate DRA mode into ELIO mode*/
2192     emode = dai_elio_mode((int)*mode);
2193 
2194     /* Determine array configuration */
2195     dai_set_config(*numfiles, *numioprocs, &DRA[handle].numfiles,
2196             &DRA[handle].ioprocs);
2197 
2198     /* determine disk array decomposition */
2199     elem_size = dai_sizeofM(ctype);
2200     ndai_chunking( elem_size, *ndim, reqdims, dims, DRA[handle].chunk);
2201 
2202     /* determine layout -- by row or column */
2203     DRA[handle].layout = COLUMN;
2204 
2205     /* complete initialization */
2206     for (i=0; i<*ndim; i++) DRA[handle].dims[i] = dims[i];
2207     DRA[handle].ndim = *ndim;
2208     DRA[handle].type = ctype;
2209     DRA[handle].mode = (int)*mode;
2210     strncpy (DRA[handle].fname, filename,  DRA_MAX_FNAME);
2211     strncpy(DRA[handle].name, name, DRA_MAX_NAME );
2212 
2213     dai_write_param(DRA[handle].fname, *d_a);      /* create param file */
2214     DRA[handle].indep = dai_file_config(filename); /*check file configuration*/
2215 
2216     /* create file */
2217     if(dai_io_manage(*d_a)){
2218 
2219         if (INDEPFILES(*d_a) || DRA[handle].numfiles > 1) {
2220 
2221             sprintf(dummy_fname,"%s.%ld",DRA[handle].fname,(long)dai_io_nodeid(*d_a));
2222             DRA[handle].fd = elio_open(dummy_fname,emode, ELIO_PRIVATE);
2223         } else{
2224 
2225             DRA[handle].fd = elio_open(DRA[handle].fname,emode, ELIO_SHARED);
2226         }
2227 
2228         if(DRA[handle].fd==NULL)dai_error("ndra_create:failed to open file",0);
2229         if(DRA[handle].fd->fd==-1)dai_error("ndra_create:failed to open file",-1);
2230     }
2231 
2232     /*
2233      *  Need to zero the last element in the array on the disk so that
2234      *  we never read beyond EOF.
2235      *
2236      *  For multiple component files will stamp every one of them.
2237      *
2238      */
2239     pnga_sync();
2240 
2241     if(dai_file_master(*d_a) && dai_write_allowed(*d_a)) ndai_zero_eof(*d_a);
2242 
2243     pnga_sync();
2244 
2245     return(ELIO_OK);
2246 }
2247 
2248 
2249 /**
2250  * CREATE AN N-DIMENSIONAL DISK ARRAY
2251  *
2252  * @param type[in]
2253  * @param ndim[in]     dimension of DRA
2254  * @param dims[][in]   dimensions of DRA
2255  * @param name[in]
2256  * @param filename[in]
2257  * @param mode[in]
2258  * @param reqdims[in]  dimension of typical request
2259  * @param d_a[out]     DRA handle
2260  */
ndrai_create(Integer * type,Integer * ndim,Integer dims[],char * name,char * filename,Integer * mode,Integer reqdims[],Integer * d_a)2261 Integer ndrai_create(Integer *type, Integer *ndim, Integer dims[], char *name,
2262         char *filename, Integer *mode, Integer reqdims[], Integer *d_a)
2263 {
2264     Integer ret;
2265     Integer files = _dra_number_of_files;
2266     Integer procs = _dra_io_procs;
2267     ret = ndrai_create_config(type, ndim, dims, name, filename, mode, reqdims,
2268             &files, &procs, d_a);
2269     return ret;
2270 }
2271 
2272 
2273 /**
2274  * CREATE A 2-D DISK ARRAY
2275  *
2276  * @param type[in]
2277  * @param dim1[in]
2278  * @param dim2[in]
2279  * @param name[in]
2280  * @param filename[in]
2281  * @param mode[in]
2282  * @param reqdim1[in] dim1 of typical request
2283  * @param reqdim2[in] dim2 of typical request
2284  * @param d_a[out]    DRA handle
2285  */
drai_create(Integer * type,Integer * dim1,Integer * dim2,char * name,char * filename,Integer * mode,Integer * reqdim1,Integer * reqdim2,Integer * d_a)2286 Integer drai_create(Integer *type, Integer *dim1, Integer *dim2, char *name,
2287         char *filename, Integer *mode, Integer *reqdim1, Integer *reqdim2,
2288         Integer *d_a)
2289 {
2290     Integer ndim = 2;
2291     Integer dims[2], reqdims[2];
2292     dims[0] = *dim1; dims[1] = *dim2;
2293     reqdims[0] = *reqdim1; reqdims[1] = *reqdim2;
2294     return ndrai_create(type, &ndim, dims, name, filename, mode, reqdims, d_a);
2295 
2296 #if 0
2297     Integer handle, elem_size, ctype;
2298     int emode;
2299 
2300     /* convert Fortran to C data type */
2301     ctype = pnga_type_f2c(*type);
2302     pnga_sync();
2303 
2304     /* if we have an error here, it is fatal        */
2305     dai_check_typeM(ctype);
2306     if( *dim1 <= 0 )
2307         dai_error("dra_create: disk array dimension1 invalid ",  *dim1);
2308     else if( *dim2 <= 0)
2309         dai_error("dra_create: disk array dimension2 invalid ",  *dim2);
2310     if(strlen(filename)>DRA_MAX_FNAME)
2311         dai_error("dra_create: filename too long", DRA_MAX_FNAME);
2312 
2313     /*  Get next free DRA handle */
2314     if( (handle = dai_get_handle()) == -1)
2315         dai_error("dai_create: too many disk arrays ", _max_disk_array);
2316     *d_a = handle - DRA_OFFSET;
2317 
2318     /*translate DRA mode into ELIO mode*/
2319     emode = dai_elio_mode((int)*mode);
2320 
2321     /* determine disk array decomposition  */
2322     elem_size = dai_sizeofM(ctype);
2323     dai_chunking( elem_size, *reqdim1, *reqdim2, *dim1, *dim2,
2324             &DRA[handle].chunk[0], &DRA[handle].chunk[1]);
2325 
2326     /* determine layout -- by row or column */
2327     DRA[handle].layout = COLUMN;
2328 
2329     /* complete initialization */
2330     DRA[handle].dims[0] = *dim1;
2331     DRA[handle].dims[1] = *dim2;
2332     DRA[handle].ndim = 2;
2333     DRA[handle].type = ctype;
2334     DRA[handle].mode = (int)*mode;
2335     strncpy (DRA[handle].fname, filename,  DRA_MAX_FNAME);
2336     strncpy(DRA[handle].name, name, DRA_MAX_NAME );
2337 
2338     DRA[handle].ioprocs = _dra_io_procs;
2339     DRA[handle].numfiles = _dra_number_of_files;
2340 
2341     dai_write_param(DRA[handle].fname, *d_a);
2342     DRA[handle].indep = dai_file_config(filename);
2343     /* create file */
2344     if(dai_io_manage(*d_a)){
2345 
2346         if (INDEPFILES(*d_a) || DRA[handle].numfiles > 1) {
2347 
2348             sprintf(dummy_fname,"%s.%ld",DRA[handle].fname,(long)dai_io_nodeid(*d_a));
2349             DRA[handle].fd = elio_open(dummy_fname,emode, ELIO_PRIVATE);
2350         } else {
2351 
2352             DRA[handle].fd = elio_open(DRA[handle].fname,emode, ELIO_SHARED);
2353         }
2354 
2355         if(DRA[handle].fd==NULL)dai_error("dra_create:failed to open file",0);
2356         if(DRA[handle].fd->fd==-1)dai_error("dra_create:failed to open file",0);
2357     }
2358 
2359 
2360     pnga_sync();
2361 
2362     if(dai_file_master(*d_a) && dai_write_allowed(*d_a)) dai_zero_eof(*d_a);
2363 
2364     pnga_sync();
2365 
2366     return(ELIO_OK);
2367 #endif
2368 
2369 }
2370 
2371 
2372 /**
2373  * write N-dimensional aligned block of data from memory buffer to d_a
2374  *
2375  * @param ds_a[in] section of DRA written to disk
2376  * @param buf[in]  pointer to io buffer
2377  * @param ld[in]   array of strides
2378  * @param id
2379  */
ndai_put(section_t ds_a,void * buf,Integer ld[],io_request_t * id)2380 void ndai_put(section_t ds_a, void *buf, Integer ld[], io_request_t *id)
2381 {
2382     Integer handle = ds_a.handle + DRA_OFFSET, elem, i;
2383     Integer ndim = ds_a.ndim;
2384     Off_t   offset;
2385     Size_t  bytes;
2386 #if WALLTIME
2387     double ss0,tt0,tt1;
2388 #endif
2389 
2390     /* find location in a file where data should be written */
2391     ndai_file_location(ds_a, &offset);
2392     for (i=0; i<ndim-1; i++) if ((ds_a.hi[i]-ds_a.lo[i]+1) != ld[i])
2393         dai_error("ndai_put: bad ld",ld[i]);
2394 
2395     /* since everything is aligned, write data to disk */
2396     elem = 1;
2397     for (i=0; i<ndim; i++) elem *= (ds_a.hi[i]-ds_a.lo[i]+1);
2398     bytes= (Size_t) elem * dai_sizeofM(DRA[handle].type);
2399 #if WALLTIME
2400     walltime_(&ss0,&tt0);
2401 #endif
2402     if( ELIO_OK != elio_awrite(DRA[handle].fd, offset, buf, bytes, id ))
2403         dai_error("ndai_put failed", ds_a.handle);
2404 #if WALLTIME
2405     walltime_(&ss0,&tt1);
2406     printf("p[%d] Beginning ndai_put: %16.6f\n",pnga_nodeid(),tt0);
2407     printf("p[%d] Ending ndai_put: %16.6f\n",pnga_nodeid(),tt1);
2408 #endif
2409 }
2410 
2411 
2412 /**
2413  * read N-dimensional aligned block of data from d_a to memory buffer
2414  *
2415  * @param ds_a[in] section of DRA read from disk
2416  * @param buf[in]  pointer to io buffer
2417  * @param ld[in]   array of strides
2418  * @param id
2419  */
ndai_get(section_t ds_a,void * buf,Integer ld[],io_request_t * id)2420 void ndai_get(section_t ds_a, void *buf, Integer ld[], io_request_t *id)
2421 {
2422     Integer handle = ds_a.handle + DRA_OFFSET, elem, rc;
2423     Integer ndim = DRA[handle].ndim, i;
2424     Off_t   offset;
2425     Size_t  bytes;
2426 #if WALLTIME
2427     double ss0,tt0,tt1;
2428 #endif
2429 
2430     /* find location in a file where data should be read from */
2431     ndai_file_location(ds_a, &offset);
2432 
2433 #ifdef CLEAR_BUF
2434     dai_clear_buffer();
2435 #endif
2436 
2437     for (i=0; i<ndim-1; i++) if ((ds_a.hi[i] - ds_a.lo[i] + 1) != ld[i])
2438         dai_error("ndai_get: bad ld",ld[i]);
2439     /* since everything is aligned, read data from disk */
2440     elem = 1;
2441     for (i=0; i<ndim; i++) elem *= (ds_a.hi[i]-ds_a.lo[i]+1);
2442     bytes= (Size_t) elem * dai_sizeofM(DRA[handle].type);
2443 #if WALLTIME
2444     walltime_(&ss0,&tt0);
2445 #endif
2446     rc= elio_aread(DRA[handle].fd, offset, buf, bytes, id );
2447 #if WALLTIME
2448     walltime_(&ss0,&tt1);
2449     printf("p[%d] Beginning ndai_get: %16.6f\n",pnga_nodeid(),tt0);
2450     printf("p[%d] Ending ndai_get:    %16.6f\n",pnga_nodeid(),tt1);
2451 #endif
2452     if(rc !=  ELIO_OK) dai_error("ndai_get failed", rc);
2453 }
2454 
2455 
2456 /**
2457  * decompose section defined by lo and hi into aligned and unaligned DRA
2458  * subsections
2459  *
2460  * @param ds_a[in]
2461  * @param aligned[out]   Indices of aligned subsections.
2462  * @param na[out]        Number of aligned subsections.
2463  * @param cover[out]     Indices of cover subsections.
2464  * @param unaligned[out] Indices of unaligned subsections.
2465  * @param nu[out]        Number of unaligned subsections.
2466  */
ndai_decomp_section(section_t ds_a,Integer aligned[][2* MAXDIM],int * na,Integer cover[][2* MAXDIM],Integer unaligned[][2* MAXDIM],int * nu)2467 void ndai_decomp_section(section_t ds_a, Integer aligned[][2*MAXDIM],
2468         int *na, Integer cover[][2*MAXDIM], Integer unaligned[][2*MAXDIM],
2469         int *nu)
2470 {
2471     Integer a=0, u=0, handle=ds_a.handle+DRA_OFFSET;
2472     Integer chunk_units;
2473     Integer i, j, idir, ndim = DRA[handle].ndim;
2474     Integer off_low[MAXDIM], off_hi[MAXDIM];
2475     Integer cover_lo[MAXDIM], cover_hi[MAXDIM];
2476     Integer check, chunk_lo, chunk_hi;
2477 
2478     /* section[lo,hi] is decomposed into 'aligned' and 'unaligned'
2479      * subsections.  The aligned subsections are aligned on
2480      * chunk[1]..chunk[ndim-1] boundaries. The unaligned subsections are
2481      * not completely covered by chunk[1]..chunk[ndim]-1 boundaries. These
2482      * are subsets of the 'cover' subsections which are aligned on chunk
2483      * boundaries and contain the unaligned subsections. Disk I/O will
2484      * actually be performed on 'aligned' and 'cover' subsections instead
2485      * of 'unaligned' subsections.
2486      *
2487      * The indexing of the aligned[][idir], cover[][idir], and
2488      * unaligned[][idir] arrays is idir = 0,1 corresponds to low and
2489      * high values in the 0 direction, idir = 2,3 corresponds to low and
2490      * high values in the 1 direction and so on up to a value of
2491      * idir = 2*ndim-1.
2492      *
2493      * The strategy for performing the decomposition is to first find the
2494      * coordinates corresponding to an aligned patch that completely covers
2495      * the originally requested section.
2496      *
2497      * Begin by initializing some arrays. */
2498 
2499     for (i=0, j=0; i<ndim; i++) {
2500         aligned[a][j] = ds_a.lo[i];
2501         cover_lo[i] = ds_a.lo[i];
2502         off_low[i] = (ds_a.lo[i] - 1) % DRA[handle].chunk[i];
2503         j++;
2504         aligned[a][j] = ds_a.hi[i];
2505         cover_hi[i] = ds_a.hi[i];
2506         off_hi[i] = ds_a.hi[i] % DRA[handle].chunk[i];
2507         j++;
2508     }
2509     /* Find coordinates of aligned patch that completely covers the first
2510        ndim-1 dimensions of ds_a */
2511     for (i=0; i<ndim-1; i++) {
2512         if (off_low[i] !=0) {
2513             chunk_lo = (ds_a.lo[i] - 1) / DRA[handle].chunk[i];
2514             cover_lo[i] = chunk_lo * DRA[handle].chunk[i] + 1;
2515         }
2516         if (off_hi[i] !=0) {
2517             chunk_hi = ds_a.hi[i] / DRA[handle].chunk[i] + 1;
2518             cover_hi[i] = chunk_hi * DRA[handle].chunk[i];
2519             if (cover_hi[i] > DRA[handle].dims[i])
2520                 cover_hi[i] = DRA[handle].dims[i];
2521         }
2522     }
2523     /* Find coordinates of aligned chunk (if there is one) */
2524     j = 0;
2525     check = 1;
2526     for (i=0; i<ndim-1; i++) {
2527         if (off_low[i] != 0) {
2528             chunk_lo = (ds_a.lo[i] - 1) / DRA[handle].chunk[i] + 1;
2529             aligned[a][j] = chunk_lo * DRA[handle].chunk[i] + 1;
2530         }
2531         j++;
2532         if (off_hi[i] !=0) {
2533             chunk_hi = ds_a.hi[i] / DRA[handle].chunk[i];
2534             aligned[a][j] = chunk_hi * DRA[handle].chunk[i];
2535         }
2536         if (aligned[a][j] < aligned[a][j-1]) check = 0;
2537         j++;
2538     }
2539     *na = (check == 1)  ?1 :0;
2540 
2541     /* evaluate cover sections and unaligned chunk dimensions. We
2542        break the evaluation of chunks into the following two
2543 cases:
2544 1) There is no aligned section
2545 2) There is an aligned section */
2546 
2547     if (*na == 0) {
2548         /* There is no aligned block. Just return with one cover
2549            section */
2550         for (i=0, j=0; i<ndim; i++) {
2551             cover[u][j] = cover_lo[i];
2552             unaligned[u][j] = ds_a.lo[i];
2553             j++;
2554             cover[u][j] = cover_hi[i];
2555             unaligned[u][j] = ds_a.hi[i];
2556             j++;
2557         }
2558         *nu = 1;
2559         return;
2560     }
2561 
2562     /* An aligned chunk exists so we must find cover sections that
2563        surround it. We scan over the coordinate directions idir
2564        and choose cover sections such that if the coordinate
2565        direction of the cover section is less than idir, then the
2566        cover section extends to the boundary of the aligned
2567        section. If the coordinate direction of the cover section
2568        is greater than idir then the cover extends beyond the
2569        dimensions of the aligned chunk (if there is a nonzero
2570        offset). This scheme guarantees that corner pieces of the
2571        sections are picked up once and only once. */
2572 
2573     for (idir=0; idir<ndim-1; idir++) {
2574         check = 1;
2575         /* cover over lower end of patch */
2576         if (off_low[idir] != 0) {
2577             for (i=0, j=0; i<ndim-1; i++) {
2578                 if (i < idir) {
2579                     if (off_low[i] != 0) {
2580                         chunk_units = (ds_a.lo[i] - 1) / DRA[handle].chunk[i];
2581                         cover[u][j] = chunk_units * DRA[handle].chunk[i] + 1;
2582                     } else {
2583                         cover[u][j] = ds_a.lo[i];
2584                     }
2585                     unaligned[u][j] = ds_a.lo[i];
2586                     j++;
2587                     if (off_hi[i] != 0) {
2588                         chunk_units = ds_a.hi[i] / DRA[handle].chunk[i]+1;
2589                         cover[u][j] = PARIO_MIN(chunk_units * DRA[handle].chunk[i],
2590                                 DRA[handle].dims[i]);
2591                     } else {
2592                         cover[u][j] = ds_a.hi[i];
2593                     }
2594                     unaligned[u][j] = ds_a.hi[i];
2595                     j++;
2596                 } else if (i == idir) {
2597                     chunk_units = (ds_a.lo[i] - 1) / DRA[handle].chunk[i];
2598                     cover[u][j] = chunk_units * DRA[handle].chunk[i] + 1;
2599                     unaligned[u][j] = ds_a.lo[i];
2600                     j++;
2601                     cover[u][j] = PARIO_MIN(cover[u][j-1] + DRA[handle].chunk[i]-1,
2602                             DRA[handle].dims[i]);
2603                     unaligned[u][j] = PARIO_MIN(ds_a.hi[i],cover[u][j]);
2604                     j++;
2605                 } else {
2606                     if (off_low[i] != 0) {
2607                         chunk_units = (ds_a.lo[i] - 1) / DRA[handle].chunk[i]+1;
2608                         cover[u][j] = chunk_units * DRA[handle].chunk[i] + 1;
2609                     } else {
2610                         cover[u][j] = ds_a.lo[i];
2611                     }
2612                     unaligned[u][j] = ds_a.lo[i];
2613                     j++;
2614                     if (off_hi[i] != 0) {
2615                         chunk_units = ds_a.hi[i] / DRA[handle].chunk[i];
2616                         cover[u][j] = chunk_units * DRA[handle].chunk[i];
2617                     } else {
2618                         cover[u][j] = ds_a.hi[i];
2619                     }
2620                     unaligned[u][j] = ds_a.hi[i];
2621                     j++;
2622                 }
2623             }
2624             cover[u][j] = ds_a.lo[ndim-1];
2625             unaligned[u][j] = ds_a.lo[ndim-1];
2626             j++;
2627             cover[u][j] = ds_a.hi[ndim-1];
2628             unaligned[u][j] = ds_a.hi[ndim-1];
2629             u++;
2630             check = 1;
2631             aligned[a][2*idir] = cover[u-1][2*idir+1]+1;
2632         }
2633         /* check to see if there is only one unaligned section covering this
2634            dimension */
2635         if (check == 1) {
2636             if (cover[u-1][2*idir+1] >= ds_a.hi[idir]) check = 0;
2637         } else {
2638             check = 1;
2639         }
2640         /* handle cover over upper end of patch */
2641         if (off_hi[idir] != 0 && check == 1) {
2642             for (i=0, j=0; i<ndim-1; i++) {
2643                 if (i < idir) {
2644                     if (off_low[i] != 0) {
2645                         chunk_units = (ds_a.lo[i] - 1) / DRA[handle].chunk[i];
2646                         cover[u][j] = chunk_units * DRA[handle].chunk[i] + 1;
2647                     } else {
2648                         cover[u][j] = ds_a.lo[i];
2649                     }
2650                     unaligned[u][j] = ds_a.lo[i];
2651                     j++;
2652                     if (off_hi[i] != 0) {
2653                         chunk_units = ds_a.hi[i] / DRA[handle].chunk[i]+1;
2654                         cover[u][j] = PARIO_MIN(chunk_units * DRA[handle].chunk[i],
2655                                 DRA[handle].dims[i]);
2656                     } else {
2657                         cover[u][j] = ds_a.hi[i];
2658                     }
2659                     unaligned[u][j] = ds_a.hi[i];
2660                     j++;
2661                 } else if (i == idir) {
2662                     chunk_units = ds_a.hi[i] / DRA[handle].chunk[i];
2663                     cover[u][j] = chunk_units * DRA[handle].chunk[i] + 1;
2664                     unaligned[u][j] = cover[u][j];
2665                     aligned[a][2*i+1] = PARIO_MIN(cover[u][j]-1,ds_a.hi[idir]);
2666                     j++;
2667                     cover[u][j] = PARIO_MIN(cover[u][j-1] + DRA[handle].chunk[i]-1,
2668                             DRA[handle].dims[i]);
2669                     unaligned[u][j] = PARIO_MIN(ds_a.hi[i],cover[u][j]);
2670                     j++;
2671                 } else {
2672                     if (off_low[i] != 0) {
2673                         chunk_units = (ds_a.lo[i] - 1) / DRA[handle].chunk[i]+1;
2674                         cover[u][j] = chunk_units * DRA[handle].chunk[i] + 1;
2675                     } else {
2676                         cover[u][j] = ds_a.lo[i];
2677                     }
2678                     unaligned[u][j] = ds_a.lo[i];
2679                     j++;
2680                     if (off_hi[i] != 0) {
2681                         chunk_units = ds_a.hi[i] / DRA[handle].chunk[i];
2682                         cover[u][j] = chunk_units * DRA[handle].chunk[i];
2683                     } else {
2684                         cover[u][j] = ds_a.hi[i];
2685                     }
2686                     unaligned[u][j] = ds_a.hi[i];
2687                     j++;
2688                 }
2689             }
2690             cover[u][j] = ds_a.lo[ndim-1];
2691             unaligned[u][j] = ds_a.lo[ndim-1];
2692             j++;
2693             cover[u][j] = ds_a.hi[ndim-1];
2694             unaligned[u][j] = ds_a.hi[ndim-1];
2695             u++;
2696             aligned[a][2*idir+1] = cover[u-1][2*idir]-1;
2697         }
2698     }
2699     *nu = (int)u;
2700     return;
2701 }
2702 
2703 
2704 /**
2705  * given the set of indices lo inside the patch cover, find the next
2706  * set of indices assuming that the area represented by cover has been
2707  * divided up into blocks whose size is given in inc
2708  */
ndai_next(Integer * lo,Integer * cover,Integer * inc,Integer ndim)2709 int ndai_next(Integer *lo, Integer *cover, Integer *inc, Integer ndim)
2710 {
2711     /* first check to see if any of the low indices are out of range.
2712        If so then reset all low indices to minimum values. */
2713     int retval=1;
2714     Integer i;
2715     for (i = 0; i<ndim; i++) {
2716         if (lo[i] == 0) retval = 0;
2717     }
2718     if (retval == 0) {
2719         for (i = 0; i<ndim; i++) {
2720             lo[i] = cover[2*i];
2721         }
2722     }
2723     /* increment all indices in lo. If index exceeds value cover[2*i+1]
2724        for that index, then set index back to cover[2*i] and increment
2725        next index. */
2726     if (retval != 0) {
2727         for (i=0; i<ndim; i++) {
2728             lo[i] += inc[i];
2729             if (lo[i] > cover[2*i+1]) {
2730                 if (i<ndim-1) lo[i] = cover[2*i];
2731             } else {
2732                 break;
2733             }
2734         }
2735     }
2736     retval = (lo[ndim-1] <= cover[2*ndim-1]);
2737     return retval;
2738 }
2739 
2740 
2741 /**
2742  * compute next chunk of array to process
2743  */
ndai_next_chunk(Integer req,Integer * list,section_t * ds_chunk)2744 int ndai_next_chunk(Integer req, Integer* list, section_t* ds_chunk)
2745 {
2746     Integer   handle = ds_chunk->handle+DRA_OFFSET;
2747     int       retval, ndim = DRA[handle].ndim, i;
2748 
2749     /* If we are writing out to multiple files then we need to consider
2750        chunk boundaries along last dimension */
2751     /*    if(INDEPFILES(ds_chunk->handle) || DRA[handle].numfiles > 1) */
2752     if(ds_chunk->lo[ndim-1] && DRA[handle].chunk[ndim-1]>1)
2753         ds_chunk->lo[ndim-1] -= (ds_chunk->lo[ndim-1] -1) %
2754             DRA[handle].chunk[ndim-1];
2755 
2756     /* ds_chunk->lo is getting set in this call. list contains the
2757        the lower and upper indices of the cover section. */
2758     retval = ndai_next(ds_chunk->lo, list, DRA[handle].chunk, ndim);
2759     /*
2760        printf("Request %d\n",req);
2761        for (i=0; i<ndim; i++) {
2762        printf("ds_chunk.lo[%d] = %d cover.lo[%d] = %d cover.hi[%d] = %d\n", i,
2763        ds_chunk->lo[i], i, list[2*i], i, list[2*i+1]);
2764        } */
2765     if(!retval) {
2766         return(retval);
2767     }
2768 
2769     for (i=0; i<ndim; i++) {
2770         ds_chunk->hi[i] = PARIO_MIN(list[2*i+1],
2771                 ds_chunk->lo[i]+DRA[handle].chunk[i]-1);
2772     }
2773 
2774     /* Again, if we are writing out to multiple files then we need to consider
2775        chunk boundaries along last dimension */
2776     /*    if(INDEPFILES(ds_chunk->handle) || DRA[handle].numfiles > 1) {  */
2777     if (1) {
2778         Integer nlo;
2779         Integer hi_temp =  ds_chunk->lo[ndim-1] +
2780             DRA[handle].chunk[ndim-1] -1;
2781         hi_temp -= hi_temp % DRA[handle].chunk[ndim-1];
2782         ds_chunk->hi[ndim-1] = PARIO_MIN(ds_chunk->hi[ndim-1], hi_temp);
2783 
2784         /*this line was absent from older version on bonnie that worked */
2785         nlo = 2*(ndim-1);
2786         if(ds_chunk->lo[ndim-1] < list[nlo]) ds_chunk->lo[ndim-1] = list[nlo];
2787     }
2788     /*
2789        for (i=0; i<ndim; i++) {
2790        printf("ds_chunk.hi[%d] = %d\n", i, ds_chunk->hi[i]);
2791        } */
2792 
2793     return 1;
2794 }
2795 
2796 
2797 /**
2798  * function to complete an operation and release the buffer associated
2799  * with a buffer id
2800  */
wait_buf(char * buf)2801 void wait_buf(char *buf)
2802 {
2803     Integer *ga_movhdl;
2804     io_request_t *io_req;
2805     int op_code; /* int buf_id = nbuf; */
2806     buf_info *bi;
2807 
2808     if (buf == NULL) return;
2809 
2810     bi = (buf_info*) buf;
2811     op_code = bi->op;
2812     io_req = &(bi->io_req);
2813     ga_movhdl = &(bi->ga_movhdl);
2814 
2815     /*if (buf_id >= nbuf) {
2816       printf("Wait_buf Error: No operation is associated with this buffer\n");
2817       return;
2818       }*/
2819 
2820     switch(op_code) {
2821         case DRA_OP_WRITE:
2822             elio_wait(io_req);
2823             break;
2824 
2825         case DRA_OP_READ:
2826             if (bi->align == 0)
2827                 pnga_nbwait(ga_movhdl);
2828             else {
2829                 elio_wait(io_req);
2830                 dai_exec_callback(buf, WAIT);
2831             }
2832             break;
2833 
2834         default:
2835             return;
2836     }
2837 
2838 #ifdef BUF_DEBUG
2839     printf("Released a buffer\n");
2840 #endif
2841 
2842 }
2843 
2844 
2845 /**
2846  * Write or Read Unaligned Subsections to/from disk:
2847  * always read an aligned extension of a section from disk to local buffer then
2848  * for read : copy requested data from buffer to global array;
2849  * for write: overwrite part of buffer with data from g_a and write
2850  * complete buffer to disk
2851  *
2852  * @param opcode[in] signal for read or write
2853  * @param transp[in] should data be transposed
2854  * @param ds_a[in] section of DRA that is to be read from or written to
2855  * @param gs_a[in] section of GA that is to be read from or written to
2856  * @param req[in] request number
2857  */
ndai_transfer_unlgn(int opcode,int transp,section_t ds_a,section_t gs_a,Integer req)2858 void ndai_transfer_unlgn(
2859         int opcode,
2860         int transp,
2861         section_t ds_a,
2862         section_t gs_a,
2863         Integer req)
2864 {
2865     Integer   chunk_ld[MAXDIM],  next, offset, i, j;
2866     int   type = DRA[ds_a.handle+DRA_OFFSET].type;
2867     Integer   ndim = DRA[ds_a.handle+DRA_OFFSET].ndim;
2868     section_t ds_chunk, ds_unlg;
2869     char      *buf, *buffer;
2870     Integer *ga_movhdl;
2871     io_request_t *io_req;
2872     buf_info *bi;
2873 
2874     ds_chunk =  ds_unlg = ds_a;
2875     if (dra_debug_flag && 0) {
2876         for (i=0; i<ndim; i++) {
2877             printf("ndai_transfer_unlgn: ds_chunk.lo[%ld] = %ld\n",
2878                     (long)i,(long)ds_chunk.lo[i]);
2879             printf("ndai_transfer_unlgn: ds_chunk.hi[%ld] = %ld\n",
2880                     (long)i,(long)ds_chunk.hi[i]);
2881         }
2882         printf("ndai_transfer_unlgn: number of unaligned chunks = %d\n",
2883                 Requests[req].nu);
2884         for (j=0; j<Requests[req].nu; j++) {
2885             for (i=0; i<ndim; i++) {
2886                 printf("ndai_transfer_unlgn: list_cover[%ld][%ld] = %ld\n",
2887                         (long)j,(long)2*i,(long)
2888                         Requests[req].list_cover[j][2*i]);
2889                 printf("ndai_transfer_unlgn: list_cover[%ld][%ld] = %ld\n",
2890                         (long)j,(long)2*i+1,
2891                         (long)Requests[req].list_cover[j][2*i+1]);
2892             }
2893         }
2894     }
2895 
2896     for(next = 0; next < Requests[req].nu; next++){
2897 
2898         for (i=0; i<ndim; i++) ds_chunk.lo[i] = 0;   /* initialize */
2899         while(ndai_next_chunk(req, Requests[req].list_cover[next],&ds_chunk)){
2900 
2901             if(dai_myturn(ds_chunk)){
2902 
2903                 /*find corresponding to chunk of 'cover' unaligned sub-subsection*/
2904                 for (i=0; i<ndim; i++) {
2905                     ds_unlg.lo[i] = Requests[req].list_unlgn[next][2*i];
2906                     ds_unlg.hi[i] = Requests[req].list_unlgn[next][2*i+1];
2907                 }
2908 
2909                 if (dra_debug_flag && 0) {
2910                     for (i=0; i<ndim; i++) {
2911                         printf("ndai_transfer_unlgn: ds_chunk.lo[%ld] = %ld\n",
2912                                 (long)i,(long)ds_chunk.lo[i]);
2913                         printf("ndai_transfer_unlgn: ds_chunk.hi[%ld] = %ld\n",
2914                                 (long)i,(long)ds_chunk.hi[i]);
2915                     }
2916                     for (i=0; i<ndim; i++) {
2917                         printf("ndai_transfer_unlgn: ds_unlg.lo[%ld] = %ld\n",
2918                                 (long)i,(long)ds_unlg.lo[i]);
2919                         printf("ndai_transfer_unlgn: ds_unlg.hi[%ld] = %ld\n",
2920                                 (long)i,(long)ds_unlg.hi[i]);
2921                     }
2922                 }
2923 
2924                 if(!dai_section_intersect(ds_chunk, &ds_unlg))
2925                     dai_error("ndai_transfer_unlgn: inconsistent cover", 0);
2926 
2927                 /* copy data from disk to DRA buffer */
2928                 for (i=0; i<ndim-1; i++) chunk_ld[i] = ds_chunk.hi[i] - ds_chunk.lo[i] + 1;
2929                 /* get a free buffer */
2930                 buf = get_buf(&buf_ctxt, Requests[req].call_id);
2931 
2932                 bi = (buf_info*) buf;
2933                 io_req = &(bi->io_req);
2934                 ga_movhdl = &(bi->ga_movhdl);
2935                 bi->align = 0;
2936 
2937                 buf = (char*) (buf + sizeof(buf_info));
2938 
2939                 ndai_get(ds_chunk, buf, chunk_ld, io_req);
2940                 elio_wait(io_req);
2941                 /* determine location in the buffer where GA data should be */
2942                 offset = ds_unlg.lo[ndim-1]-ds_chunk.lo[ndim-1];
2943                 for (i=ndim-2; i>=0; i--)  {
2944                     offset = offset*chunk_ld[i];
2945                     offset += ds_unlg.lo[i] - ds_chunk.lo[i];
2946                 }
2947                 buffer  = (char*)buf;
2948                 buffer += offset * dai_sizeofM(type);
2949 
2950                 switch (opcode){
2951                     case DRA_OP_WRITE:
2952                         bi->op = DRA_OP_WRITE;
2953                         /* overwrite a part of buffer with data from g_a */
2954                         nga_move(LOAD, transp, gs_a, ds_a, ds_unlg, buffer, chunk_ld, ga_movhdl);
2955                         pnga_nbwait(ga_movhdl);
2956 
2957                         /* write ENTIRE updated buffer back to disk */
2958                         ndai_put(ds_chunk, buf, chunk_ld, io_req);
2959 
2960                         break;
2961 
2962                     case DRA_OP_READ:
2963                         bi->op = DRA_OP_READ;
2964                         /* copy requested data from buffer to g_a */
2965                         nga_move(STORE, transp, gs_a, ds_a, ds_unlg, buffer, chunk_ld, ga_movhdl);
2966 
2967                         break;
2968 
2969                     default:
2970                         dai_error("dai_transfer_unlg: invalid opcode",(Integer)opcode);
2971                 }
2972 
2973 #       ifdef DEBUG
2974                 fprintf(stderr,"%d transf unlg g[%d:%d,%d:%d]-d[%d:%d,%d:%d]\n",
2975                         dai_io_nodeid(), gs_chunk.lo[0], gs_chunk.hi[0],
2976                         gs_chunk.lo[1], gs_chunk.hi[1],
2977                         ds_unlg.lo[0], ds_unlg.hi[0],
2978                         ds_unlg.lo[1], ds_unlg.hi[1]);
2979 #       endif
2980 
2981             }
2982         }
2983     }
2984     /*
2985        returning from this function leaving some outstanding operations,
2986        so that dra_read()/write() can be non-blocking to some extent. we will
2987        have to call dra_wait() to make sure these operations are complete.
2988        */
2989 }
2990 
2991 
2992 /**
2993  * write or read aligned subsections to disk
2994  */
ndai_transfer_algn(int opcode,int transp,section_t ds_a,section_t gs_a,Integer req)2995 void ndai_transfer_algn(
2996         int opcode, int transp, section_t ds_a, section_t gs_a, Integer req)
2997 {
2998     Integer  next, chunk_ld[MAXDIM], ndim = ds_a.ndim;
2999     Integer i;
3000     section_t ds_chunk = ds_a;
3001     char *buf, *buffer;
3002     Integer *ga_movhdl;
3003     io_request_t *io_req;
3004     buf_info *bi;
3005 
3006     for(next = 0; next < Requests[req].na; next++){
3007         for (i=0; i<ndim; i++) ds_chunk.lo[i] = 0; /*initialize */
3008 
3009         while(ndai_next_chunk(req, Requests[req].list_algn[next], &ds_chunk)){
3010             if (dra_debug_flag && 0) {
3011                 printf("ndai_transfer_algn: Request %ld\n",(long)req);
3012                 for (i=0; i<ndim; i++) {
3013                     printf("ndai_transfer_algn: ds_chunk.lo[%ld] = %ld\n",
3014                             (long)i,(long)ds_chunk.lo[i]);
3015                     printf("ndai_transfer_algn: ds_chunk.hi[%ld] = %ld\n",
3016                             (long)i,(long)ds_chunk.hi[i]);
3017                 }
3018             }
3019 
3020             if(dai_myturn(ds_chunk)){
3021 
3022                 for (i=0; i<ndim-1; i++) chunk_ld[i] = ds_chunk.hi[i] - ds_chunk.lo[i] + 1;
3023                 /* get a free buffer */
3024                 buf = get_buf(&buf_ctxt, Requests[req].call_id);
3025 
3026                 bi = (buf_info*) buf;
3027                 io_req = &(bi->io_req);
3028                 ga_movhdl = &(bi->ga_movhdl);
3029                 bi->align = 1;
3030                 bi->callback = OFF;
3031 
3032                 buffer = buf;
3033                 buf = buf + sizeof(buf_info);
3034 
3035                 switch (opcode){
3036 
3037                     case DRA_OP_WRITE:
3038                         bi->op = DRA_OP_WRITE;
3039                         /* copy data from g_a to DRA buffer */
3040                         nga_move(LOAD, transp, gs_a, ds_a, ds_chunk, buf, chunk_ld, ga_movhdl);
3041                         pnga_nbwait(ga_movhdl);
3042 
3043                         /* copy data from DRA buffer to disk */
3044                         ndai_put(ds_chunk, buf, chunk_ld, io_req);
3045 
3046                         break;
3047 
3048                     case DRA_OP_READ:
3049                         bi->op = DRA_OP_READ;
3050                         /* copy data from disk to DRA buffer */
3051                         ndai_get(ds_chunk, buf, chunk_ld, io_req);
3052 
3053                         /* copy data from DRA buffer to g_a */
3054                         /* nga_move(STORE, transp, gs_a, ds_a, ds_chunk, buf, chunk_ld, ga_movhdl); */
3055                         dai_callback(STORE, transp, gs_a, ds_a, ds_chunk, chunk_ld, buffer, req);
3056                         break;
3057 
3058                     default:
3059                         dai_error("dai_transfer_algn: invalid opcode",(Integer)opcode);
3060                 }
3061 
3062 #       ifdef DEBUG
3063                 fprintf(stderr,"%d transf algn g[%d:%d,%d:%d]-d[%d:%d,%d:%d]\n",
3064                         dai_io_nodeid(), gs_chunk.lo[0], gs_chunk.hi[0],
3065                         gs_chunk.lo[1], gs_chunk.hi[1],
3066                         ds_chunk.lo[0], ds_chunk.hi[0],
3067                         ds_chunk.lo[1], ds_chunk.hi[1]);
3068 #       endif
3069 
3070             }
3071         }
3072     }
3073     /*
3074        returning from this function leaving some outstanding operations,
3075        so that dra_read()/write() can be non-blocking to some extent. we will
3076        have to call dra_wait() to make sure these operations are complete.
3077        */
3078 }
3079 
3080 
3081 /**
3082  * WRITE SECTION g_a[glo:ghi] TO d_a[dlo:dhi]
3083  *
3084  * @param transp[in] transpose operator
3085  * @param g_a[in] GA handle
3086  * @param glo[in]
3087  * @param ghi[in]
3088  * @param d_a[in] DRA handle
3089  * @param dlo[in]
3090  * @param dhi[in]
3091  * @param request[out] async. request id
3092  */
ndra_write_section_(logical * transp,Integer * g_a,Integer glo[],Integer ghi[],Integer * d_a,Integer dlo[],Integer dhi[],Integer * request)3093 Integer FATR ndra_write_section_(logical *transp,
3094         Integer *g_a, Integer glo[], Integer ghi[],
3095         Integer *d_a, Integer dlo[], Integer dhi[],
3096         Integer *request)
3097 {
3098     Integer gdims[MAXDIM], gtype, handle=*d_a+DRA_OFFSET;
3099     Integer i, gelem, delem, ndim;
3100     section_t d_sect, g_sect;
3101 
3102     pnga_sync();
3103 
3104     /* usual argument/type/range checking stuff */
3105 
3106     dai_check_handleM(*d_a,"ndra_write_sect");
3107     pnga_inquire(*g_a, &gtype, &ndim, gdims);
3108     if(!dai_write_allowed(*d_a))dai_error("ndra_write_sect: write not allowed",*d_a);
3109     if(DRA[handle].type != (int)gtype)dai_error("ndra_write_sect: type mismatch",gtype);
3110     if(DRA[handle].ndim != ndim)dai_error("ndra_write_sect: dimension mismatch", ndim);
3111     for (i=0; i<ndim; i++) dai_check_rangeM(glo[i], ghi[i], gdims[i],
3112             "ndra_write_sect: g_a dim error");
3113     for (i=0; i<ndim; i++) dai_check_rangeM(dlo[i], dhi[i], DRA[handle].dims[i],
3114             "ndra_write_sect: d_a dim error");
3115 
3116     /* check if numbers of elements in g_a & d_a sections match */
3117     gelem = 1;
3118     delem = 1;
3119     for (i=0; i<ndim; i++) {
3120         gelem *= (ghi[i]-glo[i]+1);
3121         delem *= (dhi[i]-dlo[i]+1);
3122     }
3123     if (gelem != delem)
3124         dai_error("ndra_write_sect: d_a and g_a sections do not match ", 0L);
3125 
3126     dai_assign_request_handle(request);
3127 
3128     /* decompose d_a section into aligned and unaligned subsections
3129      * -- with respect to underlying array layout on the disk
3130      */
3131 
3132     Requests[*request].nu=MAX_ALGN;
3133     Requests[*request].na=MAX_UNLG;
3134 
3135     nfill_sectionM(d_sect, *d_a, DRA[handle].ndim, dlo, dhi);
3136     nfill_sectionM(g_sect, *g_a, ndim, glo, ghi);
3137 
3138     ndai_decomp_section(d_sect,
3139             Requests[*request].list_algn,
3140             &Requests[*request].na,
3141             Requests[*request].list_cover,
3142             Requests[*request].list_unlgn,
3143             &Requests[*request].nu);
3144     _dra_turn = 0;
3145 
3146     /* process unaligned subsections */
3147     ndai_transfer_unlgn(DRA_OP_WRITE, (int)*transp, d_sect, g_sect, *request);
3148 
3149     /* process aligned subsections */
3150     ndai_transfer_algn (DRA_OP_WRITE, (int)*transp, d_sect, g_sect, *request);
3151 
3152     pnga_sync();
3153 
3154     return(ELIO_OK);
3155 }
3156 
3157 
3158 /**
3159  * WRITE N-dimensional g_a TO d_a
3160  *
3161  * @param g_a[in]      GA handle
3162  * @param d_a[in]      DRA handle
3163  * @param request[out] handle to async oper
3164  */
ndra_write_(Integer * g_a,Integer * d_a,Integer * request)3165 Integer FATR ndra_write_(Integer *g_a, Integer *d_a, Integer *request)
3166 {
3167     Integer gdims[MAXDIM], gtype, handle=*d_a+DRA_OFFSET;
3168     logical transp = FALSE;
3169     Integer lo[MAXDIM], hi[MAXDIM], ndim, i;
3170 
3171     pnga_sync();
3172 
3173     /* usual argument/type/range checking stuff */
3174 
3175     dai_check_handleM(*d_a,"ndra_write");
3176     if( !dai_write_allowed(*d_a))
3177         dai_error("ndra_write: write not allowed to this array",*d_a);
3178 
3179     pnga_inquire(*g_a, &gtype, &ndim, gdims);
3180     if(DRA[handle].type != (int)gtype)dai_error("ndra_write: type mismatch",gtype);
3181     if(DRA[handle].ndim != ndim)dai_error("ndra_write: dimension mismatch",ndim);
3182     for (i=0; i<ndim; i++) {
3183         if(DRA[handle].dims[i] != gdims[i])
3184             dai_error("ndra_write: dims mismatch",gdims[i]);
3185     }
3186 
3187     /* right now, naive implementation just calls ndra_write_section */
3188     for (i=0; i<ndim; i++) {
3189         lo[i] = 1;
3190         hi[i] = DRA[handle].dims[i];
3191     }
3192 
3193     return(ndra_write_section_(&transp, g_a, lo, hi, d_a, lo, hi, request));
3194 }
3195 
3196 
3197 /**
3198  * READ SECTION g_a[glo:ghi] FROM d_a[dlo:dhi]
3199  *
3200  * @param transp[in]   transpose operator
3201  * @param g_a[in]      GA handle
3202  * @param glo[in]
3203  * @param ghi[in]
3204  * @param d_a[in]      DRA handle
3205  * @param dlo[in]
3206  * @param dhi[in]
3207  * @param request[out] request id
3208  */
ndra_read_section_(logical * transp,Integer * g_a,Integer glo[],Integer ghi[],Integer * d_a,Integer dlo[],Integer dhi[],Integer * request)3209 Integer FATR ndra_read_section_(logical *transp,
3210         Integer *g_a, Integer glo[], Integer ghi[],
3211         Integer *d_a, Integer dlo[], Integer dhi[],
3212         Integer *request)
3213 {
3214     Integer gdims[MAXDIM], gtype, handle=*d_a+DRA_OFFSET;
3215     Integer i, gelem, delem, ndim, me;
3216     section_t d_sect, g_sect;
3217 
3218     pnga_sync();
3219     me = pnga_nodeid();
3220     /* printf("%d: CAME HERE!!!", me); */
3221     /* usual argument/type/range checking stuff */
3222     dai_check_handleM(*d_a,"ndra_read_sect");
3223     if(!dai_read_allowed(*d_a))dai_error("ndra_read_sect: read not allowed",*d_a);
3224     pnga_inquire(*g_a, &gtype, &ndim, gdims);
3225     if(DRA[handle].type != (int)gtype)dai_error("ndra_read_sect: type mismatch",gtype);
3226     if(DRA[handle].ndim != ndim)dai_error("ndra_read_sect: dimension mismatch", ndim);
3227 
3228     for (i=0; i<ndim; i++) dai_check_rangeM(glo[i], ghi[i], gdims[i],
3229             "ndra_read_sect: g_a dim error");
3230     for (i=0; i<ndim; i++) dai_check_rangeM(dlo[i], dhi[i], DRA[handle].dims[i],
3231             "ndra_read_sect: d_a dim error");
3232 
3233     /* check if numbers of elements in g_a & d_a sections match */
3234     gelem = 1;
3235     delem = 1;
3236     for (i=0; i<ndim; i++) {
3237         gelem *= (ghi[i] - glo[i] + 1);
3238         delem *= (dhi[i] - dlo[i] + 1);
3239     }
3240     if (gelem != delem)
3241         dai_error("ndra_read_sect: d_a and g_a sections do not match ", 0L);
3242 
3243     dai_assign_request_handle(request);
3244 
3245     /* decompose d_a section into aligned and unaligned subsections
3246      * -- with respect to underlying array layout on the disk
3247      */
3248 
3249     Requests[*request].nu=MAX_ALGN;
3250     Requests[*request].na=MAX_UNLG;
3251 
3252     if (dra_debug_flag) {
3253         for (i=0; i<ndim; i++) {
3254             printf("proc[%ld] ndra_read_section: dlo[%ld] = %ld\n",
3255                     (long)me, (long)i, (long)dlo[i]);
3256             printf("proc[%ld] ndra_read_section: dhi[%ld] = %ld\n",
3257                     (long)me, (long)i, (long)dhi[i]);
3258         }
3259         for (i=0; i<ndim; i++) {
3260             printf("proc[%ld] ndra_read_section: glo[%ld] = %ld\n",
3261                     (long)me, (long)i, (long)glo[i]);
3262             printf("proc[%ld] ndra_read_section: ghi[%ld] = %ld\n",
3263                     (long)me, (long)i, (long)ghi[i]);
3264         }
3265     }
3266 
3267     nfill_sectionM(d_sect, *d_a, DRA[handle].ndim, dlo, dhi);
3268     nfill_sectionM(g_sect, *g_a, ndim, glo, ghi);
3269 
3270     ndai_decomp_section(d_sect,
3271             Requests[*request].list_algn,
3272             &Requests[*request].na,
3273             Requests[*request].list_cover,
3274             Requests[*request].list_unlgn,
3275             &Requests[*request].nu);
3276 
3277     _dra_turn = 0;
3278     if (dra_debug_flag && 0) {
3279         printf("ndra_read_section: Number of aligned sections %d\n",
3280                 Requests[*request].na);
3281         printf("ndra_read_section: Number of unaligned sections %d\n",
3282                 Requests[*request].nu);
3283         for (i=0; i<2*ndim; i++) {
3284             printf("ndra_read_section: list_algn[%ld] =  %ld\n",
3285                     (long)i,(long)Requests[*request].list_algn[0][i]);
3286         }
3287         for (i=0; i<2*ndim; i++) {
3288             printf("ndra_read_section: list_cover[%ld] =  %ld\n",
3289                     (long)i,(long)Requests[*request].list_cover[0][i]);
3290         }
3291         for (i=0; i<2*ndim; i++) {
3292             printf("ndra_read_section: list_unlgn[%ld] =  %ld\n",
3293                     (long)i,(long)Requests[*request].list_unlgn[0][i]);
3294         }
3295     }
3296 
3297     /* process unaligned subsections */
3298     ndai_transfer_unlgn(DRA_OP_READ, (int)*transp,  d_sect, g_sect, *request);
3299 
3300     /* process aligned subsections */
3301     ndai_transfer_algn (DRA_OP_READ, (int)*transp,  d_sect, g_sect, *request);
3302 
3303     /* printf(" %d: CAME at the end of ndra_read_section!", me); */
3304     return(ELIO_OK);
3305 }
3306 
3307 
3308 /**
3309  * READ N-dimensional g_a FROM d_a
3310  */
ndra_read_(Integer * g_a,Integer * d_a,Integer * request)3311 Integer FATR ndra_read_(Integer* g_a, Integer* d_a, Integer* request)
3312 {
3313     Integer gdims[MAXDIM], gtype, handle=*d_a+DRA_OFFSET;
3314     logical transp = FALSE;
3315     Integer lo[MAXDIM], hi[MAXDIM], ndim, i;
3316 
3317     pnga_sync();
3318     /* printf("%d: CAME AT ndra_read_!!\n", pnga_nodeid()); */
3319     /* usual argument/type/range checking stuff */
3320     dai_check_handleM(*d_a,"ndra_read");
3321     if(!dai_read_allowed(*d_a))dai_error("ndra_read: read not allowed",*d_a);
3322     pnga_inquire(*g_a, &gtype, &ndim, gdims);
3323     /* printf("%d: CAME After pnga_inquire!!\n", pnga_nodeid()); */
3324     if(DRA[handle].type != (int)gtype)dai_error("ndra_read: type mismatch",gtype);
3325     if(DRA[handle].ndim != ndim)dai_error("ndra_read: dimension mismatch",ndim);
3326     for (i=0; i<ndim; i++) {
3327         if(DRA[handle].dims[i] != gdims[i])
3328             dai_error("ndra_read: dims mismatch",gdims[i]);
3329     }
3330 
3331     /* right now, naive implementation just calls ndra_read_section */
3332     for (i=0; i<ndim; i++) {
3333         lo[i] = 1;
3334         hi[i] = DRA[handle].dims[i];
3335     }
3336     return(ndra_read_section_(&transp, g_a, lo, hi, d_a, lo, hi, request));
3337 }
3338 
3339 
3340 /**
3341  * WRITE SECTION g_a[gilo:gihi, gjlo:gjhi] TO d_a[dilo:dihi, djlo:djhi]
3342  *
3343  * @param transp[in]   transpose operator
3344  * @param g_a[in]      GA handle
3345  * @param gilo[in]
3346  * @param gihi[in]
3347  * @param gjlo[in]
3348  * @param gjhi[in]
3349  * @param d_a[in]      DRA handle
3350  * @param dilo[in]
3351  * @param dihi[in]
3352  * @param djlo[in]
3353  * @param djhi[in]
3354  * @param request[out] async. request id
3355  */
dra_write_section_(logical * transp,Integer * g_a,Integer * gilo,Integer * gihi,Integer * gjlo,Integer * gjhi,Integer * d_a,Integer * dilo,Integer * dihi,Integer * djlo,Integer * djhi,Integer * request)3356 Integer FATR dra_write_section_(logical *transp,
3357         Integer *g_a,Integer *gilo,Integer *gihi,Integer *gjlo,Integer *gjhi,
3358         Integer *d_a,Integer *dilo,Integer *dihi,Integer *djlo,Integer *djhi,
3359         Integer *request)
3360 {
3361     Integer glo[2], ghi[2], dlo[2], dhi[2];
3362 
3363     glo[0] = *gilo;
3364     glo[1] = *gjlo;
3365     ghi[0] = *gihi;
3366     ghi[1] = *gjhi;
3367 
3368     dlo[0] = *dilo;
3369     dlo[1] = *djlo;
3370     dhi[0] = *dihi;
3371     dhi[1] = *djhi;
3372 
3373     return (ndra_write_section_(transp, g_a, glo, ghi, d_a, dlo, dhi, request));
3374 
3375     /*
3376        Integer gdim1, gdim2, gtype, handle=*d_a+DRA_OFFSET;
3377        section_t d_sect, g_sect;
3378 
3379        pnga_sync();
3380 
3381        dai_check_handleM(*d_a,"dra_write_sect");
3382        ga_inquire_internal_(g_a, &gtype, &gdim1, &gdim2);
3383        if(!dai_write_allowed(*d_a))dai_error("dra_write_sect: write not allowed",*d_a);
3384        if(DRA[handle].type != (int)gtype)dai_error("dra_write_sect: type mismatch",gtype);
3385        dai_check_rangeM(*gilo,*gihi, gdim1, "dra_write_sect: g_a dim1 error");
3386        dai_check_rangeM(*gjlo,*gjhi, gdim2, "dra_write_sect: g_a dim2 error");
3387        dai_check_rangeM(*dilo,*dihi,DRA[handle].dims[0],"dra_write_sect:d_a dim1 error");
3388        dai_check_rangeM(*djlo,*djhi,DRA[handle].dims[1],"dra_write_sect:d_a dim2 error");
3389 
3390 
3391        if ((*dihi - *dilo + 1) * (*djhi - *djlo + 1) !=
3392        (*gihi - *gilo + 1) * (*gjhi - *gjlo + 1))
3393        dai_error("dra_write_sect: d_a and g_a sections do not match ", 0L);
3394 
3395        dai_assign_request_handle(request);
3396 
3397 
3398        Requests[*request].nu=MAX_ALGN;
3399        Requests[*request].na=MAX_UNLG;
3400 
3401        fill_sectionM(d_sect, *d_a, *dilo, *dihi, *djlo, *djhi);
3402        fill_sectionM(g_sect, *g_a, *gilo, *gihi, *gjlo, *gjhi);
3403 
3404        dai_decomp_section(d_sect,
3405        Requests[*request].list_algn,
3406        &Requests[*request].na,
3407        Requests[*request].list_cover,
3408        Requests[*request].list_unlgn,
3409        &Requests[*request].nu);
3410        _dra_turn = 0;
3411 
3412 
3413        dai_transfer_unlgn(DRA_OP_WRITE, (int)*transp, d_sect, g_sect, *request);
3414 
3415 
3416        dai_transfer_algn (DRA_OP_WRITE, (int)*transp, d_sect, g_sect, *request);
3417 
3418        pnga_sync();
3419 
3420        return(ELIO_OK);
3421        */
3422 }
3423 
3424 
3425 /**
3426  * WRITE g_a TO d_a
3427  *
3428  * @param g_a[in]      GA handle
3429  * @param d_a[in]      DRA handle
3430  * @param request[out] handle to async oper.
3431  */
dra_write_(Integer * g_a,Integer * d_a,Integer * request)3432 Integer FATR dra_write_(Integer *g_a, Integer *d_a, Integer *request)
3433 {
3434     return (ndra_write_(g_a, d_a, request));
3435     /*
3436        Integer gdim1, gdim2, gtype, handle=*d_a+DRA_OFFSET;
3437        logical transp = FALSE;
3438        Integer ilo, ihi, jlo, jhi;
3439 
3440        pnga_sync();
3441 
3442        dai_check_handleM(*d_a,"dra_write");
3443        if( !dai_write_allowed(*d_a))
3444        dai_error("dra_write: write not allowed to this array",*d_a);
3445 
3446        ga_inquire_internal_(g_a, &gtype, &gdim1, &gdim2);
3447        if(DRA[handle].type != (int)gtype)dai_error("dra_write: type mismatch",gtype);
3448        if(DRA[handle].dims[0] != gdim1)dai_error("dra_write: dim1 mismatch",gdim1);
3449        if(DRA[handle].dims[1] != gdim2)dai_error("dra_write: dim2 mismatch",gdim2);
3450 
3451 
3452        ilo = 1; ihi = DRA[handle].dims[0];
3453        jlo = 1; jhi = DRA[handle].dims[1];
3454        return(dra_write_section_(&transp, g_a, &ilo, &ihi, &jlo, &jhi,
3455        d_a, &ilo, &ihi, &jlo, &jhi,request));
3456        */
3457 }
3458 
3459 
3460 /**
3461  * READ SECTION g_a[gilo:gihi, gjlo:gjhi] FROM d_a[dilo:dihi, djlo:djhi]
3462  *
3463  * @param transp[in]   transpose operator
3464  * @param g_a[in]      GA handle
3465  * @param gilo[in]
3466  * @param gihi[in]
3467  * @param gjlo[in]
3468  * @param gjhi[in]
3469  * @param d_a[in]      DRA handle
3470  * @param dilo[in]
3471  * @param dihi[in]
3472  * @param djlo[in]
3473  * @param djhi[in]
3474  * @param request[out] request id
3475  */
dra_read_section_(logical * transp,Integer * g_a,Integer * gilo,Integer * gihi,Integer * gjlo,Integer * gjhi,Integer * d_a,Integer * dilo,Integer * dihi,Integer * djlo,Integer * djhi,Integer * request)3476 Integer FATR dra_read_section_(logical *transp,
3477         Integer *g_a,Integer *gilo,Integer *gihi,Integer *gjlo,Integer *gjhi,
3478         Integer *d_a,Integer *dilo,Integer *dihi,Integer *djlo,Integer *djhi,
3479         Integer *request)
3480 {
3481     Integer glo[2], ghi[2], dlo[2], dhi[2];
3482 
3483     glo[0] = *gilo;
3484     glo[1] = *gjlo;
3485     ghi[0] = *gihi;
3486     ghi[1] = *gjhi;
3487 
3488     dlo[0] = *dilo;
3489     dlo[1] = *djlo;
3490     dhi[0] = *dihi;
3491     dhi[1] = *djhi;
3492 
3493     return (ndra_read_section_(transp, g_a, glo, ghi, d_a, dlo, dhi, request));
3494 
3495     /*
3496        Integer gdim1, gdim2, gtype, handle=*d_a+DRA_OFFSET;
3497        section_t d_sect, g_sect;
3498 
3499        pnga_sync();
3500 
3501        dai_check_handleM(*d_a,"dra_read_sect");
3502        if(!dai_read_allowed(*d_a))dai_error("dra_read_sect: read not allowed",*d_a);
3503        ga_inquire_internal_(g_a, &gtype, &gdim1, &gdim2);
3504        if(DRA[handle].type != (int)gtype)dai_error("dra_read_sect: type mismatch",gtype);
3505        dai_check_rangeM(*gilo, *gihi, gdim1, "dra_read_sect: g_a dim1 error");
3506        dai_check_rangeM(*gjlo, *gjhi, gdim2, "dra_read_sect: g_a dim2 error");
3507        dai_check_rangeM(*dilo, *dihi,DRA[handle].dims[0],"dra_read_sect:d_a dim1 error");
3508        dai_check_rangeM(*djlo, *djhi,DRA[handle].dims[1],"dra_read_sect:d_a dim2 error");
3509 
3510 
3511        if ((*dihi - *dilo + 1) * (*djhi - *djlo + 1) !=
3512        (*gihi - *gilo + 1) * (*gjhi - *gjlo + 1))
3513        dai_error("dra_read_sect: d_a and g_a sections do not match ", 0L);
3514 
3515        dai_assign_request_handle(request);
3516 
3517 
3518        Requests[*request].nu=MAX_ALGN;
3519        Requests[*request].na=MAX_UNLG;
3520 
3521        fill_sectionM(d_sect, *d_a, *dilo, *dihi, *djlo, *djhi);
3522        fill_sectionM(g_sect, *g_a, *gilo, *gihi, *gjlo, *gjhi);
3523 
3524        dai_decomp_section(d_sect,
3525        Requests[*request].list_algn,
3526        &Requests[*request].na,
3527        Requests[*request].list_cover,
3528        Requests[*request].list_unlgn,
3529        &Requests[*request].nu);
3530 
3531        _dra_turn = 0;
3532 
3533 
3534        dai_transfer_unlgn(DRA_OP_READ, (int)*transp,  d_sect, g_sect, *request);
3535 
3536 
3537        dai_transfer_algn (DRA_OP_READ, (int)*transp,  d_sect, g_sect, *request);
3538 
3539        return(ELIO_OK);
3540        */
3541 }
3542 
3543 
3544 /**
3545  * READ g_a FROM d_a
3546  */
dra_read_(Integer * g_a,Integer * d_a,Integer * request)3547 Integer FATR dra_read_(Integer* g_a, Integer* d_a, Integer* request)
3548 {
3549 
3550     return (ndra_read_(g_a, d_a, request));
3551 
3552     /*
3553        Integer gdim1, gdim2, gtype, handle=*d_a+DRA_OFFSET;
3554        logical transp = FALSE;
3555        Integer ilo, ihi, jlo, jhi;
3556 
3557        pnga_sync();
3558 
3559 
3560        dai_check_handleM(*d_a,"dra_read");
3561        if(!dai_read_allowed(*d_a))dai_error("dra_read: read not allowed",*d_a);
3562        ga_inquire_internal_(g_a, &gtype, &gdim1, &gdim2);
3563        if(DRA[handle].type != (int)gtype)dai_error("dra_read: type mismatch",gtype);
3564        if(DRA[handle].dims[0] != gdim1)dai_error("dra_read: dim1 mismatch",gdim1);
3565        if(DRA[handle].dims[1] != gdim2)dai_error("dra_read: dim2 mismatch",gdim2);
3566 
3567 
3568        ilo = 1; ihi = DRA[handle].dims[0];
3569        jlo = 1; jhi = DRA[handle].dims[1];
3570        return(dra_read_section_(&transp, g_a, &ilo, &ihi, &jlo, &jhi,
3571        d_a, &ilo, &ihi, &jlo, &jhi, request));
3572        */
3573 }
3574 
3575 
3576 /**
3577  * INQUIRE PARAMETERS OF EXISTING N-DIMENSIONAL DISK ARRAY
3578  *
3579  * @param d_a[in]      DRA handle
3580  * @param type[out]
3581  * @param ndim[out]
3582  * @param dims[out]
3583  * @param name[out]
3584  * @param filename[out]
3585  */
ndrai_inquire(Integer * d_a,Integer * type,Integer * ndim,Integer dims[],char * name,char * filename)3586 Integer ndrai_inquire(Integer *d_a, Integer *type, Integer *ndim,
3587         Integer dims[], char *name, char *filename)
3588 {
3589     Integer handle=*d_a+DRA_OFFSET;
3590     Integer i;
3591 
3592     dai_check_handleM(*d_a,"dra_inquire");
3593 
3594     *type = (Integer)DRA[handle].type;
3595     *ndim = DRA[handle].ndim;
3596     for (i=0; i<*ndim; i++) {
3597       dims[i] = DRA[handle].dims[i];
3598     }
3599     strcpy(name, DRA[handle].name);
3600     strcpy(filename, DRA[handle].fname);
3601 
3602     return(ELIO_OK);
3603 }
3604 
3605 
3606 /**
3607  * PRINT OUT INTERNAL PARAMETERS OF DRA
3608  */
dra_print_internals_(Integer * d_a)3609 void FATR dra_print_internals_(Integer *d_a)
3610 {
3611     Integer i;
3612     Integer *dims, *chunks;
3613     Integer handle = *d_a + DRA_OFFSET;
3614     Integer ndim = DRA[handle].ndim;
3615     Integer me = pnga_nodeid();
3616     dims = DRA[handle].dims;
3617     chunks = DRA[handle].chunk;
3618     if (me == 0) {
3619         printf("Internal Data for DRA: %s\n",DRA[handle].name);
3620         printf("  DRA Metafile Name: %s\n",DRA[handle].fname);
3621         switch(DRA[handle].type){
3622             case C_DBL:
3623                 printf("  DRA data type is DOUBLE PRECISION\n");
3624                 break;
3625             case C_FLOAT:
3626                 printf("  DRA data type is SINGLE PRECISION\n");
3627                 break;
3628             case C_INT:
3629                 printf("  DRA data type is INTEGER\n");
3630                 break;
3631             case C_DCPL:
3632                 printf("  DRA data type is DOUBLE COMPLEX\n");
3633                 break;
3634             case C_SCPL:
3635                 printf("  DRA data type is SINGLE COMPLEX\n");
3636                 break;
3637             case C_LONG:
3638                 printf("  DRA data type is LONG INTEGER\n");
3639                 break;
3640             default:
3641                 printf("  DRA data type is UNKNOWN\n");
3642                 break;
3643         }
3644         switch(DRA[handle].mode) {
3645             case DRA_RW:
3646                 printf("  DRA access permisions are READ/WRITE\n");
3647                 break;
3648             case DRA_W:
3649                 printf("  DRA access permisions are WRITE ONLY\n");
3650                 break;
3651             case DRA_R:
3652                 printf("  DRA access permisions are READ ONLY\n");
3653                 break;
3654             default:
3655                 printf("  DRA access permisions are UNKNOWN\n");
3656                 break;
3657         }
3658         printf("  Dimension of DRA: %d\n",(int)ndim);
3659         printf("  Dimensions of DRA:\n");
3660         for (i=0; i<ndim; i++) {
3661             printf("    Dimension in direction [%d]: %d\n",(int)(i+1),
3662                     (int)dims[i]);
3663         }
3664         printf("  Chunk dimensions of DRA:\n");
3665         for (i=0; i<ndim; i++) {
3666             printf("    Chunk dimension in direction [%d]: %d\n",(int)(i+1),
3667                     (int)chunks[i]);
3668         }
3669         if (DRA[handle].actv) {
3670             printf("  DRA is currently active\n");
3671         } else {
3672             printf("  DRA is not currently active\n");
3673         }
3674         if (DRA[handle].indep) {
3675             printf("  DRA is using independent files\n");
3676         } else {
3677             printf("  DRA is using shared files\n");
3678         }
3679         printf("  Number files used for DRA: %d\n",(int)DRA[handle].numfiles);
3680         printf("  Number IO processors used for DRA: %d\n",
3681                 (int)DRA[handle].ioprocs);
3682     }
3683 }
3684 
3685 
3686 /**
3687  * SET DEFAULT CONFIGURATION FOR HANDLING DRAs STORED ON OPEN FILE SYSTEMS
3688  */
dra_set_default_config_(Integer * numfiles,Integer * numioprocs)3689 void FATR dra_set_default_config_(Integer *numfiles, Integer *numioprocs)
3690 {
3691     Integer number_of_files, io_procs;
3692     dai_set_config(*numfiles, *numioprocs, &number_of_files, &io_procs);
3693     _dra_number_of_files = number_of_files;
3694     _dra_io_procs = io_procs;
3695 }
3696 
3697 
3698 /**
3699  * SET DEBUG FLAG FOR DRA OPERATIONS TO TRUE OR FALSE
3700  */
dra_set_debug_(logical * flag)3701 void FATR dra_set_debug_(logical *flag)
3702 {
3703     if (*flag) {
3704         dra_debug_flag = TRUE;
3705     } else {
3706         dra_debug_flag = FALSE;
3707     }
3708 }
3709