1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.BufferedReader; 22 import java.io.File; 23 import java.io.FileOutputStream; 24 import java.io.IOException; 25 import java.io.InputStreamReader; 26 import java.io.PrintWriter; 27 import java.io.UnsupportedEncodingException; 28 import java.net.URLDecoder; 29 import java.net.URLEncoder; 30 import java.util.ArrayList; 31 import java.util.Calendar; 32 import java.util.Collections; 33 import java.util.HashMap; 34 import java.util.HashSet; 35 import java.util.Iterator; 36 import java.util.LinkedHashMap; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.Set; 40 import java.util.SortedMap; 41 import java.util.TreeMap; 42 import java.util.Map.Entry; 43 import java.util.concurrent.atomic.AtomicBoolean; 44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.LinkedBlockingQueue; 46 import java.util.concurrent.ThreadPoolExecutor; 47 import java.util.concurrent.TimeUnit; 48 import java.util.regex.Matcher; 49 import java.util.regex.Pattern; 50 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 import org.apache.hadoop.conf.Configuration; 54 import org.apache.hadoop.fs.FSDataInputStream; 55 import org.apache.hadoop.fs.FSDataOutputStream; 56 import org.apache.hadoop.fs.FileStatus; 57 import org.apache.hadoop.fs.FileSystem; 58 import org.apache.hadoop.fs.FileUtil; 59 import org.apache.hadoop.fs.Path; 60 import org.apache.hadoop.fs.PathFilter; 61 import org.apache.hadoop.fs.permission.FsAction; 62 import org.apache.hadoop.fs.permission.FsPermission; 63 import org.apache.hadoop.mapreduce.JobACL; 64 import org.apache.hadoop.security.authorize.AccessControlList; 65 import org.apache.hadoop.util.StringUtils; 66 import org.apache.hadoop.util.DiskChecker.DiskErrorException; 67 68 /** 69 * Provides methods for writing to and reading from job history. 70 * Job History works in an append mode, JobHistory and its inner classes provide methods 71 * to log job events. 72 * 73 * JobHistory is split into multiple files, format of each file is plain text where each line 74 * is of the format [type (key=value)*], where type identifies the type of the record. 75 * Type maps to UID of one of the inner classes of this class. 76 * 77 * Job history is maintained in a master index which contains star/stop times of all jobs with 78 * a few other job level properties. Apart from this each job's history is maintained in a seperate history 79 * file. name of job history files follows the format jobtrackerId_jobid 80 * 81 * For parsing the job history it supports a listener based interface where each line is parsed 82 * and passed to listener. The listener can create an object model of history or look for specific 83 * events and discard rest of the history. 84 * 85 * CHANGE LOG : 86 * Version 0 : The history has the following format : 87 * TAG KEY1="VALUE1" KEY2="VALUE2" and so on. 88 TAG can be Job, Task, MapAttempt or ReduceAttempt. 89 Note that a '"' is the line delimiter. 90 * Version 1 : Changes the line delimiter to '.' 91 Values are now escaped for unambiguous parsing. 92 Added the Meta tag to store version info. 93 */ 94 public class JobHistory { 95 96 static final long VERSION = 1L; 97 98 static final int DONE_DIRECTORY_FORMAT_VERSION = 1; 99 100 static final String DONE_DIRECTORY_FORMAT_DIRNAME 101 = "version-" + DONE_DIRECTORY_FORMAT_VERSION; 102 103 static final String UNDERSCORE_ESCAPE = "%5F"; 104 105 public static final Log LOG = LogFactory.getLog(JobHistory.class); 106 private static final char DELIMITER = ' '; 107 static final char LINE_DELIMITER_CHAR = '.'; 108 static final char[] charsToEscape = new char[] {'"', '=', 109 LINE_DELIMITER_CHAR}; 110 static final String DIGITS = "[0-9]+"; 111 112 static final String KEY = "(\\w+)"; 113 // value is any character other than quote, but escaped quotes can be there 114 static final String VALUE = "[^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+"; 115 116 static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\""); 117 118 static final int MAXIMUM_DATESTRING_COUNT = 200000; 119 120 public static final int JOB_NAME_TRIM_LENGTH = 50; 121 private static String JOBTRACKER_UNIQUE_STRING = null; 122 private static String LOG_DIR = null; 123 private static final String SECONDARY_FILE_SUFFIX = ".recover"; 124 private static long jobHistoryBlockSize = 0; 125 private static String jobtrackerHostname; 126 private static JobHistoryFilesManager fileManager = null; 127 final static FsPermission HISTORY_DIR_PERMISSION = 128 FsPermission.createImmutable((short) 0755); // rwxr-xr-x 129 final static FsPermission HISTORY_FILE_PERMISSION = 130 FsPermission.createImmutable((short) 0744); // rwxr--r-- 131 private static FileSystem LOGDIR_FS; // log dir filesystem 132 protected static FileSystem DONEDIR_FS; // Done dir filesystem 133 private static JobConf jtConf; 134 protected static Path DONE = null; // folder for completed jobs 135 private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail(); 136 private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*"; 137 private static boolean aclsEnabled = false; 138 139 static final String CONF_FILE_NAME_SUFFIX = "_conf.xml"; 140 141 private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6; 142 private static int SERIAL_NUMBER_LOW_DIGITS; 143 144 private static String SERIAL_NUMBER_FORMAT; 145 146 private static final Set<Path> existingDoneSubdirs = new HashSet<Path>(); 147 148 private static final SortedMap<Integer, String> idToDateString 149 = new TreeMap<Integer, String>(); 150 151 /** 152 * A filter for conf files 153 */ 154 private static final PathFilter CONF_FILTER = new PathFilter() { 155 public boolean accept(Path path) { 156 return path.getName().endsWith(CONF_FILE_NAME_SUFFIX); 157 } 158 }; 159 160 private static final Map<JobID, MovedFileInfo> jobHistoryFileMap = 161 Collections.<JobID,MovedFileInfo>synchronizedMap( 162 new LinkedHashMap<JobID, MovedFileInfo>()); 163 164 private static final SortedMap<Long, String>jobToDirectoryMap 165 = new TreeMap<Long, String>(); 166 167 // JobHistory filename regex 168 public static final Pattern JOBHISTORY_FILENAME_REGEX = 169 Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+"); 170 // JobHistory conf-filename regex 171 public static final Pattern CONF_FILENAME_REGEX = 172 Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml"); 173 174 private static class MovedFileInfo { 175 private final String historyFile; 176 private final long timestamp; MovedFileInfo(String historyFile, long timestamp)177 public MovedFileInfo(String historyFile, long timestamp) { 178 this.historyFile = historyFile; 179 this.timestamp = timestamp; 180 } 181 } 182 183 /** 184 * Given the job id, return the history file path from the cache 185 */ getHistoryFilePath(JobID jobId)186 public static String getHistoryFilePath(JobID jobId) { 187 MovedFileInfo info = jobHistoryFileMap.get(jobId); 188 if (info == null) { 189 return null; 190 } 191 return info.historyFile; 192 } 193 194 /** 195 * A class that manages all the files related to a job. For now 196 * - writers : list of open files 197 * - job history filename 198 * - job conf filename 199 */ 200 private static class JobHistoryFilesManager { 201 // a private (virtual) folder for all the files related to a running job 202 private static class FilesHolder { 203 ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>(); 204 Path historyFilename; // path of job history file 205 Path confFilename; // path of job's conf 206 } 207 208 private ThreadPoolExecutor executor = null; 209 private final Configuration conf; 210 private final JobTracker jobTracker; 211 212 // cache from job-key to files associated with it. 213 private Map<JobID, FilesHolder> fileCache = 214 new ConcurrentHashMap<JobID, FilesHolder>(); 215 JobHistoryFilesManager(Configuration conf, JobTracker jobTracker)216 JobHistoryFilesManager(Configuration conf, JobTracker jobTracker) 217 throws IOException { 218 this.conf = conf; 219 this.jobTracker = jobTracker; 220 } 221 222 start()223 void start() { 224 executor = new ThreadPoolExecutor(5, 5, 1, 225 TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>()); 226 // make core threads to terminate if there has been no work 227 // for the keppalive period. 228 executor.allowCoreThreadTimeOut(true); 229 } 230 getFileHolder(JobID id)231 private FilesHolder getFileHolder(JobID id) { 232 FilesHolder holder = fileCache.get(id); 233 if (holder == null) { 234 holder = new FilesHolder(); 235 fileCache.put(id, holder); 236 } 237 return holder; 238 } 239 addWriter(JobID id, PrintWriter writer)240 void addWriter(JobID id, PrintWriter writer) { 241 FilesHolder holder = getFileHolder(id); 242 holder.writers.add(writer); 243 } 244 setHistoryFile(JobID id, Path file)245 void setHistoryFile(JobID id, Path file) { 246 FilesHolder holder = getFileHolder(id); 247 holder.historyFilename = file; 248 } 249 setConfFile(JobID id, Path file)250 void setConfFile(JobID id, Path file) { 251 FilesHolder holder = getFileHolder(id); 252 holder.confFilename = file; 253 } 254 getWriters(JobID id)255 ArrayList<PrintWriter> getWriters(JobID id) { 256 FilesHolder holder = fileCache.get(id); 257 return holder == null ? null : holder.writers; 258 } 259 getHistoryFile(JobID id)260 Path getHistoryFile(JobID id) { 261 FilesHolder holder = fileCache.get(id); 262 return holder == null ? null : holder.historyFilename; 263 } 264 getConfFileWriters(JobID id)265 Path getConfFileWriters(JobID id) { 266 FilesHolder holder = fileCache.get(id); 267 return holder == null ? null : holder.confFilename; 268 } 269 purgeJob(JobID id)270 void purgeJob(JobID id) { 271 fileCache.remove(id); 272 } 273 moveToDone(final JobID id)274 void moveToDone(final JobID id) { 275 final List<Path> paths = new ArrayList<Path>(); 276 final Path historyFile = fileManager.getHistoryFile(id); 277 if (historyFile == null) { 278 LOG.info("No file for job-history with " + id + " found in cache!"); 279 } else { 280 paths.add(historyFile); 281 } 282 283 final Path confPath = fileManager.getConfFileWriters(id); 284 if (confPath == null) { 285 LOG.info("No file for jobconf with " + id + " found in cache!"); 286 } else { 287 paths.add(confPath); 288 } 289 290 executor.execute(new Runnable() { 291 292 public void run() { 293 long millisecondTime = System.currentTimeMillis(); 294 295 Path resultDir = canonicalHistoryLogPath(id, millisecondTime); 296 297 //move the files to DONE canonical subfolder 298 try { 299 for (Path path : paths) { 300 //check if path exists, in case of retries it may not exist 301 if (LOGDIR_FS.exists(path)) { 302 maybeMakeSubdirectory(id, millisecondTime); 303 304 LOG.info("Moving " + path.toString() + " to " + 305 resultDir.toString()); 306 DONEDIR_FS.moveFromLocalFile(path, resultDir); 307 DONEDIR_FS.setPermission(new Path(resultDir, path.getName()), 308 new FsPermission(HISTORY_FILE_PERMISSION)); 309 } 310 } 311 } catch (Throwable e) { 312 LOG.error("Unable to move history file to DONE canonical subfolder.", e); 313 } 314 String historyFileDonePath = null; 315 if (historyFile != null) { 316 historyFileDonePath = new Path(resultDir, 317 historyFile.getName()).toString(); 318 } 319 320 jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, 321 millisecondTime)); 322 jobTracker.historyFileCopied(id, historyFileDonePath); 323 324 //purge the job from the cache 325 fileManager.purgeJob(id); 326 } 327 328 }); 329 } 330 removeWriter(JobID jobId, PrintWriter writer)331 void removeWriter(JobID jobId, PrintWriter writer) { 332 fileManager.getWriters(jobId).remove(writer); 333 } 334 } 335 336 // several methods for manipulating the subdirectories of the DONE 337 // directory 338 jobSerialNumber(JobID id)339 private static int jobSerialNumber(JobID id) { 340 return id.getId(); 341 } 342 serialNumberDirectoryComponent(JobID id)343 private static String serialNumberDirectoryComponent(JobID id) { 344 return String.format(SERIAL_NUMBER_FORMAT, 345 Integer.valueOf(jobSerialNumber(id))) 346 .substring(0, SERIAL_NUMBER_DIRECTORY_DIGITS); 347 } 348 349 // directory components may contain internal slashes, but do NOT 350 // contain slashes at either end. 351 timestampDirectoryComponent(JobID id, long millisecondTime)352 private static String timestampDirectoryComponent(JobID id, long millisecondTime) { 353 int serialNumber = jobSerialNumber(id); 354 Integer boxedSerialNumber = serialNumber; 355 356 // don't want to do this inside the lock 357 Calendar timestamp = Calendar.getInstance(); 358 timestamp.setTimeInMillis(millisecondTime); 359 360 synchronized (idToDateString) { 361 String dateString = idToDateString.get(boxedSerialNumber); 362 363 if (dateString == null) { 364 365 dateString = String.format 366 ("%04d/%02d/%02d", 367 timestamp.get(Calendar.YEAR), 368 // months are 0-based in Calendar, but people will expect January 369 // to be month #1. 370 timestamp.get(Calendar.MONTH) + 1, 371 timestamp.get(Calendar.DAY_OF_MONTH)); 372 373 dateString = dateString.intern(); 374 375 idToDateString.put(boxedSerialNumber, dateString); 376 377 if (idToDateString.size() > MAXIMUM_DATESTRING_COUNT) { 378 idToDateString.remove(idToDateString.firstKey()); 379 } 380 } 381 382 return dateString; 383 } 384 } 385 386 // returns false iff the directory already existed maybeMakeSubdirectory(JobID id, long millisecondTime)387 private static boolean maybeMakeSubdirectory(JobID id, long millisecondTime) 388 throws IOException { 389 Path dir = canonicalHistoryLogPath(id, millisecondTime); 390 391 synchronized (existingDoneSubdirs) { 392 if (existingDoneSubdirs.contains(dir)) { 393 if (LOG.isDebugEnabled() && !DONEDIR_FS.exists(dir)) { 394 LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir 395 + " already existed, but it didn't."); 396 } 397 398 return true; 399 } 400 401 if (!DONEDIR_FS.exists(dir)) { 402 LOG.info("Creating DONE subfolder at "+ dir); 403 404 if (!FileSystem.mkdirs(DONEDIR_FS, dir, 405 new FsPermission(HISTORY_DIR_PERMISSION))) { 406 throw new IOException("Mkdirs failed to create " + dir.toString()); 407 } 408 409 existingDoneSubdirs.add(dir); 410 411 return false; 412 } else { 413 if (LOG.isDebugEnabled()) { 414 LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir 415 + " didn't already exist, but it did."); 416 } 417 418 return false; 419 } 420 } 421 } 422 canonicalHistoryLogPath(JobID id, long millisecondTime)423 private static Path canonicalHistoryLogPath(JobID id, long millisecondTime) { 424 return new Path(DONE, historyLogSubdirectory(id, millisecondTime)); 425 } 426 historyLogSubdirectory(JobID id, long millisecondTime)427 private static String historyLogSubdirectory(JobID id, long millisecondTime) { 428 String result 429 = (DONE_DIRECTORY_FORMAT_DIRNAME 430 + "/" + jobtrackerDirectoryComponent(id)); 431 432 String serialNumberDirectory = serialNumberDirectoryComponent(id); 433 434 result = (result 435 + "/" + timestampDirectoryComponent(id, millisecondTime) 436 + "/" + serialNumberDirectory 437 + "/"); 438 439 return result; 440 } 441 jobtrackerDirectoryComponent(JobID id)442 private static String jobtrackerDirectoryComponent(JobID id) { 443 return JOBTRACKER_UNIQUE_STRING; 444 } 445 doneSubdirsBeforeSerialTail()446 private static String doneSubdirsBeforeSerialTail() { 447 // job tracker ID 448 String result 449 = ("/" + DONE_DIRECTORY_FORMAT_DIRNAME 450 + "/*"); // job tracker instance ID 451 452 // date 453 result = result + "/*/*/*"; // YYYY/MM/DD ; 454 455 return result; 456 } 457 458 /** 459 * Record types are identifiers for each line of log in history files. 460 * A record type appears as the first token in a single line of log. 461 */ 462 public static enum RecordTypes { 463 Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta 464 } 465 466 /** 467 * Job history files contain key="value" pairs, where keys belong to this enum. 468 * It acts as a global namespace for all keys. 469 */ 470 public static enum Keys { 471 JOBTRACKERID, 472 START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, 473 LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 474 FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 475 ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 476 SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 477 TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS, 478 VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON, LOCALITY, AVATAAR, 479 WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES, 480 WORKFLOW_TAGS 481 } 482 483 /** 484 * This enum contains some of the values commonly used by history log events. 485 * since values in history can only be strings - Values.name() is used in 486 * most places in history file. 487 */ 488 public static enum Values { 489 SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP 490 } 491 492 /** 493 * Initialize JobHistory files. 494 * @param conf Jobconf of the job tracker. 495 * @param hostname jobtracker's hostname 496 * @param jobTrackerStartTime jobtracker's start time 497 */ init(JobTracker jobTracker, JobConf conf, String hostname, long jobTrackerStartTime)498 public static void init(JobTracker jobTracker, JobConf conf, 499 String hostname, long jobTrackerStartTime) throws IOException { 500 initLogDir(conf); 501 SERIAL_NUMBER_LOW_DIGITS = 3; 502 SERIAL_NUMBER_FORMAT = ("%0" 503 + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS) 504 + "d"); 505 JOBTRACKER_UNIQUE_STRING = hostname + "_" + 506 String.valueOf(jobTrackerStartTime) + "_"; 507 jobtrackerHostname = hostname; 508 Path logDir = new Path(LOG_DIR); 509 if (!LOGDIR_FS.exists(logDir)){ 510 if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) { 511 throw new IOException("Mkdirs failed to create " + logDir.toString()); 512 } 513 } else { // directory exists 514 checkDirectoryPermissions(LOGDIR_FS, logDir, "hadoop.job.history.location"); 515 } 516 conf.set("hadoop.job.history.location", LOG_DIR); 517 // set the job history block size (default is 3MB) 518 jobHistoryBlockSize = 519 conf.getLong("mapred.jobtracker.job.history.block.size", 520 3 * 1024 * 1024); 521 jtConf = conf; 522 523 // queue and job level security is enabled on the mapreduce cluster or not 524 aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false); 525 526 // initialize the file manager 527 fileManager = new JobHistoryFilesManager(conf, jobTracker); 528 } 529 initLogDir(JobConf conf)530 private static void initLogDir(JobConf conf) throws IOException { 531 LOG_DIR = conf.get("hadoop.job.history.location" , 532 "file:///" + new File( 533 System.getProperty("hadoop.log.dir")).getAbsolutePath() 534 + File.separator + "history"); 535 Path logDir = new Path(LOG_DIR); 536 LOGDIR_FS = logDir.getFileSystem(conf); 537 } 538 initDone(JobConf conf, FileSystem fs)539 static void initDone(JobConf conf, FileSystem fs) throws IOException { 540 initDone(conf, fs, true); 541 } 542 initDone(JobConf conf, FileSystem fs, boolean setup)543 static void initDone(JobConf conf, FileSystem fs, 544 boolean setup) 545 throws IOException { 546 //if completed job history location is set, use that 547 String doneLocation = conf. 548 get("mapred.job.tracker.history.completed.location"); 549 if (doneLocation != null) { 550 DONE = fs.makeQualified(new Path(doneLocation)); 551 DONEDIR_FS = fs; 552 } else { 553 if (!setup) { 554 initLogDir(conf); 555 } 556 DONE = new Path(LOG_DIR, "done"); 557 DONEDIR_FS = LOGDIR_FS; 558 } 559 Path versionSubdir = new Path(DONE, DONE_DIRECTORY_FORMAT_DIRNAME); 560 //If not already present create the done folder with appropriate 561 //permission 562 if (!DONEDIR_FS.exists(DONE)) { 563 LOG.info("Creating DONE folder at "+ DONE); 564 if (!DONEDIR_FS.mkdirs(DONE, 565 new FsPermission(HISTORY_DIR_PERMISSION))) { 566 throw new IOException("Mkdirs failed to create " + DONE.toString()); 567 } 568 569 if (!DONEDIR_FS.exists(versionSubdir)) { 570 if (!DONEDIR_FS.mkdirs(versionSubdir, 571 new FsPermission(HISTORY_DIR_PERMISSION))) { 572 throw new IOException("Mkdirs failed to create " + versionSubdir); 573 } 574 } 575 } else { // directory exists. Checks version subdirectory permissions as 576 // well. 577 checkDirectoryPermissions(DONEDIR_FS, DONE, 578 "mapred.job.tracker.history.completed.location"); 579 if (DONEDIR_FS.exists(versionSubdir)) 580 checkDirectoryPermissions(DONEDIR_FS, versionSubdir, 581 "mapred.job.tracker.history.completed.location-versionsubdir"); 582 } 583 584 if (!setup) { 585 return; 586 } 587 588 fileManager.start(); 589 590 HistoryCleaner.cleanupFrequency = 591 conf.getLong("mapreduce.jobhistory.cleaner.interval-ms", 592 HistoryCleaner.DEFAULT_CLEANUP_FREQUENCY); 593 HistoryCleaner.maxAgeOfHistoryFiles = 594 conf.getLong("mapreduce.jobhistory.max-age-ms", 595 HistoryCleaner.DEFAULT_HISTORY_MAX_AGE); 596 LOG.info(String.format("Job History MaxAge is %d ms (%.2f days), " + 597 "Cleanup Frequency is %d ms (%.2f days)", 598 HistoryCleaner.maxAgeOfHistoryFiles, 599 ((float) HistoryCleaner.maxAgeOfHistoryFiles)/HistoryCleaner.ONE_DAY_IN_MS, 600 HistoryCleaner.cleanupFrequency, 601 ((float) HistoryCleaner.cleanupFrequency)/HistoryCleaner.ONE_DAY_IN_MS)); 602 } 603 604 /** 605 * @param FileSystem 606 * @param Path 607 * @param configKey 608 * @throws IOException 609 * @throws DiskErrorException 610 */ checkDirectoryPermissions(FileSystem fs, Path path, String configKey)611 static void checkDirectoryPermissions(FileSystem fs, Path path, 612 String configKey) throws IOException, DiskErrorException { 613 FileStatus stat = fs.getFileStatus(path); 614 FsPermission actual = stat.getPermission(); 615 if (!stat.isDir()) 616 throw new DiskErrorException(configKey + " - not a directory: " 617 + path.toString()); 618 FsAction user = actual.getUserAction(); 619 if (!user.implies(FsAction.READ)) 620 throw new DiskErrorException("bad " + configKey 621 + "- directory is not readable: " + path.toString()); 622 if (!user.implies(FsAction.WRITE)) 623 throw new DiskErrorException("bad " + configKey 624 + "- directory is not writable " + path.toString()); 625 } 626 627 /** 628 * Manages job-history's meta information such as version etc. 629 * Helps in logging version information to the job-history and recover 630 * version information from the history. 631 */ 632 static class MetaInfoManager implements Listener { 633 private long version = 0L; 634 private KeyValuePair pairs = new KeyValuePair(); 635 636 // Extract the version of the history that was used to write the history MetaInfoManager(String line)637 public MetaInfoManager(String line) throws IOException { 638 if (null != line) { 639 // Parse the line 640 parseLine(line, this, false); 641 } 642 } 643 644 // Get the line delimiter getLineDelim()645 char getLineDelim() { 646 if (version == 0) { 647 return '"'; 648 } else { 649 return LINE_DELIMITER_CHAR; 650 } 651 } 652 653 // Checks if the values are escaped or not isValueEscaped()654 boolean isValueEscaped() { 655 // Note that the values are not escaped in version 0 656 return version != 0; 657 } 658 handle(RecordTypes recType, Map<Keys, String> values)659 public void handle(RecordTypes recType, Map<Keys, String> values) 660 throws IOException { 661 // Check if the record is of type META 662 if (RecordTypes.Meta == recType) { 663 pairs.handle(values); 664 version = pairs.getLong(Keys.VERSION); // defaults to 0 665 } 666 } 667 668 /** 669 * Logs history meta-info to the history file. This needs to be called once 670 * per history file. 671 * @param jobId job id, assigned by jobtracker. 672 */ logMetaInfo(ArrayList<PrintWriter> writers)673 static void logMetaInfo(ArrayList<PrintWriter> writers){ 674 if (null != writers){ 675 JobHistory.log(writers, RecordTypes.Meta, 676 new Keys[] {Keys.VERSION}, 677 new String[] {String.valueOf(VERSION)}); 678 } 679 } 680 } 681 682 /** Escapes the string especially for {@link JobHistory} 683 */ escapeString(String data)684 static String escapeString(String data) { 685 return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR, 686 charsToEscape); 687 } 688 689 /** 690 * Parses history file and invokes Listener.handle() for 691 * each line of history. It can be used for looking through history 692 * files for specific items without having to keep whole history in memory. 693 * @param path path to history file 694 * @param l Listener for history events 695 * @param fs FileSystem where history file is present 696 * @throws IOException 697 */ parseHistoryFromFS(String path, Listener l, FileSystem fs)698 public static void parseHistoryFromFS(String path, Listener l, FileSystem fs) 699 throws IOException{ 700 FSDataInputStream in = fs.open(new Path(path)); 701 BufferedReader reader = new BufferedReader(new InputStreamReader (in)); 702 try { 703 String line = null; 704 StringBuffer buf = new StringBuffer(); 705 706 // Read the meta-info line. Note that this might a jobinfo line for files 707 // written with older format 708 line = reader.readLine(); 709 710 // Check if the file is empty 711 if (line == null) { 712 return; 713 } 714 715 // Get the information required for further processing 716 MetaInfoManager mgr = new MetaInfoManager(line); 717 boolean isEscaped = mgr.isValueEscaped(); 718 String lineDelim = String.valueOf(mgr.getLineDelim()); 719 String escapedLineDelim = 720 StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR, 721 mgr.getLineDelim()); 722 723 do { 724 buf.append(line); 725 if (!line.trim().endsWith(lineDelim) 726 || line.trim().endsWith(escapedLineDelim)) { 727 buf.append("\n"); 728 continue; 729 } 730 parseLine(buf.toString(), l, isEscaped); 731 buf = new StringBuffer(); 732 } while ((line = reader.readLine())!= null); 733 } finally { 734 try { reader.close(); } catch (IOException ex) {} 735 } 736 } 737 738 /** 739 * Parse a single line of history. 740 * @param line 741 * @param l 742 * @throws IOException 743 */ parseLine(String line, Listener l, boolean isEscaped)744 private static void parseLine(String line, Listener l, boolean isEscaped) 745 throws IOException{ 746 // extract the record type 747 int idx = line.indexOf(' '); 748 String recType = line.substring(0, idx); 749 String data = line.substring(idx+1, line.length()); 750 751 Matcher matcher = pattern.matcher(data); 752 Map<Keys,String> parseBuffer = new HashMap<Keys, String>(); 753 754 while(matcher.find()){ 755 String tuple = matcher.group(0); 756 String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '='); 757 String value = parts[1].substring(1, parts[1].length() -1); 758 if (isEscaped) { 759 value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, 760 charsToEscape); 761 } 762 parseBuffer.put(Keys.valueOf(parts[0]), value); 763 } 764 765 l.handle(RecordTypes.valueOf(recType), parseBuffer); 766 767 parseBuffer.clear(); 768 } 769 770 771 /** 772 * Log a raw record type with keys and values. This is method is generally not used directly. 773 * @param recordType type of log event 774 * @param key key 775 * @param value value 776 */ 777 log(PrintWriter out, RecordTypes recordType, Keys key, String value)778 static void log(PrintWriter out, RecordTypes recordType, Keys key, 779 String value){ 780 value = escapeString(value); 781 out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\"" 782 + DELIMITER + LINE_DELIMITER_CHAR); 783 } 784 785 /** 786 * Log a number of keys and values with record. the array length of keys and values 787 * should be same. 788 * @param recordType type of log event 789 * @param keys type of log event 790 * @param values type of log event 791 */ 792 793 /** 794 * Log a number of keys and values with record. the array length of keys and values 795 * should be same. 796 * @param recordType type of log event 797 * @param keys type of log event 798 * @param values type of log event 799 */ 800 log(ArrayList<PrintWriter> writers, RecordTypes recordType, Keys[] keys, String[] values)801 static void log(ArrayList<PrintWriter> writers, RecordTypes recordType, 802 Keys[] keys, String[] values) { 803 log(writers, recordType, keys, values, null); 804 } 805 806 static class JobHistoryLogger { 807 static final Log LOG = LogFactory.getLog(JobHistoryLogger.class); 808 } 809 810 /** 811 * Log a number of keys and values with record. the array length of keys and values 812 * should be same. 813 * @param recordType type of log event 814 * @param keys type of log event 815 * @param values type of log event 816 * @param JobID jobid of the job 817 */ 818 log(ArrayList<PrintWriter> writers, RecordTypes recordType, Keys[] keys, String[] values, JobID id)819 static void log(ArrayList<PrintWriter> writers, RecordTypes recordType, 820 Keys[] keys, String[] values, JobID id) { 821 822 // First up calculate the length of buffer, so that we are performant 823 // enough. 824 int length = recordType.name().length() + keys.length * 4 + 2; 825 for (int i = 0; i < keys.length; i++) { 826 values[i] = escapeString(values[i]); 827 length += values[i].length() + keys[i].toString().length(); 828 } 829 830 // We have the length of the buffer, now construct it. 831 StringBuilder builder = new StringBuilder(length); 832 builder.append(recordType.name()); 833 builder.append(DELIMITER); 834 for(int i =0; i< keys.length; i++){ 835 builder.append(keys[i]); 836 builder.append("=\""); 837 builder.append(values[i]); 838 builder.append("\""); 839 builder.append(DELIMITER); 840 } 841 builder.append(LINE_DELIMITER_CHAR); 842 843 String logLine = builder.toString(); 844 for (Iterator<PrintWriter> iter = writers.iterator(); iter.hasNext();) { 845 PrintWriter out = iter.next(); 846 out.println(logLine); 847 if (out.checkError() && id != null) { 848 LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager"); 849 iter.remove(); 850 } 851 } 852 if (recordType != RecordTypes.Meta) { 853 JobHistoryLogger.LOG.debug(logLine); 854 } 855 } 856 857 /** 858 * Get the history location 859 */ getJobHistoryLocation()860 static Path getJobHistoryLocation() { 861 return new Path(LOG_DIR); 862 } 863 864 /** 865 * Get the history location for completed jobs 866 */ getCompletedJobHistoryLocation()867 static Path getCompletedJobHistoryLocation() { 868 return DONE; 869 } 870 serialNumberDirectoryDigits()871 static int serialNumberDirectoryDigits() { 872 return SERIAL_NUMBER_DIRECTORY_DIGITS; 873 } 874 serialNumberTotalDigits()875 static int serialNumberTotalDigits() { 876 return serialNumberDirectoryDigits() + SERIAL_NUMBER_LOW_DIGITS; 877 } 878 879 /** 880 * Get the 881 */ 882 883 /** 884 * Base class contais utility stuff to manage types key value pairs with enums. 885 */ 886 static class KeyValuePair{ 887 private Map<Keys, String> values = new HashMap<Keys, String>(); 888 889 /** 890 * Get 'String' value for given key. Most of the places use Strings as 891 * values so the default get' method returns 'String'. This method never returns 892 * null to ease on GUIs. if no value is found it returns empty string "" 893 * @param k 894 * @return if null it returns empty string - "" 895 */ get(Keys k)896 public String get(Keys k){ 897 String s = values.get(k); 898 return s == null ? "" : s; 899 } 900 /** 901 * Convert value from history to int and return. 902 * if no value is found it returns 0. 903 * @param k key 904 */ getInt(Keys k)905 public int getInt(Keys k){ 906 String s = values.get(k); 907 if (null != s){ 908 return Integer.parseInt(s); 909 } 910 return 0; 911 } 912 /** 913 * Convert value from history to int and return. 914 * if no value is found it returns 0. 915 * @param k 916 */ getLong(Keys k)917 public long getLong(Keys k){ 918 String s = values.get(k); 919 if (null != s){ 920 return Long.parseLong(s); 921 } 922 return 0; 923 } 924 /** 925 * Set value for the key. 926 * @param k 927 * @param s 928 */ set(Keys k, String s)929 public void set(Keys k, String s){ 930 values.put(k, s); 931 } 932 /** 933 * Adds all values in the Map argument to its own values. 934 * @param m 935 */ set(Map<Keys, String> m)936 public void set(Map<Keys, String> m){ 937 values.putAll(m); 938 } 939 /** 940 * Reads values back from the history, input is same Map as passed to Listener by parseHistory(). 941 * @param values 942 */ handle(Map<Keys, String> values)943 public synchronized void handle(Map<Keys, String> values){ 944 set(values); 945 } 946 /** 947 * Returns Map containing all key-values. 948 */ getValues()949 public Map<Keys, String> getValues(){ 950 return values; 951 } 952 } 953 954 // hasMismatches is just used to return a second value if you want 955 // one. I would have used MutableBoxedBoolean if such had been provided. filteredStat2Paths(FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches)956 static Path[] filteredStat2Paths 957 (FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches) { 958 int resultCount = 0; 959 960 if (hasMismatches == null) { 961 hasMismatches = new AtomicBoolean(false); 962 } 963 964 for (int i = 0; i < stats.length; ++i) { 965 if (stats[i].isDir() == dirs) { 966 stats[resultCount++] = stats[i]; 967 } else { 968 hasMismatches.set(true); 969 } 970 } 971 972 Path[] paddedResult = FileUtil.stat2Paths(stats); 973 974 Path[] result = new Path[resultCount]; 975 976 System.arraycopy(paddedResult, 0, result, 0, resultCount); 977 978 return result; 979 } 980 localGlobber(FileSystem fs, Path root, String tail)981 static FileStatus[] localGlobber 982 (FileSystem fs, Path root, String tail) 983 throws IOException { 984 return localGlobber(fs, root, tail, null); 985 } 986 localGlobber(FileSystem fs, Path root, String tail, PathFilter filter)987 static FileStatus[] localGlobber 988 (FileSystem fs, Path root, String tail, PathFilter filter) 989 throws IOException { 990 return localGlobber(fs, root, tail, filter, null); 991 } 992 nullToEmpty(FileStatus[] result)993 private static FileStatus[] nullToEmpty(FileStatus[] result) { 994 return result == null ? new FileStatus[0] : result; 995 } 996 listFilteredStatus(FileSystem fs, Path root, PathFilter filter)997 private static FileStatus[] listFilteredStatus 998 (FileSystem fs, Path root, PathFilter filter) 999 throws IOException { 1000 return filter == null ? fs.listStatus(root) : fs.listStatus(root, filter); 1001 } 1002 1003 // hasMismatches is just used to return a second value if you want 1004 // one. I would have used MutableBoxedBoolean if such had been provided. localGlobber(FileSystem fs, Path root, String tail, PathFilter filter, AtomicBoolean hasFlatFiles)1005 static FileStatus[] localGlobber 1006 (FileSystem fs, Path root, String tail, PathFilter filter, 1007 AtomicBoolean hasFlatFiles) 1008 throws IOException { 1009 if (tail.equals("")) { 1010 return nullToEmpty(listFilteredStatus(fs, root, filter)); 1011 } 1012 1013 if (tail.startsWith("/*")) { 1014 Path[] subdirs = filteredStat2Paths(nullToEmpty(fs.listStatus(root)), 1015 true, hasFlatFiles); 1016 1017 FileStatus[][] subsubdirs = new FileStatus[subdirs.length][]; 1018 1019 int subsubdirCount = 0; 1020 1021 if (subsubdirs.length == 0) { 1022 return new FileStatus[0]; 1023 } 1024 1025 String newTail = tail.substring(2); 1026 1027 for (int i = 0; i < subdirs.length; ++i) { 1028 subsubdirs[i] = localGlobber(fs, subdirs[i], newTail, filter, null); 1029 subsubdirCount += subsubdirs[i].length; 1030 } 1031 1032 FileStatus[] result = new FileStatus[subsubdirCount]; 1033 1034 int segmentStart = 0; 1035 1036 for (int i = 0; i < subsubdirs.length; ++i) { 1037 System.arraycopy(subsubdirs[i], 0, result, segmentStart, subsubdirs[i].length); 1038 segmentStart += subsubdirs[i].length; 1039 } 1040 1041 return result; 1042 } 1043 1044 if (tail.startsWith("/")) { 1045 int split = tail.indexOf('/', 1); 1046 1047 if (split < 0) { 1048 return nullToEmpty 1049 (listFilteredStatus(fs, new Path(root, tail.substring(1)), filter)); 1050 } else { 1051 String thisSegment = tail.substring(1, split); 1052 String newTail = tail.substring(split); 1053 return localGlobber 1054 (fs, new Path(root, thisSegment), newTail, filter, hasFlatFiles); 1055 } 1056 } 1057 1058 IOException e = new IOException("localGlobber: bad tail"); 1059 1060 throw e; 1061 } 1062 confPathFromLogFilePath(Path logFile)1063 static Path confPathFromLogFilePath(Path logFile) { 1064 String jobId = jobIdNameFromLogFileName(logFile.getName()); 1065 1066 Path logDir = logFile.getParent(); 1067 1068 return new Path(logDir, jobId + CONF_FILE_NAME_SUFFIX); 1069 } 1070 jobIdNameFromLogFileName(String logFileName)1071 static String jobIdNameFromLogFileName(String logFileName) { 1072 String[] jobDetails = logFileName.split("_"); 1073 return jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2]; 1074 } 1075 userNameFromLogFileName(String logFileName)1076 static String userNameFromLogFileName(String logFileName) { 1077 String[] jobDetails = logFileName.split("_"); 1078 return jobDetails[3]; 1079 } 1080 jobNameFromLogFileName(String logFileName)1081 static String jobNameFromLogFileName(String logFileName) { 1082 String[] jobDetails = logFileName.split("_"); 1083 return jobDetails[4]; 1084 } 1085 1086 1087 // This code will be inefficient if the subject contains dozens of underscores escapeUnderscores(String escapee)1088 static String escapeUnderscores(String escapee) { 1089 return replaceStringInstances(escapee, "_", UNDERSCORE_ESCAPE); 1090 } 1091 nonOccursString(String logFileName)1092 static String nonOccursString(String logFileName) { 1093 int adHocIndex = 0; 1094 1095 String unfoundString = "q" + adHocIndex; 1096 1097 while (logFileName.contains(unfoundString)) { 1098 unfoundString = "q" + ++adHocIndex; 1099 } 1100 1101 return unfoundString + "q"; 1102 } 1103 1104 // I tolerate this code because I expect a low number of 1105 // occurrences in a relatively short string replaceStringInstances(String logFileName, String old, String replacement)1106 static String replaceStringInstances 1107 (String logFileName, String old, String replacement) { 1108 int index = logFileName.indexOf(old); 1109 1110 while (index > 0) { 1111 logFileName = (logFileName.substring(0, index) 1112 + replacement 1113 + replaceStringInstances 1114 (logFileName.substring(index + old.length()), 1115 old, replacement)); 1116 1117 index = logFileName.indexOf(old); 1118 } 1119 1120 return logFileName; 1121 } 1122 1123 1124 /** 1125 * Helper class for logging or reading back events related to job start, finish or failure. 1126 */ 1127 public static class JobInfo extends KeyValuePair{ 1128 1129 private Map<String, Task> allTasks = new TreeMap<String, Task>(); 1130 private Map<JobACL, AccessControlList> jobACLs = 1131 new HashMap<JobACL, AccessControlList>(); 1132 private String queueName = null;// queue to which this job was submitted to 1133 1134 /** Create new JobInfo */ JobInfo(String jobId)1135 public JobInfo(String jobId){ 1136 set(Keys.JOBID, jobId); 1137 } 1138 1139 /** 1140 * Returns all map and reduce tasks <taskid-Task>. 1141 */ getAllTasks()1142 public Map<String, Task> getAllTasks() { return allTasks; } 1143 1144 /** 1145 * Get the job acls. 1146 * 1147 * @return a {@link Map} from {@link JobACL} to {@link AccessControlList} 1148 */ getJobACLs()1149 public Map<JobACL, AccessControlList> getJobACLs() { 1150 return jobACLs; 1151 } 1152 1153 @Override handle(Map<Keys, String> values)1154 public synchronized void handle(Map<Keys, String> values) { 1155 if (values.containsKey(Keys.SUBMIT_TIME)) {// job submission 1156 // construct the job ACLs 1157 String viewJobACL = values.get(Keys.VIEW_JOB); 1158 String modifyJobACL = values.get(Keys.MODIFY_JOB); 1159 if (viewJobACL != null) { 1160 jobACLs.put(JobACL.VIEW_JOB, new AccessControlList(viewJobACL)); 1161 } 1162 if (modifyJobACL != null) { 1163 jobACLs.put(JobACL.MODIFY_JOB, new AccessControlList(modifyJobACL)); 1164 } 1165 // get the job queue name 1166 queueName = values.get(Keys.JOB_QUEUE); 1167 } 1168 super.handle(values); 1169 } 1170 getJobQueue()1171 String getJobQueue() { 1172 return queueName; 1173 } 1174 1175 /** 1176 * Get the path of the locally stored job file 1177 * @param jobId id of the job 1178 * @return the path of the job file on the local file system 1179 */ getLocalJobFilePath(JobID jobId)1180 public static String getLocalJobFilePath(JobID jobId){ 1181 return System.getProperty("hadoop.log.dir") + File.separator + 1182 jobId + CONF_FILE_NAME_SUFFIX; 1183 } 1184 1185 1186 /** 1187 * Helper function to encode the URL of the path of the job-history 1188 * log file. 1189 * 1190 * @param logFile path of the job-history file 1191 * @return URL encoded path 1192 * @throws IOException 1193 */ encodeJobHistoryFilePath(String logFile)1194 public static String encodeJobHistoryFilePath(String logFile) 1195 throws IOException { 1196 Path rawPath = new Path(logFile); 1197 String encodedFileName = null; 1198 try { 1199 encodedFileName = URLEncoder.encode(rawPath.getName(), "UTF-8"); 1200 } catch (UnsupportedEncodingException uee) { 1201 IOException ioe = new IOException(); 1202 ioe.initCause(uee); 1203 ioe.setStackTrace(uee.getStackTrace()); 1204 throw ioe; 1205 } 1206 1207 Path encodedPath = new Path(rawPath.getParent(), encodedFileName); 1208 return encodedPath.toString(); 1209 } 1210 1211 /** 1212 * Helper function to encode the URL of the filename of the job-history 1213 * log file. 1214 * 1215 * @param logFileName file name of the job-history file 1216 * @return URL encoded filename 1217 * @throws IOException 1218 */ encodeJobHistoryFileName(String logFileName)1219 public static String encodeJobHistoryFileName(String logFileName) 1220 throws IOException { 1221 String replacementUnderscoreEscape = null; 1222 1223 if (logFileName.contains(UNDERSCORE_ESCAPE)) { 1224 replacementUnderscoreEscape = nonOccursString(logFileName); 1225 1226 logFileName = replaceStringInstances 1227 (logFileName, UNDERSCORE_ESCAPE, replacementUnderscoreEscape); 1228 } 1229 1230 String encodedFileName = null; 1231 try { 1232 encodedFileName = URLEncoder.encode(logFileName, "UTF-8"); 1233 } catch (UnsupportedEncodingException uee) { 1234 IOException ioe = new IOException(); 1235 ioe.initCause(uee); 1236 ioe.setStackTrace(uee.getStackTrace()); 1237 throw ioe; 1238 } 1239 1240 if (replacementUnderscoreEscape != null) { 1241 encodedFileName = replaceStringInstances 1242 (encodedFileName, replacementUnderscoreEscape, UNDERSCORE_ESCAPE); 1243 } 1244 1245 return encodedFileName; 1246 } 1247 1248 /** 1249 * Helper function to decode the URL of the filename of the job-history 1250 * log file. 1251 * 1252 * @param logFileName file name of the job-history file 1253 * @return URL decoded filename 1254 * @throws IOException 1255 */ decodeJobHistoryFileName(String logFileName)1256 public static String decodeJobHistoryFileName(String logFileName) 1257 throws IOException { 1258 String decodedFileName = null; 1259 try { 1260 decodedFileName = URLDecoder.decode(logFileName, "UTF-8"); 1261 } catch (UnsupportedEncodingException uee) { 1262 IOException ioe = new IOException(); 1263 ioe.initCause(uee); 1264 ioe.setStackTrace(uee.getStackTrace()); 1265 throw ioe; 1266 } 1267 return decodedFileName; 1268 } 1269 1270 /** 1271 * Get the job name from the job conf 1272 */ getJobName(JobConf jobConf)1273 static String getJobName(JobConf jobConf) { 1274 String jobName = jobConf.getJobName(); 1275 if (jobName == null || jobName.length() == 0) { 1276 jobName = "NA"; 1277 } 1278 return jobName; 1279 } 1280 1281 /** 1282 * Get the user name from the job conf 1283 */ getUserName(JobConf jobConf)1284 public static String getUserName(JobConf jobConf) { 1285 String user = jobConf.getUser(); 1286 if (user == null || user.length() == 0) { 1287 user = "NA"; 1288 } 1289 return user; 1290 } 1291 1292 /** 1293 * Get the workflow adjacencies from the job conf 1294 * The string returned is of the form "key"="value" "key"="value" ... 1295 */ getWorkflowAdjacencies(Configuration conf)1296 public static String getWorkflowAdjacencies(Configuration conf) { 1297 int prefixLen = JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING.length(); 1298 Map<String,String> adjacencies = 1299 conf.getValByRegex(JobConf.WORKFLOW_ADJACENCY_PREFIX_PATTERN); 1300 if (adjacencies.isEmpty()) 1301 return ""; 1302 int size = 0; 1303 for (Entry<String,String> entry : adjacencies.entrySet()) { 1304 int keyLen = entry.getKey().length(); 1305 size += keyLen - prefixLen; 1306 size += entry.getValue().length() + 6; 1307 } 1308 StringBuilder sb = new StringBuilder(size); 1309 for (Entry<String,String> entry : adjacencies.entrySet()) { 1310 int keyLen = entry.getKey().length(); 1311 sb.append("\""); 1312 sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen))); 1313 sb.append("\"=\""); 1314 sb.append(escapeString(entry.getValue())); 1315 sb.append("\" "); 1316 } 1317 return sb.toString(); 1318 } 1319 1320 /** 1321 * Get the job history file path given the history filename 1322 */ getJobHistoryLogLocation(String logFileName)1323 public static Path getJobHistoryLogLocation(String logFileName) 1324 { 1325 return LOG_DIR == null ? null : new Path(LOG_DIR, logFileName); 1326 } 1327 1328 /** 1329 * Get the user job history file path 1330 */ getJobHistoryLogLocationForUser(String logFileName, JobConf jobConf)1331 public static Path getJobHistoryLogLocationForUser(String logFileName, 1332 JobConf jobConf) { 1333 // find user log directory 1334 Path userLogFile = null; 1335 Path outputPath = FileOutputFormat.getOutputPath(jobConf); 1336 String userLogDir = jobConf.get("hadoop.job.history.user.location", 1337 outputPath == null 1338 ? null 1339 : outputPath.toString()); 1340 if ("none".equals(userLogDir)) { 1341 userLogDir = null; 1342 } 1343 if (userLogDir != null) { 1344 userLogDir = userLogDir + Path.SEPARATOR + "_logs" + Path.SEPARATOR 1345 + "history"; 1346 userLogFile = new Path(userLogDir, logFileName); 1347 } 1348 return userLogFile; 1349 } 1350 1351 /** 1352 * Generates the job history filename for a new job 1353 */ getNewJobHistoryFileName(JobConf jobConf, JobID id, long submitTime)1354 private static String getNewJobHistoryFileName(JobConf jobConf, JobID id, long submitTime) { 1355 return 1356 id.toString() + "_" 1357 + submitTime + "_" 1358 + escapeUnderscores(getUserName(jobConf)) + "_" 1359 + escapeUnderscores(trimJobName(getJobName(jobConf))); 1360 } 1361 1362 /** 1363 * Trims the job-name if required 1364 */ trimJobName(String jobName)1365 private static String trimJobName(String jobName) { 1366 if (jobName.length() > JOB_NAME_TRIM_LENGTH) { 1367 jobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH); 1368 } 1369 return jobName; 1370 } 1371 escapeRegexChars( String string )1372 private static String escapeRegexChars( String string ) { 1373 return "\\Q"+string.replaceAll("\\\\E", "\\\\E\\\\\\\\E\\\\Q")+"\\E"; 1374 } 1375 1376 /** 1377 * Recover the job history filename from the history folder. 1378 * Uses the following pattern 1379 * $jt-hostname_[0-9]*_$job-id_$user_$job-name* 1380 * @param jobConf the job conf 1381 * @param id job id 1382 */ getJobHistoryFileName(JobConf jobConf, JobID id)1383 public static synchronized String getJobHistoryFileName(JobConf jobConf, 1384 JobID id) 1385 throws IOException { 1386 return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS); 1387 } 1388 1389 // Returns that portion of the pathname that sits under the DONE directory getDoneJobHistoryFileName(JobConf jobConf, JobID id)1390 static synchronized String getDoneJobHistoryFileName(JobConf jobConf, 1391 JobID id) throws IOException { 1392 if (DONE == null) { 1393 return null; 1394 } 1395 return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS); 1396 } 1397 1398 /** 1399 * @param dir The directory where to search. 1400 */ getJobHistoryFileName(JobConf jobConf, JobID id, Path dir, FileSystem fs)1401 private static synchronized String getJobHistoryFileName(JobConf jobConf, 1402 JobID id, Path dir, FileSystem fs) 1403 throws IOException { 1404 String user = getUserName(jobConf); 1405 String jobName = trimJobName(getJobName(jobConf)); 1406 if (LOG_DIR == null) { 1407 return null; 1408 } 1409 1410 // Make the pattern matching the job's history file 1411 1412 final String regexp 1413 = id.toString() + "_" + DIGITS + "_" + user + "_" 1414 + escapeRegexChars(jobName) + "+"; 1415 1416 final Pattern historyFilePattern = Pattern.compile(regexp); 1417 1418 // a path filter that matches 4 parts of the filenames namely 1419 // - jt-hostname 1420 // - job-id 1421 // - username 1422 // - jobname 1423 PathFilter filter = new PathFilter() { 1424 public boolean accept(Path path) { 1425 String unescapedFileName = path.getName(); 1426 String fileName = null; 1427 try { 1428 fileName = decodeJobHistoryFileName(unescapedFileName); 1429 } catch (IOException ioe) { 1430 LOG.info("Error while decoding history file " + fileName + "." 1431 + " Ignoring file.", ioe); 1432 return false; 1433 } 1434 1435 return historyFilePattern.matcher(fileName).find(); 1436 } 1437 }; 1438 1439 FileStatus[] statuses = null; 1440 1441 if (dir == DONE) { 1442 final String scanTail 1443 = (DONE_BEFORE_SERIAL_TAIL 1444 + "/" + serialNumberDirectoryComponent(id)); 1445 1446 if (LOG.isDebugEnabled()) { 1447 LOG.debug("JobHistory.getJobHistoryFileName DONE dir: scanning " 1448 + scanTail); 1449 if (LOG.isTraceEnabled()) { 1450 LOG.trace(Thread.currentThread().getStackTrace()); 1451 } 1452 } 1453 1454 statuses = localGlobber(fs, DONE, scanTail, filter); 1455 } else { 1456 statuses = fs.listStatus(dir, filter); 1457 } 1458 1459 String filename = null; 1460 if (statuses == null || statuses.length == 0) { 1461 LOG.info("Nothing to recover for job " + id); 1462 } else { 1463 // return filename considering that fact the name can be a 1464 // secondary filename like filename.recover 1465 filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName); 1466 if (dir == DONE) { 1467 Path parent = statuses[0].getPath().getParent(); 1468 String parentPathName = parent.toString(); 1469 String donePathName = DONE.toString(); 1470 filename = (parentPathName.substring(donePathName.length() + Path.SEPARATOR.length()) 1471 + Path.SEPARATOR + filename); 1472 } 1473 1474 LOG.info("Recovered job history filename for job " + id + " is " 1475 + filename); 1476 } 1477 return filename; 1478 } 1479 1480 // removes all extra extensions from a filename and returns the core/primary 1481 // filename getPrimaryFilename(String filename, String jobName)1482 private static String getPrimaryFilename(String filename, String jobName) 1483 throws IOException{ 1484 filename = decodeJobHistoryFileName(filename); 1485 // Remove the '.recover' suffix if it exists 1486 if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) { 1487 int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length(); 1488 filename = filename.substring(0, newLength); 1489 } 1490 return encodeJobHistoryFileName(filename); 1491 } 1492 1493 /** Since there was a restart, there should be a master file and 1494 * a recovery file. Once the recovery is complete, the master should be 1495 * deleted as an indication that the recovery file should be treated as the 1496 * master upon completion or next restart. 1497 * @param fileName the history filename that needs checkpointing 1498 * @param conf Job conf 1499 * @throws IOException 1500 */ checkpointRecovery(String fileName, JobConf conf)1501 static synchronized void checkpointRecovery(String fileName, JobConf conf) 1502 throws IOException { 1503 Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName); 1504 if (logPath != null) { 1505 LOG.info("Deleting job history file " + logPath.getName()); 1506 LOGDIR_FS.delete(logPath, false); 1507 } 1508 // do the same for the user file too 1509 logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 1510 conf); 1511 if (logPath != null) { 1512 FileSystem fs = logPath.getFileSystem(conf); 1513 fs.delete(logPath, false); 1514 } 1515 } 1516 getSecondaryJobHistoryFile(String filename)1517 static String getSecondaryJobHistoryFile(String filename) 1518 throws IOException { 1519 return encodeJobHistoryFileName( 1520 decodeJobHistoryFileName(filename) + SECONDARY_FILE_SUFFIX); 1521 } 1522 1523 /** Selects one of the two files generated as a part of recovery. 1524 * The thumb rule is that always select the oldest file. 1525 * This call makes sure that only one file is left in the end. 1526 * @param conf job conf 1527 * @param logFilePath Path of the log file 1528 * @throws IOException 1529 */ recoverJobHistoryFile(JobConf conf, Path logFilePath)1530 public synchronized static Path recoverJobHistoryFile(JobConf conf, 1531 Path logFilePath) 1532 throws IOException { 1533 Path ret; 1534 String logFileName = logFilePath.getName(); 1535 String tmpFilename = getSecondaryJobHistoryFile(logFileName); 1536 Path logDir = logFilePath.getParent(); 1537 Path tmpFilePath = new Path(logDir, tmpFilename); 1538 if (LOGDIR_FS.exists(logFilePath)) { 1539 LOG.info(logFileName + " exists!"); 1540 if (LOGDIR_FS.exists(tmpFilePath)) { 1541 LOG.info("Deleting " + tmpFilename 1542 + " and using " + logFileName + " for recovery."); 1543 LOGDIR_FS.delete(tmpFilePath, false); 1544 } 1545 ret = tmpFilePath; 1546 } else { 1547 LOG.info(logFileName + " doesnt exist! Using " 1548 + tmpFilename + " for recovery."); 1549 if (LOGDIR_FS.exists(tmpFilePath)) { 1550 LOG.info("Renaming " + tmpFilename + " to " + logFileName); 1551 LOGDIR_FS.rename(tmpFilePath, logFilePath); 1552 ret = tmpFilePath; 1553 } else { 1554 ret = logFilePath; 1555 } 1556 } 1557 1558 // do the same for the user files too 1559 logFilePath = getJobHistoryLogLocationForUser(logFileName, conf); 1560 if (logFilePath != null) { 1561 FileSystem fs = logFilePath.getFileSystem(conf); 1562 logDir = logFilePath.getParent(); 1563 tmpFilePath = new Path(logDir, tmpFilename); 1564 if (fs.exists(logFilePath)) { 1565 LOG.info(logFileName + " exists!"); 1566 if (fs.exists(tmpFilePath)) { 1567 LOG.info("Deleting " + tmpFilename + " and making " + logFileName 1568 + " as the master history file for user."); 1569 fs.delete(tmpFilePath, false); 1570 } 1571 } else { 1572 LOG.info(logFileName + " doesnt exist! Using " 1573 + tmpFilename + " as the master history file for user."); 1574 if (fs.exists(tmpFilePath)) { 1575 LOG.info("Renaming " + tmpFilename + " to " + logFileName 1576 + " in user directory"); 1577 fs.rename(tmpFilePath, logFilePath); 1578 } 1579 } 1580 } 1581 1582 return ret; 1583 } 1584 1585 /** Finalize the recovery and make one file in the end. 1586 * This invloves renaming the recover file to the master file. 1587 * Note that this api should be invoked only if recovery is involved. 1588 * @param id Job id 1589 * @param conf the job conf 1590 * @throws IOException 1591 */ finalizeRecovery(JobID id, JobConf conf)1592 static synchronized void finalizeRecovery(JobID id, JobConf conf) 1593 throws IOException { 1594 Path tmpLogPath = fileManager.getHistoryFile(id); 1595 if (tmpLogPath == null) { 1596 if (LOG.isDebugEnabled()) { 1597 LOG.debug("No file for job with " + id + " found in cache!"); 1598 } 1599 return; 1600 } 1601 String tmpLogFileName = tmpLogPath.getName(); 1602 1603 // get the primary filename from the cached filename 1604 String masterLogFileName = 1605 getPrimaryFilename(tmpLogFileName, getJobName(conf)); 1606 Path masterLogPath = new Path(tmpLogPath.getParent(), masterLogFileName); 1607 1608 // rename the tmp file to the master file. Note that this should be 1609 // done only when the file is closed and handles are released. 1610 LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName); 1611 LOGDIR_FS.rename(tmpLogPath, masterLogPath); 1612 // update the cache 1613 fileManager.setHistoryFile(id, masterLogPath); 1614 1615 // do the same for the user file too 1616 masterLogPath = 1617 JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName, 1618 conf); 1619 tmpLogPath = 1620 JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, 1621 conf); 1622 if (masterLogPath != null) { 1623 FileSystem fs = masterLogPath.getFileSystem(conf); 1624 if (fs.exists(tmpLogPath)) { 1625 LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName 1626 + " in user directory"); 1627 fs.rename(tmpLogPath, masterLogPath); 1628 } 1629 } 1630 } 1631 1632 /** 1633 * Deletes job data from the local disk. 1634 * For now just deletes the localized copy of job conf 1635 */ cleanupJob(JobID id)1636 static void cleanupJob(JobID id) { 1637 String localJobFilePath = JobInfo.getLocalJobFilePath(id); 1638 File f = new File (localJobFilePath); 1639 LOG.info("Deleting localized job conf at " + f); 1640 if (!f.delete()) { 1641 if (LOG.isDebugEnabled()) { 1642 LOG.debug("Failed to delete file " + f); 1643 } 1644 } 1645 } 1646 1647 /** 1648 * Delete job conf from the history folder. 1649 */ deleteConfFiles()1650 static void deleteConfFiles() throws IOException { 1651 LOG.info("Cleaning up config files from the job history folder"); 1652 FileSystem fs = new Path(LOG_DIR).getFileSystem(jtConf); 1653 FileStatus[] status = fs.listStatus(new Path(LOG_DIR), CONF_FILTER); 1654 for (FileStatus s : status) { 1655 LOG.info("Deleting conf file " + s.getPath()); 1656 fs.delete(s.getPath(), false); 1657 } 1658 } 1659 1660 /** 1661 * Move the completed job into the completed folder. 1662 * This assumes that the jobhistory file is closed and all operations on the 1663 * jobhistory file is complete. 1664 * This *should* be the last call to jobhistory for a given job. 1665 */ markCompleted(JobID id)1666 static void markCompleted(JobID id) throws IOException { 1667 fileManager.moveToDone(id); 1668 } 1669 1670 /** 1671 * Log job submitted event to history. Creates a new file in history 1672 * for the job. if history file creation fails, it disables history 1673 * for all other events. 1674 * @param jobId job id assigned by job tracker. 1675 * @param jobConf job conf of the job 1676 * @param jobConfPath path to job conf xml file in HDFS. 1677 * @param submitTime time when job tracker received the job 1678 * @throws IOException 1679 * @deprecated Use 1680 * {@link #logSubmitted(JobID, JobConf, String, long, boolean)} instead. 1681 */ 1682 @Deprecated logSubmitted(JobID jobId, JobConf jobConf, String jobConfPath, long submitTime)1683 public static void logSubmitted(JobID jobId, JobConf jobConf, 1684 String jobConfPath, long submitTime) 1685 throws IOException { 1686 logSubmitted(jobId, jobConf, jobConfPath, submitTime, true); 1687 } 1688 logSubmitted(JobID jobId, JobConf jobConf, String jobConfPath, long submitTime, boolean restarted)1689 public static void logSubmitted(JobID jobId, JobConf jobConf, 1690 String jobConfPath, long submitTime, 1691 boolean restarted) 1692 throws IOException { 1693 FileSystem fs = null; 1694 String userLogDir = null; 1695 String jobUniqueString = jobId.toString(); 1696 1697 // Get the username and job name to be used in the actual log filename; 1698 // sanity check them too 1699 String jobName = getJobName(jobConf); 1700 String user = getUserName(jobConf); 1701 1702 // get the history filename 1703 String logFileName = null; 1704 if (restarted) { 1705 logFileName = getJobHistoryFileName(jobConf, jobId); 1706 if (logFileName == null) { 1707 logFileName = 1708 encodeJobHistoryFileName(getNewJobHistoryFileName 1709 (jobConf, jobId, submitTime)); 1710 } else { 1711 String parts[] = logFileName.split("_"); 1712 //TODO this is a hack :( 1713 // jobtracker-hostname_jobtracker-identifier_ 1714 String jtUniqueString = parts[0] + "_" + parts[1] + "_"; 1715 jobUniqueString = jobId.toString(); 1716 } 1717 } else { 1718 logFileName = 1719 encodeJobHistoryFileName(getNewJobHistoryFileName 1720 (jobConf, jobId, submitTime)); 1721 } 1722 1723 // setup the history log file for this job 1724 Path logFile = getJobHistoryLogLocation(logFileName); 1725 1726 // find user log directory 1727 Path userLogFile = 1728 getJobHistoryLogLocationForUser(logFileName, jobConf); 1729 PrintWriter writer = null; 1730 try{ 1731 FSDataOutputStream out = null; 1732 if (LOG_DIR != null) { 1733 // create output stream for logging in hadoop.job.history.location 1734 if (restarted) { 1735 logFile = recoverJobHistoryFile(jobConf, logFile); 1736 logFileName = logFile.getName(); 1737 } 1738 1739 int defaultBufferSize = 1740 LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096); 1741 out = LOGDIR_FS.create(logFile, 1742 new FsPermission(HISTORY_FILE_PERMISSION), 1743 true, 1744 defaultBufferSize, 1745 LOGDIR_FS.getDefaultReplication(), 1746 jobHistoryBlockSize, null); 1747 writer = new PrintWriter(out); 1748 fileManager.addWriter(jobId, writer); 1749 1750 // cache it ... 1751 fileManager.setHistoryFile(jobId, logFile); 1752 } 1753 if (userLogFile != null) { 1754 // Get the actual filename as recoverJobHistoryFile() might return 1755 // a different filename 1756 userLogDir = userLogFile.getParent().toString(); 1757 userLogFile = new Path(userLogDir, logFileName); 1758 1759 // create output stream for logging 1760 // in hadoop.job.history.user.location 1761 fs = userLogFile.getFileSystem(jobConf); 1762 1763 out = fs.create(userLogFile, true, 4096); 1764 writer = new PrintWriter(out); 1765 fileManager.addWriter(jobId, writer); 1766 } 1767 1768 ArrayList<PrintWriter> writers = fileManager.getWriters(jobId); 1769 // Log the history meta info 1770 JobHistory.MetaInfoManager.logMetaInfo(writers); 1771 1772 String viewJobACL = "*"; 1773 String modifyJobACL = "*"; 1774 if (aclsEnabled) { 1775 viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " "); 1776 modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " "); 1777 } 1778 //add to writer as well 1779 JobHistory.log(writers, RecordTypes.Job, 1780 new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, 1781 Keys.SUBMIT_TIME, Keys.JOBCONF, 1782 Keys.VIEW_JOB, Keys.MODIFY_JOB, 1783 Keys.JOB_QUEUE, Keys.WORKFLOW_ID, 1784 Keys.WORKFLOW_NAME, Keys.WORKFLOW_NODE_NAME, 1785 Keys.WORKFLOW_ADJACENCIES, 1786 Keys.WORKFLOW_TAGS}, 1787 new String[]{jobId.toString(), jobName, user, 1788 String.valueOf(submitTime) , jobConfPath, 1789 viewJobACL, modifyJobACL, 1790 jobConf.getQueueName(), 1791 jobConf.get(JobConf.WORKFLOW_ID, ""), 1792 jobConf.get(JobConf.WORKFLOW_NAME, ""), 1793 jobConf.get(JobConf.WORKFLOW_NODE_NAME, ""), 1794 getWorkflowAdjacencies(jobConf), 1795 jobConf.get(JobConf.WORKFLOW_TAGS, ""), 1796 }, 1797 jobId 1798 ); 1799 1800 }catch(IOException e){ 1801 LOG.error("Failed creating job history log file for job " + jobId, e); 1802 if (writer != null) { 1803 fileManager.removeWriter(jobId, writer); 1804 } 1805 } 1806 // Always store job conf on local file system 1807 String localJobFilePath = JobInfo.getLocalJobFilePath(jobId); 1808 File localJobFile = new File(localJobFilePath); 1809 FileOutputStream jobOut = null; 1810 try { 1811 jobOut = new FileOutputStream(localJobFile); 1812 jobConf.writeXml(jobOut); 1813 if (LOG.isDebugEnabled()) { 1814 LOG.debug("Job conf for " + jobId + " stored at " 1815 + localJobFile.getAbsolutePath()); 1816 } 1817 } catch (IOException ioe) { 1818 LOG.error("Failed to store job conf on the local filesystem ", ioe); 1819 } finally { 1820 if (jobOut != null) { 1821 try { 1822 jobOut.close(); 1823 } catch (IOException ie) { 1824 LOG.info("Failed to close the job configuration file " 1825 + StringUtils.stringifyException(ie)); 1826 } 1827 } 1828 } 1829 1830 /* Storing the job conf on the log dir */ 1831 Path jobFilePath = null; 1832 if (LOG_DIR != null) { 1833 jobFilePath = new Path(LOG_DIR + File.separator + 1834 jobUniqueString + CONF_FILE_NAME_SUFFIX); 1835 fileManager.setConfFile(jobId, jobFilePath); 1836 } 1837 Path userJobFilePath = null; 1838 if (userLogDir != null) { 1839 userJobFilePath = new Path(userLogDir + File.separator + 1840 jobUniqueString + CONF_FILE_NAME_SUFFIX); 1841 } 1842 FSDataOutputStream jobFileOut = null; 1843 try { 1844 if (LOG_DIR != null) { 1845 int defaultBufferSize = 1846 LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096); 1847 if (!LOGDIR_FS.exists(jobFilePath)) { 1848 jobFileOut = LOGDIR_FS.create(jobFilePath, 1849 new FsPermission(HISTORY_FILE_PERMISSION), 1850 true, 1851 defaultBufferSize, 1852 LOGDIR_FS.getDefaultReplication(), 1853 LOGDIR_FS.getDefaultBlockSize(), null); 1854 jobConf.writeXml(jobFileOut); 1855 jobFileOut.close(); 1856 } 1857 } 1858 if (userLogDir != null) { 1859 fs = new Path(userLogDir).getFileSystem(jobConf); 1860 jobFileOut = fs.create(userJobFilePath); 1861 jobConf.writeXml(jobFileOut); 1862 } 1863 if (LOG.isDebugEnabled()) { 1864 LOG.debug("Job conf for " + jobId + " stored at " 1865 + jobFilePath + "and" + userJobFilePath ); 1866 } 1867 } catch (IOException ioe) { 1868 LOG.error("Failed to store job conf in the log dir", ioe); 1869 } finally { 1870 if (jobFileOut != null) { 1871 try { 1872 jobFileOut.close(); 1873 } catch (IOException ie) { 1874 LOG.info("Failed to close the job configuration file " 1875 + StringUtils.stringifyException(ie)); 1876 } 1877 } 1878 } 1879 } 1880 /** 1881 * Logs launch time of job. 1882 * 1883 * @param jobId job id, assigned by jobtracker. 1884 * @param startTime start time of job. 1885 * @param totalMaps total maps assigned by jobtracker. 1886 * @param totalReduces total reduces. 1887 */ logInited(JobID jobId, long startTime, int totalMaps, int totalReduces)1888 public static void logInited(JobID jobId, long startTime, 1889 int totalMaps, int totalReduces) { 1890 ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 1891 1892 if (null != writer){ 1893 JobHistory.log(writer, RecordTypes.Job, 1894 new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, 1895 Keys.TOTAL_REDUCES, Keys.JOB_STATUS}, 1896 new String[] {jobId.toString(), String.valueOf(startTime), 1897 String.valueOf(totalMaps), 1898 String.valueOf(totalReduces), 1899 Values.PREP.name()}, jobId); 1900 } 1901 } 1902 1903 /** 1904 * Logs the job as RUNNING. 1905 * 1906 * @param jobId job id, assigned by jobtracker. 1907 * @param startTime start time of job. 1908 * @param totalMaps total maps assigned by jobtracker. 1909 * @param totalReduces total reduces. 1910 * @deprecated Use {@link #logInited(JobID, long, int, int)} and 1911 * {@link #logStarted(JobID)} 1912 */ 1913 @Deprecated logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces)1914 public static void logStarted(JobID jobId, long startTime, 1915 int totalMaps, int totalReduces) { 1916 logStarted(jobId); 1917 } 1918 1919 /** 1920 * Logs job as running 1921 * @param jobId job id, assigned by jobtracker. 1922 */ logStarted(JobID jobId)1923 public static void logStarted(JobID jobId){ 1924 ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 1925 1926 if (null != writer){ 1927 JobHistory.log(writer, RecordTypes.Job, 1928 new Keys[] {Keys.JOBID, Keys.JOB_STATUS}, 1929 new String[] {jobId.toString(), 1930 Values.RUNNING.name()}, jobId); 1931 } 1932 } 1933 1934 /** 1935 * Log job finished. closes the job file in history. 1936 * @param jobId job id, assigned by jobtracker. 1937 * @param finishTime finish time of job in ms. 1938 * @param finishedMaps no of maps successfully finished. 1939 * @param finishedReduces no of reduces finished sucessfully. 1940 * @param failedMaps no of failed map tasks. 1941 * @param failedReduces no of failed reduce tasks. 1942 * @param counters the counters from the job 1943 */ logFinished(JobID jobId, long finishTime, int finishedMaps, int finishedReduces, int failedMaps, int failedReduces, Counters mapCounters, Counters reduceCounters, Counters counters)1944 public static void logFinished(JobID jobId, long finishTime, 1945 int finishedMaps, int finishedReduces, 1946 int failedMaps, int failedReduces, 1947 Counters mapCounters, 1948 Counters reduceCounters, 1949 Counters counters) { 1950 // close job file for this job 1951 ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 1952 1953 if (null != writer){ 1954 JobHistory.log(writer, RecordTypes.Job, 1955 new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 1956 Keys.JOB_STATUS, Keys.FINISHED_MAPS, 1957 Keys.FINISHED_REDUCES, 1958 Keys.FAILED_MAPS, Keys.FAILED_REDUCES, 1959 Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS, 1960 Keys.COUNTERS}, 1961 new String[] {jobId.toString(), Long.toString(finishTime), 1962 Values.SUCCESS.name(), 1963 String.valueOf(finishedMaps), 1964 String.valueOf(finishedReduces), 1965 String.valueOf(failedMaps), 1966 String.valueOf(failedReduces), 1967 mapCounters.makeEscapedCompactString(), 1968 reduceCounters.makeEscapedCompactString(), 1969 counters.makeEscapedCompactString()}, jobId); 1970 for (PrintWriter out : writer) { 1971 out.close(); 1972 } 1973 } 1974 Thread historyCleaner = new Thread(new HistoryCleaner()); 1975 historyCleaner.start(); 1976 } 1977 /** 1978 * Logs job failed event. Closes the job history log file. 1979 * @param jobid job id 1980 * @param timestamp time when job failure was detected in ms. 1981 * @param finishedMaps no finished map tasks. 1982 * @param finishedReduces no of finished reduce tasks. 1983 */ logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces, String failReason)1984 public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces, String failReason){ 1985 ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 1986 1987 if (null != writer){ 1988 JobHistory.log(writer, RecordTypes.Job, 1989 new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES, Keys.FAIL_REASON }, 1990 new String[] {jobid.toString(), String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 1991 String.valueOf(finishedReduces), failReason}, jobid); 1992 for (PrintWriter out : writer) { 1993 out.close(); 1994 } 1995 } 1996 } 1997 /** 1998 * Logs job killed event. Closes the job history log file. 1999 * 2000 * @param jobid 2001 * job id 2002 * @param timestamp 2003 * time when job killed was issued in ms. 2004 * @param finishedMaps 2005 * no finished map tasks. 2006 * @param finishedReduces 2007 * no of finished reduce tasks. 2008 */ logKilled(JobID jobid, long timestamp, int finishedMaps, int finishedReduces)2009 public static void logKilled(JobID jobid, long timestamp, int finishedMaps, 2010 int finishedReduces) { 2011 ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 2012 2013 if (null != writer) { 2014 JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID, 2015 Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, 2016 Keys.FINISHED_REDUCES }, new String[] { jobid.toString(), 2017 String.valueOf(timestamp), Values.KILLED.name(), 2018 String.valueOf(finishedMaps), String.valueOf(finishedReduces) }, jobid); 2019 for (PrintWriter out : writer) { 2020 out.close(); 2021 } 2022 } 2023 } 2024 /** 2025 * Log job's priority. 2026 * @param jobid job id 2027 * @param priority Jobs priority 2028 */ logJobPriority(JobID jobid, JobPriority priority)2029 public static void logJobPriority(JobID jobid, JobPriority priority){ 2030 ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 2031 2032 if (null != writer){ 2033 JobHistory.log(writer, RecordTypes.Job, 2034 new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY}, 2035 new String[] {jobid.toString(), priority.toString()}, jobid); 2036 } 2037 } 2038 /** 2039 * Log job's submit-time/launch-time 2040 * @param jobid job id 2041 * @param submitTime job's submit time 2042 * @param launchTime job's launch time 2043 * @param restartCount number of times the job got restarted 2044 * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead. 2045 */ 2046 @Deprecated logJobInfo(JobID jobid, long submitTime, long launchTime, int restartCount)2047 public static void logJobInfo(JobID jobid, long submitTime, long launchTime, 2048 int restartCount){ 2049 logJobInfo(jobid, submitTime, launchTime); 2050 } 2051 logJobInfo(JobID jobid, long submitTime, long launchTime)2052 public static void logJobInfo(JobID jobid, long submitTime, long launchTime) 2053 { 2054 ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 2055 2056 if (null != writer){ 2057 JobHistory.log(writer, RecordTypes.Job, 2058 new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, 2059 Keys.LAUNCH_TIME}, 2060 new String[] {jobid.toString(), 2061 String.valueOf(submitTime), 2062 String.valueOf(launchTime)}, jobid); 2063 } 2064 } 2065 } 2066 2067 /** 2068 * Helper class for logging or reading back events related to Task's start, finish or failure. 2069 * All events logged by this class are logged in a separate file per job in 2070 * job tracker history. These events map to TIPs in jobtracker. 2071 */ 2072 public static class Task extends KeyValuePair{ 2073 private Map <String, TaskAttempt> taskAttempts = new TreeMap<String, TaskAttempt>(); 2074 2075 /** 2076 * Log start time of task (TIP). 2077 * @param taskId task id 2078 * @param taskType MAP or REDUCE 2079 * @param startTime startTime of tip. 2080 */ logStarted(TaskID taskId, String taskType, long startTime, String splitLocations)2081 public static void logStarted(TaskID taskId, String taskType, 2082 long startTime, String splitLocations) { 2083 JobID id = taskId.getJobID(); 2084 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2085 2086 if (null != writer){ 2087 JobHistory.log(writer, RecordTypes.Task, 2088 new Keys[]{Keys.TASKID, Keys.TASK_TYPE , 2089 Keys.START_TIME, Keys.SPLITS}, 2090 new String[]{taskId.toString(), taskType, 2091 String.valueOf(startTime), 2092 splitLocations}, id); 2093 } 2094 } 2095 /** 2096 * Log finish time of task. 2097 * @param taskId task id 2098 * @param taskType MAP or REDUCE 2099 * @param finishTime finish timeof task in ms 2100 */ logFinished(TaskID taskId, String taskType, long finishTime, Counters counters)2101 public static void logFinished(TaskID taskId, String taskType, 2102 long finishTime, Counters counters){ 2103 JobID id = taskId.getJobID(); 2104 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2105 2106 if (null != writer){ 2107 JobHistory.log(writer, RecordTypes.Task, 2108 new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 2109 Keys.TASK_STATUS, Keys.FINISH_TIME, 2110 Keys.COUNTERS}, 2111 new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), 2112 String.valueOf(finishTime), 2113 counters.makeEscapedCompactString()}, id); 2114 } 2115 } 2116 2117 /** 2118 * Update the finish time of task. 2119 * @param taskId task id 2120 * @param finishTime finish time of task in ms 2121 */ logUpdates(TaskID taskId, long finishTime)2122 public static void logUpdates(TaskID taskId, long finishTime){ 2123 JobID id = taskId.getJobID(); 2124 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2125 2126 if (null != writer){ 2127 JobHistory.log(writer, RecordTypes.Task, 2128 new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, 2129 new String[]{ taskId.toString(), 2130 String.valueOf(finishTime)}, id); 2131 } 2132 } 2133 2134 /** 2135 * Log job failed event. 2136 * @param taskId task id 2137 * @param taskType MAP or REDUCE. 2138 * @param time timestamp when job failed detected. 2139 * @param error error message for failure. 2140 */ logFailed(TaskID taskId, String taskType, long time, String error)2141 public static void logFailed(TaskID taskId, String taskType, long time, String error){ 2142 logFailed(taskId, taskType, time, error, null); 2143 } 2144 2145 /** 2146 * @param failedDueToAttempt The attempt that caused the failure, if any 2147 */ logFailed(TaskID taskId, String taskType, long time, String error, TaskAttemptID failedDueToAttempt)2148 public static void logFailed(TaskID taskId, String taskType, long time, 2149 String error, 2150 TaskAttemptID failedDueToAttempt){ 2151 JobID id = taskId.getJobID(); 2152 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2153 2154 if (null != writer){ 2155 String failedAttempt = failedDueToAttempt == null 2156 ? "" 2157 : failedDueToAttempt.toString(); 2158 JobHistory.log(writer, RecordTypes.Task, 2159 new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 2160 Keys.TASK_STATUS, Keys.FINISH_TIME, 2161 Keys.ERROR, Keys.TASK_ATTEMPT_ID}, 2162 new String[]{ taskId.toString(), taskType, 2163 Values.FAILED.name(), 2164 String.valueOf(time) , error, 2165 failedAttempt}, id); 2166 } 2167 } 2168 /** 2169 * Returns all task attempts for this task. <task attempt id - TaskAttempt> 2170 */ getTaskAttempts()2171 public Map<String, TaskAttempt> getTaskAttempts(){ 2172 return this.taskAttempts; 2173 } 2174 } 2175 2176 /** 2177 * Base class for Map and Reduce TaskAttempts. 2178 */ 2179 public static class TaskAttempt extends Task{} 2180 2181 /** 2182 * Helper class for logging or reading back events related to start, finish or failure of 2183 * a Map Attempt on a node. 2184 */ 2185 public static class MapAttempt extends TaskAttempt{ 2186 /** 2187 * Log start time of this map task attempt. 2188 * @param taskAttemptId task attempt id 2189 * @param startTime start time of task attempt as reported by task tracker. 2190 * @param hostName host name of the task attempt. 2191 * @deprecated Use 2192 * {@link #logStarted(TaskAttemptID, long, String, int, String)} 2193 */ 2194 @Deprecated logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName)2195 public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){ 2196 logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name()); 2197 } 2198 2199 @Deprecated logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType)2200 public static void logStarted(TaskAttemptID taskAttemptId, long startTime, 2201 String trackerName, int httpPort, String taskType) { 2202 logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType, 2203 Locality.OFF_SWITCH, Avataar.VIRGIN); 2204 } 2205 2206 /** 2207 * Log start time of this map task attempt. 2208 * 2209 * @param taskAttemptId task attempt id 2210 * @param startTime start time of task attempt as reported by task tracker. 2211 * @param trackerName name of the tracker executing the task attempt. 2212 * @param httpPort http port of the task tracker executing the task attempt 2213 * @param taskType Whether the attempt is cleanup or setup or map 2214 * @param locality the data locality of the task attempt 2215 * @param Avataar the avataar of the task attempt 2216 */ logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType, Locality locality, Avataar avataar)2217 public static void logStarted(TaskAttemptID taskAttemptId, long startTime, 2218 String trackerName, int httpPort, 2219 String taskType, 2220 Locality locality, Avataar avataar) { 2221 JobID id = taskAttemptId.getJobID(); 2222 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2223 2224 if (null != writer){ 2225 JobHistory.log(writer, RecordTypes.MapAttempt, 2226 new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 2227 Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 2228 Keys.TRACKER_NAME, Keys.HTTP_PORT, 2229 Keys.LOCALITY, Keys.AVATAAR}, 2230 new String[]{taskType, 2231 taskAttemptId.getTaskID().toString(), 2232 taskAttemptId.toString(), 2233 String.valueOf(startTime), trackerName, 2234 httpPort == -1 ? "" : String.valueOf(httpPort), 2235 locality.toString(), avataar.toString()}, 2236 id 2237 ); 2238 } 2239 } 2240 2241 /** 2242 * Log finish time of map task attempt. 2243 * @param taskAttemptId task attempt id 2244 * @param finishTime finish time 2245 * @param hostName host name 2246 * @deprecated Use 2247 * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)} 2248 */ 2249 @Deprecated logFinished(TaskAttemptID taskAttemptId, long finishTime, String hostName)2250 public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 2251 String hostName){ 2252 logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 2253 new Counters()); 2254 } 2255 2256 /** 2257 * Log finish time of map task attempt. 2258 * 2259 * @param taskAttemptId task attempt id 2260 * @param finishTime finish time 2261 * @param hostName host name 2262 * @param taskType Whether the attempt is cleanup or setup or map 2263 * @param stateString state string of the task attempt 2264 * @param counter counters of the task attempt 2265 */ logFinished(TaskAttemptID taskAttemptId, long finishTime, String hostName, String taskType, String stateString, Counters counter)2266 public static void logFinished(TaskAttemptID taskAttemptId, 2267 long finishTime, 2268 String hostName, 2269 String taskType, 2270 String stateString, 2271 Counters counter) { 2272 JobID id = taskAttemptId.getJobID(); 2273 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2274 2275 if (null != writer){ 2276 JobHistory.log(writer, RecordTypes.MapAttempt, 2277 new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 2278 Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 2279 Keys.FINISH_TIME, Keys.HOSTNAME, 2280 Keys.STATE_STRING, Keys.COUNTERS}, 2281 new String[]{taskType, 2282 taskAttemptId.getTaskID().toString(), 2283 taskAttemptId.toString(), 2284 Values.SUCCESS.name(), 2285 String.valueOf(finishTime), hostName, 2286 stateString, 2287 counter.makeEscapedCompactString()}, id); 2288 } 2289 } 2290 2291 /** 2292 * Log task attempt failed event. 2293 * @param taskAttemptId task attempt id 2294 * @param timestamp timestamp 2295 * @param hostName hostname of this task attempt. 2296 * @param error error message if any for this task attempt. 2297 * @deprecated Use 2298 * {@link #logFailed(TaskAttemptID, long, String, String, String)} 2299 */ 2300 @Deprecated logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2301 public static void logFailed(TaskAttemptID taskAttemptId, 2302 long timestamp, String hostName, 2303 String error) { 2304 logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name()); 2305 } 2306 2307 /** 2308 * Log task attempt failed event. 2309 * 2310 * @param taskAttemptId task attempt id 2311 * @param timestamp timestamp 2312 * @param hostName hostname of this task attempt. 2313 * @param error error message if any for this task attempt. 2314 * @param taskType Whether the attempt is cleanup or setup or map 2315 */ logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2316 public static void logFailed(TaskAttemptID taskAttemptId, 2317 long timestamp, String hostName, 2318 String error, String taskType) { 2319 JobID id = taskAttemptId.getJobID(); 2320 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2321 2322 if (null != writer){ 2323 JobHistory.log(writer, RecordTypes.MapAttempt, 2324 new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 2325 Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 2326 Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR}, 2327 new String[]{ taskType, 2328 taskAttemptId.getTaskID().toString(), 2329 taskAttemptId.toString(), 2330 Values.FAILED.name(), 2331 String.valueOf(timestamp), 2332 hostName, error}, id); 2333 } 2334 } 2335 2336 /** 2337 * Log task attempt killed event. 2338 * @param taskAttemptId task attempt id 2339 * @param timestamp timestamp 2340 * @param hostName hostname of this task attempt. 2341 * @param error error message if any for this task attempt. 2342 * @deprecated Use 2343 * {@link #logKilled(TaskAttemptID, long, String, String, String)} 2344 */ 2345 @Deprecated logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2346 public static void logKilled(TaskAttemptID taskAttemptId, 2347 long timestamp, String hostName, String error){ 2348 logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name()); 2349 } 2350 2351 /** 2352 * Log task attempt killed event. 2353 * 2354 * @param taskAttemptId task attempt id 2355 * @param timestamp timestamp 2356 * @param hostName hostname of this task attempt. 2357 * @param error error message if any for this task attempt. 2358 * @param taskType Whether the attempt is cleanup or setup or map 2359 */ logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2360 public static void logKilled(TaskAttemptID taskAttemptId, 2361 long timestamp, String hostName, 2362 String error, String taskType) { 2363 JobID id = taskAttemptId.getJobID(); 2364 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2365 2366 if (null != writer){ 2367 JobHistory.log(writer, RecordTypes.MapAttempt, 2368 new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 2369 Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 2370 Keys.FINISH_TIME, Keys.HOSTNAME, 2371 Keys.ERROR}, 2372 new String[]{ taskType, 2373 taskAttemptId.getTaskID().toString(), 2374 taskAttemptId.toString(), 2375 Values.KILLED.name(), 2376 String.valueOf(timestamp), 2377 hostName, error}, id); 2378 } 2379 } 2380 } 2381 /** 2382 * Helper class for logging or reading back events related to start, finish or failure of 2383 * a Map Attempt on a node. 2384 */ 2385 public static class ReduceAttempt extends TaskAttempt{ 2386 /** 2387 * Log start time of Reduce task attempt. 2388 * @param taskAttemptId task attempt id 2389 * @param startTime start time 2390 * @param hostName host name 2391 * @deprecated Use 2392 * {@link #logStarted(TaskAttemptID, long, String, int, String)} 2393 */ 2394 @Deprecated logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName)2395 public static void logStarted(TaskAttemptID taskAttemptId, 2396 long startTime, String hostName){ 2397 logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name()); 2398 } 2399 2400 @Deprecated logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType)2401 public static void logStarted(TaskAttemptID taskAttemptId, 2402 long startTime, String trackerName, 2403 int httpPort, 2404 String taskType) { 2405 logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType, 2406 Locality.OFF_SWITCH, Avataar.VIRGIN); 2407 } 2408 /** 2409 * Log start time of Reduce task attempt. 2410 * 2411 * @param taskAttemptId task attempt id 2412 * @param startTime start time 2413 * @param trackerName tracker name 2414 * @param httpPort the http port of the tracker executing the task attempt 2415 * @param taskType Whether the attempt is cleanup or setup or reduce 2416 * @param locality the data locality of the task attempt 2417 * @param Avataar the avataar of the task attempt 2418 */ logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType, Locality locality, Avataar avataar)2419 public static void logStarted(TaskAttemptID taskAttemptId, 2420 long startTime, String trackerName, 2421 int httpPort, 2422 String taskType, 2423 Locality locality, Avataar avataar) { 2424 JobID id = taskAttemptId.getJobID(); 2425 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2426 2427 if (null != writer){ 2428 JobHistory.log(writer, RecordTypes.ReduceAttempt, 2429 new Keys[] {Keys.TASK_TYPE, Keys.TASKID, 2430 Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 2431 Keys.TRACKER_NAME, Keys.HTTP_PORT, 2432 Keys.LOCALITY, Keys.AVATAAR}, 2433 new String[]{taskType, 2434 taskAttemptId.getTaskID().toString(), 2435 taskAttemptId.toString(), 2436 String.valueOf(startTime), trackerName, 2437 httpPort == -1 ? "" : 2438 String.valueOf(httpPort), 2439 locality.toString(), avataar.toString()}, 2440 id); 2441 } 2442 } 2443 2444 /** 2445 * Log finished event of this task. 2446 * @param taskAttemptId task attempt id 2447 * @param shuffleFinished shuffle finish time 2448 * @param sortFinished sort finish time 2449 * @param finishTime finish time of task 2450 * @param hostName host name where task attempt executed 2451 * @deprecated Use 2452 * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)} 2453 */ 2454 @Deprecated logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, long sortFinished, long finishTime, String hostName)2455 public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 2456 long sortFinished, long finishTime, 2457 String hostName){ 2458 logFinished(taskAttemptId, shuffleFinished, sortFinished, 2459 finishTime, hostName, Values.REDUCE.name(), 2460 "", new Counters()); 2461 } 2462 2463 /** 2464 * Log finished event of this task. 2465 * 2466 * @param taskAttemptId task attempt id 2467 * @param shuffleFinished shuffle finish time 2468 * @param sortFinished sort finish time 2469 * @param finishTime finish time of task 2470 * @param hostName host name where task attempt executed 2471 * @param taskType Whether the attempt is cleanup or setup or reduce 2472 * @param stateString the state string of the attempt 2473 * @param counter counters of the attempt 2474 */ logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, long sortFinished, long finishTime, String hostName, String taskType, String stateString, Counters counter)2475 public static void logFinished(TaskAttemptID taskAttemptId, 2476 long shuffleFinished, 2477 long sortFinished, long finishTime, 2478 String hostName, String taskType, 2479 String stateString, Counters counter) { 2480 JobID id = taskAttemptId.getJobID(); 2481 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2482 2483 if (null != writer){ 2484 JobHistory.log(writer, RecordTypes.ReduceAttempt, 2485 new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 2486 Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 2487 Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, 2488 Keys.FINISH_TIME, Keys.HOSTNAME, 2489 Keys.STATE_STRING, Keys.COUNTERS}, 2490 new String[]{taskType, 2491 taskAttemptId.getTaskID().toString(), 2492 taskAttemptId.toString(), 2493 Values.SUCCESS.name(), 2494 String.valueOf(shuffleFinished), 2495 String.valueOf(sortFinished), 2496 String.valueOf(finishTime), hostName, 2497 stateString, 2498 counter.makeEscapedCompactString()}, id); 2499 } 2500 } 2501 2502 /** 2503 * Log failed reduce task attempt. 2504 * @param taskAttemptId task attempt id 2505 * @param timestamp time stamp when task failed 2506 * @param hostName host name of the task attempt. 2507 * @param error error message of the task. 2508 * @deprecated Use 2509 * {@link #logFailed(TaskAttemptID, long, String, String, String)} 2510 */ 2511 @Deprecated logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2512 public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 2513 String hostName, String error){ 2514 logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name()); 2515 } 2516 2517 /** 2518 * Log failed reduce task attempt. 2519 * 2520 * @param taskAttemptId task attempt id 2521 * @param timestamp time stamp when task failed 2522 * @param hostName host name of the task attempt. 2523 * @param error error message of the task. 2524 * @param taskType Whether the attempt is cleanup or setup or reduce 2525 */ logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2526 public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 2527 String hostName, String error, 2528 String taskType) { 2529 JobID id = taskAttemptId.getJobID(); 2530 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2531 2532 if (null != writer){ 2533 JobHistory.log(writer, RecordTypes.ReduceAttempt, 2534 new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 2535 Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 2536 Keys.FINISH_TIME, Keys.HOSTNAME, 2537 Keys.ERROR }, 2538 new String[]{ taskType, 2539 taskAttemptId.getTaskID().toString(), 2540 taskAttemptId.toString(), 2541 Values.FAILED.name(), 2542 String.valueOf(timestamp), hostName, error }, id); 2543 } 2544 } 2545 2546 /** 2547 * Log killed reduce task attempt. 2548 * @param taskAttemptId task attempt id 2549 * @param timestamp time stamp when task failed 2550 * @param hostName host name of the task attempt. 2551 * @param error error message of the task. 2552 * @deprecated Use 2553 * {@link #logKilled(TaskAttemptID, long, String, String, String)} 2554 */ 2555 @Deprecated logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2556 public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 2557 String hostName, String error) { 2558 logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name()); 2559 } 2560 2561 /** 2562 * Log killed reduce task attempt. 2563 * 2564 * @param taskAttemptId task attempt id 2565 * @param timestamp time stamp when task failed 2566 * @param hostName host name of the task attempt. 2567 * @param error error message of the task. 2568 * @param taskType Whether the attempt is cleanup or setup or reduce 2569 */ logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2570 public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 2571 String hostName, String error, 2572 String taskType) { 2573 JobID id = taskAttemptId.getJobID(); 2574 ArrayList<PrintWriter> writer = fileManager.getWriters(id); 2575 2576 if (null != writer){ 2577 JobHistory.log(writer, RecordTypes.ReduceAttempt, 2578 new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 2579 Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 2580 Keys.FINISH_TIME, Keys.HOSTNAME, 2581 Keys.ERROR }, 2582 new String[]{ taskType, 2583 taskAttemptId.getTaskID().toString(), 2584 taskAttemptId.toString(), 2585 Values.KILLED.name(), 2586 String.valueOf(timestamp), 2587 hostName, error }, id); 2588 } 2589 } 2590 } 2591 2592 /** 2593 * Callback interface for reading back log events from JobHistory. This interface 2594 * should be implemented and passed to JobHistory.parseHistory() 2595 * 2596 */ 2597 public static interface Listener{ 2598 /** 2599 * Callback method for history parser. 2600 * @param recType type of record, which is the first entry in the line. 2601 * @param values a map of key-value pairs as thry appear in history. 2602 * @throws IOException 2603 */ handle(RecordTypes recType, Map<Keys, String> values)2604 public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException; 2605 } 2606 2607 /** 2608 * Returns the time in milliseconds, truncated to the day. 2609 */ directoryTime(String year, String month, String day)2610 static long directoryTime(String year, String month, String day) { 2611 Calendar result = Calendar.getInstance(); 2612 result.clear(); 2613 2614 result.set(Calendar.YEAR, Integer.parseInt(year)); 2615 2616 // months are 0-based in Calendar, but people will expect January 2617 // to be month #1 . Therefore the number is bumped before we make the 2618 // directory name and must be debumped to seek the time. 2619 result.set(Calendar.MONTH, Integer.parseInt(month) - 1); 2620 2621 result.set(Calendar.DAY_OF_MONTH, Integer.parseInt(day)); 2622 2623 // truncate to day granularity 2624 long timeInMillis = result.getTimeInMillis(); 2625 return timeInMillis; 2626 } 2627 2628 /** 2629 * Delete history files older than one month. Update master index and remove all 2630 * jobs older than one month. Also if a job tracker has no jobs in last one month 2631 * remove reference to the job tracker. 2632 * 2633 */ 2634 public static class HistoryCleaner implements Runnable { 2635 static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L; 2636 static final long DEFAULT_HISTORY_MAX_AGE = 30 * ONE_DAY_IN_MS; 2637 static final long DEFAULT_CLEANUP_FREQUENCY = ONE_DAY_IN_MS; 2638 static long cleanupFrequency = DEFAULT_CLEANUP_FREQUENCY; 2639 static long maxAgeOfHistoryFiles = DEFAULT_HISTORY_MAX_AGE; 2640 private long now; 2641 private static final AtomicBoolean isRunning = new AtomicBoolean(false); 2642 private static long lastRan = 0; 2643 2644 private static Pattern parseDirectory 2645 = Pattern.compile(".+/([0-9]+)/([0-9]+)/([0-9]+)/[0-9]+/?"); 2646 2647 /** 2648 * Cleans up history data. 2649 */ run()2650 public void run() { 2651 if (isRunning.getAndSet(true)) { 2652 return; 2653 } 2654 now = System.currentTimeMillis(); 2655 // clean history only once a day at max 2656 if (lastRan != 0 && (now - lastRan) < cleanupFrequency) { 2657 isRunning.set(false); 2658 return; 2659 } 2660 lastRan = now; 2661 clean(now); 2662 } 2663 clean(long now)2664 public void clean(long now) { 2665 Set<String> deletedPathnames = new HashSet<String>(); 2666 2667 // XXXXX debug code 2668 boolean printedOneDeletee = false; 2669 boolean printedOneMovedFile = false; 2670 2671 try { 2672 Path[] datedDirectories 2673 = FileUtil.stat2Paths(localGlobber(DONEDIR_FS, DONE, 2674 DONE_BEFORE_SERIAL_TAIL, null)); 2675 2676 // any file with a timestamp earlier than cutoff should be deleted 2677 long cutoff = now - maxAgeOfHistoryFiles; 2678 Calendar cutoffDay = Calendar.getInstance(); 2679 cutoffDay.setTimeInMillis(cutoff); 2680 cutoffDay.set(Calendar.HOUR_OF_DAY, 0); 2681 cutoffDay.set(Calendar.MINUTE, 0); 2682 cutoffDay.set(Calendar.SECOND, 0); 2683 cutoffDay.set(Calendar.MILLISECOND, 0); 2684 2685 // find directories older than the maximum age 2686 for (int i = 0; i < datedDirectories.length; ++i) { 2687 String thisDir = datedDirectories[i].toString(); 2688 Matcher pathMatcher = parseDirectory.matcher(thisDir); 2689 2690 if (pathMatcher.matches()) { 2691 long dirDay = directoryTime(pathMatcher.group(1), 2692 pathMatcher.group(2), 2693 pathMatcher.group(3)); 2694 2695 if (LOG.isDebugEnabled()) { 2696 LOG.debug("HistoryCleaner.run just parsed " + thisDir 2697 + " as year/month/day = " + pathMatcher.group(1) + "/" 2698 + pathMatcher.group(2) + "/" + pathMatcher.group(3)); 2699 } 2700 2701 if (dirDay <= cutoffDay.getTimeInMillis()) { 2702 if (LOG.isDebugEnabled()) { 2703 Calendar nnow = Calendar.getInstance(); 2704 nnow.setTimeInMillis(now); 2705 Calendar then = Calendar.getInstance(); 2706 then.setTimeInMillis(dirDay); 2707 2708 LOG.debug("HistoryCleaner.run directory: " + thisDir 2709 + " because its time is " + then + " but it's now " + nnow); 2710 } 2711 } 2712 2713 // if dirDay is cutoffDay, some files may be old enough and others not 2714 if (dirDay == cutoffDay.getTimeInMillis()) { 2715 // remove old enough files in the directory 2716 FileStatus[] possibleDeletees = DONEDIR_FS.listStatus(datedDirectories[i]); 2717 2718 for (int j = 0; j < possibleDeletees.length; ++j) { 2719 if (possibleDeletees[j].getModificationTime() < now - 2720 maxAgeOfHistoryFiles) { 2721 Path deletee = possibleDeletees[j].getPath(); 2722 if (LOG.isDebugEnabled() && !printedOneDeletee) { 2723 LOG.debug("HistoryCleaner.run deletee: " 2724 + deletee.toString()); 2725 printedOneDeletee = true; 2726 } 2727 2728 DONEDIR_FS.delete(deletee); 2729 deletedPathnames.add(deletee.toString()); 2730 } 2731 } 2732 } 2733 2734 // if the directory is older than cutoffDay, we can flat out 2735 // delete it because all the files in it are old enough 2736 if (dirDay < cutoffDay.getTimeInMillis()) { 2737 synchronized (existingDoneSubdirs) { 2738 if (!existingDoneSubdirs.contains(datedDirectories[i])) { 2739 LOG.warn("JobHistory: existingDoneSubdirs doesn't contain " 2740 + datedDirectories[i] + ", but should."); 2741 } 2742 DONEDIR_FS.delete(datedDirectories[i], true); 2743 existingDoneSubdirs.remove(datedDirectories[i]); 2744 } 2745 } 2746 } 2747 } 2748 2749 //walking over the map to purge entries from jobHistoryFileMap 2750 synchronized (jobHistoryFileMap) { 2751 Iterator<Entry<JobID, MovedFileInfo>> it = 2752 jobHistoryFileMap.entrySet().iterator(); 2753 while (it.hasNext()) { 2754 MovedFileInfo info = it.next().getValue(); 2755 2756 if (LOG.isDebugEnabled() && !printedOneMovedFile) { 2757 LOG.debug("HistoryCleaner.run a moved file: " + info.historyFile); 2758 printedOneMovedFile = true; 2759 } 2760 2761 if (deletedPathnames.contains(info.historyFile)) { 2762 it.remove(); 2763 } 2764 } 2765 } 2766 } catch (IOException ie) { 2767 LOG.info("Error cleaning up history directory" + 2768 StringUtils.stringifyException(ie)); 2769 } finally { 2770 isRunning.set(false); 2771 } 2772 } 2773 getLastRan()2774 static long getLastRan() { 2775 return lastRan; 2776 } 2777 } 2778 2779 /** 2780 * Return the TaskLogsUrl of a particular TaskAttempt 2781 * 2782 * @param attempt 2783 * @return the taskLogsUrl. null if http-port or tracker-name or 2784 * task-attempt-id are unavailable. 2785 */ getTaskLogsUrl(JobHistory.TaskAttempt attempt)2786 public static String getTaskLogsUrl(JobHistory.TaskAttempt attempt) { 2787 if (attempt.get(Keys.HTTP_PORT).equals("") 2788 || attempt.get(Keys.TRACKER_NAME).equals("") 2789 || attempt.get(Keys.TASK_ATTEMPT_ID).equals("")) { 2790 return null; 2791 } 2792 2793 String taskTrackerName = 2794 JobInProgress.convertTrackerNameToHostName( 2795 attempt.get(Keys.TRACKER_NAME)); 2796 return TaskLogServlet.getTaskLogUrl(taskTrackerName, attempt 2797 .get(Keys.HTTP_PORT), attempt.get(Keys.TASK_ATTEMPT_ID)); 2798 } 2799 } 2800