1 /*
2    (C) 2001 by Argonne National Laboratory.
3        See COPYRIGHT in top-level directory.
4 */
5 #include "mpe_logging_conf.h"
6 
7 #if defined( STDC_HEADERS ) || defined( HAVE_STDIO_H )
8 #include <stdio.h>
9 #endif
10 #if defined( STDC_HEADERS ) || defined( HAVE_STDLIB_H )
11 #include <stdlib.h>
12 #endif
13 #if defined( STDC_HEADERS ) || defined( HAVE_STRING_H )
14 #include <string.h>
15 #endif
16 #if defined( HAVE_FCNTL_H )
17 #include <fcntl.h>
18 #endif
19 #ifdef HAVE_SYS_TYPES_H
20 #include <sys/types.h>
21 #endif
22 #if defined( HAVE_UNISTD_H )
23 #include <unistd.h>
24 #endif
25 #ifdef HAVE_IO_H
26 #include <io.h>
27 #endif
28 
29 #if !defined( CLOG_NOMPI )
30 #include "mpi.h"
31 #else
32 #include "mpi_null.h"
33 #endif /* Endof if !defined( CLOG_NOMPI ) */
34 
35 #include "clog_const.h"
36 #include "clog_mem.h"
37 #include "clog_merger.h"
38 #include "clog_util.h"
39 
40 #define CLOG_RANK_NULL   -1
41 
42 static int clog_merger_minblocksize;
43 
CLOG_Merger_create(unsigned int block_size)44 CLOG_Merger_t* CLOG_Merger_create( unsigned int block_size )
45 {
46     CLOG_Merger_t  *merger;
47 
48     merger = (CLOG_Merger_t *) MALLOC( sizeof(CLOG_Merger_t) );
49     if ( merger == NULL ) {
50         fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
51                          "\t""MALLOC() fails for CLOG_Merger_t!\n" );
52         fflush( stderr );
53         return NULL;
54     }
55 
56 
57     merger->left_blk  = (CLOG_BlockData_t *)
58                         CLOG_BlockData_create( block_size );
59     if ( merger->left_blk == NULL ) {
60         fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
61                          "\t""CLOG_BlockData_create(%d) fails for left_blk!",
62                          block_size );
63         fflush( stderr );
64         return NULL;
65     }
66 
67     merger->right_blk  = (CLOG_BlockData_t *)
68                          CLOG_BlockData_create( block_size );
69     if ( merger->right_blk == NULL ) {
70         fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
71                          "\t""CLOG_BlockData_create(%d) fails for right_blk!",
72                          block_size );
73         fflush( stderr );
74         return NULL;
75     }
76 
77     merger->sorted_blk  = (CLOG_BlockData_t *)
78                           CLOG_BlockData_create( block_size );
79     if ( merger->sorted_blk == NULL ) {
80         fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
81                          "\t""CLOG_BlockData_create(%d) fails for sorted_blk!",
82                          block_size );
83         fflush( stderr );
84         return NULL;
85     }
86 
87     merger->block_size        = block_size;
88     merger->num_active_blks   = 0;
89     merger->world_size        = 1;
90     merger->local_world_rank  = 0;
91     merger->left_world_rank   = 0;
92     merger->right_world_rank  = 0;
93     merger->parent_world_rank = 0;
94 
95     merger->is_big_endian    = CLOG_BOOL_NULL;
96     strncpy( merger->out_filename, "mpe_trace."CLOG_FILE_TYPE,
97              CLOG_PATH_STRLEN );
98     merger->out_fd           = -1;
99 
100     return merger;
101 }
102 
CLOG_Merger_free(CLOG_Merger_t ** merger_handle)103 void CLOG_Merger_free( CLOG_Merger_t **merger_handle )
104 {
105     CLOG_Merger_t  *merger;
106     merger = *merger_handle;
107     if ( merger != NULL ) {
108         CLOG_BlockData_free( &(merger->sorted_blk) );
109         CLOG_BlockData_free( &(merger->left_blk) );
110         CLOG_BlockData_free( &(merger->right_blk) );
111         FREE( merger );
112     }
113     *merger_handle = NULL;
114 }
115 
116 /*@
117     CLOG_Merger_set_neighbor_ranks - locally determine parent and children
118                                      in a binary tree
119 
120 Input parameters
121 
122 +  merger->local_world_rank  - calling process''s id
123 -  merger->world_size        - total number of processes in tree
124 
125 Output parameters
126 
127 +  merger->parent_world_rank - parent of local process
128                              (or CLOG_RANK_NULL if root)
129 .  merger->left_world_rank   - left child of local process in binary tree
130                              (or CLOG_RANK_NULL if none)
131 -  merger->right_world_rank  - right child of local process in binary tree
132                              (or CLOG_RANK_NULL if none)
133 
134 @*/
CLOG_Merger_set_neighbor_ranks(CLOG_Merger_t * merger)135 void CLOG_Merger_set_neighbor_ranks( CLOG_Merger_t *merger )
136 {
137     if ( merger->local_world_rank == 0 )
138         merger->parent_world_rank = CLOG_RANK_NULL;
139     else
140         merger->parent_world_rank = (merger->local_world_rank - 1) / 2;
141 
142     merger->left_world_rank = (2 * merger->local_world_rank) + 1;
143     if ( merger->left_world_rank > merger->world_size - 1 )
144         merger->left_world_rank = CLOG_RANK_NULL;
145 
146     merger->right_world_rank = (2 * merger->local_world_rank) + 2;
147     if ( merger->right_world_rank > merger->world_size - 1 )
148         merger->right_world_rank = CLOG_RANK_NULL;
149 }
150 
151 /* Initialize the CLOG_Merger_t */
CLOG_Merger_init(CLOG_Merger_t * merger,const CLOG_Preamble_t * preamble,const char * merged_file_prefix)152 void CLOG_Merger_init(       CLOG_Merger_t    *merger,
153                        const CLOG_Preamble_t  *preamble,
154                        const char             *merged_file_prefix )
155 {
156     /* Set up the binary tree MPI ranks ID locally for merging */
157     PMPI_Comm_size( MPI_COMM_WORLD, &(merger->world_size) );
158     PMPI_Comm_rank( MPI_COMM_WORLD, &(merger->local_world_rank) );
159     CLOG_Merger_set_neighbor_ranks( merger );
160 
161     merger->is_big_endian  = preamble->is_big_endian;
162     /* Open the merged output file at root process */
163     if ( merger->parent_world_rank == CLOG_RANK_NULL ) {
164         strncpy( merger->out_filename, merged_file_prefix, CLOG_PATH_STRLEN );
165         strcat( merger->out_filename, "."CLOG_FILE_TYPE );
166 
167         merger->out_fd = OPEN( merger->out_filename, O_CREAT|O_WRONLY|O_TRUNC,
168                                0664 );
169         if ( merger->out_fd == -1 ) {
170             fprintf( stderr, __FILE__":CLOG_Merger_init() - \n"
171                              "\t""Could not open file %s for merging!\n",
172                              merger->out_filename );
173             fflush( stderr );
174             CLOG_Util_abort( 1 );
175         }
176         CLOG_Preamble_write( preamble, CLOG_BOOL_TRUE, CLOG_BOOL_TRUE,
177                              merger->out_fd );
178     }
179 
180     /*
181        Initialize the CLOG_Merger's minimum block size: CLOG_REC_ENDBLOCK
182     */
183     clog_merger_minblocksize = CLOG_Rec_size( CLOG_REC_ENDBLOCK );
184 }
185 
186 /* Finalize the CLOG_Merger_t */
CLOG_Merger_finalize(CLOG_Merger_t * merger,CLOG_Buffer_t * buffer)187 void CLOG_Merger_finalize( CLOG_Merger_t *merger, CLOG_Buffer_t *buffer )
188 {
189     CLOG_BOOL_T  do_byte_swap;
190     int          ierr;
191 
192     if ( merger->out_fd != -1 ) {
193         /* Update CLOG_Preamble_t with commtable_fptr */
194         buffer->preamble->commtable_fptr
195         = (CLOG_int64_t) lseek( merger->out_fd, 0, SEEK_CUR );
196         /* Save CLOG_CommSet_t to file */
197         do_byte_swap = merger->is_big_endian != CLOG_BOOL_TRUE;
198         ierr = CLOG_CommSet_write( buffer->commset, merger->out_fd,
199                                    do_byte_swap );
200         if ( ierr < 0 ) {
201             fprintf( stderr, __FILE__":CLOG_Merger_finalize() - \n"
202                              "\t""CLOG_CommSet_write() fails!\n" );
203             fflush( stderr );
204             return;
205         }
206         /* Save the updated CLOG_Preamble_t */
207         lseek( merger->out_fd, 0, SEEK_SET );
208         CLOG_Preamble_write( buffer->preamble, CLOG_BOOL_TRUE, CLOG_BOOL_TRUE,
209                              merger->out_fd );
210         close( merger->out_fd );
211     }
212 }
213 
214 /*
215     Flush the sorted buffer to the output logfile.
216 */
CLOG_Merger_flush(CLOG_Merger_t * merger)217 void CLOG_Merger_flush( CLOG_Merger_t *merger )
218 {
219     int  ierr;
220 
221     if ( merger->is_big_endian != CLOG_BOOL_TRUE )
222         CLOG_BlockData_swap_bytes_last( merger->sorted_blk );
223 
224     ierr = write( merger->out_fd, merger->sorted_blk->head,
225                   merger->block_size );
226     if ( ierr != merger->block_size ) {
227         fprintf( stderr, __FILE__":CLOG_Merger_flush() - \n"
228                          "\t""Fail to write to the logfile %s, ierr = %d.\n",
229                          merger->out_filename, ierr );
230         fflush( stderr );
231         CLOG_Util_abort( 1 );
232     }
233 }
234 
235 /*
236     clog_merger_minblocksize is defined in CLOG_Merger_init()
237 */
CLOG_Merger_reserved_block_size(CLOG_int32_t rectype)238 static int CLOG_Merger_reserved_block_size( CLOG_int32_t rectype )
239 {
240     /* Have CLOG_Rec_size() do the error checking */
241     return CLOG_Rec_size( rectype ) + clog_merger_minblocksize;
242 }
243 
244 /*
245    Save the CLOG record marked by CLOG_Rec_Header_t to the output file.
246 */
CLOG_Merger_save_rec(CLOG_Merger_t * merger,const CLOG_Rec_Header_t * hdr)247 void CLOG_Merger_save_rec( CLOG_Merger_t *merger, const CLOG_Rec_Header_t *hdr )
248 {
249     CLOG_BlockData_t   *sorted_blk;
250     CLOG_Rec_Header_t  *sorted_hdr;
251     int                 reclen;
252 
253     sorted_blk = merger->sorted_blk;
254 
255 
256     /*
257         If the sorted_blk is full, send sorted_blk to the process's parent
258         if the parent exists.  If the parent process does not exist, it must
259         be the root.  Then write the sorted_blk to the merged logfile,
260         reinitialize the sorted_blk.
261     */
262     if (    sorted_blk->ptr + CLOG_Merger_reserved_block_size( hdr->rectype )
263          >= sorted_blk->tail ) {
264         sorted_hdr = (CLOG_Rec_Header_t *) sorted_blk->ptr;
265         sorted_hdr->time       = hdr->time;     /* use prev record's time */
266         sorted_hdr->icomm      = 0;
267         sorted_hdr->rank       = merger->local_world_rank;
268         sorted_hdr->thread     = 0;
269         sorted_hdr->rectype    = CLOG_REC_ENDBLOCK;
270 
271         if ( merger->parent_world_rank != CLOG_RANK_NULL ) {
272             PMPI_Ssend( sorted_blk->head, merger->block_size,
273                         CLOG_DATAUNIT_MPI_TYPE, merger->parent_world_rank,
274                         CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD );
275         }
276         else { /* if parent_world_rank does not exist, must be the root */
277             CLOG_Merger_flush( merger );
278         }
279         sorted_blk->ptr = sorted_blk->head;
280     }
281 
282     /* CLOG_Rec_print( hdr, stdout ); */
283     /* Save the CLOG record into the sorted buffer */
284     reclen = CLOG_Rec_size( hdr->rectype );
285     memcpy( sorted_blk->ptr, hdr, reclen );
286     sorted_blk->ptr += reclen;
287 }
288 
289 
290 /* Used internally by CLOG_Merger_sort() */
291 /*
292    Here blockdata can be either left_blk or right_blk defined in CLOG_Merger_t
293 */
CLOG_Merger_refill_sideblock(CLOG_BlockData_t * blockdata,int block_world_rank,int block_size)294 void CLOG_Merger_refill_sideblock( CLOG_BlockData_t  *blockdata,
295                                    int block_world_rank, int block_size )
296 {
297     MPI_Status         status;
298 
299     PMPI_Recv( blockdata->head, block_size, CLOG_DATAUNIT_MPI_TYPE,
300                block_world_rank, CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD,
301                &status );
302     CLOG_BlockData_reset( blockdata );
303 }
304 
305 /* Used internally by CLOG_Merger_sort() */
CLOG_Merger_refill_localblock(CLOG_BlockData_t * blockdata,CLOG_Buffer_t * buffer,CLOG_Time_t * timediff_handle)306 void CLOG_Merger_refill_localblock( CLOG_BlockData_t *blockdata,
307                                     CLOG_Buffer_t    *buffer,
308                                     CLOG_Time_t      *timediff_handle )
309 {
310     /* refill the whole CLOG_Buffer_t from disk */
311     if ( buffer->curr_block == NULL || buffer->num_used_blocks == 0 )
312         CLOG_Buffer_localIO_read( buffer );
313 
314     if ( buffer->num_used_blocks > 0 ) {
315         blockdata->head = buffer->curr_block->data->head;
316         CLOG_BlockData_patch_all( blockdata, timediff_handle,
317                                   buffer->commset->table );
318         CLOG_BlockData_reset( blockdata );
319         buffer->curr_block = buffer->curr_block->next;
320         buffer->num_used_blocks--;
321     }
322     else /* if ( buffer->num_used_blocks == 0 ) */
323         blockdata->ptr += CLOG_Rec_size( CLOG_REC_ENDBLOCK );
324 }
325 
326 /* Used internally by CLOG_Merger_sort() */
327 /*
328    Here blockdata can be either left_blk or right_blk defined in CLOG_Merger_t
329 */
330 CLOG_Rec_Header_t *
CLOG_Merger_next_sideblock_hdr(CLOG_BlockData_t * blockdata,CLOG_Rec_Header_t * hdr,CLOG_Merger_t * merger,int block_world_rank,int block_size)331 CLOG_Merger_next_sideblock_hdr( CLOG_BlockData_t   *blockdata,
332                                 CLOG_Rec_Header_t  *hdr,
333                                 CLOG_Merger_t      *merger,
334                                 int                 block_world_rank,
335                                 int                 block_size )
336 {
337     if ( hdr->rectype == CLOG_REC_ENDLOG ) {
338         hdr->time      = CLOG_MAXTIME;
339         (merger->num_active_blks)--;
340     }
341     else {
342         CLOG_Merger_save_rec( merger, hdr );
343         blockdata->ptr += CLOG_Rec_size( hdr->rectype );
344         hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
345         if ( hdr->rectype == CLOG_REC_ENDBLOCK ) {
346             CLOG_Merger_refill_sideblock( blockdata, block_world_rank,
347                                           block_size );
348             hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
349         }
350     }
351     return hdr;
352 }
353 
354 /* Used internally by CLOG_Merger_sort() */
355 CLOG_Rec_Header_t *
CLOG_Merger_next_localblock_hdr(CLOG_BlockData_t * blockdata,CLOG_Rec_Header_t * hdr,CLOG_Merger_t * merger,CLOG_Buffer_t * buffer,CLOG_Time_t * timediff_handle)356 CLOG_Merger_next_localblock_hdr( CLOG_BlockData_t   *blockdata,
357                                  CLOG_Rec_Header_t  *hdr,
358                                  CLOG_Merger_t      *merger,
359                                  CLOG_Buffer_t      *buffer,
360                                  CLOG_Time_t        *timediff_handle )
361 {
362     if ( hdr->rectype == CLOG_REC_ENDLOG ) {
363         hdr->time      = CLOG_MAXTIME;
364         (merger->num_active_blks)--;
365     }
366     else {
367         CLOG_Merger_save_rec( merger, hdr );
368         blockdata->ptr += CLOG_Rec_size( hdr->rectype );
369         hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
370         if ( hdr->rectype == CLOG_REC_ENDBLOCK ) {
371             CLOG_Merger_refill_localblock( blockdata, buffer,
372                                            timediff_handle );
373             hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
374         }
375     }
376     return hdr;
377 }
378 
379 
380 /*
381 */
CLOG_Merger_sort(CLOG_Merger_t * merger,CLOG_Buffer_t * buffer)382 void CLOG_Merger_sort( CLOG_Merger_t *merger, CLOG_Buffer_t *buffer )
383 {
384     CLOG_Rec_Header_t  *left_hdr, *right_hdr, *local_hdr;
385     CLOG_BlockData_t   *left_blk, *right_blk, *local_blk;
386     CLOG_BlockData_t    local_shadow_blockdata ;
387     CLOG_Time_t         local_timediff;
388     int                 left_world_rank, right_world_rank;
389     int                 block_size;
390 
391     /* May want to move this to CLOG_Merger_init() or CLOG_Converge_init() */
392     /* Merge/Synchronize all the CLOG_CommSet_t at all the processes */
393     CLOG_CommSet_merge( buffer->commset );
394     /*
395         reinitialization of CLOG_Buffer_t so that
396         buffer->curr_block == buffer->head->block
397     */
398     CLOG_Buffer_localIO_reinit4read( buffer );
399 
400     merger->num_active_blks = 0;
401     block_size              = merger->block_size;
402 
403     /* local_timediff's value is modifiable by CLOG_BlockData_patch_all() */
404     local_timediff          = 0.0;
405     left_world_rank         = merger->left_world_rank;
406     right_world_rank        = merger->right_world_rank;
407     left_blk                = merger->left_blk;
408     right_blk               = merger->right_blk;
409 
410     local_blk  = &local_shadow_blockdata;
411     /* Initialize the local_blk from CLOG_Buffer_t.curr_block */
412     if ( buffer->curr_block != NULL && buffer->num_used_blocks > 0 ) {
413         merger->num_active_blks++;
414         CLOG_Merger_refill_localblock( local_blk, buffer, &local_timediff );
415     }
416 
417     /* Initialize CLOG_Merger_t.left_blk from its left neighbor */
418     if ( left_world_rank != CLOG_RANK_NULL ) {
419         merger->num_active_blks++;
420         CLOG_Merger_refill_sideblock( left_blk, left_world_rank, block_size );
421     }
422     else {
423         left_hdr            = (CLOG_Rec_Header_t *) left_blk->head;
424         left_hdr->time      = CLOG_MAXTIME;
425     }
426 
427     /* Initialize CLOG_Merger_t.right_blk its right neighbor */
428     if ( right_world_rank != CLOG_RANK_NULL ) {
429         merger->num_active_blks++;
430         CLOG_Merger_refill_sideblock( right_blk, right_world_rank, block_size );
431     }
432     else {
433         right_hdr            = (CLOG_Rec_Header_t *) right_blk->head;
434         right_hdr->time      = CLOG_MAXTIME;
435     }
436 
437     left_hdr  = (CLOG_Rec_Header_t *) left_blk->ptr;
438     right_hdr = (CLOG_Rec_Header_t *) right_blk->ptr;
439     local_hdr = (CLOG_Rec_Header_t *) local_blk->ptr;
440     while ( merger->num_active_blks > 0 ) {
441         if ( left_hdr->time <= right_hdr->time ) {
442             if ( left_hdr->time <= local_hdr->time )
443                 left_hdr  = CLOG_Merger_next_sideblock_hdr( left_blk,
444                                                             left_hdr,
445                                                             merger,
446                                                             left_world_rank,
447                                                             block_size );
448             else
449                 local_hdr = CLOG_Merger_next_localblock_hdr( local_blk,
450                                                              local_hdr,
451                                                              merger,
452                                                              buffer,
453                                                              &local_timediff );
454         }
455         else {
456             if ( right_hdr->time <= local_hdr->time )
457                 right_hdr = CLOG_Merger_next_sideblock_hdr( right_blk,
458                                                             right_hdr,
459                                                             merger,
460                                                             right_world_rank,
461                                                             block_size );
462             else
463                 local_hdr = CLOG_Merger_next_localblock_hdr( local_blk,
464                                                              local_hdr,
465                                                              merger,
466                                                              buffer,
467                                                              &local_timediff );
468         }
469     }   /* Endof while ( merger->num_active_blks > 0 ) */
470 }
471 
CLOG_Merger_last_flush(CLOG_Merger_t * merger)472 void CLOG_Merger_last_flush( CLOG_Merger_t *merger )
473 {
474     CLOG_BlockData_t  *sorted_blk;
475     CLOG_Rec_Header_t *sorted_hdr;
476 
477     sorted_blk = merger->sorted_blk;
478 
479     /* Write the CLOG_REC_ENDLOG as the very last record in the sorted buffer */
480     sorted_hdr = (CLOG_Rec_Header_t *) sorted_blk->ptr;
481     sorted_hdr->time       = CLOG_MAXTIME;    /* use prev record's time */
482     sorted_hdr->icomm      = 0;
483     sorted_hdr->rank       = merger->local_world_rank;
484     sorted_hdr->thread     = 0;
485     sorted_hdr->rectype    = CLOG_REC_ENDLOG;
486 
487     if ( merger->parent_world_rank != CLOG_RANK_NULL ) {
488         PMPI_Ssend( sorted_blk->head, merger->block_size,
489                     CLOG_DATAUNIT_MPI_TYPE, merger->parent_world_rank,
490                     CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD );
491     }
492     else { /* if parent_world_rank does not exist, must be the root */
493         CLOG_Merger_flush( merger );
494     }
495 }
496