1 /*
2 (C) 2001 by Argonne National Laboratory.
3 See COPYRIGHT in top-level directory.
4 */
5 #include "mpe_logging_conf.h"
6
7 #if defined( STDC_HEADERS ) || defined( HAVE_STDIO_H )
8 #include <stdio.h>
9 #endif
10 #if defined( STDC_HEADERS ) || defined( HAVE_STDLIB_H )
11 #include <stdlib.h>
12 #endif
13 #if defined( STDC_HEADERS ) || defined( HAVE_STRING_H )
14 #include <string.h>
15 #endif
16 #if defined( HAVE_FCNTL_H )
17 #include <fcntl.h>
18 #endif
19 #ifdef HAVE_SYS_TYPES_H
20 #include <sys/types.h>
21 #endif
22 #if defined( HAVE_UNISTD_H )
23 #include <unistd.h>
24 #endif
25 #ifdef HAVE_IO_H
26 #include <io.h>
27 #endif
28
29 #if !defined( CLOG_NOMPI )
30 #include "mpi.h"
31 #else
32 #include "mpi_null.h"
33 #endif /* Endof if !defined( CLOG_NOMPI ) */
34
35 #include "clog_const.h"
36 #include "clog_mem.h"
37 #include "clog_merger.h"
38 #include "clog_util.h"
39
40 #define CLOG_RANK_NULL -1
41
42 static int clog_merger_minblocksize;
43
CLOG_Merger_create(unsigned int block_size)44 CLOG_Merger_t* CLOG_Merger_create( unsigned int block_size )
45 {
46 CLOG_Merger_t *merger;
47
48 merger = (CLOG_Merger_t *) MALLOC( sizeof(CLOG_Merger_t) );
49 if ( merger == NULL ) {
50 fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
51 "\t""MALLOC() fails for CLOG_Merger_t!\n" );
52 fflush( stderr );
53 return NULL;
54 }
55
56
57 merger->left_blk = (CLOG_BlockData_t *)
58 CLOG_BlockData_create( block_size );
59 if ( merger->left_blk == NULL ) {
60 fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
61 "\t""CLOG_BlockData_create(%d) fails for left_blk!",
62 block_size );
63 fflush( stderr );
64 return NULL;
65 }
66
67 merger->right_blk = (CLOG_BlockData_t *)
68 CLOG_BlockData_create( block_size );
69 if ( merger->right_blk == NULL ) {
70 fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
71 "\t""CLOG_BlockData_create(%d) fails for right_blk!",
72 block_size );
73 fflush( stderr );
74 return NULL;
75 }
76
77 merger->sorted_blk = (CLOG_BlockData_t *)
78 CLOG_BlockData_create( block_size );
79 if ( merger->sorted_blk == NULL ) {
80 fprintf( stderr, __FILE__":CLOG_Merger_create() - \n"
81 "\t""CLOG_BlockData_create(%d) fails for sorted_blk!",
82 block_size );
83 fflush( stderr );
84 return NULL;
85 }
86
87 merger->block_size = block_size;
88 merger->num_active_blks = 0;
89 merger->world_size = 1;
90 merger->local_world_rank = 0;
91 merger->left_world_rank = 0;
92 merger->right_world_rank = 0;
93 merger->parent_world_rank = 0;
94
95 merger->is_big_endian = CLOG_BOOL_NULL;
96 strncpy( merger->out_filename, "mpe_trace."CLOG_FILE_TYPE,
97 CLOG_PATH_STRLEN );
98 merger->out_fd = -1;
99
100 return merger;
101 }
102
CLOG_Merger_free(CLOG_Merger_t ** merger_handle)103 void CLOG_Merger_free( CLOG_Merger_t **merger_handle )
104 {
105 CLOG_Merger_t *merger;
106 merger = *merger_handle;
107 if ( merger != NULL ) {
108 CLOG_BlockData_free( &(merger->sorted_blk) );
109 CLOG_BlockData_free( &(merger->left_blk) );
110 CLOG_BlockData_free( &(merger->right_blk) );
111 FREE( merger );
112 }
113 *merger_handle = NULL;
114 }
115
116 /*@
117 CLOG_Merger_set_neighbor_ranks - locally determine parent and children
118 in a binary tree
119
120 Input parameters
121
122 + merger->local_world_rank - calling process''s id
123 - merger->world_size - total number of processes in tree
124
125 Output parameters
126
127 + merger->parent_world_rank - parent of local process
128 (or CLOG_RANK_NULL if root)
129 . merger->left_world_rank - left child of local process in binary tree
130 (or CLOG_RANK_NULL if none)
131 - merger->right_world_rank - right child of local process in binary tree
132 (or CLOG_RANK_NULL if none)
133
134 @*/
CLOG_Merger_set_neighbor_ranks(CLOG_Merger_t * merger)135 void CLOG_Merger_set_neighbor_ranks( CLOG_Merger_t *merger )
136 {
137 if ( merger->local_world_rank == 0 )
138 merger->parent_world_rank = CLOG_RANK_NULL;
139 else
140 merger->parent_world_rank = (merger->local_world_rank - 1) / 2;
141
142 merger->left_world_rank = (2 * merger->local_world_rank) + 1;
143 if ( merger->left_world_rank > merger->world_size - 1 )
144 merger->left_world_rank = CLOG_RANK_NULL;
145
146 merger->right_world_rank = (2 * merger->local_world_rank) + 2;
147 if ( merger->right_world_rank > merger->world_size - 1 )
148 merger->right_world_rank = CLOG_RANK_NULL;
149 }
150
151 /* Initialize the CLOG_Merger_t */
CLOG_Merger_init(CLOG_Merger_t * merger,const CLOG_Preamble_t * preamble,const char * merged_file_prefix)152 void CLOG_Merger_init( CLOG_Merger_t *merger,
153 const CLOG_Preamble_t *preamble,
154 const char *merged_file_prefix )
155 {
156 /* Set up the binary tree MPI ranks ID locally for merging */
157 PMPI_Comm_size( MPI_COMM_WORLD, &(merger->world_size) );
158 PMPI_Comm_rank( MPI_COMM_WORLD, &(merger->local_world_rank) );
159 CLOG_Merger_set_neighbor_ranks( merger );
160
161 merger->is_big_endian = preamble->is_big_endian;
162 /* Open the merged output file at root process */
163 if ( merger->parent_world_rank == CLOG_RANK_NULL ) {
164 strncpy( merger->out_filename, merged_file_prefix, CLOG_PATH_STRLEN );
165 strcat( merger->out_filename, "."CLOG_FILE_TYPE );
166
167 merger->out_fd = OPEN( merger->out_filename, O_CREAT|O_WRONLY|O_TRUNC,
168 0664 );
169 if ( merger->out_fd == -1 ) {
170 fprintf( stderr, __FILE__":CLOG_Merger_init() - \n"
171 "\t""Could not open file %s for merging!\n",
172 merger->out_filename );
173 fflush( stderr );
174 CLOG_Util_abort( 1 );
175 }
176 CLOG_Preamble_write( preamble, CLOG_BOOL_TRUE, CLOG_BOOL_TRUE,
177 merger->out_fd );
178 }
179
180 /*
181 Initialize the CLOG_Merger's minimum block size: CLOG_REC_ENDBLOCK
182 */
183 clog_merger_minblocksize = CLOG_Rec_size( CLOG_REC_ENDBLOCK );
184 }
185
186 /* Finalize the CLOG_Merger_t */
CLOG_Merger_finalize(CLOG_Merger_t * merger,CLOG_Buffer_t * buffer)187 void CLOG_Merger_finalize( CLOG_Merger_t *merger, CLOG_Buffer_t *buffer )
188 {
189 CLOG_BOOL_T do_byte_swap;
190 int ierr;
191
192 if ( merger->out_fd != -1 ) {
193 /* Update CLOG_Preamble_t with commtable_fptr */
194 buffer->preamble->commtable_fptr
195 = (CLOG_int64_t) lseek( merger->out_fd, 0, SEEK_CUR );
196 /* Save CLOG_CommSet_t to file */
197 do_byte_swap = merger->is_big_endian != CLOG_BOOL_TRUE;
198 ierr = CLOG_CommSet_write( buffer->commset, merger->out_fd,
199 do_byte_swap );
200 if ( ierr < 0 ) {
201 fprintf( stderr, __FILE__":CLOG_Merger_finalize() - \n"
202 "\t""CLOG_CommSet_write() fails!\n" );
203 fflush( stderr );
204 return;
205 }
206 /* Save the updated CLOG_Preamble_t */
207 lseek( merger->out_fd, 0, SEEK_SET );
208 CLOG_Preamble_write( buffer->preamble, CLOG_BOOL_TRUE, CLOG_BOOL_TRUE,
209 merger->out_fd );
210 close( merger->out_fd );
211 }
212 }
213
214 /*
215 Flush the sorted buffer to the output logfile.
216 */
CLOG_Merger_flush(CLOG_Merger_t * merger)217 void CLOG_Merger_flush( CLOG_Merger_t *merger )
218 {
219 int ierr;
220
221 if ( merger->is_big_endian != CLOG_BOOL_TRUE )
222 CLOG_BlockData_swap_bytes_last( merger->sorted_blk );
223
224 ierr = write( merger->out_fd, merger->sorted_blk->head,
225 merger->block_size );
226 if ( ierr != merger->block_size ) {
227 fprintf( stderr, __FILE__":CLOG_Merger_flush() - \n"
228 "\t""Fail to write to the logfile %s, ierr = %d.\n",
229 merger->out_filename, ierr );
230 fflush( stderr );
231 CLOG_Util_abort( 1 );
232 }
233 }
234
235 /*
236 clog_merger_minblocksize is defined in CLOG_Merger_init()
237 */
CLOG_Merger_reserved_block_size(CLOG_int32_t rectype)238 static int CLOG_Merger_reserved_block_size( CLOG_int32_t rectype )
239 {
240 /* Have CLOG_Rec_size() do the error checking */
241 return CLOG_Rec_size( rectype ) + clog_merger_minblocksize;
242 }
243
244 /*
245 Save the CLOG record marked by CLOG_Rec_Header_t to the output file.
246 */
CLOG_Merger_save_rec(CLOG_Merger_t * merger,const CLOG_Rec_Header_t * hdr)247 void CLOG_Merger_save_rec( CLOG_Merger_t *merger, const CLOG_Rec_Header_t *hdr )
248 {
249 CLOG_BlockData_t *sorted_blk;
250 CLOG_Rec_Header_t *sorted_hdr;
251 int reclen;
252
253 sorted_blk = merger->sorted_blk;
254
255
256 /*
257 If the sorted_blk is full, send sorted_blk to the process's parent
258 if the parent exists. If the parent process does not exist, it must
259 be the root. Then write the sorted_blk to the merged logfile,
260 reinitialize the sorted_blk.
261 */
262 if ( sorted_blk->ptr + CLOG_Merger_reserved_block_size( hdr->rectype )
263 >= sorted_blk->tail ) {
264 sorted_hdr = (CLOG_Rec_Header_t *) sorted_blk->ptr;
265 sorted_hdr->time = hdr->time; /* use prev record's time */
266 sorted_hdr->icomm = 0;
267 sorted_hdr->rank = merger->local_world_rank;
268 sorted_hdr->thread = 0;
269 sorted_hdr->rectype = CLOG_REC_ENDBLOCK;
270
271 if ( merger->parent_world_rank != CLOG_RANK_NULL ) {
272 PMPI_Ssend( sorted_blk->head, merger->block_size,
273 CLOG_DATAUNIT_MPI_TYPE, merger->parent_world_rank,
274 CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD );
275 }
276 else { /* if parent_world_rank does not exist, must be the root */
277 CLOG_Merger_flush( merger );
278 }
279 sorted_blk->ptr = sorted_blk->head;
280 }
281
282 /* CLOG_Rec_print( hdr, stdout ); */
283 /* Save the CLOG record into the sorted buffer */
284 reclen = CLOG_Rec_size( hdr->rectype );
285 memcpy( sorted_blk->ptr, hdr, reclen );
286 sorted_blk->ptr += reclen;
287 }
288
289
290 /* Used internally by CLOG_Merger_sort() */
291 /*
292 Here blockdata can be either left_blk or right_blk defined in CLOG_Merger_t
293 */
CLOG_Merger_refill_sideblock(CLOG_BlockData_t * blockdata,int block_world_rank,int block_size)294 void CLOG_Merger_refill_sideblock( CLOG_BlockData_t *blockdata,
295 int block_world_rank, int block_size )
296 {
297 MPI_Status status;
298
299 PMPI_Recv( blockdata->head, block_size, CLOG_DATAUNIT_MPI_TYPE,
300 block_world_rank, CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD,
301 &status );
302 CLOG_BlockData_reset( blockdata );
303 }
304
305 /* Used internally by CLOG_Merger_sort() */
CLOG_Merger_refill_localblock(CLOG_BlockData_t * blockdata,CLOG_Buffer_t * buffer,CLOG_Time_t * timediff_handle)306 void CLOG_Merger_refill_localblock( CLOG_BlockData_t *blockdata,
307 CLOG_Buffer_t *buffer,
308 CLOG_Time_t *timediff_handle )
309 {
310 /* refill the whole CLOG_Buffer_t from disk */
311 if ( buffer->curr_block == NULL || buffer->num_used_blocks == 0 )
312 CLOG_Buffer_localIO_read( buffer );
313
314 if ( buffer->num_used_blocks > 0 ) {
315 blockdata->head = buffer->curr_block->data->head;
316 CLOG_BlockData_patch_all( blockdata, timediff_handle,
317 buffer->commset->table );
318 CLOG_BlockData_reset( blockdata );
319 buffer->curr_block = buffer->curr_block->next;
320 buffer->num_used_blocks--;
321 }
322 else /* if ( buffer->num_used_blocks == 0 ) */
323 blockdata->ptr += CLOG_Rec_size( CLOG_REC_ENDBLOCK );
324 }
325
326 /* Used internally by CLOG_Merger_sort() */
327 /*
328 Here blockdata can be either left_blk or right_blk defined in CLOG_Merger_t
329 */
330 CLOG_Rec_Header_t *
CLOG_Merger_next_sideblock_hdr(CLOG_BlockData_t * blockdata,CLOG_Rec_Header_t * hdr,CLOG_Merger_t * merger,int block_world_rank,int block_size)331 CLOG_Merger_next_sideblock_hdr( CLOG_BlockData_t *blockdata,
332 CLOG_Rec_Header_t *hdr,
333 CLOG_Merger_t *merger,
334 int block_world_rank,
335 int block_size )
336 {
337 if ( hdr->rectype == CLOG_REC_ENDLOG ) {
338 hdr->time = CLOG_MAXTIME;
339 (merger->num_active_blks)--;
340 }
341 else {
342 CLOG_Merger_save_rec( merger, hdr );
343 blockdata->ptr += CLOG_Rec_size( hdr->rectype );
344 hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
345 if ( hdr->rectype == CLOG_REC_ENDBLOCK ) {
346 CLOG_Merger_refill_sideblock( blockdata, block_world_rank,
347 block_size );
348 hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
349 }
350 }
351 return hdr;
352 }
353
354 /* Used internally by CLOG_Merger_sort() */
355 CLOG_Rec_Header_t *
CLOG_Merger_next_localblock_hdr(CLOG_BlockData_t * blockdata,CLOG_Rec_Header_t * hdr,CLOG_Merger_t * merger,CLOG_Buffer_t * buffer,CLOG_Time_t * timediff_handle)356 CLOG_Merger_next_localblock_hdr( CLOG_BlockData_t *blockdata,
357 CLOG_Rec_Header_t *hdr,
358 CLOG_Merger_t *merger,
359 CLOG_Buffer_t *buffer,
360 CLOG_Time_t *timediff_handle )
361 {
362 if ( hdr->rectype == CLOG_REC_ENDLOG ) {
363 hdr->time = CLOG_MAXTIME;
364 (merger->num_active_blks)--;
365 }
366 else {
367 CLOG_Merger_save_rec( merger, hdr );
368 blockdata->ptr += CLOG_Rec_size( hdr->rectype );
369 hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
370 if ( hdr->rectype == CLOG_REC_ENDBLOCK ) {
371 CLOG_Merger_refill_localblock( blockdata, buffer,
372 timediff_handle );
373 hdr = (CLOG_Rec_Header_t *) blockdata->ptr;
374 }
375 }
376 return hdr;
377 }
378
379
380 /*
381 */
CLOG_Merger_sort(CLOG_Merger_t * merger,CLOG_Buffer_t * buffer)382 void CLOG_Merger_sort( CLOG_Merger_t *merger, CLOG_Buffer_t *buffer )
383 {
384 CLOG_Rec_Header_t *left_hdr, *right_hdr, *local_hdr;
385 CLOG_BlockData_t *left_blk, *right_blk, *local_blk;
386 CLOG_BlockData_t local_shadow_blockdata ;
387 CLOG_Time_t local_timediff;
388 int left_world_rank, right_world_rank;
389 int block_size;
390
391 /* May want to move this to CLOG_Merger_init() or CLOG_Converge_init() */
392 /* Merge/Synchronize all the CLOG_CommSet_t at all the processes */
393 CLOG_CommSet_merge( buffer->commset );
394 /*
395 reinitialization of CLOG_Buffer_t so that
396 buffer->curr_block == buffer->head->block
397 */
398 CLOG_Buffer_localIO_reinit4read( buffer );
399
400 merger->num_active_blks = 0;
401 block_size = merger->block_size;
402
403 /* local_timediff's value is modifiable by CLOG_BlockData_patch_all() */
404 local_timediff = 0.0;
405 left_world_rank = merger->left_world_rank;
406 right_world_rank = merger->right_world_rank;
407 left_blk = merger->left_blk;
408 right_blk = merger->right_blk;
409
410 local_blk = &local_shadow_blockdata;
411 /* Initialize the local_blk from CLOG_Buffer_t.curr_block */
412 if ( buffer->curr_block != NULL && buffer->num_used_blocks > 0 ) {
413 merger->num_active_blks++;
414 CLOG_Merger_refill_localblock( local_blk, buffer, &local_timediff );
415 }
416
417 /* Initialize CLOG_Merger_t.left_blk from its left neighbor */
418 if ( left_world_rank != CLOG_RANK_NULL ) {
419 merger->num_active_blks++;
420 CLOG_Merger_refill_sideblock( left_blk, left_world_rank, block_size );
421 }
422 else {
423 left_hdr = (CLOG_Rec_Header_t *) left_blk->head;
424 left_hdr->time = CLOG_MAXTIME;
425 }
426
427 /* Initialize CLOG_Merger_t.right_blk its right neighbor */
428 if ( right_world_rank != CLOG_RANK_NULL ) {
429 merger->num_active_blks++;
430 CLOG_Merger_refill_sideblock( right_blk, right_world_rank, block_size );
431 }
432 else {
433 right_hdr = (CLOG_Rec_Header_t *) right_blk->head;
434 right_hdr->time = CLOG_MAXTIME;
435 }
436
437 left_hdr = (CLOG_Rec_Header_t *) left_blk->ptr;
438 right_hdr = (CLOG_Rec_Header_t *) right_blk->ptr;
439 local_hdr = (CLOG_Rec_Header_t *) local_blk->ptr;
440 while ( merger->num_active_blks > 0 ) {
441 if ( left_hdr->time <= right_hdr->time ) {
442 if ( left_hdr->time <= local_hdr->time )
443 left_hdr = CLOG_Merger_next_sideblock_hdr( left_blk,
444 left_hdr,
445 merger,
446 left_world_rank,
447 block_size );
448 else
449 local_hdr = CLOG_Merger_next_localblock_hdr( local_blk,
450 local_hdr,
451 merger,
452 buffer,
453 &local_timediff );
454 }
455 else {
456 if ( right_hdr->time <= local_hdr->time )
457 right_hdr = CLOG_Merger_next_sideblock_hdr( right_blk,
458 right_hdr,
459 merger,
460 right_world_rank,
461 block_size );
462 else
463 local_hdr = CLOG_Merger_next_localblock_hdr( local_blk,
464 local_hdr,
465 merger,
466 buffer,
467 &local_timediff );
468 }
469 } /* Endof while ( merger->num_active_blks > 0 ) */
470 }
471
CLOG_Merger_last_flush(CLOG_Merger_t * merger)472 void CLOG_Merger_last_flush( CLOG_Merger_t *merger )
473 {
474 CLOG_BlockData_t *sorted_blk;
475 CLOG_Rec_Header_t *sorted_hdr;
476
477 sorted_blk = merger->sorted_blk;
478
479 /* Write the CLOG_REC_ENDLOG as the very last record in the sorted buffer */
480 sorted_hdr = (CLOG_Rec_Header_t *) sorted_blk->ptr;
481 sorted_hdr->time = CLOG_MAXTIME; /* use prev record's time */
482 sorted_hdr->icomm = 0;
483 sorted_hdr->rank = merger->local_world_rank;
484 sorted_hdr->thread = 0;
485 sorted_hdr->rectype = CLOG_REC_ENDLOG;
486
487 if ( merger->parent_world_rank != CLOG_RANK_NULL ) {
488 PMPI_Ssend( sorted_blk->head, merger->block_size,
489 CLOG_DATAUNIT_MPI_TYPE, merger->parent_world_rank,
490 CLOG_MERGE_LOGBUFTYPE, MPI_COMM_WORLD );
491 }
492 else { /* if parent_world_rank does not exist, must be the root */
493 CLOG_Merger_flush( merger );
494 }
495 }
496