1 /*
2 (C) 2001 by Argonne National Laboratory.
3 See COPYRIGHT in top-level directory.
4 */
5 #include "mpe_logging_conf.h"
6
7 #if defined( HAVE_SYS_TYPES_H)
8 #include <sys/types.h>
9 #endif
10 #if defined( STDC_HEADERS ) || defined( HAVE_STDIO_H )
11 #include <stdio.h>
12 #endif
13 #if defined( STDC_HEADERS ) || defined( HAVE_STDLIB_H )
14 #include <stdlib.h>
15 #endif
16
17 #if !defined( CLOG_NOMPI )
18 #include "mpi.h"
19 #else
20 #include "mpi_null.h"
21 #endif /* Endof if !defined( CLOG_NOMPI ) */
22
23 #include "clog.h"
24 #include "clog_mem.h"
25 #include "clog_util.h"
26 #include "clog_record.h"
27 #include "clog_preamble.h"
28 #include "clog_timer.h"
29 #include "clog_sync.h"
30 #include "clog_merger.h"
31 #include "clog_commset.h"
32
CLOG_Open(void)33 CLOG_Stream_t *CLOG_Open( void )
34 {
35 CLOG_Stream_t *stream;
36
37 stream = (CLOG_Stream_t *) MALLOC( sizeof( CLOG_Stream_t ) );
38 if ( stream == NULL ) {
39 fprintf( stderr, __FILE__":CLOG_Open() - MALLOC() fails.\n" );
40 fflush( stderr );
41 return NULL;
42 }
43
44 stream->buffer = CLOG_Buffer_create();
45 if ( stream->buffer == NULL ) {
46 fprintf( stderr, __FILE__":CLOG_Open() - \n"
47 "\t""CLOG_Buffer_create() returns NULL.\n" );
48 fflush( stderr );
49 return NULL;
50 }
51
52 stream->syncer = NULL;
53 stream->merger = NULL;
54
55 return stream;
56 }
57
CLOG_Close(CLOG_Stream_t ** stream_handle)58 void CLOG_Close( CLOG_Stream_t **stream_handle )
59 {
60 CLOG_Stream_t *stream;
61
62 stream = *stream_handle;
63 if ( stream != NULL ) {
64 CLOG_Buffer_localIO_finalize( stream->buffer );
65 CLOG_Buffer_free( &(stream->buffer) );
66 FREE( stream );
67 }
68 *stream_handle = NULL;
69 }
70
71 /*
72 CLOG_Local_init() has to be called before any of CLOG_Buffer_save_xxxx()
73 is invoked. It is usually called right after CLOG_Stream_t is created.
74 */
CLOG_Local_init(CLOG_Stream_t * stream,const char * local_tmpfile_name)75 void CLOG_Local_init( CLOG_Stream_t *stream, const char *local_tmpfile_name )
76 {
77 CLOG_Buffer_t *buffer;
78
79 stream->known_solo_eventID = CLOG_KNOWN_SOLO_EVENTID_START;
80 stream->known_eventID = CLOG_KNOWN_EVENTID_START;
81 stream->known_stateID = CLOG_KNOWN_STATEID_START;
82 stream->user_eventID = CLOG_USER_EVENTID_START;
83 stream->user_stateID = CLOG_USER_STATEID_START;
84 stream->user_solo_eventID = CLOG_USER_SOLO_EVENTID_START;
85
86 CLOG_Rec_sizes_init();
87 buffer = stream->buffer;
88 CLOG_Buffer_init4write( buffer, local_tmpfile_name );
89
90 /* Initialize the synchronizer */
91 stream->syncer = CLOG_Sync_create( buffer->world_size, buffer->world_rank );
92 CLOG_Sync_init( stream->syncer );
93
94 /* CLOG_Timer_start() HAS TO BE CALLED before any CLOG_Buffer_save_xxx() */
95 CLOG_Timer_start();
96 /*
97 Save a default CLOG_REC_TIMESHIFT as the very first record
98 of the local clog file, so that its value can be used to
99 adjust events happened afterward.
100 */
101 CLOG_Buffer_init_timeshift( buffer );
102 }
103
CLOG_Local_finalize(CLOG_Stream_t * stream)104 void CLOG_Local_finalize( CLOG_Stream_t *stream )
105 {
106 const CLOG_CommIDs_t *commIDs;
107 CLOG_Buffer_t *buffer;
108 CLOG_Sync_t *syncer;
109 CLOG_Time_t local_timediff;
110
111 syncer = stream->syncer;
112 if ( syncer->world_rank == 0 ) {
113 if ( syncer->is_ok_to_sync == CLOG_BOOL_TRUE ) {
114 fprintf( stderr, "Enabling the %s clock synchronization...\n",
115 CLOG_Sync_print_type( syncer ) );
116 }
117 else
118 fprintf( stderr, "Disabling the clock synchronization...\n" );
119 }
120
121 buffer = stream->buffer;
122
123 /* Adding the CLOG_Buffer_write2disk's state definition */
124 if ( buffer->world_rank == 0
125 && buffer->log_overhead == CLOG_BOOL_TRUE ) {
126 commIDs = CLOG_CommSet_get_IDs( buffer->commset, MPI_COMM_WORLD );
127 CLOG_Buffer_save_statedef( buffer, commIDs, 0,
128 CLOG_STATEID_BUFFERWRITE,
129 CLOG_EVT_BUFFERWRITE_START,
130 CLOG_EVT_BUFFERWRITE_FINAL,
131 "maroon", "CLOG_Buffer_write2disk",
132 NULL );
133 }
134
135 if ( stream->syncer->is_ok_to_sync == CLOG_BOOL_TRUE ) {
136 local_timediff = CLOG_Sync_run( stream->syncer );
137 CLOG_Buffer_set_timeshift( buffer, local_timediff, CLOG_BOOL_FALSE );
138 }
139 CLOG_Sync_free( &(stream->syncer) );
140
141 CLOG_Buffer_save_endlog( buffer );
142 CLOG_Buffer_localIO_flush( buffer );
143 }
144
CLOG_Get_known_solo_eventID(CLOG_Stream_t * stream)145 int CLOG_Get_known_solo_eventID( CLOG_Stream_t *stream )
146 {
147 if ( stream->known_solo_eventID < CLOG_KNOWN_EVENTID_START )
148 return (stream->known_solo_eventID)++;
149 else {
150 fprintf( stderr, __FILE__":CLOG_Get_known_solo_eventID() - \n"
151 "\t""CLOG internal KNOWN solo eventID are used up, "
152 "last known solo eventID is %d. Aborting...\n",
153 stream->known_solo_eventID );
154 fflush( stderr );
155 CLOG_Util_abort( 1 );
156 return stream->known_solo_eventID;
157 }
158 }
159
CLOG_Get_known_eventID(CLOG_Stream_t * stream)160 int CLOG_Get_known_eventID( CLOG_Stream_t *stream )
161 {
162 if ( stream->known_eventID < CLOG_USER_EVENTID_START )
163 return (stream->known_eventID)++;
164 else {
165 fprintf( stderr, __FILE__":CLOG_Get_known_eventID() - \n"
166 "\t""CLOG internal KNOWN eventID have been used up, "
167 "use CLOG user eventID %d.\n",
168 stream->user_eventID );
169 fflush( stderr );
170 return (stream->user_eventID)++;
171 }
172 }
173
CLOG_Get_user_eventID(CLOG_Stream_t * stream)174 int CLOG_Get_user_eventID( CLOG_Stream_t *stream )
175 {
176 if ( stream->user_eventID < CLOG_USER_SOLO_EVENTID_START )
177 return (stream->user_eventID)++;
178 else {
179 fprintf( stderr, __FILE__":CLOG_Get_user_eventID() - \n"
180 "\t""CLOG internal USER eventID have been used up, "
181 "use CLOG user SOLO eventID %d.\n",
182 stream->user_eventID );
183 fflush( stderr );
184 return (stream->user_eventID)++;
185 }
186 }
187
CLOG_Get_user_solo_eventID(CLOG_Stream_t * stream)188 int CLOG_Get_user_solo_eventID( CLOG_Stream_t *stream )
189 {
190 return (stream->user_solo_eventID)++;
191 }
192
CLOG_Get_known_stateID(CLOG_Stream_t * stream)193 int CLOG_Get_known_stateID( CLOG_Stream_t *stream )
194 {
195 if ( stream->known_stateID < CLOG_USER_STATEID_START )
196 return (stream->known_stateID)++;
197 else {
198 fprintf( stderr, __FILE__":CLOG_Get_known_stateID() - \n"
199 "\t""CLOG internal KNOWN stateID have been used up, "
200 "use CLOG user stateID %d.\n",
201 stream->user_stateID );
202 fflush( stderr );
203 return (stream->user_stateID)++;
204 }
205 }
206
CLOG_Get_user_stateID(CLOG_Stream_t * stream)207 int CLOG_Get_user_stateID( CLOG_Stream_t *stream )
208 {
209 return (stream->user_stateID)++;
210 }
211
CLOG_Check_known_stateID(CLOG_Stream_t * stream,int stateID)212 int CLOG_Check_known_stateID( CLOG_Stream_t *stream, int stateID )
213 {
214 if ( stateID >= CLOG_KNOWN_STATEID_START
215 && stateID < CLOG_USER_STATEID_START )
216 return CLOG_BOOL_TRUE;
217 else
218 return CLOG_BOOL_FALSE;
219 }
220
CLOG_Converge_init(CLOG_Stream_t * stream,const char * merged_file_prefix)221 void CLOG_Converge_init( CLOG_Stream_t *stream,
222 const char *merged_file_prefix )
223 {
224 /* stream->buffer->block_size == stream->buffer->preamble->block_size */
225 stream->merger = CLOG_Merger_create( stream->buffer->block_size );
226 /*
227 Update CLOG_Preamble_t with current values before its being passed to
228 CLOG_Merger_init() where it will be copied to the disk at root process.
229 */
230 stream->buffer->preamble->user_stateID_count
231 = stream->user_stateID - CLOG_USER_STATEID_START;
232 stream->buffer->preamble->known_solo_eventID_count
233 = stream->known_solo_eventID - CLOG_KNOWN_SOLO_EVENTID_START;
234 stream->buffer->preamble->user_solo_eventID_count
235 = stream->user_solo_eventID - CLOG_USER_SOLO_EVENTID_START;
236 CLOG_Merger_init( stream->merger, stream->buffer->preamble,
237 merged_file_prefix );
238 }
239
CLOG_Converge_finalize(CLOG_Stream_t * stream)240 void CLOG_Converge_finalize( CLOG_Stream_t *stream )
241 {
242 CLOG_Merger_finalize( stream->merger, stream->buffer );
243 CLOG_Merger_free( &(stream->merger) );
244 }
245
CLOG_Converge_sort(CLOG_Stream_t * stream)246 void CLOG_Converge_sort( CLOG_Stream_t *stream )
247 {
248 CLOG_Merger_sort( stream->merger, stream->buffer );
249 CLOG_Merger_last_flush( stream->merger );
250 }
251