1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #include "mpiimpl.h"
7 #include "dataloop_internal.h"
8 
9 #include <stdio.h>
10 #include <stdlib.h>
11 
12 /*
13 === BEGIN_MPI_T_CVAR_INFO_BLOCK ===
14 
15 categories:
16     - name        : DATALOOP
17       description : Dataloop-related CVARs
18 
19 cvars:
20     - name        : MPIR_CVAR_DATALOOP_FAST_SEEK
21       category    : DATALOOP
22       type        : int
23       default     : 1
24       class       : none
25       verbosity   : MPI_T_VERBOSITY_USER_BASIC
26       scope       : MPI_T_SCOPE_ALL_EQ
27       description : >-
28         use a datatype-specialized algorithm to shortcut seeking to
29         the correct location in a noncontiguous buffer
30 
31 === END_MPI_T_CVAR_INFO_BLOCK ===
32 */
33 
34 #undef MPII_DATALOOP_DEBUG_MANIPULATE
35 
36 /* Notes on functions:
37  *
38  * There are a few different sets of functions here:
39  * - MPII_Segment_manipulate() - uses a "piece" function to perform operations
40  *   using segments (piece functions defined elsewhere)
41  * - MPIDU functions - these define the externally visible interface
42  *   to segment functionality
43  */
44 
45 /* MPII_Segment_manipulate - do something to a segment
46  *
47  * If you think of all the data to be manipulated (packed, unpacked, whatever),
48  * as a stream of bytes, it's easier to understand how first and last fit in.
49  *
50  * This function does all the work, calling the piecefn passed in when it
51  * encounters a datatype element which falls into the range of first..(last-1).
52  *
53  * piecefn can be NULL, in which case this function doesn't do anything when it
54  * hits a region.  This is used internally for repositioning within this stream.
55  *
56  * last is a byte offset to the byte just past the last byte in the stream
57  * to operate on.  this makes the calculations all over MUCH cleaner.
58  *
59  * stream_off, stream_el_size, first, and last are all working in terms of the
60  * types and sizes for the stream, which might be different from the local sizes
61  * (in the heterogeneous case).
62  *
63  * This is a horribly long function.  Too bad; it's complicated :)! -- Rob
64  *
65  * NOTE: THIS IMPLEMENTATION CANNOT HANDLE STRUCT DATALOOPS.
66  */
67 
68 #define SEGMENT_SAVE_LOCAL_VALUES         \
69     {                                           \
70         segp->cur_sp     = cur_sp;              \
71         segp->valid_sp   = valid_sp;            \
72         segp->stream_off = stream_off;          \
73         *lastp           = stream_off;          \
74     }
75 
76 #define SEGMENT_LOAD_LOCAL_VALUES         \
77     {                                           \
78         last       = *lastp;                    \
79         cur_sp     = segp->cur_sp;              \
80         valid_sp   = segp->valid_sp;            \
81         stream_off = segp->stream_off;          \
82         cur_elmp   = &(segp->stackelm[cur_sp]); \
83     }
84 
85 #define SEGMENT_RESET_VALUES                                      \
86     {                                                                   \
87         segp->stream_off     = 0;                                       \
88         segp->cur_sp         = 0;                                       \
89         cur_elmp             = &(segp->stackelm[0]);                    \
90         cur_elmp->curcount   = cur_elmp->orig_count;                    \
91         cur_elmp->orig_block = MPII_Dataloop_stackelm_blocksize(cur_elmp);      \
92         cur_elmp->curblock   = cur_elmp->orig_block;                    \
93         cur_elmp->curoffset  = cur_elmp->orig_offset +                  \
94             MPII_Dataloop_stackelm_offset(cur_elmp);                            \
95     }
96 
97 #define SEGMENT_POP_AND_MAYBE_EXIT                        \
98     {                                                           \
99         cur_sp--;                                               \
100         if (cur_sp >= 0) cur_elmp = &segp->stackelm[cur_sp];    \
101         else {                                                  \
102             SEGMENT_SAVE_LOCAL_VALUES;                    \
103             return;                                             \
104         }                                                       \
105     }
106 
107 #define SEGMENT_PUSH                      \
108     {                                           \
109         cur_sp++;                               \
110         cur_elmp = &segp->stackelm[cur_sp];     \
111     }
112 
113 #define STACKELM_BLOCKINDEXED_OFFSET(elmp_, curcount_)    \
114     (elmp_)->loop_p->loop_params.bi_t.offset_array[(curcount_)]
115 
116 #define STACKELM_INDEXED_OFFSET(elmp_, curcount_)         \
117     (elmp_)->loop_p->loop_params.i_t.offset_array[(curcount_)]
118 
119 #define STACKELM_INDEXED_BLOCKSIZE(elmp_, curcount_)              \
120     (elmp_)->loop_p->loop_params.i_t.blocksize_array[(curcount_)]
121 
122 #define STACKELM_STRUCT_OFFSET(elmp_, curcount_)          \
123     (elmp_)->loop_p->loop_params.s_t.offset_array[(curcount_)]
124 
125 #define STACKELM_STRUCT_BLOCKSIZE(elmp_, curcount_)               \
126     (elmp_)->loop_p->loop_params.s_t.blocksize_array[(curcount_)]
127 
128 #define STACKELM_STRUCT_EL_EXTENT(elmp_, curcount_)               \
129     (elmp_)->loop_p->loop_params.s_t.el_extent_array[(curcount_)]
130 
131 #define STACKELM_STRUCT_DATALOOP(elmp_, curcount_)                \
132     (elmp_)->loop_p->loop_params.s_t.dataloop_array[(curcount_)]
133 
segment_seek(struct MPIR_Segment * segp,MPI_Aint position,MPI_Aint (* sizefn)(MPI_Datatype el_type))134 static void segment_seek(struct MPIR_Segment *segp, MPI_Aint position,
135                          MPI_Aint(*sizefn) (MPI_Datatype el_type))
136 {
137     struct MPII_Dataloop_stackelm *cur_elmp;
138     struct MPII_Dataloop_stackelm *next_elmp;
139     int cur_sp;
140 
141     MPIR_Assert(segp->stream_off < position);
142 
143     if (segp->stream_off || sizefn || !MPIR_CVAR_DATALOOP_FAST_SEEK) {
144         goto fallback_path;
145     }
146 
147     SEGMENT_RESET_VALUES;
148     cur_sp = segp->cur_sp;
149 
150     /* in the common case where this is a new segment and user wants
151      * to pack or unpack from a non-zero offset, try to skip through
152      * large blocks and setup the segment cursor at the correct
153      * position */
154     /* in the below code, at the leaf-level, the curblocks is setup to
155      * point to the remaining blocks.  But at the upper levels, the
156      * curblocks are setup to be one lesser than the remaining blocks
157      * (even if the lower-level block is completely unused). */
158     cur_elmp->orig_offset = 0;
159     cur_elmp->curoffset = 0;
160     while (1) {
161         switch ((cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK)) {
162 
163             case MPII_DATALOOP_KIND_CONTIG:
164                 {
165                     MPI_Aint blocksize = MPII_Dataloop_stackelm_blocksize(cur_elmp);
166 
167                     MPI_Aint num_elems = (position - segp->stream_off) / cur_elmp->loop_p->el_size;
168                     if (num_elems > blocksize)
169                         num_elems = blocksize;
170                     segp->stream_off += num_elems * cur_elmp->loop_p->el_size;
171 
172                     /* contig should have exactly one block */
173                     MPIR_Assert(cur_elmp->orig_count == 1);
174 
175                     /* current (remaining) block count */
176                     cur_elmp->curcount = (num_elems == blocksize ? 0 : 1);
177 
178                     /* current (remaining) block size */
179                     cur_elmp->curblock = blocksize - num_elems;
180 
181                     /* current offset */
182                     cur_elmp->curoffset = cur_elmp->orig_offset +
183                         num_elems * cur_elmp->loop_p->el_extent;
184 
185                     /* if there is a child element, setup its
186                      * parameters */
187                     if ((cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) == 0) {
188                         next_elmp = &(segp->stackelm[cur_sp + 1]);
189                         next_elmp->orig_offset = cur_elmp->curoffset;
190                         cur_elmp->curoffset = cur_elmp->orig_offset;
191 
192                         cur_elmp->curblock--;
193                         segp->cur_sp++;
194 
195                         /* we can't skip any large blocks at this
196                          * level anymore; move one level lower and
197                          * repeat the same process */
198                         SEGMENT_PUSH;
199 
200                         continue;
201                     } else {
202                         goto fn_exit;
203                     }
204 
205                     break;
206                 }
207 
208             case MPII_DATALOOP_KIND_VECTOR:
209                 {
210                     MPI_Aint blocksize = MPII_Dataloop_stackelm_blocksize(cur_elmp);
211 
212                     MPI_Aint num_blocks =
213                         (position - segp->stream_off) / (cur_elmp->loop_p->el_size * blocksize);
214                     if (num_blocks > cur_elmp->orig_count)
215                         num_blocks = cur_elmp->orig_count;
216                     segp->stream_off += num_blocks * cur_elmp->loop_p->el_size * blocksize;
217 
218                     MPI_Aint num_elems = (position - segp->stream_off) / cur_elmp->loop_p->el_size;
219                     MPIR_Assert(num_elems < blocksize);
220                     segp->stream_off += num_elems * cur_elmp->loop_p->el_size;
221 
222                     /* current (remaining) block count */
223                     cur_elmp->curcount = cur_elmp->orig_count - num_blocks;
224 
225                     /* current (remaining) block size */
226                     cur_elmp->curblock = blocksize - num_elems;
227 
228                     /* current offset */
229                     cur_elmp->curoffset = cur_elmp->orig_offset +
230                         num_blocks * cur_elmp->loop_p->loop_params.v_t.stride +
231                         num_elems * cur_elmp->loop_p->el_extent;
232 
233                     /* if there is a child element, setup its
234                      * parameters */
235                     if ((cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) == 0) {
236                         next_elmp = &(segp->stackelm[cur_sp + 1]);
237                         next_elmp->orig_offset = cur_elmp->curoffset;
238                         cur_elmp->curoffset = cur_elmp->orig_offset;
239 
240                         cur_elmp->curblock--;
241                         segp->cur_sp++;
242 
243                         /* we can't skip any large blocks at this
244                          * level anymore; move one level lower and
245                          * repeat the same process */
246                         SEGMENT_PUSH;
247 
248                         continue;
249                     } else {
250                         goto fn_exit;
251                     }
252 
253                     break;
254                 }
255 
256             case MPII_DATALOOP_KIND_BLOCKINDEXED:
257                 {
258                     MPI_Aint blocksize = MPII_Dataloop_stackelm_blocksize(cur_elmp);
259 
260                     MPI_Aint num_blocks =
261                         (position - segp->stream_off) / (cur_elmp->loop_p->el_size * blocksize);
262                     if (num_blocks > cur_elmp->orig_count)
263                         num_blocks = cur_elmp->orig_count;
264                     segp->stream_off += num_blocks * cur_elmp->loop_p->el_size * blocksize;
265 
266                     MPI_Aint num_elems = (position - segp->stream_off) / cur_elmp->loop_p->el_size;
267                     MPIR_Assert(num_elems < blocksize);
268                     segp->stream_off += num_elems * cur_elmp->loop_p->el_size;
269 
270                     /* current (remaining) block count */
271                     cur_elmp->curcount = cur_elmp->orig_count - num_blocks;
272 
273                     /* current (remaining) block size */
274                     cur_elmp->curblock = blocksize - num_elems;
275 
276                     /* current offset */
277                     cur_elmp->curoffset = cur_elmp->orig_offset +
278                         num_elems * cur_elmp->loop_p->el_extent +
279                         STACKELM_BLOCKINDEXED_OFFSET(cur_elmp, num_blocks);
280 
281                     /* if there is a child element, setup its
282                      * parameters */
283                     if ((cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) == 0) {
284                         next_elmp = &(segp->stackelm[cur_sp + 1]);
285                         next_elmp->orig_offset = cur_elmp->curoffset;
286                         cur_elmp->curoffset = cur_elmp->orig_offset;
287 
288                         cur_elmp->curblock--;
289                         segp->cur_sp++;
290 
291                         /* we can't skip any large blocks at this
292                          * level anymore; move one level lower and
293                          * repeat the same process */
294                         SEGMENT_PUSH;
295 
296                         continue;
297                     } else {
298                         goto fn_exit;
299                     }
300 
301                     break;
302                 }
303 
304             case MPII_DATALOOP_KIND_INDEXED:
305                 {
306                     MPI_Aint blocksize;
307                     MPI_Aint num_blocks;
308 
309                     for (num_blocks = 0; num_blocks < cur_elmp->orig_count; num_blocks++) {
310                         blocksize = STACKELM_INDEXED_BLOCKSIZE(cur_elmp, num_blocks);
311 
312                         if (position - segp->stream_off < cur_elmp->loop_p->el_size * blocksize) {
313                             cur_elmp->orig_block = blocksize;
314                             break;
315                         }
316 
317                         segp->stream_off += cur_elmp->loop_p->el_size * blocksize;
318                     }
319 
320                     blocksize = STACKELM_INDEXED_BLOCKSIZE(cur_elmp, num_blocks);
321 
322                     MPI_Aint num_elems = (position - segp->stream_off) / cur_elmp->loop_p->el_size;
323                     MPIR_Assert(num_elems < blocksize);
324                     segp->stream_off += num_elems * cur_elmp->loop_p->el_size;
325 
326                     /* current (remaining) block count */
327                     cur_elmp->curcount = cur_elmp->orig_count - num_blocks;
328 
329                     /* current (remaining) block size */
330                     cur_elmp->curblock = blocksize - num_elems;
331 
332                     /* current offset */
333                     cur_elmp->curoffset = cur_elmp->orig_offset +
334                         num_elems * cur_elmp->loop_p->el_extent +
335                         STACKELM_INDEXED_OFFSET(cur_elmp, num_blocks);
336 
337                     /* if there is a child element, setup its
338                      * parameters */
339                     if ((cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) == 0) {
340                         next_elmp = &(segp->stackelm[cur_sp + 1]);
341                         next_elmp->orig_offset = cur_elmp->curoffset;
342                         cur_elmp->curoffset = cur_elmp->orig_offset;
343 
344                         cur_elmp->curblock--;
345                         segp->cur_sp++;
346 
347                         /* we can't skip any large blocks at this
348                          * level anymore; move one level lower and
349                          * repeat the same process */
350                         SEGMENT_PUSH;
351 
352                         continue;
353                     } else {
354                         goto fn_exit;
355                     }
356 
357                     break;
358                 }
359 
360             case MPII_DATALOOP_KIND_STRUCT:
361                 {
362                     MPI_Aint blocksize;
363                     MPI_Aint num_blocks;
364                     MPII_Dataloop *dloop;
365 
366                     for (num_blocks = 0; num_blocks < cur_elmp->orig_count; num_blocks++) {
367                         blocksize = STACKELM_INDEXED_BLOCKSIZE(cur_elmp, num_blocks);
368                         dloop = STACKELM_STRUCT_DATALOOP(cur_elmp, num_blocks);
369 
370                         if (position - segp->stream_off < dloop->el_size * blocksize) {
371                             cur_elmp->orig_block = blocksize;
372                             break;
373                         }
374 
375                         segp->stream_off += cur_elmp->loop_p->el_size * blocksize;
376                     }
377 
378                     blocksize = STACKELM_INDEXED_BLOCKSIZE(cur_elmp, num_blocks);
379                     dloop = STACKELM_STRUCT_DATALOOP(cur_elmp, num_blocks);
380 
381                     MPI_Aint num_elems = (position - segp->stream_off) / dloop->el_size;
382                     MPIR_Assert(num_elems < blocksize);
383                     segp->stream_off += num_elems * dloop->el_size;
384 
385                     /* current (remaining) block count */
386                     cur_elmp->curcount = cur_elmp->orig_count - num_blocks;
387 
388                     /* current (remaining) block size */
389                     cur_elmp->curblock = blocksize - num_elems;
390 
391                     /* current offset */
392                     cur_elmp->curoffset = cur_elmp->orig_offset +
393                         num_elems * STACKELM_STRUCT_EL_EXTENT(cur_elmp, num_blocks) +
394                         STACKELM_STRUCT_OFFSET(cur_elmp, num_blocks);
395 
396                     /* structs cannot be leaves */
397                     MPIR_Assert((cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) == 0);
398 
399                     /* if there is a child element, setup its
400                      * parameters */
401                     if ((cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) == 0) {
402                         next_elmp = &(segp->stackelm[cur_sp + 1]);
403                         next_elmp->orig_offset = cur_elmp->curoffset;
404                         cur_elmp->curoffset = cur_elmp->orig_offset;
405 
406                         cur_elmp->curblock--;
407                         segp->cur_sp++;
408 
409                         /* we can't skip any large blocks at this
410                          * level anymore; move one level lower and
411                          * repeat the same process */
412                         SEGMENT_PUSH;
413 
414                         continue;
415                     } else {
416                         goto fn_exit;
417                     }
418 
419                     break;
420                 }
421 
422             default:
423                 goto fallback_path;
424         }
425 
426         MPIR_Assert(segp->stream_off == position);
427         break;
428     }
429 
430     goto fn_exit;
431 
432   fallback_path:
433     {
434         MPI_Aint tmp_last = position;
435 
436         /* use manipulate function with a NULL piecefn to advance
437          * stream offset */
438         MPII_Segment_manipulate(segp, segp->stream_off, &tmp_last, NULL,        /* contig fn */
439                                 NULL,   /* vector fn */
440                                 NULL,   /* blkidx fn */
441                                 NULL,   /* index fn */
442                                 sizefn, NULL);
443 
444         /* --BEGIN ERROR HANDLING-- */
445         /* verify that we're in the right location */
446         MPIR_Assert(tmp_last == position);
447         /* --END ERROR HANDLING-- */
448     }
449 
450   fn_exit:
451     return;
452 }
453 
MPII_Segment_manipulate(struct MPIR_Segment * segp,MPI_Aint first,MPI_Aint * lastp,int (* contigfn)(MPI_Aint * blocks_p,MPI_Datatype el_type,MPI_Aint rel_off,void * bufp,void * v_paramp),int (* vectorfn)(MPI_Aint * blocks_p,MPI_Aint count,MPI_Aint blklen,MPI_Aint stride,MPI_Datatype el_type,MPI_Aint rel_off,void * bufp,void * v_paramp),int (* blkidxfn)(MPI_Aint * blocks_p,MPI_Aint count,MPI_Aint blklen,MPI_Aint * offsetarray,MPI_Datatype el_type,MPI_Aint rel_off,void * bufp,void * v_paramp),int (* indexfn)(MPI_Aint * blocks_p,MPI_Aint count,MPI_Aint * blockarray,MPI_Aint * offsetarray,MPI_Datatype el_type,MPI_Aint rel_off,void * bufp,void * v_paramp),MPI_Aint (* sizefn)(MPI_Datatype el_type),void * pieceparams)454 void MPII_Segment_manipulate(struct MPIR_Segment *segp,
455                              MPI_Aint first,
456                              MPI_Aint * lastp,
457                              int (*contigfn) (MPI_Aint * blocks_p,
458                                               MPI_Datatype el_type,
459                                               MPI_Aint rel_off,
460                                               void *bufp,
461                                               void *v_paramp),
462                              int (*vectorfn) (MPI_Aint * blocks_p,
463                                               MPI_Aint count,
464                                               MPI_Aint blklen,
465                                               MPI_Aint stride,
466                                               MPI_Datatype el_type,
467                                               MPI_Aint rel_off,
468                                               void *bufp,
469                                               void *v_paramp),
470                              int (*blkidxfn) (MPI_Aint * blocks_p,
471                                               MPI_Aint count,
472                                               MPI_Aint blklen,
473                                               MPI_Aint * offsetarray,
474                                               MPI_Datatype el_type,
475                                               MPI_Aint rel_off,
476                                               void *bufp,
477                                               void *v_paramp),
478                              int (*indexfn) (MPI_Aint * blocks_p,
479                                              MPI_Aint count,
480                                              MPI_Aint * blockarray,
481                                              MPI_Aint * offsetarray,
482                                              MPI_Datatype el_type,
483                                              MPI_Aint rel_off,
484                                              void *bufp,
485                                              void *v_paramp),
486                              MPI_Aint(*sizefn) (MPI_Datatype el_type), void *pieceparams)
487 {
488     /* these four are the "local values": cur_sp, valid_sp, last, stream_off */
489     int cur_sp, valid_sp;
490     MPI_Aint last, stream_off;
491 
492     struct MPII_Dataloop_stackelm *cur_elmp;
493     enum { PF_NULL, PF_CONTIG, PF_VECTOR, PF_BLOCKINDEXED, PF_INDEXED } piecefn_type = PF_NULL;
494 
495     SEGMENT_LOAD_LOCAL_VALUES;
496 
497     if (first == *lastp) {
498         /* nothing to do */
499         MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
500                         (MPL_DBG_FDEST,
501                          "dloop_segment_manipulate: warning: first == last ("
502                          MPI_AINT_FMT_DEC_SPEC ")\n", first));
503         return;
504     }
505 
506     /* first we ensure that stream_off and first are in the same spot */
507     if (first != stream_off) {
508 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
509         MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
510                         (MPL_DBG_FDEST,
511                          "first=" MPI_AINT_FMT_DEC_SPEC "; stream_off="
512                          MPI_AINT_FMT_DEC_SPEC "; resetting.\n", first, stream_off));
513 #endif
514 
515         if (first < stream_off) {
516             SEGMENT_RESET_VALUES;
517             stream_off = 0;
518         }
519 
520         if (first != stream_off) {
521             segment_seek(segp, first, sizefn);
522         }
523 
524         SEGMENT_LOAD_LOCAL_VALUES;
525 
526 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
527         MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
528                         (MPL_DBG_FDEST,
529                          "done repositioning stream_off; first=" MPI_AINT_FMT_DEC_SPEC
530                          ", stream_off=" MPI_AINT_FMT_DEC_SPEC ", last="
531                          MPI_AINT_FMT_DEC_SPEC "\n", first, stream_off, last));
532 #endif
533     }
534 
535     for (;;) {
536 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
537 #if 0
538         MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
539                         (MPL_DBG_FDEST, "looptop; cur_sp=%d, cur_elmp=%x\n", cur_sp,
540                          (unsigned) cur_elmp));
541 #endif
542 #endif
543 
544         if (cur_elmp->loop_p->kind & MPII_DATALOOP_FINAL_MASK) {
545             int piecefn_indicated_exit = -1;
546             MPI_Aint myblocks, local_el_size, stream_el_size;
547             MPI_Datatype el_type;
548 
549             /* structs are never finals (leaves) */
550             MPIR_Assert((cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) !=
551                         MPII_DATALOOP_KIND_STRUCT);
552 
553             /* pop immediately on zero count */
554             if (cur_elmp->curcount == 0)
555                 SEGMENT_POP_AND_MAYBE_EXIT;
556 
557             /* size on this system of the int, double, etc. that is
558              * the elementary type.
559              */
560             local_el_size = cur_elmp->loop_p->el_size;
561             el_type = cur_elmp->loop_p->el_type;
562             stream_el_size = (sizefn) ? sizefn(el_type) : local_el_size;
563 
564             /* calculate number of elem. types to work on and function to use.
565              * default is to use the contig piecefn (if there is one).
566              */
567             myblocks = cur_elmp->curblock;
568             piecefn_type = (contigfn ? PF_CONTIG : PF_NULL);
569 
570             /* check for opportunities to use other piecefns */
571             switch (cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) {
572                 case MPII_DATALOOP_KIND_CONTIG:
573                     break;
574                 case MPII_DATALOOP_KIND_BLOCKINDEXED:
575                     /* only use blkidx piecefn if at start of blkidx type */
576                     if (blkidxfn &&
577                         cur_elmp->orig_block == cur_elmp->curblock &&
578                         cur_elmp->orig_count == cur_elmp->curcount) {
579                         /* TODO: RELAX CONSTRAINTS */
580                         myblocks = cur_elmp->curblock * cur_elmp->curcount;
581                         piecefn_type = PF_BLOCKINDEXED;
582                     }
583                     break;
584                 case MPII_DATALOOP_KIND_INDEXED:
585                     /* only use index piecefn if at start of the index type.
586                      *   count test checks that we're on first block.
587                      *   block test checks that we haven't made progress on first block.
588                      */
589                     if (indexfn &&
590                         cur_elmp->orig_count == cur_elmp->curcount &&
591                         cur_elmp->curblock == STACKELM_INDEXED_BLOCKSIZE(cur_elmp, 0)) {
592                         /* TODO: RELAX CONSTRAINT ON COUNT? */
593                         myblocks = cur_elmp->loop_p->loop_params.i_t.total_blocks;
594                         piecefn_type = PF_INDEXED;
595                     }
596                     break;
597                 case MPII_DATALOOP_KIND_VECTOR:
598                     /* only use the vector piecefn if at the start of a
599                      * contiguous block.
600                      */
601                     if (vectorfn && cur_elmp->orig_block == cur_elmp->curblock) {
602                         myblocks = cur_elmp->curblock * cur_elmp->curcount;
603                         piecefn_type = PF_VECTOR;
604                     }
605                     break;
606                 default:
607                     /* --BEGIN ERROR HANDLING-- */
608                     MPIR_Assert(0);
609                     break;
610                     /* --END ERROR HANDLING-- */
611             }
612 
613 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
614             MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
615                             (MPL_DBG_FDEST,
616                              "\thit leaf; cur_sp=%d, elmp=%x, piece_sz=" MPI_AINT_FMT_DEC_SPEC
617                              "\n", cur_sp, (unsigned) cur_elmp, myblocks * local_el_size));
618 #endif
619 
620             /* enforce the last parameter if necessary by reducing myblocks */
621             if (last != MPIR_SEGMENT_IGNORE_LAST &&
622                 (stream_off + (myblocks * stream_el_size) > last)) {
623                 myblocks = ((last - stream_off) / stream_el_size);
624 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
625                 MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
626                                 (MPL_DBG_FDEST,
627                                  "\tpartial block count=" MPI_AINT_FMT_DEC_SPEC " ("
628                                  MPI_AINT_FMT_DEC_SPEC " bytes)\n", myblocks,
629                                  myblocks * stream_el_size));
630 #endif
631                 if (myblocks == 0) {
632                     SEGMENT_SAVE_LOCAL_VALUES;
633                     return;
634                 }
635             }
636 
637             /* call piecefn to perform data manipulation */
638             switch (piecefn_type) {
639                 case PF_NULL:
640                     piecefn_indicated_exit = 0;
641 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
642                     MPL_DBG_MSG("\tNULL piecefn for this piece\n");
643 #endif
644                     break;
645                 case PF_CONTIG:
646                     MPIR_Assert(myblocks <= cur_elmp->curblock);
647                     piecefn_indicated_exit = contigfn(&myblocks, el_type, cur_elmp->curoffset,  /* relative to segp->ptr */
648                                                       segp->ptr,        /* start of buffer (from segment) */
649                                                       pieceparams);
650                     break;
651                 case PF_VECTOR:
652                     piecefn_indicated_exit =
653                         vectorfn(&myblocks,
654                                  cur_elmp->curcount,
655                                  cur_elmp->orig_block,
656                                  cur_elmp->loop_p->loop_params.v_t.stride,
657                                  el_type, cur_elmp->curoffset, segp->ptr, pieceparams);
658                     break;
659                 case PF_BLOCKINDEXED:
660                     piecefn_indicated_exit = blkidxfn(&myblocks, cur_elmp->curcount, cur_elmp->orig_block, cur_elmp->loop_p->loop_params.bi_t.offset_array, el_type, cur_elmp->orig_offset,     /* blkidxfn adds offset */
661                                                       segp->ptr, pieceparams);
662                     break;
663                 case PF_INDEXED:
664                     piecefn_indicated_exit = indexfn(&myblocks, cur_elmp->curcount, cur_elmp->loop_p->loop_params.i_t.blocksize_array, cur_elmp->loop_p->loop_params.i_t.offset_array, el_type, cur_elmp->orig_offset,  /* indexfn adds offset value */
665                                                      segp->ptr, pieceparams);
666                     break;
667             }
668 
669             /* update local values based on piecefn returns (myblocks and
670              * piecefn_indicated_exit)
671              */
672             MPIR_Assert(piecefn_indicated_exit >= 0);
673             MPIR_Assert(myblocks >= 0);
674             stream_off += myblocks * stream_el_size;
675 
676             /* myblocks of 0 or less than cur_elmp->curblock indicates
677              * that we should stop processing and return.
678              */
679             if (myblocks == 0) {
680                 SEGMENT_SAVE_LOCAL_VALUES;
681                 return;
682             } else if (myblocks < (MPI_Aint) (cur_elmp->curblock)) {
683                 cur_elmp->curoffset += myblocks * local_el_size;
684                 cur_elmp->curblock -= myblocks;
685 
686                 SEGMENT_SAVE_LOCAL_VALUES;
687                 return;
688             } else {    /* myblocks >= cur_elmp->curblock */
689 
690                 MPI_Aint count_index = 0;
691 
692                 /* this assumes we're either *just* processing the last parts
693                  * of the current block, or we're processing as many blocks as
694                  * we like starting at the beginning of one.
695                  */
696 
697                 switch (cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) {
698                     case MPII_DATALOOP_KIND_INDEXED:
699                         while (myblocks > 0 && myblocks >= (MPI_Aint) (cur_elmp->curblock)) {
700                             myblocks -= (MPI_Aint) (cur_elmp->curblock);
701                             cur_elmp->curcount--;
702                             MPIR_Assert(cur_elmp->curcount >= 0);
703 
704                             count_index = cur_elmp->orig_count - cur_elmp->curcount;
705                             cur_elmp->curblock = STACKELM_INDEXED_BLOCKSIZE(cur_elmp, count_index);
706                         }
707 
708                         if (cur_elmp->curcount == 0) {
709                             /* don't bother to fill in values; we're popping anyway */
710                             MPIR_Assert(myblocks == 0);
711                             SEGMENT_POP_AND_MAYBE_EXIT;
712                         } else {
713                             cur_elmp->orig_block = cur_elmp->curblock;
714                             cur_elmp->curoffset = cur_elmp->orig_offset +
715                                 STACKELM_INDEXED_OFFSET(cur_elmp, count_index);
716 
717                             cur_elmp->curblock -= myblocks;
718                             cur_elmp->curoffset += myblocks * local_el_size;
719                         }
720                         break;
721                     case MPII_DATALOOP_KIND_VECTOR:
722                         /* this math relies on assertions at top of code block */
723                         cur_elmp->curcount -= myblocks / (MPI_Aint) (cur_elmp->curblock);
724                         if (cur_elmp->curcount == 0) {
725                             MPIR_Assert(myblocks % ((MPI_Aint) (cur_elmp->curblock)) == 0);
726                             SEGMENT_POP_AND_MAYBE_EXIT;
727                         } else {
728                             /* this math relies on assertions at top of code
729                              * block
730                              */
731                             cur_elmp->curblock = cur_elmp->orig_block -
732                                 (myblocks % (MPI_Aint) (cur_elmp->curblock));
733                             /* new offset = original offset +
734                              *              stride * whole blocks +
735                              *              leftover bytes
736                              */
737                             cur_elmp->curoffset = cur_elmp->orig_offset +
738                                 (((MPI_Aint) (cur_elmp->orig_count - cur_elmp->curcount)) *
739                                  cur_elmp->loop_p->loop_params.v_t.stride) +
740                                 (((MPI_Aint) (cur_elmp->orig_block - cur_elmp->curblock)) *
741                                  local_el_size);
742                         }
743                         break;
744                     case MPII_DATALOOP_KIND_CONTIG:
745                         /* contigs that reach this point have always been
746                          * completely processed
747                          */
748                         MPIR_Assert(myblocks == (MPI_Aint) (cur_elmp->curblock) &&
749                                     cur_elmp->curcount == 1);
750                         SEGMENT_POP_AND_MAYBE_EXIT;
751                         break;
752                     case MPII_DATALOOP_KIND_BLOCKINDEXED:
753                         while (myblocks > 0 && myblocks >= (MPI_Aint) (cur_elmp->curblock)) {
754                             myblocks -= (MPI_Aint) (cur_elmp->curblock);
755                             cur_elmp->curcount--;
756                             MPIR_Assert(cur_elmp->curcount >= 0);
757 
758                             count_index = cur_elmp->orig_count - cur_elmp->curcount;
759                             cur_elmp->curblock = cur_elmp->orig_block;
760                         }
761                         if (cur_elmp->curcount == 0) {
762                             /* popping */
763                             MPIR_Assert(myblocks == 0);
764                             SEGMENT_POP_AND_MAYBE_EXIT;
765                         } else {
766                             /* cur_elmp->orig_block = cur_elmp->curblock; */
767                             cur_elmp->curoffset = cur_elmp->orig_offset +
768                                 STACKELM_BLOCKINDEXED_OFFSET(cur_elmp, count_index);
769                             cur_elmp->curblock -= myblocks;
770                             cur_elmp->curoffset += myblocks * local_el_size;
771                         }
772                         break;
773                 }
774             }
775 
776             if (piecefn_indicated_exit) {
777                 /* piece function indicated that we should quit processing */
778                 SEGMENT_SAVE_LOCAL_VALUES;
779                 return;
780             }
781         } /* end of if leaf */
782         else if (cur_elmp->curblock == 0) {
783 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
784             MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
785                             (MPL_DBG_FDEST, "\thit end of block; elmp=%x [%d]\n",
786                              (unsigned) cur_elmp, cur_sp));
787 #endif
788             cur_elmp->curcount--;
789 
790             /* new block.  for indexed and struct reset orig_block.
791              * reset curblock for all types
792              */
793             switch (cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) {
794                 case MPII_DATALOOP_KIND_CONTIG:
795                 case MPII_DATALOOP_KIND_VECTOR:
796                 case MPII_DATALOOP_KIND_BLOCKINDEXED:
797                     break;
798                 case MPII_DATALOOP_KIND_INDEXED:
799                     cur_elmp->orig_block =
800                         STACKELM_INDEXED_BLOCKSIZE(cur_elmp,
801                                                    cur_elmp->curcount ? cur_elmp->orig_count -
802                                                    cur_elmp->curcount : 0);
803                     break;
804                 case MPII_DATALOOP_KIND_STRUCT:
805                     cur_elmp->orig_block =
806                         STACKELM_STRUCT_BLOCKSIZE(cur_elmp,
807                                                   cur_elmp->curcount ? cur_elmp->orig_count -
808                                                   cur_elmp->curcount : 0);
809                     break;
810                 default:
811                     /* --BEGIN ERROR HANDLING-- */
812                     MPIR_Assert(0);
813                     break;
814                     /* --END ERROR HANDLING-- */
815             }
816             cur_elmp->curblock = cur_elmp->orig_block;
817 
818             if (cur_elmp->curcount == 0) {
819 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
820                 MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
821                                 (MPL_DBG_FDEST, "\talso hit end of count; elmp=%x [%d]\n",
822                                  (unsigned) cur_elmp, cur_sp));
823 #endif
824                 SEGMENT_POP_AND_MAYBE_EXIT;
825             }
826         } else {        /* push the stackelm */
827 
828             MPII_Dataloop_stackelm *next_elmp;
829             MPI_Aint count_index, block_index;
830 
831             count_index = cur_elmp->orig_count - cur_elmp->curcount;
832             block_index = cur_elmp->orig_block - cur_elmp->curblock;
833 
834             /* reload the next stackelm if necessary */
835             next_elmp = &(segp->stackelm[cur_sp + 1]);
836             if (cur_elmp->may_require_reloading) {
837                 MPII_Dataloop *load_dlp = NULL;
838                 switch (cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) {
839                     case MPII_DATALOOP_KIND_CONTIG:
840                     case MPII_DATALOOP_KIND_VECTOR:
841                     case MPII_DATALOOP_KIND_BLOCKINDEXED:
842                     case MPII_DATALOOP_KIND_INDEXED:
843                         load_dlp = cur_elmp->loop_p->loop_params.cm_t.dataloop;
844                         break;
845                     case MPII_DATALOOP_KIND_STRUCT:
846                         load_dlp = STACKELM_STRUCT_DATALOOP(cur_elmp, count_index);
847                         break;
848                     default:
849                         /* --BEGIN ERROR HANDLING-- */
850                         MPIR_Assert(0);
851                         break;
852                         /* --END ERROR HANDLING-- */
853                 }
854 
855 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
856                 MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
857                                 (MPL_DBG_FDEST, "\tloading dlp=%x, elmp=%x [%d]\n",
858                                  (unsigned) load_dlp, (unsigned) next_elmp, cur_sp + 1));
859 #endif
860 
861                 MPII_Dataloop_stackelm_load(next_elmp, load_dlp, 1);
862             }
863 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
864             MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
865                             (MPL_DBG_FDEST, "\tpushing type, elmp=%x [%d], count=%d, block=%d\n",
866                              (unsigned) cur_elmp, cur_sp, count_index, block_index));
867 #endif
868             /* set orig_offset and all cur values for new stackelm.
869              * this is done in two steps: first set orig_offset based on
870              * current stackelm, then set cur values based on new stackelm.
871              */
872             switch (cur_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) {
873                 case MPII_DATALOOP_KIND_CONTIG:
874                     next_elmp->orig_offset = cur_elmp->curoffset +
875                         (MPI_Aint) block_index *cur_elmp->loop_p->el_extent;
876                     break;
877                 case MPII_DATALOOP_KIND_VECTOR:
878                     /* note: stride is in bytes */
879                     next_elmp->orig_offset = cur_elmp->orig_offset +
880                         (MPI_Aint) count_index *cur_elmp->loop_p->loop_params.v_t.stride +
881                         (MPI_Aint) block_index *cur_elmp->loop_p->el_extent;
882                     break;
883                 case MPII_DATALOOP_KIND_BLOCKINDEXED:
884                     next_elmp->orig_offset = cur_elmp->orig_offset +
885                         (MPI_Aint) block_index *cur_elmp->loop_p->el_extent +
886                         STACKELM_BLOCKINDEXED_OFFSET(cur_elmp, count_index);
887                     break;
888                 case MPII_DATALOOP_KIND_INDEXED:
889                     next_elmp->orig_offset = cur_elmp->orig_offset +
890                         (MPI_Aint) block_index *cur_elmp->loop_p->el_extent +
891                         STACKELM_INDEXED_OFFSET(cur_elmp, count_index);
892                     break;
893                 case MPII_DATALOOP_KIND_STRUCT:
894                     next_elmp->orig_offset = cur_elmp->orig_offset +
895                         (MPI_Aint) block_index *STACKELM_STRUCT_EL_EXTENT(cur_elmp,
896                                                                           count_index) +
897                         STACKELM_STRUCT_OFFSET(cur_elmp, count_index);
898                     break;
899                 default:
900                     /* --BEGIN ERROR HANDLING-- */
901                     MPIR_Assert(0);
902                     break;
903                     /* --END ERROR HANDLING-- */
904             }
905 
906 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
907             MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
908                             (MPL_DBG_FDEST,
909                              "\tstep 1: next orig_offset = " MPI_AINT_FMT_DEC_SPEC " (0x"
910                              MPI_AINT_FMT_HEX_SPEC ")\n", next_elmp->orig_offset,
911                              next_elmp->orig_offset));
912 #endif
913 
914             switch (next_elmp->loop_p->kind & MPII_DATALOOP_KIND_MASK) {
915                 case MPII_DATALOOP_KIND_CONTIG:
916                 case MPII_DATALOOP_KIND_VECTOR:
917                     next_elmp->curcount = next_elmp->orig_count;
918                     next_elmp->curblock = next_elmp->orig_block;
919                     next_elmp->curoffset = next_elmp->orig_offset;
920                     break;
921                 case MPII_DATALOOP_KIND_BLOCKINDEXED:
922                     next_elmp->curcount = next_elmp->orig_count;
923                     next_elmp->curblock = next_elmp->orig_block;
924                     next_elmp->curoffset = next_elmp->orig_offset +
925                         STACKELM_BLOCKINDEXED_OFFSET(next_elmp, 0);
926                     break;
927                 case MPII_DATALOOP_KIND_INDEXED:
928                     next_elmp->curcount = next_elmp->orig_count;
929                     next_elmp->curblock = STACKELM_INDEXED_BLOCKSIZE(next_elmp, 0);
930                     next_elmp->curoffset = next_elmp->orig_offset +
931                         STACKELM_INDEXED_OFFSET(next_elmp, 0);
932                     break;
933                 case MPII_DATALOOP_KIND_STRUCT:
934                     next_elmp->curcount = next_elmp->orig_count;
935                     next_elmp->curblock = STACKELM_STRUCT_BLOCKSIZE(next_elmp, 0);
936                     next_elmp->curoffset = next_elmp->orig_offset +
937                         STACKELM_STRUCT_OFFSET(next_elmp, 0);
938                     break;
939                 default:
940                     /* --BEGIN ERROR HANDLING-- */
941                     MPIR_Assert(0);
942                     break;
943                     /* --END ERROR HANDLING-- */
944             }
945 
946 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
947             MPL_DBG_MSG_FMT(MPIR_DBG_DATATYPE, VERBOSE,
948                             (MPL_DBG_FDEST,
949                              "\tstep 2: next curoffset = " MPI_AINT_FMT_DEC_SPEC " (0x"
950                              MPI_AINT_FMT_HEX_SPEC ")\n", next_elmp->curoffset,
951                              next_elmp->curoffset));
952 #endif
953 
954             cur_elmp->curblock--;
955             SEGMENT_PUSH;
956         }       /* end of else push the stackelm */
957     }   /* end of for (;;) */
958 
959 #ifdef MPII_DATALOOP_DEBUG_MANIPULATE
960     MPL_DBG_MSG("hit end of datatype\n");
961 #endif
962 
963     SEGMENT_SAVE_LOCAL_VALUES;
964     return;
965 }
966 
967 /* MPII_Dataloop_stackelm_blocksize - returns block size for stackelm based on current
968  * count in stackelm.
969  *
970  * NOTE: loop_p, orig_count, and curcount members of stackelm MUST be correct
971  * before this is called!
972  *
973  */
MPII_Dataloop_stackelm_blocksize(struct MPII_Dataloop_stackelm * elmp)974 MPI_Aint MPII_Dataloop_stackelm_blocksize(struct MPII_Dataloop_stackelm * elmp)
975 {
976     MPII_Dataloop *dlp = elmp->loop_p;
977 
978     switch (dlp->kind & MPII_DATALOOP_KIND_MASK) {
979         case MPII_DATALOOP_KIND_CONTIG:
980             /* NOTE: we're dropping the count into the
981              * blksize field for contigs, as described
982              * in the init call.
983              */
984             return dlp->loop_params.c_t.count;
985             break;
986         case MPII_DATALOOP_KIND_VECTOR:
987             return dlp->loop_params.v_t.blocksize;
988             break;
989         case MPII_DATALOOP_KIND_BLOCKINDEXED:
990             return dlp->loop_params.bi_t.blocksize;
991             break;
992         case MPII_DATALOOP_KIND_INDEXED:
993             return dlp->loop_params.i_t.blocksize_array[elmp->orig_count - elmp->curcount];
994             break;
995         case MPII_DATALOOP_KIND_STRUCT:
996             return dlp->loop_params.s_t.blocksize_array[elmp->orig_count - elmp->curcount];
997             break;
998         default:
999             /* --BEGIN ERROR HANDLING-- */
1000             MPIR_Assert(0);
1001             break;
1002             /* --END ERROR HANDLING-- */
1003     }
1004     return -1;
1005 }
1006 
1007 /* MPII_Dataloop_stackelm_offset - returns starting offset (displacement) for stackelm
1008  * based on current count in stackelm.
1009  *
1010  * NOTE: loop_p, orig_count, and curcount members of stackelm MUST be correct
1011  * before this is called!
1012  *
1013  * also, this really is only good at init time for vectors and contigs
1014  * (all the time for indexed) at the moment.
1015  *
1016  */
MPII_Dataloop_stackelm_offset(struct MPII_Dataloop_stackelm * elmp)1017 MPI_Aint MPII_Dataloop_stackelm_offset(struct MPII_Dataloop_stackelm * elmp)
1018 {
1019     MPII_Dataloop *dlp = elmp->loop_p;
1020 
1021     switch (dlp->kind & MPII_DATALOOP_KIND_MASK) {
1022         case MPII_DATALOOP_KIND_VECTOR:
1023         case MPII_DATALOOP_KIND_CONTIG:
1024             return 0;
1025             break;
1026         case MPII_DATALOOP_KIND_BLOCKINDEXED:
1027             return dlp->loop_params.bi_t.offset_array[elmp->orig_count - elmp->curcount];
1028             break;
1029         case MPII_DATALOOP_KIND_INDEXED:
1030             return dlp->loop_params.i_t.offset_array[elmp->orig_count - elmp->curcount];
1031             break;
1032         case MPII_DATALOOP_KIND_STRUCT:
1033             return dlp->loop_params.s_t.offset_array[elmp->orig_count - elmp->curcount];
1034             break;
1035         default:
1036             /* --BEGIN ERROR HANDLING-- */
1037             MPIR_Assert(0);
1038             break;
1039             /* --END ERROR HANDLING-- */
1040     }
1041     return -1;
1042 }
1043 
1044 /* MPII_Dataloop_stackelm_load
1045  * loop_p, orig_count, orig_block, and curcount are all filled by us now.
1046  * the rest are filled in at processing time.
1047  */
MPII_Dataloop_stackelm_load(struct MPII_Dataloop_stackelm * elmp,MPII_Dataloop * dlp,int branch_flag)1048 void MPII_Dataloop_stackelm_load(struct MPII_Dataloop_stackelm *elmp,
1049                                  MPII_Dataloop * dlp, int branch_flag)
1050 {
1051     elmp->loop_p = dlp;
1052 
1053     if ((dlp->kind & MPII_DATALOOP_KIND_MASK) == MPII_DATALOOP_KIND_CONTIG) {
1054         elmp->orig_count = 1;   /* put in blocksize instead */
1055     } else {
1056         elmp->orig_count = dlp->loop_params.count;
1057     }
1058 
1059     if (branch_flag || (dlp->kind & MPII_DATALOOP_KIND_MASK) == MPII_DATALOOP_KIND_STRUCT) {
1060         elmp->may_require_reloading = 1;
1061     } else {
1062         elmp->may_require_reloading = 0;
1063     }
1064 
1065     /* required by MPII_Dataloop_stackelm_blocksize */
1066     elmp->curcount = elmp->orig_count;
1067 
1068     elmp->orig_block = MPII_Dataloop_stackelm_blocksize(elmp);
1069     /* TODO: GO AHEAD AND FILL IN CURBLOCK? */
1070 }
1071 
1072 /*
1073  * Local variables:
1074  * c-indent-tabs-mode: nil
1075  * End:
1076  */
1077