1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs;
20 
21 import java.io.BufferedReader;
22 import java.io.BufferedWriter;
23 import java.io.DataInputStream;
24 import java.io.DataOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.io.OutputStreamWriter;
29 import java.text.SimpleDateFormat;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Date;
33 import java.util.Map;
34 import java.util.StringTokenizer;
35 import java.util.HashMap;
36 
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.io.LongWritable;
41 import org.apache.hadoop.io.SequenceFile;
42 import org.apache.hadoop.io.Text;
43 import org.apache.hadoop.io.SequenceFile.CompressionType;
44 import org.apache.hadoop.io.compress.CompressionCodec;
45 import org.apache.hadoop.io.compress.GzipCodec;
46 import org.apache.hadoop.mapred.*;
47 import org.apache.hadoop.util.ReflectionUtils;
48 import org.apache.hadoop.util.StringUtils;
49 
50 /**
51  * Job History Log Analyzer.
52  *
53  * <h3>Description.</h3>
54  * This a tool for parsing and analyzing history logs of map-reduce jobs.
55  * History logs contain information about execution of jobs, tasks, and
56  * attempts. This tool focuses on submission, launch, start, and finish times,
57  * as well as the success or failure of jobs, tasks, and attempts.
58  * <p>
59  * The analyzer calculates <em>per hour slot utilization</em> for the cluster
60  * as follows.
61  * For each task attempt it divides the time segment from the start of the
62  * attempt t<sub>S</sub> to the finish t<sub>F</sub> into whole hours
63  * [t<sub>0</sub>, ..., t<sub>n</sub>], where t<sub>0</sub> <= t<sub>S</sub>
64  * is the maximal whole hour preceding t<sub>S</sub>, and
65  * t<sub>n</sub> >= t<sub>F</sub> is the minimal whole hour after t<sub>F</sub>.
66  * Thus, [t<sub>0</sub>, ..., t<sub>n</sub>] covers the segment
67  * [t<sub>S</sub>, t<sub>F</sub>], during which the attempt was executed.
68  * Each interval [t<sub>i</sub>, t<sub>i+1</sub>] fully contained in
69  * [t<sub>S</sub>, t<sub>F</sub>] corresponds to exactly one slot on
70  * a map-reduce cluster (usually MAP-slot or REDUCE-slot).
71  * If interval [t<sub>i</sub>, t<sub>i+1</sub>] only intersects with
72  * [t<sub>S</sub>, t<sub>F</sub>] then we say that the task
73  * attempt used just a fraction of the slot during this hour.
74  * The fraction equals the size of the intersection.
75  * Let slotTime(A, h) denote the number of slots calculated that way for a
76  * specific attempt A during hour h.
77  * The tool then sums all slots for all attempts for every hour.
78  * The result is the slot hour utilization of the cluster:
79  * <tt>slotTime(h) = SUM<sub>A</sub> slotTime(A,h)</tt>.
80  * <p>
81  * Log analyzer calculates slot hours for <em>MAP</em> and <em>REDUCE</em>
82  * attempts separately.
83  * <p>
84  * Log analyzer distinguishes between <em>successful</em> and <em>failed</em>
85  * attempts. Task attempt is considered successful if its own status is SUCCESS
86  * and the statuses of the task and the job it is a part of are also SUCCESS.
87  * Otherwise the task attempt is considered failed.
88  * <p>
89  * Map-reduce clusters are usually configured to have a fixed number of MAP
90  * and REDUCE slots per node. Thus the maximal possible number of slots on
91  * the cluster is <tt>total_slots = total_nodes * slots_per_node</tt>.
92  * Effective slot hour cannot exceed <tt>total_slots</tt> for successful
93  * attempts.
94  * <p>
95  * <em>Pending time</em> characterizes the wait time of attempts.
96  * It is calculated similarly to the slot hour except that the wait interval
97  * starts when the job is submitted and ends when an attempt starts execution.
98  * In addition to that pending time also includes intervals between attempts
99  * of the same task if it was re-executed.
100  * <p>
101  * History log analyzer calculates two pending time variations. First is based
102  * on job submission time as described above, second, starts the wait interval
103  * when the job is launched rather than submitted.
104  *
105  * <h3>Input.</h3>
106  * The following input parameters can be specified in the argument string
107  * to the job log analyzer:
108  * <ul>
109  * <li><tt>-historyDir inputDir</tt> specifies the location of the directory
110  * where analyzer will be looking for job history log files.</li>
111  * <li><tt>-resFile resultFile</tt> the name of the result file.</li>
112  * <li><tt>-usersIncluded | -usersExcluded userList</tt> slot utilization and
113  * pending time can be calculated for all or for all but the specified users.
114  * <br>
115  * <tt>userList</tt> is a comma or semicolon separated list of users.</li>
116  * <li><tt>-gzip</tt> is used if history log files are compressed.
117  * Only {@link GzipCodec} is currently supported.</li>
118  * <li><tt>-jobDelimiter pattern</tt> one can concatenate original log files into
119  * larger file(s) with the specified delimiter to recognize the end of the log
120  * for one job from the next one.<br>
121  * <tt>pattern</tt> is a java regular expression
122  * {@link java.util.regex.Pattern}, which should match only the log delimiters.
123  * <br>
124  * E.g. pattern <tt>".!!FILE=.*!!"</tt> matches delimiters, which contain
125  * the original history log file names in the following form:<br>
126  * <tt>"$!!FILE=my.job.tracker.com_myJobId_user_wordcount.log!!"</tt></li>
127  * <li><tt>-clean</tt> cleans up default directories used by the analyzer.</li>
128  * <li><tt>-test</tt> test one file locally and exit;
129  * does not require map-reduce.</li>
130  * <li><tt>-help</tt> print usage.</li>
131  * </ul>
132  *
133  * <h3>Output.</h3>
134  * The output file is formatted as a tab separated table consisting of four
135  * columns: <tt>SERIES, PERIOD, TYPE, SLOT_HOUR</tt>.
136  * <ul>
137  * <li><tt>SERIES</tt> one of the four statistical series;</li>
138  * <li><tt>PERIOD</tt> the start of the time interval in the following format:
139  * <tt>"yyyy-mm-dd hh:mm:ss"</tt>;</li>
140  * <li><tt>TYPE</tt> the slot type, e.g. MAP or REDUCE;</li>
141  * <li><tt>SLOT_HOUR</tt> the value of the slot usage during this
142  * time interval.</li>
143  * </ul>
144  */
145 @SuppressWarnings("deprecation")
146 public class JHLogAnalyzer {
147   private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class);
148   // Constants
149   private static final String JHLA_ROOT_DIR =
150                             System.getProperty("test.build.data", "stats/JHLA");
151   private static final Path INPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_input");
152   private static final String BASE_INPUT_FILE_NAME = "jhla_in_";
153   private static final Path OUTPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_output");
154   private static final Path RESULT_FILE =
155                             new Path(JHLA_ROOT_DIR, "jhla_result.txt");
156   private static final Path DEFAULT_HISTORY_DIR = new Path("history");
157 
158   private static final int DEFAULT_TIME_INTERVAL_MSEC = 1000*60*60; // 1 hour
159 
160   static{
161     Configuration.addDefaultResource("hdfs-default.xml");
162     Configuration.addDefaultResource("hdfs-site.xml");
163   }
164 
165   static enum StatSeries {
166     STAT_ALL_SLOT_TIME
167           (AccumulatingReducer.VALUE_TYPE_LONG + "allSlotTime"),
168     STAT_FAILED_SLOT_TIME
169           (AccumulatingReducer.VALUE_TYPE_LONG + "failedSlotTime"),
170     STAT_SUBMIT_PENDING_SLOT_TIME
171           (AccumulatingReducer.VALUE_TYPE_LONG + "submitPendingSlotTime"),
172     STAT_LAUNCHED_PENDING_SLOT_TIME
173           (AccumulatingReducer.VALUE_TYPE_LONG + "launchedPendingSlotTime");
174 
175     private String statName = null;
StatSeries(String name)176     private StatSeries(String name) {this.statName = name;}
toString()177     public String toString() {return statName;}
178   }
179 
180   private static class FileCreateDaemon extends Thread {
181     private static final int NUM_CREATE_THREADS = 10;
182     private static volatile int numFinishedThreads;
183     private static volatile int numRunningThreads;
184     private static FileStatus[] jhLogFiles;
185 
186     FileSystem fs;
187     int start;
188     int end;
189 
FileCreateDaemon(FileSystem fs, int start, int end)190     FileCreateDaemon(FileSystem fs, int start, int end) {
191       this.fs = fs;
192       this.start = start;
193       this.end = end;
194     }
195 
run()196     public void run() {
197       try {
198         for(int i=start; i < end; i++) {
199           String name = getFileName(i);
200           Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
201           SequenceFile.Writer writer = null;
202           try {
203             writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
204                                                Text.class, LongWritable.class,
205                                                CompressionType.NONE);
206             String logFile = jhLogFiles[i].getPath().toString();
207             writer.append(new Text(logFile), new LongWritable(0));
208           } catch(Exception e) {
209             throw new IOException(e);
210           } finally {
211             if (writer != null)
212               writer.close();
213             writer = null;
214           }
215         }
216       } catch(IOException ex) {
217         LOG.error("FileCreateDaemon failed.", ex);
218       }
219       numFinishedThreads++;
220     }
221 
createControlFile(FileSystem fs, Path jhLogDir )222     private static void createControlFile(FileSystem fs, Path jhLogDir
223     ) throws IOException {
224       fs.delete(INPUT_DIR, true);
225       jhLogFiles = fs.listStatus(jhLogDir);
226 
227       numFinishedThreads = 0;
228       try {
229         int start = 0;
230         int step = jhLogFiles.length / NUM_CREATE_THREADS
231         + ((jhLogFiles.length % NUM_CREATE_THREADS) > 0 ? 1 : 0);
232         FileCreateDaemon[] daemons = new FileCreateDaemon[NUM_CREATE_THREADS];
233         numRunningThreads = 0;
234         for(int tIdx=0; tIdx < NUM_CREATE_THREADS && start < jhLogFiles.length; tIdx++) {
235           int end = Math.min(start + step, jhLogFiles.length);
236           daemons[tIdx] = new FileCreateDaemon(fs, start, end);
237           start += step;
238           numRunningThreads++;
239         }
240         for(int tIdx=0; tIdx < numRunningThreads; tIdx++) {
241           daemons[tIdx].start();
242         }
243       } finally {
244         int prevValue = 0;
245         while(numFinishedThreads < numRunningThreads) {
246           if(prevValue < numFinishedThreads) {
247             LOG.info("Finished " + numFinishedThreads + " threads out of " + numRunningThreads);
248             prevValue = numFinishedThreads;
249           }
250           try {Thread.sleep(500);} catch (InterruptedException e) {}
251         }
252       }
253     }
254   }
255 
createControlFile(FileSystem fs, Path jhLogDir )256   private static void createControlFile(FileSystem fs, Path jhLogDir
257   ) throws IOException {
258     LOG.info("creating control file: JH log dir = " + jhLogDir);
259     FileCreateDaemon.createControlFile(fs, jhLogDir);
260     LOG.info("created control file: JH log dir = " + jhLogDir);
261   }
262 
getFileName(int fIdx)263   private static String getFileName(int fIdx) {
264     return BASE_INPUT_FILE_NAME + Integer.toString(fIdx);
265   }
266 
267   /**
268    * If keyVal is of the form KEY="VALUE", then this will return [KEY, VALUE]
269    */
getKeyValue(String t)270   private static String [] getKeyValue(String t) throws IOException {
271     String[] keyVal = t.split("=\"*|\"");
272     return keyVal;
273   }
274 
275   /**
276    * JobHistory log record.
277    */
278   private static class JobHistoryLog {
279     String JOBID;
280     String JOB_STATUS;
281     long SUBMIT_TIME;
282     long LAUNCH_TIME;
283     long FINISH_TIME;
284     long TOTAL_MAPS;
285     long TOTAL_REDUCES;
286     long FINISHED_MAPS;
287     long FINISHED_REDUCES;
288     String USER;
289     Map<String, TaskHistoryLog> tasks;
290 
isSuccessful()291     boolean isSuccessful() {
292      return (JOB_STATUS != null) && JOB_STATUS.equals("SUCCESS");
293     }
294 
parseLine(String line)295     void parseLine(String line) throws IOException {
296       StringTokenizer tokens = new StringTokenizer(line);
297       if(!tokens.hasMoreTokens())
298         return;
299       String what = tokens.nextToken();
300       // Line should start with one of the following:
301       // Job, Task, MapAttempt, ReduceAttempt
302       if(what.equals("Job"))
303         updateJob(tokens);
304       else if(what.equals("Task"))
305         updateTask(tokens);
306       else if(what.indexOf("Attempt") >= 0)
307         updateTaskAttempt(tokens);
308     }
309 
updateJob(StringTokenizer tokens)310     private void updateJob(StringTokenizer tokens) throws IOException {
311       while(tokens.hasMoreTokens()) {
312         String t = tokens.nextToken();
313         String[] keyVal = getKeyValue(t);
314         if(keyVal.length < 2) continue;
315 
316         if(keyVal[0].equals("JOBID")) {
317           if(JOBID == null)
318             JOBID = new String(keyVal[1]);
319           else if(!JOBID.equals(keyVal[1])) {
320             LOG.error("Incorrect JOBID: "
321                 + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100))
322                 + " expect " + JOBID);
323             return;
324           }
325         }
326         else if(keyVal[0].equals("JOB_STATUS"))
327           JOB_STATUS = new String(keyVal[1]);
328         else if(keyVal[0].equals("SUBMIT_TIME"))
329           SUBMIT_TIME = Long.parseLong(keyVal[1]);
330         else if(keyVal[0].equals("LAUNCH_TIME"))
331           LAUNCH_TIME = Long.parseLong(keyVal[1]);
332         else if(keyVal[0].equals("FINISH_TIME"))
333           FINISH_TIME = Long.parseLong(keyVal[1]);
334         else if(keyVal[0].equals("TOTAL_MAPS"))
335           TOTAL_MAPS = Long.parseLong(keyVal[1]);
336         else if(keyVal[0].equals("TOTAL_REDUCES"))
337           TOTAL_REDUCES = Long.parseLong(keyVal[1]);
338         else if(keyVal[0].equals("FINISHED_MAPS"))
339           FINISHED_MAPS = Long.parseLong(keyVal[1]);
340         else if(keyVal[0].equals("FINISHED_REDUCES"))
341           FINISHED_REDUCES = Long.parseLong(keyVal[1]);
342         else if(keyVal[0].equals("USER"))
343           USER = new String(keyVal[1]);
344       }
345     }
346 
updateTask(StringTokenizer tokens)347     private void updateTask(StringTokenizer tokens) throws IOException {
348       // unpack
349       TaskHistoryLog task = new TaskHistoryLog().parse(tokens);
350       if(task.TASKID == null) {
351         LOG.error("TASKID = NULL for job " + JOBID);
352         return;
353       }
354       // update or insert
355       if(tasks == null)
356         tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES));
357       TaskHistoryLog existing = tasks.get(task.TASKID);
358       if(existing == null)
359         tasks.put(task.TASKID, task);
360       else
361         existing.updateWith(task);
362     }
363 
updateTaskAttempt(StringTokenizer tokens)364     private void updateTaskAttempt(StringTokenizer tokens) throws IOException {
365       // unpack
366       TaskAttemptHistoryLog attempt = new TaskAttemptHistoryLog();
367       String taskID = attempt.parse(tokens);
368       if(taskID == null) return;
369       if(tasks == null)
370         tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES));
371       TaskHistoryLog existing = tasks.get(taskID);
372       if(existing == null) {
373         existing = new TaskHistoryLog(taskID);
374         tasks.put(taskID, existing);
375       }
376       existing.updateWith(attempt);
377     }
378   }
379 
380   /**
381    * TaskHistory log record.
382    */
383   private static class TaskHistoryLog {
384     String TASKID;
385     String TASK_TYPE;   // MAP, REDUCE, SETUP, CLEANUP
386     String TASK_STATUS;
387     long START_TIME;
388     long FINISH_TIME;
389     Map<String, TaskAttemptHistoryLog> attempts;
390 
TaskHistoryLog()391     TaskHistoryLog() {}
392 
TaskHistoryLog(String taskID)393     TaskHistoryLog(String taskID) {
394       TASKID = taskID;
395     }
396 
isSuccessful()397     boolean isSuccessful() {
398       return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS");
399     }
400 
parse(StringTokenizer tokens)401     TaskHistoryLog parse(StringTokenizer tokens) throws IOException {
402       while(tokens.hasMoreTokens()) {
403         String t = tokens.nextToken();
404         String[] keyVal = getKeyValue(t);
405         if(keyVal.length < 2) continue;
406 
407         if(keyVal[0].equals("TASKID")) {
408           if(TASKID == null)
409             TASKID = new String(keyVal[1]);
410           else if(!TASKID.equals(keyVal[1])) {
411             LOG.error("Incorrect TASKID: "
412                 + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100))
413                 + " expect " + TASKID);
414             continue;
415           }
416         }
417         else if(keyVal[0].equals("TASK_TYPE"))
418           TASK_TYPE = new String(keyVal[1]);
419         else if(keyVal[0].equals("TASK_STATUS"))
420           TASK_STATUS = new String(keyVal[1]);
421         else if(keyVal[0].equals("START_TIME"))
422           START_TIME = Long.parseLong(keyVal[1]);
423         else if(keyVal[0].equals("FINISH_TIME"))
424           FINISH_TIME = Long.parseLong(keyVal[1]);
425       }
426       return this;
427     }
428 
429     /**
430      * Update with non-null fields of the same task log record.
431      */
updateWith(TaskHistoryLog from)432     void updateWith(TaskHistoryLog from) throws IOException {
433       if(TASKID == null)
434         TASKID = from.TASKID;
435       else if(!TASKID.equals(from.TASKID)) {
436         throw new IOException("Incorrect TASKID: " + from.TASKID
437                             + " expect " + TASKID);
438       }
439       if(TASK_TYPE == null)
440         TASK_TYPE = from.TASK_TYPE;
441       else if(! TASK_TYPE.equals(from.TASK_TYPE)) {
442         LOG.error(
443             "Incorrect TASK_TYPE: " + from.TASK_TYPE + " expect " + TASK_TYPE
444             + " for task " + TASKID);
445         return;
446       }
447       if(from.TASK_STATUS != null)
448         TASK_STATUS = from.TASK_STATUS;
449       if(from.START_TIME > 0)
450         START_TIME = from.START_TIME;
451       if(from.FINISH_TIME > 0)
452         FINISH_TIME = from.FINISH_TIME;
453     }
454 
455     /**
456      * Update with non-null fields of the task attempt log record.
457      */
updateWith(TaskAttemptHistoryLog attempt)458     void updateWith(TaskAttemptHistoryLog attempt) throws IOException {
459       if(attempt.TASK_ATTEMPT_ID == null) {
460         LOG.error("Unexpected TASK_ATTEMPT_ID = null for task " + TASKID);
461         return;
462       }
463       if(attempts == null)
464         attempts = new HashMap<String, TaskAttemptHistoryLog>();
465       TaskAttemptHistoryLog existing = attempts.get(attempt.TASK_ATTEMPT_ID);
466       if(existing == null)
467         attempts.put(attempt.TASK_ATTEMPT_ID, attempt);
468       else
469         existing.updateWith(attempt);
470       // update task start time
471       if(attempt.START_TIME > 0 &&
472           (this.START_TIME == 0 || this.START_TIME > attempt.START_TIME))
473         START_TIME = attempt.START_TIME;
474     }
475   }
476 
477   /**
478    * TaskAttemptHistory log record.
479    */
480   private static class TaskAttemptHistoryLog {
481     String TASK_ATTEMPT_ID;
482     String TASK_STATUS; // this task attempt status
483     long START_TIME;
484     long FINISH_TIME;
485     long HDFS_BYTES_READ;
486     long HDFS_BYTES_WRITTEN;
487     long FILE_BYTES_READ;
488     long FILE_BYTES_WRITTEN;
489 
490     /**
491      * Task attempt is considered successful iff all three statuses
492      * of the attempt, the task, and the job equal "SUCCESS".
493      */
isSuccessful()494     boolean isSuccessful() {
495       return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS");
496     }
497 
parse(StringTokenizer tokens)498     String parse(StringTokenizer tokens) throws IOException {
499       String taskID = null;
500       while(tokens.hasMoreTokens()) {
501         String t = tokens.nextToken();
502         String[] keyVal = getKeyValue(t);
503         if(keyVal.length < 2) continue;
504 
505         if(keyVal[0].equals("TASKID")) {
506           if(taskID == null)
507             taskID = new String(keyVal[1]);
508           else if(!taskID.equals(keyVal[1])) {
509             LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID);
510             continue;
511           }
512         }
513         else if(keyVal[0].equals("TASK_ATTEMPT_ID")) {
514           if(TASK_ATTEMPT_ID == null)
515             TASK_ATTEMPT_ID = new String(keyVal[1]);
516           else if(!TASK_ATTEMPT_ID.equals(keyVal[1])) {
517             LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID);
518             continue;
519           }
520         }
521         else if(keyVal[0].equals("TASK_STATUS"))
522           TASK_STATUS = new String(keyVal[1]);
523         else if(keyVal[0].equals("START_TIME"))
524           START_TIME = Long.parseLong(keyVal[1]);
525         else if(keyVal[0].equals("FINISH_TIME"))
526           FINISH_TIME = Long.parseLong(keyVal[1]);
527       }
528       return taskID;
529     }
530 
531     /**
532      * Update with non-null fields of the same task attempt log record.
533      */
updateWith(TaskAttemptHistoryLog from)534     void updateWith(TaskAttemptHistoryLog from) throws IOException {
535       if(TASK_ATTEMPT_ID == null)
536         TASK_ATTEMPT_ID = from.TASK_ATTEMPT_ID;
537       else if(! TASK_ATTEMPT_ID.equals(from.TASK_ATTEMPT_ID)) {
538         throw new IOException(
539             "Incorrect TASK_ATTEMPT_ID: " + from.TASK_ATTEMPT_ID
540             + " expect " + TASK_ATTEMPT_ID);
541       }
542       if(from.TASK_STATUS != null)
543         TASK_STATUS = from.TASK_STATUS;
544       if(from.START_TIME > 0)
545         START_TIME = from.START_TIME;
546       if(from.FINISH_TIME > 0)
547         FINISH_TIME = from.FINISH_TIME;
548       if(from.HDFS_BYTES_READ > 0)
549         HDFS_BYTES_READ = from.HDFS_BYTES_READ;
550       if(from.HDFS_BYTES_WRITTEN > 0)
551         HDFS_BYTES_WRITTEN = from.HDFS_BYTES_WRITTEN;
552       if(from.FILE_BYTES_READ > 0)
553         FILE_BYTES_READ = from.FILE_BYTES_READ;
554       if(from.FILE_BYTES_WRITTEN > 0)
555         FILE_BYTES_WRITTEN = from.FILE_BYTES_WRITTEN;
556     }
557   }
558 
559   /**
560    * Key = statName*date-time*taskType
561    * Value = number of msec for the our
562    */
563   private static class IntervalKey {
564     static final String KEY_FIELD_DELIMITER = "*";
565     String statName;
566     String dateTime;
567     String taskType;
568 
IntervalKey(String stat, long timeMSec, String taskType)569     IntervalKey(String stat, long timeMSec, String taskType) {
570       statName = stat;
571       SimpleDateFormat dateF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
572       dateTime = dateF.format(new Date(timeMSec));
573       this.taskType = taskType;
574     }
575 
IntervalKey(String key)576     IntervalKey(String key) {
577       StringTokenizer keyTokens = new StringTokenizer(key, KEY_FIELD_DELIMITER);
578       if(!keyTokens.hasMoreTokens()) return;
579       statName = keyTokens.nextToken();
580       if(!keyTokens.hasMoreTokens()) return;
581       dateTime = keyTokens.nextToken();
582       if(!keyTokens.hasMoreTokens()) return;
583       taskType = keyTokens.nextToken();
584     }
585 
setStatName(String stat)586     void setStatName(String stat) {
587       statName = stat;
588     }
589 
getStringKey()590     String getStringKey() {
591       return statName + KEY_FIELD_DELIMITER +
592              dateTime + KEY_FIELD_DELIMITER +
593              taskType;
594     }
595 
getTextKey()596     Text getTextKey() {
597       return new Text(getStringKey());
598     }
599 
toString()600     public String toString() {
601       return getStringKey();
602     }
603   }
604 
605   /**
606    * Mapper class.
607    */
608   private static class JHLAMapper extends IOMapperBase<Object> {
609     /**
610      * A line pattern, which delimits history logs of different jobs,
611      * if multiple job logs are written in the same file.
612      * Null value means only one job log per file is expected.
613      * The pattern should be a regular expression as in
614      * {@link String#matches(String)}.
615      */
616     String jobDelimiterPattern;
617     int maxJobDelimiterLineLength;
618     /** Count only these users jobs */
619     Collection<String> usersIncluded;
620     /** Exclude jobs of the following users */
621     Collection<String> usersExcluded;
622     /** Type of compression for compressed files: gzip */
623     Class<? extends CompressionCodec> compressionClass;
624 
JHLAMapper()625     JHLAMapper() throws IOException {
626     }
627 
JHLAMapper(Configuration conf)628     JHLAMapper(Configuration conf) throws IOException {
629       configure(new JobConf(conf));
630     }
631 
configure(JobConf conf)632     public void configure(JobConf conf) {
633       super.configure(conf );
634       usersIncluded = getUserList(conf.get("jhla.users.included", null));
635       usersExcluded = getUserList(conf.get("jhla.users.excluded", null));
636       String zipClassName = conf.get("jhla.compression.class", null);
637       try {
638         compressionClass = (zipClassName == null) ? null :
639           Class.forName(zipClassName).asSubclass(CompressionCodec.class);
640       } catch(Exception e) {
641         throw new RuntimeException("Compression codec not found: ", e);
642       }
643       jobDelimiterPattern = conf.get("jhla.job.delimiter.pattern", null);
644       maxJobDelimiterLineLength = conf.getInt("jhla.job.delimiter.length", 512);
645     }
646 
647     @Override
map(Text key, LongWritable value, OutputCollector<Text, Text> output, Reporter reporter)648     public void map(Text key,
649                     LongWritable value,
650                     OutputCollector<Text, Text> output,
651                     Reporter reporter) throws IOException {
652       String name = key.toString();
653       long longValue = value.get();
654 
655       reporter.setStatus("starting " + name + " ::host = " + hostName);
656 
657       long tStart = System.currentTimeMillis();
658       parseLogFile(fs, new Path(name), longValue, output, reporter);
659       long tEnd = System.currentTimeMillis();
660       long execTime = tEnd - tStart;
661 
662       reporter.setStatus("finished " + name + " ::host = " + hostName +
663           " in " + execTime/1000 + " sec.");
664     }
665 
doIO(Reporter reporter, String path, long offset )666     public Object doIO(Reporter reporter,
667                        String path, // full path of history log file
668                        long offset  // starting offset within the file
669                        ) throws IOException {
670       return null;
671     }
672 
collectStats(OutputCollector<Text, Text> output, String name, long execTime, Object jobObjects)673     void collectStats(OutputCollector<Text, Text> output,
674         String name,
675         long execTime,
676         Object jobObjects) throws IOException {
677     }
678 
isEndOfJobLog(String line)679     private boolean isEndOfJobLog(String line) {
680       if(jobDelimiterPattern == null)
681         return false;
682       return line.matches(jobDelimiterPattern);
683     }
684 
685     /**
686      * Collect information about one job.
687      *
688      * @param fs - file system
689      * @param filePath - full path of a history log file
690      * @param offset - starting offset in the history log file
691      * @throws IOException
692      */
parseLogFile(FileSystem fs, Path filePath, long offset, OutputCollector<Text, Text> output, Reporter reporter )693     public void parseLogFile(FileSystem fs,
694                                     Path filePath,
695                                     long offset,
696                                     OutputCollector<Text, Text> output,
697                                     Reporter reporter
698                                   ) throws IOException {
699       InputStream in = null;
700       try {
701         // open file & seek
702         FSDataInputStream stm = fs.open(filePath);
703         stm.seek(offset);
704         in = stm;
705         LOG.info("Opened " + filePath);
706         reporter.setStatus("Opened " + filePath);
707         // get a compression filter if specified
708         if(compressionClass != null) {
709           CompressionCodec codec = (CompressionCodec)
710             ReflectionUtils.newInstance(compressionClass, new Configuration());
711           in = codec.createInputStream(stm);
712           LOG.info("Codec created " + filePath);
713           reporter.setStatus("Codec created " + filePath);
714         }
715         BufferedReader reader = new BufferedReader(new InputStreamReader(in));
716         LOG.info("Reader created " + filePath);
717         // skip to the next job log start
718         long processed = 0L;
719         if(jobDelimiterPattern != null) {
720           for(String line = reader.readLine();
721                 line != null; line = reader.readLine()) {
722             if((stm.getPos() - processed) > 100000) {
723               processed = stm.getPos();
724               reporter.setStatus("Processing " + filePath + " at " + processed);
725             }
726             if(isEndOfJobLog(line))
727               break;
728           }
729         }
730         // parse lines and update job history
731         JobHistoryLog jh = new JobHistoryLog();
732         int jobLineCount = 0;
733         for(String line = readLine(reader);
734               line != null; line = readLine(reader)) {
735           jobLineCount++;
736           if((stm.getPos() - processed) > 20000) {
737             processed = stm.getPos();
738             long numTasks = (jh.tasks == null ? 0 : jh.tasks.size());
739             String txt = "Processing " + filePath + " at " + processed
740                     + " # tasks = " + numTasks;
741             reporter.setStatus(txt);
742             LOG.info(txt);
743           }
744           if(isEndOfJobLog(line)) {
745             if(jh.JOBID != null) {
746               LOG.info("Finished parsing job: " + jh.JOBID
747                      + " line count = " + jobLineCount);
748               collectJobStats(jh, output, reporter);
749               LOG.info("Collected stats for job: " + jh.JOBID);
750             }
751             jh = new JobHistoryLog();
752             jobLineCount = 0;
753           } else
754             jh.parseLine(line);
755         }
756         if(jh.JOBID == null) {
757           LOG.error("JOBID = NULL in " + filePath + " at " + processed);
758           return;
759         }
760         collectJobStats(jh, output, reporter);
761       } catch(Exception ie) {
762         // parsing errors can happen if the file has been truncated
763         LOG.error("JHLAMapper.parseLogFile", ie);
764         reporter.setStatus("JHLAMapper.parseLogFile failed "
765                           + StringUtils.stringifyException(ie));
766         throw new IOException("Job failed.", ie);
767       } finally {
768         if(in != null) in.close();
769       }
770     }
771 
772     /**
773      * Read lines until one ends with a " ." or "\" "
774      */
775     private StringBuffer resBuffer = new StringBuffer();
readLine(BufferedReader reader)776     private String readLine(BufferedReader reader) throws IOException {
777       resBuffer.setLength(0);
778       reader.mark(maxJobDelimiterLineLength);
779       for(String line = reader.readLine();
780                 line != null; line = reader.readLine()) {
781         if(isEndOfJobLog(line)) {
782           if(resBuffer.length() == 0)
783             resBuffer.append(line);
784           else
785             reader.reset();
786           break;
787         }
788         if(resBuffer.length() == 0)
789           resBuffer.append(line);
790         else if(resBuffer.length() < 32000)
791           resBuffer.append(line);
792         if(line.endsWith(" .") || line.endsWith("\" ")) {
793           break;
794         }
795         reader.mark(maxJobDelimiterLineLength);
796       }
797       String result = resBuffer.length() == 0 ? null : resBuffer.toString();
798       resBuffer.setLength(0);
799       return result;
800     }
801 
collectPerIntervalStats(OutputCollector<Text, Text> output, long start, long finish, String taskType, StatSeries ... stats)802     private void collectPerIntervalStats(OutputCollector<Text, Text> output,
803         long start, long finish, String taskType,
804         StatSeries ... stats) throws IOException {
805       long curInterval = (start / DEFAULT_TIME_INTERVAL_MSEC)
806                                 * DEFAULT_TIME_INTERVAL_MSEC;
807       long curTime = start;
808       long accumTime = 0;
809       while(curTime < finish) {
810         // how much of the task time belonged to current interval
811         long nextInterval = curInterval + DEFAULT_TIME_INTERVAL_MSEC;
812         long intervalTime = ((finish < nextInterval) ?
813             finish : nextInterval) - curTime;
814         IntervalKey key = new IntervalKey("", curInterval, taskType);
815         Text val = new Text(String.valueOf(intervalTime));
816         for(StatSeries statName : stats) {
817           key.setStatName(statName.toString());
818           output.collect(key.getTextKey(), val);
819         }
820 
821         curTime = curInterval = nextInterval;
822         accumTime += intervalTime;
823       }
824       // For the pending stat speculative attempts may intersect.
825       // Only one of them is considered pending.
826       assert accumTime == finish - start || finish < start;
827     }
828 
829     private void collectJobStats(JobHistoryLog jh,
830                                         OutputCollector<Text, Text> output,
831                                         Reporter reporter
832                                         ) throws IOException {
833       if(jh == null)
834         return;
835       if(jh.tasks == null)
836         return;
837       if(jh.SUBMIT_TIME <= 0)
838         throw new IOException("Job " + jh.JOBID
839                             + " SUBMIT_TIME = " + jh.SUBMIT_TIME);
840       if(usersIncluded != null && !usersIncluded.contains(jh.USER))
841           return;
842       if(usersExcluded != null && usersExcluded.contains(jh.USER))
843           return;
844 
845       int numAttempts = 0;
846       long totalTime = 0;
847       boolean jobSuccess = jh.isSuccessful();
848       long jobWaitTime = jh.LAUNCH_TIME - jh.SUBMIT_TIME;
849       // attemptSubmitTime is the job's SUBMIT_TIME,
850       // or the previous attempt FINISH_TIME for all subsequent attempts
851       for(TaskHistoryLog th : jh.tasks.values()) {
852         if(th.attempts == null)
853           continue;
854         // Task is successful iff both the task and the job are a "SUCCESS"
855         long attemptSubmitTime = jh.LAUNCH_TIME;
856         boolean taskSuccess = jobSuccess && th.isSuccessful();
857         for(TaskAttemptHistoryLog tah : th.attempts.values()) {
858           // Task attempt is considered successful iff all three statuses
859           // of the attempt, the task, and the job equal "SUCCESS"
860           boolean success = taskSuccess && tah.isSuccessful();
861           if(tah.START_TIME == 0) {
862             LOG.error("Start time 0 for task attempt " + tah.TASK_ATTEMPT_ID);
863             continue;
864           }
865           if(tah.FINISH_TIME < tah.START_TIME) {
866             LOG.error("Finish time " + tah.FINISH_TIME + " is less than " +
867             		"Start time " + tah.START_TIME + " for task attempt " +
868             		tah.TASK_ATTEMPT_ID);
869             tah.FINISH_TIME = tah.START_TIME;
870           }
871 
872           if(!"MAP".equals(th.TASK_TYPE) && !"REDUCE".equals(th.TASK_TYPE) &&
873              !"CLEANUP".equals(th.TASK_TYPE) && !"SETUP".equals(th.TASK_TYPE)) {
874             LOG.error("Unexpected TASK_TYPE = " + th.TASK_TYPE
875             + " for attempt " + tah.TASK_ATTEMPT_ID);
876           }
877 
878           collectPerIntervalStats(output,
879                   attemptSubmitTime, tah.START_TIME, th.TASK_TYPE,
880                   StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME);
881           collectPerIntervalStats(output,
882                   attemptSubmitTime - jobWaitTime, tah.START_TIME, th.TASK_TYPE,
883                   StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME);
884           if(success)
885             collectPerIntervalStats(output,
886                   tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE,
887                   StatSeries.STAT_ALL_SLOT_TIME);
888           else
889             collectPerIntervalStats(output,
890                   tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE,
891                   StatSeries.STAT_ALL_SLOT_TIME,
892                   StatSeries.STAT_FAILED_SLOT_TIME);
893           totalTime += (tah.FINISH_TIME - tah.START_TIME);
894           numAttempts++;
895           if(numAttempts % 500 == 0) {
896             reporter.setStatus("Processing " + jh.JOBID + " at " + numAttempts);
897           }
898           attemptSubmitTime = tah.FINISH_TIME;
899         }
900       }
901       LOG.info("Total    Maps = " + jh.TOTAL_MAPS
902           + "  Reduces = " + jh.TOTAL_REDUCES);
903       LOG.info("Finished Maps = " + jh.FINISHED_MAPS
904           + "  Reduces = " + jh.FINISHED_REDUCES);
905       LOG.info("numAttempts = " + numAttempts);
906       LOG.info("totalTime   = " + totalTime);
907       LOG.info("averageAttemptTime = "
908           + (numAttempts==0 ? 0 : totalTime/numAttempts));
909       LOG.info("jobTotalTime = " + (jh.FINISH_TIME <= jh.SUBMIT_TIME? 0 :
910                                     jh.FINISH_TIME - jh.SUBMIT_TIME));
911     }
912   }
913 
914   public static class JHLAPartitioner implements Partitioner<Text, Text> {
915     static final int NUM_REDUCERS = 9;
916 
917     public void configure(JobConf conf) {}
918 
919     public int getPartition(Text key, Text value, int numPartitions) {
920       IntervalKey intKey = new IntervalKey(key.toString());
921       if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) {
922         if(intKey.taskType.equals("MAP"))
923           return 0;
924         else if(intKey.taskType.equals("REDUCE"))
925           return 1;
926       } else if(intKey.statName.equals(
927           StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) {
928         if(intKey.taskType.equals("MAP"))
929           return 2;
930         else if(intKey.taskType.equals("REDUCE"))
931           return 3;
932       } else if(intKey.statName.equals(
933           StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) {
934         if(intKey.taskType.equals("MAP"))
935           return 4;
936         else if(intKey.taskType.equals("REDUCE"))
937           return 5;
938       } else if(intKey.statName.equals(
939           StatSeries.STAT_FAILED_SLOT_TIME.toString())) {
940         if(intKey.taskType.equals("MAP"))
941           return 6;
942         else if(intKey.taskType.equals("REDUCE"))
943           return 7;
944       }
945       return 8;
946     }
947   }
948 
949   private static void runJHLA(
950           Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
951           Path outputDir,
952           Configuration fsConfig) throws IOException {
953     JobConf job = new JobConf(fsConfig, JHLogAnalyzer.class);
954 
955     job.setPartitionerClass(JHLAPartitioner.class);
956 
957     FileInputFormat.setInputPaths(job, INPUT_DIR);
958     job.setInputFormat(SequenceFileInputFormat.class);
959 
960     job.setMapperClass(mapperClass);
961     job.setReducerClass(AccumulatingReducer.class);
962 
963     FileOutputFormat.setOutputPath(job, outputDir);
964     job.setOutputKeyClass(Text.class);
965     job.setOutputValueClass(Text.class);
966     job.setNumReduceTasks(JHLAPartitioner.NUM_REDUCERS);
967     JobClient.runJob(job);
968   }
969 
970   private static class LoggingCollector implements OutputCollector<Text, Text> {
971     public void collect(Text key, Text value) throws IOException {
972       LOG.info(key + " == " + value);
973     }
974   }
975 
976   /**
977    * Run job history log analyser.
978    */
979   public static void main(String[] args) {
980     Path resFileName = RESULT_FILE;
981     Configuration conf = new Configuration();
982 
983     try {
984       conf.setInt("test.io.file.buffer.size", 0);
985       Path historyDir = DEFAULT_HISTORY_DIR;
986       String testFile = null;
987       boolean cleanup = false;
988 
989       boolean initControlFiles = true;
990       for (int i = 0; i < args.length; i++) {       // parse command line
991         if (args[i].equalsIgnoreCase("-historyDir")) {
992           historyDir = new Path(args[++i]);
993         } else if (args[i].equalsIgnoreCase("-resFile")) {
994           resFileName = new Path(args[++i]);
995         } else if (args[i].equalsIgnoreCase("-usersIncluded")) {
996           conf.set("jhla.users.included", args[++i]);
997         } else if (args[i].equalsIgnoreCase("-usersExcluded")) {
998           conf.set("jhla.users.excluded", args[++i]);
999         } else if (args[i].equalsIgnoreCase("-gzip")) {
1000           conf.set("jhla.compression.class", GzipCodec.class.getCanonicalName());
1001         } else if (args[i].equalsIgnoreCase("-jobDelimiter")) {
1002           conf.set("jhla.job.delimiter.pattern", args[++i]);
1003         } else if (args[i].equalsIgnoreCase("-jobDelimiterLength")) {
1004           conf.setInt("jhla.job.delimiter.length", Integer.parseInt(args[++i]));
1005         } else if(args[i].equalsIgnoreCase("-noInit")) {
1006           initControlFiles = false;
1007         } else if(args[i].equalsIgnoreCase("-test")) {
1008           testFile = args[++i];
1009         } else if(args[i].equalsIgnoreCase("-clean")) {
1010           cleanup = true;
1011         } else if(args[i].equalsIgnoreCase("-jobQueue")) {
1012           conf.set("mapred.job.queue.name", args[++i]);
1013         } else if(args[i].startsWith("-Xmx")) {
1014           conf.set("mapred.child.java.opts", args[i]);
1015         } else {
1016           printUsage();
1017         }
1018       }
1019 
1020       if(cleanup) {
1021         cleanup(conf);
1022         return;
1023       }
1024       if(testFile != null) {
1025         LOG.info("Start JHLA test ============ ");
1026         LocalFileSystem lfs = FileSystem.getLocal(conf);
1027         conf.set("fs.defaultFS", "file:///");
1028         JHLAMapper map = new JHLAMapper(conf);
1029         map.parseLogFile(lfs, new Path(testFile), 0L,
1030                          new LoggingCollector(), Reporter.NULL);
1031         return;
1032       }
1033 
1034       FileSystem fs = FileSystem.get(conf);
1035       if(initControlFiles)
1036         createControlFile(fs, historyDir);
1037       long tStart = System.currentTimeMillis();
1038       runJHLA(JHLAMapper.class, OUTPUT_DIR, conf);
1039       long execTime = System.currentTimeMillis() - tStart;
1040 
1041       analyzeResult(fs, 0, execTime, resFileName);
1042     } catch(IOException e) {
1043       System.err.print(StringUtils.stringifyException(e));
1044       System.exit(-1);
1045     }
1046   }
1047 
1048 
1049   private static void printUsage() {
1050     String className = JHLogAnalyzer.class.getSimpleName();
1051     System.err.println("Usage: " + className
1052       + "\n\t[-historyDir inputDir] | [-resFile resultFile] |"
1053       + "\n\t[-usersIncluded | -usersExcluded userList] |"
1054       + "\n\t[-gzip] | [-jobDelimiter pattern] |"
1055       + "\n\t[-help | -clean | -test testFile]");
1056     System.exit(-1);
1057   }
1058 
1059   private static Collection<String> getUserList(String users) {
1060     if(users == null)
1061       return null;
1062     StringTokenizer tokens = new StringTokenizer(users, ",;");
1063     Collection<String> userList = new ArrayList<String>(tokens.countTokens());
1064     while(tokens.hasMoreTokens())
1065       userList.add(tokens.nextToken());
1066     return userList;
1067   }
1068 
1069   /**
1070    * Result is combined from all reduce output files and is written to
1071    * RESULT_FILE in the format
1072    * column 1:
1073    */
1074   private static void analyzeResult( FileSystem fs,
1075                                      int testType,
1076                                      long execTime,
1077                                      Path resFileName
1078                                      ) throws IOException {
1079     LOG.info("Analyzing results ...");
1080     DataOutputStream out = null;
1081     BufferedWriter writer = null;
1082     try {
1083       out = new DataOutputStream(fs.create(resFileName));
1084       writer = new BufferedWriter(new OutputStreamWriter(out));
1085       writer.write("SERIES\tPERIOD\tTYPE\tSLOT_HOUR\n");
1086       FileStatus[] reduceFiles = fs.listStatus(OUTPUT_DIR);
1087       assert reduceFiles.length == JHLAPartitioner.NUM_REDUCERS;
1088       for(int i = 0; i < JHLAPartitioner.NUM_REDUCERS; i++) {
1089         DataInputStream in = null;
1090         BufferedReader lines = null;
1091         try {
1092           in = fs.open(reduceFiles[i].getPath());
1093           lines = new BufferedReader(new InputStreamReader(in));
1094 
1095           String line;
1096           while((line = lines.readLine()) != null) {
1097             StringTokenizer tokens = new StringTokenizer(line, "\t*");
1098             String attr = tokens.nextToken();
1099             String dateTime = tokens.nextToken();
1100             String taskType = tokens.nextToken();
1101             double val = Long.parseLong(tokens.nextToken()) /
1102                                     (double)DEFAULT_TIME_INTERVAL_MSEC;
1103             writer.write(attr.substring(2));  // skip the stat type "l:"
1104             writer.write("\t");
1105             writer.write(dateTime);
1106             writer.write("\t");
1107             writer.write(taskType);
1108             writer.write("\t");
1109             writer.write(String.valueOf((float)val));
1110             writer.newLine();
1111           }
1112         } finally {
1113           if(lines != null) lines.close();
1114           if(in != null) in.close();
1115         }
1116       }
1117     } finally {
1118       if(writer != null) writer.close();
1119       if(out != null) out.close();
1120     }
1121     LOG.info("Analyzing results ... done.");
1122   }
1123 
1124   private static void cleanup(Configuration conf) throws IOException {
1125     LOG.info("Cleaning up test files");
1126     FileSystem fs = FileSystem.get(conf);
1127     fs.delete(new Path(JHLA_ROOT_DIR), true);
1128   }
1129 }
1130