1 /*
2    (C) 2001 by Argonne National Laboratory.
3        See COPYRIGHT in top-level directory.
4 */
5 #include "mpe_logging_conf.h"
6 
7 #if defined( HAVE_SYS_TYPES_H)
8 #include <sys/types.h>
9 #endif
10 #if defined( STDC_HEADERS ) || defined( HAVE_STDIO_H )
11 #include <stdio.h>
12 #endif
13 #if defined( STDC_HEADERS ) || defined( HAVE_STDLIB_H )
14 #include <stdlib.h>
15 #endif
16 
17 #if !defined( CLOG_NOMPI )
18 #include "mpi.h"
19 #else
20 #include "mpi_null.h"
21 #endif /* Endof if !defined( CLOG_NOMPI ) */
22 
23 #include "clog.h"
24 #include "clog_mem.h"
25 #include "clog_util.h"
26 #include "clog_record.h"
27 #include "clog_preamble.h"
28 #include "clog_timer.h"
29 #include "clog_sync.h"
30 #include "clog_merger.h"
31 #include "clog_commset.h"
32 
CLOG_Open(void)33 CLOG_Stream_t *CLOG_Open( void )
34 {
35     CLOG_Stream_t  *stream;
36 
37     stream = (CLOG_Stream_t *) MALLOC( sizeof( CLOG_Stream_t ) );
38     if ( stream == NULL ) {
39         fprintf( stderr, __FILE__":CLOG_Open() - MALLOC() fails.\n" );
40         fflush( stderr );
41         return NULL;
42     }
43 
44     stream->buffer = CLOG_Buffer_create();
45     if ( stream->buffer == NULL ) {
46         fprintf( stderr, __FILE__":CLOG_Open() - \n"
47                          "\t""CLOG_Buffer_create() returns NULL.\n" );
48         fflush( stderr );
49         return NULL;
50     }
51 
52     stream->syncer = NULL;
53     stream->merger = NULL;
54 
55     return stream;
56 }
57 
CLOG_Close(CLOG_Stream_t ** stream_handle)58 void CLOG_Close( CLOG_Stream_t **stream_handle )
59 {
60     CLOG_Stream_t *stream;
61 
62     stream = *stream_handle;
63     if ( stream != NULL ) {
64         CLOG_Buffer_localIO_finalize( stream->buffer );
65         CLOG_Buffer_free( &(stream->buffer) );
66         FREE( stream );
67     }
68     *stream_handle = NULL;
69 }
70 
71 /*
72    CLOG_Local_init() has to be called before any of CLOG_Buffer_save_xxxx()
73    is invoked.  It is usually called right after CLOG_Stream_t is created.
74 */
CLOG_Local_init(CLOG_Stream_t * stream,const char * local_tmpfile_name)75 void CLOG_Local_init( CLOG_Stream_t *stream, const char *local_tmpfile_name )
76 {
77     CLOG_Buffer_t  *buffer;
78 
79     stream->known_solo_eventID = CLOG_KNOWN_SOLO_EVENTID_START;
80     stream->known_eventID      = CLOG_KNOWN_EVENTID_START;
81     stream->known_stateID      = CLOG_KNOWN_STATEID_START;
82     stream->user_eventID       = CLOG_USER_EVENTID_START;
83     stream->user_stateID       = CLOG_USER_STATEID_START;
84     stream->user_solo_eventID  = CLOG_USER_SOLO_EVENTID_START;
85 
86     CLOG_Rec_sizes_init();
87     buffer  = stream->buffer;
88     CLOG_Buffer_init4write( buffer, local_tmpfile_name );
89 
90     /* Initialize the synchronizer */
91     stream->syncer = CLOG_Sync_create( buffer->world_size, buffer->world_rank );
92     CLOG_Sync_init( stream->syncer );
93 
94     /* CLOG_Timer_start() HAS TO BE CALLED before any CLOG_Buffer_save_xxx() */
95     CLOG_Timer_start();
96     /*
97        Save a default CLOG_REC_TIMESHIFT as the very first record
98        of the local clog file, so that its value can be used to
99        adjust events happened afterward.
100     */
101     CLOG_Buffer_init_timeshift( buffer );
102 }
103 
CLOG_Local_finalize(CLOG_Stream_t * stream)104 void CLOG_Local_finalize( CLOG_Stream_t *stream )
105 {
106     const CLOG_CommIDs_t *commIDs;
107           CLOG_Buffer_t  *buffer;
108           CLOG_Sync_t    *syncer;
109           CLOG_Time_t     local_timediff;
110 
111     syncer = stream->syncer;
112     if ( syncer->world_rank == 0 ) {
113         if ( syncer->is_ok_to_sync == CLOG_BOOL_TRUE ) {
114             fprintf( stderr, "Enabling the %s clock synchronization...\n",
115                              CLOG_Sync_print_type( syncer ) );
116         }
117         else
118             fprintf( stderr, "Disabling the clock synchronization...\n" );
119     }
120 
121     buffer  = stream->buffer;
122 
123     /* Adding the CLOG_Buffer_write2disk's state definition */
124     if (    buffer->world_rank    == 0
125          && buffer->log_overhead  == CLOG_BOOL_TRUE ) {
126         commIDs  = CLOG_CommSet_get_IDs( buffer->commset, MPI_COMM_WORLD );
127         CLOG_Buffer_save_statedef( buffer, commIDs, 0,
128                                    CLOG_STATEID_BUFFERWRITE,
129                                    CLOG_EVT_BUFFERWRITE_START,
130                                    CLOG_EVT_BUFFERWRITE_FINAL,
131                                    "maroon", "CLOG_Buffer_write2disk",
132                                    NULL );
133     }
134 
135     if ( stream->syncer->is_ok_to_sync == CLOG_BOOL_TRUE ) {
136         local_timediff = CLOG_Sync_run( stream->syncer );
137         CLOG_Buffer_set_timeshift( buffer, local_timediff, CLOG_BOOL_FALSE );
138     }
139     CLOG_Sync_free( &(stream->syncer) );
140 
141     CLOG_Buffer_save_endlog( buffer );
142     CLOG_Buffer_localIO_flush( buffer );
143 }
144 
CLOG_Get_known_solo_eventID(CLOG_Stream_t * stream)145 int  CLOG_Get_known_solo_eventID( CLOG_Stream_t *stream )
146 {
147     if ( stream->known_solo_eventID < CLOG_KNOWN_EVENTID_START )
148         return (stream->known_solo_eventID)++;
149     else {
150         fprintf( stderr, __FILE__":CLOG_Get_known_solo_eventID() - \n"
151                          "\t""CLOG internal KNOWN solo eventID are used up, "
152                          "last known solo eventID is %d.  Aborting...\n",
153                          stream->known_solo_eventID );
154         fflush( stderr );
155         CLOG_Util_abort( 1 );
156         return stream->known_solo_eventID;
157     }
158 }
159 
CLOG_Get_known_eventID(CLOG_Stream_t * stream)160 int  CLOG_Get_known_eventID( CLOG_Stream_t *stream )
161 {
162     if ( stream->known_eventID < CLOG_USER_EVENTID_START )
163         return (stream->known_eventID)++;
164     else {
165         fprintf( stderr, __FILE__":CLOG_Get_known_eventID() - \n"
166                          "\t""CLOG internal KNOWN eventID have been used up, "
167                          "use CLOG user eventID %d.\n",
168                          stream->user_eventID );
169         fflush( stderr );
170         return (stream->user_eventID)++;
171     }
172 }
173 
CLOG_Get_user_eventID(CLOG_Stream_t * stream)174 int  CLOG_Get_user_eventID( CLOG_Stream_t *stream )
175 {
176     if ( stream->user_eventID < CLOG_USER_SOLO_EVENTID_START )
177         return (stream->user_eventID)++;
178     else {
179         fprintf( stderr, __FILE__":CLOG_Get_user_eventID() - \n"
180                          "\t""CLOG internal USER eventID have been used up, "
181                          "use CLOG user SOLO eventID %d.\n",
182                          stream->user_eventID );
183         fflush( stderr );
184         return (stream->user_eventID)++;
185     }
186 }
187 
CLOG_Get_user_solo_eventID(CLOG_Stream_t * stream)188 int  CLOG_Get_user_solo_eventID( CLOG_Stream_t *stream )
189 {
190     return (stream->user_solo_eventID)++;
191 }
192 
CLOG_Get_known_stateID(CLOG_Stream_t * stream)193 int  CLOG_Get_known_stateID( CLOG_Stream_t *stream )
194 {
195     if ( stream->known_stateID < CLOG_USER_STATEID_START )
196         return (stream->known_stateID)++;
197     else {
198         fprintf( stderr, __FILE__":CLOG_Get_known_stateID() - \n"
199                          "\t""CLOG internal KNOWN stateID have been used up, "
200                          "use CLOG user stateID %d.\n",
201                          stream->user_stateID );
202         fflush( stderr );
203         return (stream->user_stateID)++;
204     }
205 }
206 
CLOG_Get_user_stateID(CLOG_Stream_t * stream)207 int  CLOG_Get_user_stateID( CLOG_Stream_t *stream )
208 {
209     return (stream->user_stateID)++;
210 }
211 
CLOG_Check_known_stateID(CLOG_Stream_t * stream,int stateID)212 int  CLOG_Check_known_stateID( CLOG_Stream_t *stream, int stateID )
213 {
214     if (    stateID >= CLOG_KNOWN_STATEID_START
215          && stateID <  CLOG_USER_STATEID_START )
216         return CLOG_BOOL_TRUE;
217     else
218         return CLOG_BOOL_FALSE;
219 }
220 
CLOG_Converge_init(CLOG_Stream_t * stream,const char * merged_file_prefix)221 void CLOG_Converge_init(       CLOG_Stream_t *stream,
222                          const char          *merged_file_prefix )
223 {
224     /* stream->buffer->block_size == stream->buffer->preamble->block_size */
225     stream->merger = CLOG_Merger_create( stream->buffer->block_size );
226     /*
227        Update CLOG_Preamble_t with current values before its being passed to
228        CLOG_Merger_init() where it will be copied to the disk at root process.
229     */
230     stream->buffer->preamble->user_stateID_count
231     = stream->user_stateID - CLOG_USER_STATEID_START;
232     stream->buffer->preamble->known_solo_eventID_count
233     = stream->known_solo_eventID  - CLOG_KNOWN_SOLO_EVENTID_START;
234     stream->buffer->preamble->user_solo_eventID_count
235     = stream->user_solo_eventID  - CLOG_USER_SOLO_EVENTID_START;
236     CLOG_Merger_init( stream->merger, stream->buffer->preamble,
237                       merged_file_prefix );
238 }
239 
CLOG_Converge_finalize(CLOG_Stream_t * stream)240 void CLOG_Converge_finalize( CLOG_Stream_t *stream )
241 {
242     CLOG_Merger_finalize( stream->merger, stream->buffer );
243     CLOG_Merger_free( &(stream->merger) );
244 }
245 
CLOG_Converge_sort(CLOG_Stream_t * stream)246 void CLOG_Converge_sort( CLOG_Stream_t *stream )
247 {
248     CLOG_Merger_sort( stream->merger, stream->buffer );
249     CLOG_Merger_last_flush( stream->merger );
250 }
251