1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs; 20 21 import java.io.BufferedReader; 22 import java.io.BufferedWriter; 23 import java.io.DataInputStream; 24 import java.io.DataOutputStream; 25 import java.io.IOException; 26 import java.io.InputStream; 27 import java.io.InputStreamReader; 28 import java.io.OutputStreamWriter; 29 import java.text.SimpleDateFormat; 30 import java.util.ArrayList; 31 import java.util.Collection; 32 import java.util.Date; 33 import java.util.Map; 34 import java.util.StringTokenizer; 35 import java.util.HashMap; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.hadoop.conf.Configuration; 40 import org.apache.hadoop.io.LongWritable; 41 import org.apache.hadoop.io.SequenceFile; 42 import org.apache.hadoop.io.Text; 43 import org.apache.hadoop.io.SequenceFile.CompressionType; 44 import org.apache.hadoop.io.compress.CompressionCodec; 45 import org.apache.hadoop.io.compress.GzipCodec; 46 import org.apache.hadoop.mapred.*; 47 import org.apache.hadoop.util.ReflectionUtils; 48 import org.apache.hadoop.util.StringUtils; 49 50 /** 51 * Job History Log Analyzer. 52 * 53 * <h3>Description.</h3> 54 * This a tool for parsing and analyzing history logs of map-reduce jobs. 55 * History logs contain information about execution of jobs, tasks, and 56 * attempts. This tool focuses on submission, launch, start, and finish times, 57 * as well as the success or failure of jobs, tasks, and attempts. 58 * <p> 59 * The analyzer calculates <em>per hour slot utilization</em> for the cluster 60 * as follows. 61 * For each task attempt it divides the time segment from the start of the 62 * attempt t<sub>S</sub> to the finish t<sub>F</sub> into whole hours 63 * [t<sub>0</sub>, ..., t<sub>n</sub>], where t<sub>0</sub> <= t<sub>S</sub> 64 * is the maximal whole hour preceding t<sub>S</sub>, and 65 * t<sub>n</sub> >= t<sub>F</sub> is the minimal whole hour after t<sub>F</sub>. 66 * Thus, [t<sub>0</sub>, ..., t<sub>n</sub>] covers the segment 67 * [t<sub>S</sub>, t<sub>F</sub>], during which the attempt was executed. 68 * Each interval [t<sub>i</sub>, t<sub>i+1</sub>] fully contained in 69 * [t<sub>S</sub>, t<sub>F</sub>] corresponds to exactly one slot on 70 * a map-reduce cluster (usually MAP-slot or REDUCE-slot). 71 * If interval [t<sub>i</sub>, t<sub>i+1</sub>] only intersects with 72 * [t<sub>S</sub>, t<sub>F</sub>] then we say that the task 73 * attempt used just a fraction of the slot during this hour. 74 * The fraction equals the size of the intersection. 75 * Let slotTime(A, h) denote the number of slots calculated that way for a 76 * specific attempt A during hour h. 77 * The tool then sums all slots for all attempts for every hour. 78 * The result is the slot hour utilization of the cluster: 79 * <tt>slotTime(h) = SUM<sub>A</sub> slotTime(A,h)</tt>. 80 * <p> 81 * Log analyzer calculates slot hours for <em>MAP</em> and <em>REDUCE</em> 82 * attempts separately. 83 * <p> 84 * Log analyzer distinguishes between <em>successful</em> and <em>failed</em> 85 * attempts. Task attempt is considered successful if its own status is SUCCESS 86 * and the statuses of the task and the job it is a part of are also SUCCESS. 87 * Otherwise the task attempt is considered failed. 88 * <p> 89 * Map-reduce clusters are usually configured to have a fixed number of MAP 90 * and REDUCE slots per node. Thus the maximal possible number of slots on 91 * the cluster is <tt>total_slots = total_nodes * slots_per_node</tt>. 92 * Effective slot hour cannot exceed <tt>total_slots</tt> for successful 93 * attempts. 94 * <p> 95 * <em>Pending time</em> characterizes the wait time of attempts. 96 * It is calculated similarly to the slot hour except that the wait interval 97 * starts when the job is submitted and ends when an attempt starts execution. 98 * In addition to that pending time also includes intervals between attempts 99 * of the same task if it was re-executed. 100 * <p> 101 * History log analyzer calculates two pending time variations. First is based 102 * on job submission time as described above, second, starts the wait interval 103 * when the job is launched rather than submitted. 104 * 105 * <h3>Input.</h3> 106 * The following input parameters can be specified in the argument string 107 * to the job log analyzer: 108 * <ul> 109 * <li><tt>-historyDir inputDir</tt> specifies the location of the directory 110 * where analyzer will be looking for job history log files.</li> 111 * <li><tt>-resFile resultFile</tt> the name of the result file.</li> 112 * <li><tt>-usersIncluded | -usersExcluded userList</tt> slot utilization and 113 * pending time can be calculated for all or for all but the specified users. 114 * <br> 115 * <tt>userList</tt> is a comma or semicolon separated list of users.</li> 116 * <li><tt>-gzip</tt> is used if history log files are compressed. 117 * Only {@link GzipCodec} is currently supported.</li> 118 * <li><tt>-jobDelimiter pattern</tt> one can concatenate original log files into 119 * larger file(s) with the specified delimiter to recognize the end of the log 120 * for one job from the next one.<br> 121 * <tt>pattern</tt> is a java regular expression 122 * {@link java.util.regex.Pattern}, which should match only the log delimiters. 123 * <br> 124 * E.g. pattern <tt>".!!FILE=.*!!"</tt> matches delimiters, which contain 125 * the original history log file names in the following form:<br> 126 * <tt>"$!!FILE=my.job.tracker.com_myJobId_user_wordcount.log!!"</tt></li> 127 * <li><tt>-clean</tt> cleans up default directories used by the analyzer.</li> 128 * <li><tt>-test</tt> test one file locally and exit; 129 * does not require map-reduce.</li> 130 * <li><tt>-help</tt> print usage.</li> 131 * </ul> 132 * 133 * <h3>Output.</h3> 134 * The output file is formatted as a tab separated table consisting of four 135 * columns: <tt>SERIES, PERIOD, TYPE, SLOT_HOUR</tt>. 136 * <ul> 137 * <li><tt>SERIES</tt> one of the four statistical series;</li> 138 * <li><tt>PERIOD</tt> the start of the time interval in the following format: 139 * <tt>"yyyy-mm-dd hh:mm:ss"</tt>;</li> 140 * <li><tt>TYPE</tt> the slot type, e.g. MAP or REDUCE;</li> 141 * <li><tt>SLOT_HOUR</tt> the value of the slot usage during this 142 * time interval.</li> 143 * </ul> 144 */ 145 @SuppressWarnings("deprecation") 146 public class JHLogAnalyzer { 147 private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class); 148 // Constants 149 private static final String JHLA_ROOT_DIR = 150 System.getProperty("test.build.data", "stats/JHLA"); 151 private static final Path INPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_input"); 152 private static final String BASE_INPUT_FILE_NAME = "jhla_in_"; 153 private static final Path OUTPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_output"); 154 private static final Path RESULT_FILE = 155 new Path(JHLA_ROOT_DIR, "jhla_result.txt"); 156 private static final Path DEFAULT_HISTORY_DIR = new Path("history"); 157 158 private static final int DEFAULT_TIME_INTERVAL_MSEC = 1000*60*60; // 1 hour 159 160 static{ 161 Configuration.addDefaultResource("hdfs-default.xml"); 162 Configuration.addDefaultResource("hdfs-site.xml"); 163 } 164 165 static enum StatSeries { 166 STAT_ALL_SLOT_TIME 167 (AccumulatingReducer.VALUE_TYPE_LONG + "allSlotTime"), 168 STAT_FAILED_SLOT_TIME 169 (AccumulatingReducer.VALUE_TYPE_LONG + "failedSlotTime"), 170 STAT_SUBMIT_PENDING_SLOT_TIME 171 (AccumulatingReducer.VALUE_TYPE_LONG + "submitPendingSlotTime"), 172 STAT_LAUNCHED_PENDING_SLOT_TIME 173 (AccumulatingReducer.VALUE_TYPE_LONG + "launchedPendingSlotTime"); 174 175 private String statName = null; StatSeries(String name)176 private StatSeries(String name) {this.statName = name;} toString()177 public String toString() {return statName;} 178 } 179 180 private static class FileCreateDaemon extends Thread { 181 private static final int NUM_CREATE_THREADS = 10; 182 private static volatile int numFinishedThreads; 183 private static volatile int numRunningThreads; 184 private static FileStatus[] jhLogFiles; 185 186 FileSystem fs; 187 int start; 188 int end; 189 FileCreateDaemon(FileSystem fs, int start, int end)190 FileCreateDaemon(FileSystem fs, int start, int end) { 191 this.fs = fs; 192 this.start = start; 193 this.end = end; 194 } 195 run()196 public void run() { 197 try { 198 for(int i=start; i < end; i++) { 199 String name = getFileName(i); 200 Path controlFile = new Path(INPUT_DIR, "in_file_" + name); 201 SequenceFile.Writer writer = null; 202 try { 203 writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile, 204 Text.class, LongWritable.class, 205 CompressionType.NONE); 206 String logFile = jhLogFiles[i].getPath().toString(); 207 writer.append(new Text(logFile), new LongWritable(0)); 208 } catch(Exception e) { 209 throw new IOException(e); 210 } finally { 211 if (writer != null) 212 writer.close(); 213 writer = null; 214 } 215 } 216 } catch(IOException ex) { 217 LOG.error("FileCreateDaemon failed.", ex); 218 } 219 numFinishedThreads++; 220 } 221 createControlFile(FileSystem fs, Path jhLogDir )222 private static void createControlFile(FileSystem fs, Path jhLogDir 223 ) throws IOException { 224 fs.delete(INPUT_DIR, true); 225 jhLogFiles = fs.listStatus(jhLogDir); 226 227 numFinishedThreads = 0; 228 try { 229 int start = 0; 230 int step = jhLogFiles.length / NUM_CREATE_THREADS 231 + ((jhLogFiles.length % NUM_CREATE_THREADS) > 0 ? 1 : 0); 232 FileCreateDaemon[] daemons = new FileCreateDaemon[NUM_CREATE_THREADS]; 233 numRunningThreads = 0; 234 for(int tIdx=0; tIdx < NUM_CREATE_THREADS && start < jhLogFiles.length; tIdx++) { 235 int end = Math.min(start + step, jhLogFiles.length); 236 daemons[tIdx] = new FileCreateDaemon(fs, start, end); 237 start += step; 238 numRunningThreads++; 239 } 240 for(int tIdx=0; tIdx < numRunningThreads; tIdx++) { 241 daemons[tIdx].start(); 242 } 243 } finally { 244 int prevValue = 0; 245 while(numFinishedThreads < numRunningThreads) { 246 if(prevValue < numFinishedThreads) { 247 LOG.info("Finished " + numFinishedThreads + " threads out of " + numRunningThreads); 248 prevValue = numFinishedThreads; 249 } 250 try {Thread.sleep(500);} catch (InterruptedException e) {} 251 } 252 } 253 } 254 } 255 createControlFile(FileSystem fs, Path jhLogDir )256 private static void createControlFile(FileSystem fs, Path jhLogDir 257 ) throws IOException { 258 LOG.info("creating control file: JH log dir = " + jhLogDir); 259 FileCreateDaemon.createControlFile(fs, jhLogDir); 260 LOG.info("created control file: JH log dir = " + jhLogDir); 261 } 262 getFileName(int fIdx)263 private static String getFileName(int fIdx) { 264 return BASE_INPUT_FILE_NAME + Integer.toString(fIdx); 265 } 266 267 /** 268 * If keyVal is of the form KEY="VALUE", then this will return [KEY, VALUE] 269 */ getKeyValue(String t)270 private static String [] getKeyValue(String t) throws IOException { 271 String[] keyVal = t.split("=\"*|\""); 272 return keyVal; 273 } 274 275 /** 276 * JobHistory log record. 277 */ 278 private static class JobHistoryLog { 279 String JOBID; 280 String JOB_STATUS; 281 long SUBMIT_TIME; 282 long LAUNCH_TIME; 283 long FINISH_TIME; 284 long TOTAL_MAPS; 285 long TOTAL_REDUCES; 286 long FINISHED_MAPS; 287 long FINISHED_REDUCES; 288 String USER; 289 Map<String, TaskHistoryLog> tasks; 290 isSuccessful()291 boolean isSuccessful() { 292 return (JOB_STATUS != null) && JOB_STATUS.equals("SUCCESS"); 293 } 294 parseLine(String line)295 void parseLine(String line) throws IOException { 296 StringTokenizer tokens = new StringTokenizer(line); 297 if(!tokens.hasMoreTokens()) 298 return; 299 String what = tokens.nextToken(); 300 // Line should start with one of the following: 301 // Job, Task, MapAttempt, ReduceAttempt 302 if(what.equals("Job")) 303 updateJob(tokens); 304 else if(what.equals("Task")) 305 updateTask(tokens); 306 else if(what.indexOf("Attempt") >= 0) 307 updateTaskAttempt(tokens); 308 } 309 updateJob(StringTokenizer tokens)310 private void updateJob(StringTokenizer tokens) throws IOException { 311 while(tokens.hasMoreTokens()) { 312 String t = tokens.nextToken(); 313 String[] keyVal = getKeyValue(t); 314 if(keyVal.length < 2) continue; 315 316 if(keyVal[0].equals("JOBID")) { 317 if(JOBID == null) 318 JOBID = new String(keyVal[1]); 319 else if(!JOBID.equals(keyVal[1])) { 320 LOG.error("Incorrect JOBID: " 321 + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) 322 + " expect " + JOBID); 323 return; 324 } 325 } 326 else if(keyVal[0].equals("JOB_STATUS")) 327 JOB_STATUS = new String(keyVal[1]); 328 else if(keyVal[0].equals("SUBMIT_TIME")) 329 SUBMIT_TIME = Long.parseLong(keyVal[1]); 330 else if(keyVal[0].equals("LAUNCH_TIME")) 331 LAUNCH_TIME = Long.parseLong(keyVal[1]); 332 else if(keyVal[0].equals("FINISH_TIME")) 333 FINISH_TIME = Long.parseLong(keyVal[1]); 334 else if(keyVal[0].equals("TOTAL_MAPS")) 335 TOTAL_MAPS = Long.parseLong(keyVal[1]); 336 else if(keyVal[0].equals("TOTAL_REDUCES")) 337 TOTAL_REDUCES = Long.parseLong(keyVal[1]); 338 else if(keyVal[0].equals("FINISHED_MAPS")) 339 FINISHED_MAPS = Long.parseLong(keyVal[1]); 340 else if(keyVal[0].equals("FINISHED_REDUCES")) 341 FINISHED_REDUCES = Long.parseLong(keyVal[1]); 342 else if(keyVal[0].equals("USER")) 343 USER = new String(keyVal[1]); 344 } 345 } 346 updateTask(StringTokenizer tokens)347 private void updateTask(StringTokenizer tokens) throws IOException { 348 // unpack 349 TaskHistoryLog task = new TaskHistoryLog().parse(tokens); 350 if(task.TASKID == null) { 351 LOG.error("TASKID = NULL for job " + JOBID); 352 return; 353 } 354 // update or insert 355 if(tasks == null) 356 tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES)); 357 TaskHistoryLog existing = tasks.get(task.TASKID); 358 if(existing == null) 359 tasks.put(task.TASKID, task); 360 else 361 existing.updateWith(task); 362 } 363 updateTaskAttempt(StringTokenizer tokens)364 private void updateTaskAttempt(StringTokenizer tokens) throws IOException { 365 // unpack 366 TaskAttemptHistoryLog attempt = new TaskAttemptHistoryLog(); 367 String taskID = attempt.parse(tokens); 368 if(taskID == null) return; 369 if(tasks == null) 370 tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES)); 371 TaskHistoryLog existing = tasks.get(taskID); 372 if(existing == null) { 373 existing = new TaskHistoryLog(taskID); 374 tasks.put(taskID, existing); 375 } 376 existing.updateWith(attempt); 377 } 378 } 379 380 /** 381 * TaskHistory log record. 382 */ 383 private static class TaskHistoryLog { 384 String TASKID; 385 String TASK_TYPE; // MAP, REDUCE, SETUP, CLEANUP 386 String TASK_STATUS; 387 long START_TIME; 388 long FINISH_TIME; 389 Map<String, TaskAttemptHistoryLog> attempts; 390 TaskHistoryLog()391 TaskHistoryLog() {} 392 TaskHistoryLog(String taskID)393 TaskHistoryLog(String taskID) { 394 TASKID = taskID; 395 } 396 isSuccessful()397 boolean isSuccessful() { 398 return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS"); 399 } 400 parse(StringTokenizer tokens)401 TaskHistoryLog parse(StringTokenizer tokens) throws IOException { 402 while(tokens.hasMoreTokens()) { 403 String t = tokens.nextToken(); 404 String[] keyVal = getKeyValue(t); 405 if(keyVal.length < 2) continue; 406 407 if(keyVal[0].equals("TASKID")) { 408 if(TASKID == null) 409 TASKID = new String(keyVal[1]); 410 else if(!TASKID.equals(keyVal[1])) { 411 LOG.error("Incorrect TASKID: " 412 + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) 413 + " expect " + TASKID); 414 continue; 415 } 416 } 417 else if(keyVal[0].equals("TASK_TYPE")) 418 TASK_TYPE = new String(keyVal[1]); 419 else if(keyVal[0].equals("TASK_STATUS")) 420 TASK_STATUS = new String(keyVal[1]); 421 else if(keyVal[0].equals("START_TIME")) 422 START_TIME = Long.parseLong(keyVal[1]); 423 else if(keyVal[0].equals("FINISH_TIME")) 424 FINISH_TIME = Long.parseLong(keyVal[1]); 425 } 426 return this; 427 } 428 429 /** 430 * Update with non-null fields of the same task log record. 431 */ updateWith(TaskHistoryLog from)432 void updateWith(TaskHistoryLog from) throws IOException { 433 if(TASKID == null) 434 TASKID = from.TASKID; 435 else if(!TASKID.equals(from.TASKID)) { 436 throw new IOException("Incorrect TASKID: " + from.TASKID 437 + " expect " + TASKID); 438 } 439 if(TASK_TYPE == null) 440 TASK_TYPE = from.TASK_TYPE; 441 else if(! TASK_TYPE.equals(from.TASK_TYPE)) { 442 LOG.error( 443 "Incorrect TASK_TYPE: " + from.TASK_TYPE + " expect " + TASK_TYPE 444 + " for task " + TASKID); 445 return; 446 } 447 if(from.TASK_STATUS != null) 448 TASK_STATUS = from.TASK_STATUS; 449 if(from.START_TIME > 0) 450 START_TIME = from.START_TIME; 451 if(from.FINISH_TIME > 0) 452 FINISH_TIME = from.FINISH_TIME; 453 } 454 455 /** 456 * Update with non-null fields of the task attempt log record. 457 */ updateWith(TaskAttemptHistoryLog attempt)458 void updateWith(TaskAttemptHistoryLog attempt) throws IOException { 459 if(attempt.TASK_ATTEMPT_ID == null) { 460 LOG.error("Unexpected TASK_ATTEMPT_ID = null for task " + TASKID); 461 return; 462 } 463 if(attempts == null) 464 attempts = new HashMap<String, TaskAttemptHistoryLog>(); 465 TaskAttemptHistoryLog existing = attempts.get(attempt.TASK_ATTEMPT_ID); 466 if(existing == null) 467 attempts.put(attempt.TASK_ATTEMPT_ID, attempt); 468 else 469 existing.updateWith(attempt); 470 // update task start time 471 if(attempt.START_TIME > 0 && 472 (this.START_TIME == 0 || this.START_TIME > attempt.START_TIME)) 473 START_TIME = attempt.START_TIME; 474 } 475 } 476 477 /** 478 * TaskAttemptHistory log record. 479 */ 480 private static class TaskAttemptHistoryLog { 481 String TASK_ATTEMPT_ID; 482 String TASK_STATUS; // this task attempt status 483 long START_TIME; 484 long FINISH_TIME; 485 long HDFS_BYTES_READ; 486 long HDFS_BYTES_WRITTEN; 487 long FILE_BYTES_READ; 488 long FILE_BYTES_WRITTEN; 489 490 /** 491 * Task attempt is considered successful iff all three statuses 492 * of the attempt, the task, and the job equal "SUCCESS". 493 */ isSuccessful()494 boolean isSuccessful() { 495 return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS"); 496 } 497 parse(StringTokenizer tokens)498 String parse(StringTokenizer tokens) throws IOException { 499 String taskID = null; 500 while(tokens.hasMoreTokens()) { 501 String t = tokens.nextToken(); 502 String[] keyVal = getKeyValue(t); 503 if(keyVal.length < 2) continue; 504 505 if(keyVal[0].equals("TASKID")) { 506 if(taskID == null) 507 taskID = new String(keyVal[1]); 508 else if(!taskID.equals(keyVal[1])) { 509 LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID); 510 continue; 511 } 512 } 513 else if(keyVal[0].equals("TASK_ATTEMPT_ID")) { 514 if(TASK_ATTEMPT_ID == null) 515 TASK_ATTEMPT_ID = new String(keyVal[1]); 516 else if(!TASK_ATTEMPT_ID.equals(keyVal[1])) { 517 LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID); 518 continue; 519 } 520 } 521 else if(keyVal[0].equals("TASK_STATUS")) 522 TASK_STATUS = new String(keyVal[1]); 523 else if(keyVal[0].equals("START_TIME")) 524 START_TIME = Long.parseLong(keyVal[1]); 525 else if(keyVal[0].equals("FINISH_TIME")) 526 FINISH_TIME = Long.parseLong(keyVal[1]); 527 } 528 return taskID; 529 } 530 531 /** 532 * Update with non-null fields of the same task attempt log record. 533 */ updateWith(TaskAttemptHistoryLog from)534 void updateWith(TaskAttemptHistoryLog from) throws IOException { 535 if(TASK_ATTEMPT_ID == null) 536 TASK_ATTEMPT_ID = from.TASK_ATTEMPT_ID; 537 else if(! TASK_ATTEMPT_ID.equals(from.TASK_ATTEMPT_ID)) { 538 throw new IOException( 539 "Incorrect TASK_ATTEMPT_ID: " + from.TASK_ATTEMPT_ID 540 + " expect " + TASK_ATTEMPT_ID); 541 } 542 if(from.TASK_STATUS != null) 543 TASK_STATUS = from.TASK_STATUS; 544 if(from.START_TIME > 0) 545 START_TIME = from.START_TIME; 546 if(from.FINISH_TIME > 0) 547 FINISH_TIME = from.FINISH_TIME; 548 if(from.HDFS_BYTES_READ > 0) 549 HDFS_BYTES_READ = from.HDFS_BYTES_READ; 550 if(from.HDFS_BYTES_WRITTEN > 0) 551 HDFS_BYTES_WRITTEN = from.HDFS_BYTES_WRITTEN; 552 if(from.FILE_BYTES_READ > 0) 553 FILE_BYTES_READ = from.FILE_BYTES_READ; 554 if(from.FILE_BYTES_WRITTEN > 0) 555 FILE_BYTES_WRITTEN = from.FILE_BYTES_WRITTEN; 556 } 557 } 558 559 /** 560 * Key = statName*date-time*taskType 561 * Value = number of msec for the our 562 */ 563 private static class IntervalKey { 564 static final String KEY_FIELD_DELIMITER = "*"; 565 String statName; 566 String dateTime; 567 String taskType; 568 IntervalKey(String stat, long timeMSec, String taskType)569 IntervalKey(String stat, long timeMSec, String taskType) { 570 statName = stat; 571 SimpleDateFormat dateF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 572 dateTime = dateF.format(new Date(timeMSec)); 573 this.taskType = taskType; 574 } 575 IntervalKey(String key)576 IntervalKey(String key) { 577 StringTokenizer keyTokens = new StringTokenizer(key, KEY_FIELD_DELIMITER); 578 if(!keyTokens.hasMoreTokens()) return; 579 statName = keyTokens.nextToken(); 580 if(!keyTokens.hasMoreTokens()) return; 581 dateTime = keyTokens.nextToken(); 582 if(!keyTokens.hasMoreTokens()) return; 583 taskType = keyTokens.nextToken(); 584 } 585 setStatName(String stat)586 void setStatName(String stat) { 587 statName = stat; 588 } 589 getStringKey()590 String getStringKey() { 591 return statName + KEY_FIELD_DELIMITER + 592 dateTime + KEY_FIELD_DELIMITER + 593 taskType; 594 } 595 getTextKey()596 Text getTextKey() { 597 return new Text(getStringKey()); 598 } 599 toString()600 public String toString() { 601 return getStringKey(); 602 } 603 } 604 605 /** 606 * Mapper class. 607 */ 608 private static class JHLAMapper extends IOMapperBase<Object> { 609 /** 610 * A line pattern, which delimits history logs of different jobs, 611 * if multiple job logs are written in the same file. 612 * Null value means only one job log per file is expected. 613 * The pattern should be a regular expression as in 614 * {@link String#matches(String)}. 615 */ 616 String jobDelimiterPattern; 617 int maxJobDelimiterLineLength; 618 /** Count only these users jobs */ 619 Collection<String> usersIncluded; 620 /** Exclude jobs of the following users */ 621 Collection<String> usersExcluded; 622 /** Type of compression for compressed files: gzip */ 623 Class<? extends CompressionCodec> compressionClass; 624 JHLAMapper()625 JHLAMapper() throws IOException { 626 } 627 JHLAMapper(Configuration conf)628 JHLAMapper(Configuration conf) throws IOException { 629 configure(new JobConf(conf)); 630 } 631 configure(JobConf conf)632 public void configure(JobConf conf) { 633 super.configure(conf ); 634 usersIncluded = getUserList(conf.get("jhla.users.included", null)); 635 usersExcluded = getUserList(conf.get("jhla.users.excluded", null)); 636 String zipClassName = conf.get("jhla.compression.class", null); 637 try { 638 compressionClass = (zipClassName == null) ? null : 639 Class.forName(zipClassName).asSubclass(CompressionCodec.class); 640 } catch(Exception e) { 641 throw new RuntimeException("Compression codec not found: ", e); 642 } 643 jobDelimiterPattern = conf.get("jhla.job.delimiter.pattern", null); 644 maxJobDelimiterLineLength = conf.getInt("jhla.job.delimiter.length", 512); 645 } 646 647 @Override map(Text key, LongWritable value, OutputCollector<Text, Text> output, Reporter reporter)648 public void map(Text key, 649 LongWritable value, 650 OutputCollector<Text, Text> output, 651 Reporter reporter) throws IOException { 652 String name = key.toString(); 653 long longValue = value.get(); 654 655 reporter.setStatus("starting " + name + " ::host = " + hostName); 656 657 long tStart = System.currentTimeMillis(); 658 parseLogFile(fs, new Path(name), longValue, output, reporter); 659 long tEnd = System.currentTimeMillis(); 660 long execTime = tEnd - tStart; 661 662 reporter.setStatus("finished " + name + " ::host = " + hostName + 663 " in " + execTime/1000 + " sec."); 664 } 665 doIO(Reporter reporter, String path, long offset )666 public Object doIO(Reporter reporter, 667 String path, // full path of history log file 668 long offset // starting offset within the file 669 ) throws IOException { 670 return null; 671 } 672 collectStats(OutputCollector<Text, Text> output, String name, long execTime, Object jobObjects)673 void collectStats(OutputCollector<Text, Text> output, 674 String name, 675 long execTime, 676 Object jobObjects) throws IOException { 677 } 678 isEndOfJobLog(String line)679 private boolean isEndOfJobLog(String line) { 680 if(jobDelimiterPattern == null) 681 return false; 682 return line.matches(jobDelimiterPattern); 683 } 684 685 /** 686 * Collect information about one job. 687 * 688 * @param fs - file system 689 * @param filePath - full path of a history log file 690 * @param offset - starting offset in the history log file 691 * @throws IOException 692 */ parseLogFile(FileSystem fs, Path filePath, long offset, OutputCollector<Text, Text> output, Reporter reporter )693 public void parseLogFile(FileSystem fs, 694 Path filePath, 695 long offset, 696 OutputCollector<Text, Text> output, 697 Reporter reporter 698 ) throws IOException { 699 InputStream in = null; 700 try { 701 // open file & seek 702 FSDataInputStream stm = fs.open(filePath); 703 stm.seek(offset); 704 in = stm; 705 LOG.info("Opened " + filePath); 706 reporter.setStatus("Opened " + filePath); 707 // get a compression filter if specified 708 if(compressionClass != null) { 709 CompressionCodec codec = (CompressionCodec) 710 ReflectionUtils.newInstance(compressionClass, new Configuration()); 711 in = codec.createInputStream(stm); 712 LOG.info("Codec created " + filePath); 713 reporter.setStatus("Codec created " + filePath); 714 } 715 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 716 LOG.info("Reader created " + filePath); 717 // skip to the next job log start 718 long processed = 0L; 719 if(jobDelimiterPattern != null) { 720 for(String line = reader.readLine(); 721 line != null; line = reader.readLine()) { 722 if((stm.getPos() - processed) > 100000) { 723 processed = stm.getPos(); 724 reporter.setStatus("Processing " + filePath + " at " + processed); 725 } 726 if(isEndOfJobLog(line)) 727 break; 728 } 729 } 730 // parse lines and update job history 731 JobHistoryLog jh = new JobHistoryLog(); 732 int jobLineCount = 0; 733 for(String line = readLine(reader); 734 line != null; line = readLine(reader)) { 735 jobLineCount++; 736 if((stm.getPos() - processed) > 20000) { 737 processed = stm.getPos(); 738 long numTasks = (jh.tasks == null ? 0 : jh.tasks.size()); 739 String txt = "Processing " + filePath + " at " + processed 740 + " # tasks = " + numTasks; 741 reporter.setStatus(txt); 742 LOG.info(txt); 743 } 744 if(isEndOfJobLog(line)) { 745 if(jh.JOBID != null) { 746 LOG.info("Finished parsing job: " + jh.JOBID 747 + " line count = " + jobLineCount); 748 collectJobStats(jh, output, reporter); 749 LOG.info("Collected stats for job: " + jh.JOBID); 750 } 751 jh = new JobHistoryLog(); 752 jobLineCount = 0; 753 } else 754 jh.parseLine(line); 755 } 756 if(jh.JOBID == null) { 757 LOG.error("JOBID = NULL in " + filePath + " at " + processed); 758 return; 759 } 760 collectJobStats(jh, output, reporter); 761 } catch(Exception ie) { 762 // parsing errors can happen if the file has been truncated 763 LOG.error("JHLAMapper.parseLogFile", ie); 764 reporter.setStatus("JHLAMapper.parseLogFile failed " 765 + StringUtils.stringifyException(ie)); 766 throw new IOException("Job failed.", ie); 767 } finally { 768 if(in != null) in.close(); 769 } 770 } 771 772 /** 773 * Read lines until one ends with a " ." or "\" " 774 */ 775 private StringBuffer resBuffer = new StringBuffer(); readLine(BufferedReader reader)776 private String readLine(BufferedReader reader) throws IOException { 777 resBuffer.setLength(0); 778 reader.mark(maxJobDelimiterLineLength); 779 for(String line = reader.readLine(); 780 line != null; line = reader.readLine()) { 781 if(isEndOfJobLog(line)) { 782 if(resBuffer.length() == 0) 783 resBuffer.append(line); 784 else 785 reader.reset(); 786 break; 787 } 788 if(resBuffer.length() == 0) 789 resBuffer.append(line); 790 else if(resBuffer.length() < 32000) 791 resBuffer.append(line); 792 if(line.endsWith(" .") || line.endsWith("\" ")) { 793 break; 794 } 795 reader.mark(maxJobDelimiterLineLength); 796 } 797 String result = resBuffer.length() == 0 ? null : resBuffer.toString(); 798 resBuffer.setLength(0); 799 return result; 800 } 801 collectPerIntervalStats(OutputCollector<Text, Text> output, long start, long finish, String taskType, StatSeries ... stats)802 private void collectPerIntervalStats(OutputCollector<Text, Text> output, 803 long start, long finish, String taskType, 804 StatSeries ... stats) throws IOException { 805 long curInterval = (start / DEFAULT_TIME_INTERVAL_MSEC) 806 * DEFAULT_TIME_INTERVAL_MSEC; 807 long curTime = start; 808 long accumTime = 0; 809 while(curTime < finish) { 810 // how much of the task time belonged to current interval 811 long nextInterval = curInterval + DEFAULT_TIME_INTERVAL_MSEC; 812 long intervalTime = ((finish < nextInterval) ? 813 finish : nextInterval) - curTime; 814 IntervalKey key = new IntervalKey("", curInterval, taskType); 815 Text val = new Text(String.valueOf(intervalTime)); 816 for(StatSeries statName : stats) { 817 key.setStatName(statName.toString()); 818 output.collect(key.getTextKey(), val); 819 } 820 821 curTime = curInterval = nextInterval; 822 accumTime += intervalTime; 823 } 824 // For the pending stat speculative attempts may intersect. 825 // Only one of them is considered pending. 826 assert accumTime == finish - start || finish < start; 827 } 828 829 private void collectJobStats(JobHistoryLog jh, 830 OutputCollector<Text, Text> output, 831 Reporter reporter 832 ) throws IOException { 833 if(jh == null) 834 return; 835 if(jh.tasks == null) 836 return; 837 if(jh.SUBMIT_TIME <= 0) 838 throw new IOException("Job " + jh.JOBID 839 + " SUBMIT_TIME = " + jh.SUBMIT_TIME); 840 if(usersIncluded != null && !usersIncluded.contains(jh.USER)) 841 return; 842 if(usersExcluded != null && usersExcluded.contains(jh.USER)) 843 return; 844 845 int numAttempts = 0; 846 long totalTime = 0; 847 boolean jobSuccess = jh.isSuccessful(); 848 long jobWaitTime = jh.LAUNCH_TIME - jh.SUBMIT_TIME; 849 // attemptSubmitTime is the job's SUBMIT_TIME, 850 // or the previous attempt FINISH_TIME for all subsequent attempts 851 for(TaskHistoryLog th : jh.tasks.values()) { 852 if(th.attempts == null) 853 continue; 854 // Task is successful iff both the task and the job are a "SUCCESS" 855 long attemptSubmitTime = jh.LAUNCH_TIME; 856 boolean taskSuccess = jobSuccess && th.isSuccessful(); 857 for(TaskAttemptHistoryLog tah : th.attempts.values()) { 858 // Task attempt is considered successful iff all three statuses 859 // of the attempt, the task, and the job equal "SUCCESS" 860 boolean success = taskSuccess && tah.isSuccessful(); 861 if(tah.START_TIME == 0) { 862 LOG.error("Start time 0 for task attempt " + tah.TASK_ATTEMPT_ID); 863 continue; 864 } 865 if(tah.FINISH_TIME < tah.START_TIME) { 866 LOG.error("Finish time " + tah.FINISH_TIME + " is less than " + 867 "Start time " + tah.START_TIME + " for task attempt " + 868 tah.TASK_ATTEMPT_ID); 869 tah.FINISH_TIME = tah.START_TIME; 870 } 871 872 if(!"MAP".equals(th.TASK_TYPE) && !"REDUCE".equals(th.TASK_TYPE) && 873 !"CLEANUP".equals(th.TASK_TYPE) && !"SETUP".equals(th.TASK_TYPE)) { 874 LOG.error("Unexpected TASK_TYPE = " + th.TASK_TYPE 875 + " for attempt " + tah.TASK_ATTEMPT_ID); 876 } 877 878 collectPerIntervalStats(output, 879 attemptSubmitTime, tah.START_TIME, th.TASK_TYPE, 880 StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME); 881 collectPerIntervalStats(output, 882 attemptSubmitTime - jobWaitTime, tah.START_TIME, th.TASK_TYPE, 883 StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME); 884 if(success) 885 collectPerIntervalStats(output, 886 tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE, 887 StatSeries.STAT_ALL_SLOT_TIME); 888 else 889 collectPerIntervalStats(output, 890 tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE, 891 StatSeries.STAT_ALL_SLOT_TIME, 892 StatSeries.STAT_FAILED_SLOT_TIME); 893 totalTime += (tah.FINISH_TIME - tah.START_TIME); 894 numAttempts++; 895 if(numAttempts % 500 == 0) { 896 reporter.setStatus("Processing " + jh.JOBID + " at " + numAttempts); 897 } 898 attemptSubmitTime = tah.FINISH_TIME; 899 } 900 } 901 LOG.info("Total Maps = " + jh.TOTAL_MAPS 902 + " Reduces = " + jh.TOTAL_REDUCES); 903 LOG.info("Finished Maps = " + jh.FINISHED_MAPS 904 + " Reduces = " + jh.FINISHED_REDUCES); 905 LOG.info("numAttempts = " + numAttempts); 906 LOG.info("totalTime = " + totalTime); 907 LOG.info("averageAttemptTime = " 908 + (numAttempts==0 ? 0 : totalTime/numAttempts)); 909 LOG.info("jobTotalTime = " + (jh.FINISH_TIME <= jh.SUBMIT_TIME? 0 : 910 jh.FINISH_TIME - jh.SUBMIT_TIME)); 911 } 912 } 913 914 public static class JHLAPartitioner implements Partitioner<Text, Text> { 915 static final int NUM_REDUCERS = 9; 916 917 public void configure(JobConf conf) {} 918 919 public int getPartition(Text key, Text value, int numPartitions) { 920 IntervalKey intKey = new IntervalKey(key.toString()); 921 if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) { 922 if(intKey.taskType.equals("MAP")) 923 return 0; 924 else if(intKey.taskType.equals("REDUCE")) 925 return 1; 926 } else if(intKey.statName.equals( 927 StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) { 928 if(intKey.taskType.equals("MAP")) 929 return 2; 930 else if(intKey.taskType.equals("REDUCE")) 931 return 3; 932 } else if(intKey.statName.equals( 933 StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) { 934 if(intKey.taskType.equals("MAP")) 935 return 4; 936 else if(intKey.taskType.equals("REDUCE")) 937 return 5; 938 } else if(intKey.statName.equals( 939 StatSeries.STAT_FAILED_SLOT_TIME.toString())) { 940 if(intKey.taskType.equals("MAP")) 941 return 6; 942 else if(intKey.taskType.equals("REDUCE")) 943 return 7; 944 } 945 return 8; 946 } 947 } 948 949 private static void runJHLA( 950 Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 951 Path outputDir, 952 Configuration fsConfig) throws IOException { 953 JobConf job = new JobConf(fsConfig, JHLogAnalyzer.class); 954 955 job.setPartitionerClass(JHLAPartitioner.class); 956 957 FileInputFormat.setInputPaths(job, INPUT_DIR); 958 job.setInputFormat(SequenceFileInputFormat.class); 959 960 job.setMapperClass(mapperClass); 961 job.setReducerClass(AccumulatingReducer.class); 962 963 FileOutputFormat.setOutputPath(job, outputDir); 964 job.setOutputKeyClass(Text.class); 965 job.setOutputValueClass(Text.class); 966 job.setNumReduceTasks(JHLAPartitioner.NUM_REDUCERS); 967 JobClient.runJob(job); 968 } 969 970 private static class LoggingCollector implements OutputCollector<Text, Text> { 971 public void collect(Text key, Text value) throws IOException { 972 LOG.info(key + " == " + value); 973 } 974 } 975 976 /** 977 * Run job history log analyser. 978 */ 979 public static void main(String[] args) { 980 Path resFileName = RESULT_FILE; 981 Configuration conf = new Configuration(); 982 983 try { 984 conf.setInt("test.io.file.buffer.size", 0); 985 Path historyDir = DEFAULT_HISTORY_DIR; 986 String testFile = null; 987 boolean cleanup = false; 988 989 boolean initControlFiles = true; 990 for (int i = 0; i < args.length; i++) { // parse command line 991 if (args[i].equalsIgnoreCase("-historyDir")) { 992 historyDir = new Path(args[++i]); 993 } else if (args[i].equalsIgnoreCase("-resFile")) { 994 resFileName = new Path(args[++i]); 995 } else if (args[i].equalsIgnoreCase("-usersIncluded")) { 996 conf.set("jhla.users.included", args[++i]); 997 } else if (args[i].equalsIgnoreCase("-usersExcluded")) { 998 conf.set("jhla.users.excluded", args[++i]); 999 } else if (args[i].equalsIgnoreCase("-gzip")) { 1000 conf.set("jhla.compression.class", GzipCodec.class.getCanonicalName()); 1001 } else if (args[i].equalsIgnoreCase("-jobDelimiter")) { 1002 conf.set("jhla.job.delimiter.pattern", args[++i]); 1003 } else if (args[i].equalsIgnoreCase("-jobDelimiterLength")) { 1004 conf.setInt("jhla.job.delimiter.length", Integer.parseInt(args[++i])); 1005 } else if(args[i].equalsIgnoreCase("-noInit")) { 1006 initControlFiles = false; 1007 } else if(args[i].equalsIgnoreCase("-test")) { 1008 testFile = args[++i]; 1009 } else if(args[i].equalsIgnoreCase("-clean")) { 1010 cleanup = true; 1011 } else if(args[i].equalsIgnoreCase("-jobQueue")) { 1012 conf.set("mapred.job.queue.name", args[++i]); 1013 } else if(args[i].startsWith("-Xmx")) { 1014 conf.set("mapred.child.java.opts", args[i]); 1015 } else { 1016 printUsage(); 1017 } 1018 } 1019 1020 if(cleanup) { 1021 cleanup(conf); 1022 return; 1023 } 1024 if(testFile != null) { 1025 LOG.info("Start JHLA test ============ "); 1026 LocalFileSystem lfs = FileSystem.getLocal(conf); 1027 conf.set("fs.defaultFS", "file:///"); 1028 JHLAMapper map = new JHLAMapper(conf); 1029 map.parseLogFile(lfs, new Path(testFile), 0L, 1030 new LoggingCollector(), Reporter.NULL); 1031 return; 1032 } 1033 1034 FileSystem fs = FileSystem.get(conf); 1035 if(initControlFiles) 1036 createControlFile(fs, historyDir); 1037 long tStart = System.currentTimeMillis(); 1038 runJHLA(JHLAMapper.class, OUTPUT_DIR, conf); 1039 long execTime = System.currentTimeMillis() - tStart; 1040 1041 analyzeResult(fs, 0, execTime, resFileName); 1042 } catch(IOException e) { 1043 System.err.print(StringUtils.stringifyException(e)); 1044 System.exit(-1); 1045 } 1046 } 1047 1048 1049 private static void printUsage() { 1050 String className = JHLogAnalyzer.class.getSimpleName(); 1051 System.err.println("Usage: " + className 1052 + "\n\t[-historyDir inputDir] | [-resFile resultFile] |" 1053 + "\n\t[-usersIncluded | -usersExcluded userList] |" 1054 + "\n\t[-gzip] | [-jobDelimiter pattern] |" 1055 + "\n\t[-help | -clean | -test testFile]"); 1056 System.exit(-1); 1057 } 1058 1059 private static Collection<String> getUserList(String users) { 1060 if(users == null) 1061 return null; 1062 StringTokenizer tokens = new StringTokenizer(users, ",;"); 1063 Collection<String> userList = new ArrayList<String>(tokens.countTokens()); 1064 while(tokens.hasMoreTokens()) 1065 userList.add(tokens.nextToken()); 1066 return userList; 1067 } 1068 1069 /** 1070 * Result is combined from all reduce output files and is written to 1071 * RESULT_FILE in the format 1072 * column 1: 1073 */ 1074 private static void analyzeResult( FileSystem fs, 1075 int testType, 1076 long execTime, 1077 Path resFileName 1078 ) throws IOException { 1079 LOG.info("Analyzing results ..."); 1080 DataOutputStream out = null; 1081 BufferedWriter writer = null; 1082 try { 1083 out = new DataOutputStream(fs.create(resFileName)); 1084 writer = new BufferedWriter(new OutputStreamWriter(out)); 1085 writer.write("SERIES\tPERIOD\tTYPE\tSLOT_HOUR\n"); 1086 FileStatus[] reduceFiles = fs.listStatus(OUTPUT_DIR); 1087 assert reduceFiles.length == JHLAPartitioner.NUM_REDUCERS; 1088 for(int i = 0; i < JHLAPartitioner.NUM_REDUCERS; i++) { 1089 DataInputStream in = null; 1090 BufferedReader lines = null; 1091 try { 1092 in = fs.open(reduceFiles[i].getPath()); 1093 lines = new BufferedReader(new InputStreamReader(in)); 1094 1095 String line; 1096 while((line = lines.readLine()) != null) { 1097 StringTokenizer tokens = new StringTokenizer(line, "\t*"); 1098 String attr = tokens.nextToken(); 1099 String dateTime = tokens.nextToken(); 1100 String taskType = tokens.nextToken(); 1101 double val = Long.parseLong(tokens.nextToken()) / 1102 (double)DEFAULT_TIME_INTERVAL_MSEC; 1103 writer.write(attr.substring(2)); // skip the stat type "l:" 1104 writer.write("\t"); 1105 writer.write(dateTime); 1106 writer.write("\t"); 1107 writer.write(taskType); 1108 writer.write("\t"); 1109 writer.write(String.valueOf((float)val)); 1110 writer.newLine(); 1111 } 1112 } finally { 1113 if(lines != null) lines.close(); 1114 if(in != null) in.close(); 1115 } 1116 } 1117 } finally { 1118 if(writer != null) writer.close(); 1119 if(out != null) out.close(); 1120 } 1121 LOG.info("Analyzing results ... done."); 1122 } 1123 1124 private static void cleanup(Configuration conf) throws IOException { 1125 LOG.info("Cleaning up test files"); 1126 FileSystem fs = FileSystem.get(conf); 1127 fs.delete(new Path(JHLA_ROOT_DIR), true); 1128 } 1129 } 1130