1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.PrintWriter;
27 import java.io.UnsupportedEncodingException;
28 import java.net.URLDecoder;
29 import java.net.URLEncoder;
30 import java.util.ArrayList;
31 import java.util.Calendar;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.LinkedHashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.SortedMap;
41 import java.util.TreeMap;
42 import java.util.Map.Entry;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.ThreadPoolExecutor;
47 import java.util.concurrent.TimeUnit;
48 import java.util.regex.Matcher;
49 import java.util.regex.Pattern;
50 
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.fs.FSDataInputStream;
55 import org.apache.hadoop.fs.FSDataOutputStream;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.FileUtil;
59 import org.apache.hadoop.fs.Path;
60 import org.apache.hadoop.fs.PathFilter;
61 import org.apache.hadoop.fs.permission.FsAction;
62 import org.apache.hadoop.fs.permission.FsPermission;
63 import org.apache.hadoop.mapreduce.JobACL;
64 import org.apache.hadoop.security.authorize.AccessControlList;
65 import org.apache.hadoop.util.StringUtils;
66 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
67 
68 /**
69  * Provides methods for writing to and reading from job history.
70  * Job History works in an append mode, JobHistory and its inner classes provide methods
71  * to log job events.
72  *
73  * JobHistory is split into multiple files, format of each file is plain text where each line
74  * is of the format [type (key=value)*], where type identifies the type of the record.
75  * Type maps to UID of one of the inner classes of this class.
76  *
77  * Job history is maintained in a master index which contains star/stop times of all jobs with
78  * a few other job level properties. Apart from this each job's history is maintained in a seperate history
79  * file. name of job history files follows the format jobtrackerId_jobid
80  *
81  * For parsing the job history it supports a listener based interface where each line is parsed
82  * and passed to listener. The listener can create an object model of history or look for specific
83  * events and discard rest of the history.
84  *
85  * CHANGE LOG :
86  * Version 0 : The history has the following format :
87  *             TAG KEY1="VALUE1" KEY2="VALUE2" and so on.
88                TAG can be Job, Task, MapAttempt or ReduceAttempt.
89                Note that a '"' is the line delimiter.
90  * Version 1 : Changes the line delimiter to '.'
91                Values are now escaped for unambiguous parsing.
92                Added the Meta tag to store version info.
93  */
94 public class JobHistory {
95 
96   static final long VERSION = 1L;
97 
98   static final int DONE_DIRECTORY_FORMAT_VERSION = 1;
99 
100   static final String DONE_DIRECTORY_FORMAT_DIRNAME
101     = "version-" + DONE_DIRECTORY_FORMAT_VERSION;
102 
103   static final String UNDERSCORE_ESCAPE = "%5F";
104 
105   public static final Log LOG = LogFactory.getLog(JobHistory.class);
106   private static final char DELIMITER = ' ';
107   static final char LINE_DELIMITER_CHAR = '.';
108   static final char[] charsToEscape = new char[] {'"', '=',
109                                                 LINE_DELIMITER_CHAR};
110   static final String DIGITS = "[0-9]+";
111 
112   static final String KEY = "(\\w+)";
113   // value is any character other than quote, but escaped quotes can be there
114   static final String VALUE = "[^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+";
115 
116   static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
117 
118   static final int MAXIMUM_DATESTRING_COUNT = 200000;
119 
120   public static final int JOB_NAME_TRIM_LENGTH = 50;
121   private static String JOBTRACKER_UNIQUE_STRING = null;
122   private static String LOG_DIR = null;
123   private static final String SECONDARY_FILE_SUFFIX = ".recover";
124   private static long jobHistoryBlockSize = 0;
125   private static String jobtrackerHostname;
126   private static JobHistoryFilesManager fileManager = null;
127   final static FsPermission HISTORY_DIR_PERMISSION =
128     FsPermission.createImmutable((short) 0755); // rwxr-xr-x
129   final static FsPermission HISTORY_FILE_PERMISSION =
130     FsPermission.createImmutable((short) 0744); // rwxr--r--
131   private static FileSystem LOGDIR_FS; // log dir filesystem
132   protected static FileSystem DONEDIR_FS; // Done dir filesystem
133   private static JobConf jtConf;
134   protected static Path DONE = null; // folder for completed jobs
135   private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail();
136   private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*";
137   private static boolean aclsEnabled = false;
138 
139   static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
140 
141   private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
142   private static int SERIAL_NUMBER_LOW_DIGITS;
143 
144   private static String SERIAL_NUMBER_FORMAT;
145 
146   private static final Set<Path> existingDoneSubdirs = new HashSet<Path>();
147 
148   private static final SortedMap<Integer, String> idToDateString
149     = new TreeMap<Integer, String>();
150 
151   /**
152    * A filter for conf files
153    */
154   private static final PathFilter CONF_FILTER = new PathFilter() {
155     public boolean accept(Path path) {
156       return path.getName().endsWith(CONF_FILE_NAME_SUFFIX);
157     }
158   };
159 
160   private static final Map<JobID, MovedFileInfo> jobHistoryFileMap =
161     Collections.<JobID,MovedFileInfo>synchronizedMap(
162         new LinkedHashMap<JobID, MovedFileInfo>());
163 
164   private static final SortedMap<Long, String>jobToDirectoryMap
165     = new TreeMap<Long, String>();
166 
167   // JobHistory filename regex
168   public static final Pattern JOBHISTORY_FILENAME_REGEX =
169     Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+");
170   // JobHistory conf-filename regex
171   public static final Pattern CONF_FILENAME_REGEX =
172     Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml");
173 
174   private static class MovedFileInfo {
175     private final String historyFile;
176     private final long timestamp;
MovedFileInfo(String historyFile, long timestamp)177     public MovedFileInfo(String historyFile, long timestamp) {
178       this.historyFile = historyFile;
179       this.timestamp = timestamp;
180     }
181   }
182 
183   /**
184    * Given the job id, return the history file path from the cache
185    */
getHistoryFilePath(JobID jobId)186   public static String getHistoryFilePath(JobID jobId) {
187     MovedFileInfo info = jobHistoryFileMap.get(jobId);
188     if (info == null) {
189       return null;
190     }
191     return info.historyFile;
192   }
193 
194   /**
195    * A class that manages all the files related to a job. For now
196    *   - writers : list of open files
197    *   - job history filename
198    *   - job conf filename
199    */
200   private static class JobHistoryFilesManager {
201     // a private (virtual) folder for all the files related to a running job
202     private static class FilesHolder {
203       ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
204       Path historyFilename; // path of job history file
205       Path confFilename; // path of job's conf
206     }
207 
208     private ThreadPoolExecutor executor = null;
209     private final Configuration conf;
210     private final JobTracker jobTracker;
211 
212    // cache from job-key to files associated with it.
213     private Map<JobID, FilesHolder> fileCache =
214       new ConcurrentHashMap<JobID, FilesHolder>();
215 
JobHistoryFilesManager(Configuration conf, JobTracker jobTracker)216     JobHistoryFilesManager(Configuration conf, JobTracker jobTracker)
217         throws IOException {
218       this.conf = conf;
219       this.jobTracker = jobTracker;
220     }
221 
222 
start()223     void start() {
224       executor = new ThreadPoolExecutor(5, 5, 1,
225           TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
226       // make core threads to terminate if there has been no work
227       // for the keppalive period.
228       executor.allowCoreThreadTimeOut(true);
229     }
230 
getFileHolder(JobID id)231     private FilesHolder getFileHolder(JobID id) {
232       FilesHolder holder = fileCache.get(id);
233      if (holder == null) {
234          holder = new FilesHolder();
235          fileCache.put(id, holder);
236       }
237       return holder;
238     }
239 
addWriter(JobID id, PrintWriter writer)240     void addWriter(JobID id, PrintWriter writer) {
241       FilesHolder holder = getFileHolder(id);
242       holder.writers.add(writer);
243     }
244 
setHistoryFile(JobID id, Path file)245     void setHistoryFile(JobID id, Path file) {
246       FilesHolder holder = getFileHolder(id);
247       holder.historyFilename = file;
248     }
249 
setConfFile(JobID id, Path file)250     void setConfFile(JobID id, Path file) {
251       FilesHolder holder = getFileHolder(id);
252       holder.confFilename = file;
253     }
254 
getWriters(JobID id)255     ArrayList<PrintWriter> getWriters(JobID id) {
256       FilesHolder holder = fileCache.get(id);
257       return holder == null ? null : holder.writers;
258     }
259 
getHistoryFile(JobID id)260     Path getHistoryFile(JobID id) {
261       FilesHolder holder = fileCache.get(id);
262       return holder == null ? null : holder.historyFilename;
263     }
264 
getConfFileWriters(JobID id)265     Path getConfFileWriters(JobID id) {
266       FilesHolder holder = fileCache.get(id);
267       return holder == null ? null : holder.confFilename;
268     }
269 
purgeJob(JobID id)270     void purgeJob(JobID id) {
271       fileCache.remove(id);
272     }
273 
moveToDone(final JobID id)274     void moveToDone(final JobID id) {
275       final List<Path> paths = new ArrayList<Path>();
276       final Path historyFile = fileManager.getHistoryFile(id);
277       if (historyFile == null) {
278         LOG.info("No file for job-history with " + id + " found in cache!");
279       } else {
280         paths.add(historyFile);
281       }
282 
283       final Path confPath = fileManager.getConfFileWriters(id);
284       if (confPath == null) {
285         LOG.info("No file for jobconf with " + id + " found in cache!");
286       } else {
287         paths.add(confPath);
288       }
289 
290       executor.execute(new Runnable() {
291 
292         public void run() {
293           long millisecondTime = System.currentTimeMillis();
294 
295           Path resultDir = canonicalHistoryLogPath(id, millisecondTime);
296 
297           //move the files to DONE canonical subfolder
298           try {
299             for (Path path : paths) {
300               //check if path exists, in case of retries it may not exist
301               if (LOGDIR_FS.exists(path)) {
302                 maybeMakeSubdirectory(id, millisecondTime);
303 
304                 LOG.info("Moving " + path.toString() + " to " +
305                     resultDir.toString());
306                 DONEDIR_FS.moveFromLocalFile(path, resultDir);
307                 DONEDIR_FS.setPermission(new Path(resultDir, path.getName()),
308                     new FsPermission(HISTORY_FILE_PERMISSION));
309               }
310             }
311           } catch (Throwable e) {
312             LOG.error("Unable to move history file to DONE canonical subfolder.", e);
313           }
314           String historyFileDonePath = null;
315           if (historyFile != null) {
316             historyFileDonePath = new Path(resultDir,
317                 historyFile.getName()).toString();
318           }
319 
320           jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
321                                                       millisecondTime));
322           jobTracker.historyFileCopied(id, historyFileDonePath);
323 
324           //purge the job from the cache
325           fileManager.purgeJob(id);
326         }
327 
328       });
329     }
330 
removeWriter(JobID jobId, PrintWriter writer)331     void removeWriter(JobID jobId, PrintWriter writer) {
332       fileManager.getWriters(jobId).remove(writer);
333     }
334   }
335 
336   // several methods for manipulating the subdirectories of the DONE
337   // directory
338 
jobSerialNumber(JobID id)339   private static int jobSerialNumber(JobID id) {
340     return id.getId();
341   }
342 
serialNumberDirectoryComponent(JobID id)343   private static String serialNumberDirectoryComponent(JobID id) {
344     return String.format(SERIAL_NUMBER_FORMAT,
345                          Integer.valueOf(jobSerialNumber(id)))
346               .substring(0, SERIAL_NUMBER_DIRECTORY_DIGITS);
347   }
348 
349   // directory components may contain internal slashes, but do NOT
350   // contain slashes at either end.
351 
timestampDirectoryComponent(JobID id, long millisecondTime)352   private static String timestampDirectoryComponent(JobID id, long millisecondTime) {
353     int serialNumber = jobSerialNumber(id);
354     Integer boxedSerialNumber = serialNumber;
355 
356     // don't want to do this inside the lock
357     Calendar timestamp = Calendar.getInstance();
358     timestamp.setTimeInMillis(millisecondTime);
359 
360     synchronized (idToDateString) {
361       String dateString = idToDateString.get(boxedSerialNumber);
362 
363       if (dateString == null) {
364 
365         dateString = String.format
366           ("%04d/%02d/%02d",
367            timestamp.get(Calendar.YEAR),
368            // months are 0-based in Calendar, but people will expect January
369            // to be month #1.
370             timestamp.get(Calendar.MONTH) + 1,
371             timestamp.get(Calendar.DAY_OF_MONTH));
372 
373         dateString = dateString.intern();
374 
375         idToDateString.put(boxedSerialNumber, dateString);
376 
377         if (idToDateString.size() > MAXIMUM_DATESTRING_COUNT) {
378           idToDateString.remove(idToDateString.firstKey());
379         }
380       }
381 
382       return dateString;
383     }
384   }
385 
386   // returns false iff the directory already existed
maybeMakeSubdirectory(JobID id, long millisecondTime)387   private static boolean maybeMakeSubdirectory(JobID id, long millisecondTime)
388           throws IOException {
389     Path dir = canonicalHistoryLogPath(id, millisecondTime);
390 
391     synchronized (existingDoneSubdirs) {
392       if (existingDoneSubdirs.contains(dir)) {
393         if (LOG.isDebugEnabled() && !DONEDIR_FS.exists(dir)) {
394           LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir
395               + " already existed, but it didn't.");
396         }
397 
398         return true;
399       }
400 
401       if (!DONEDIR_FS.exists(dir)) {
402         LOG.info("Creating DONE subfolder at "+ dir);
403 
404         if (!FileSystem.mkdirs(DONEDIR_FS, dir,
405                                new FsPermission(HISTORY_DIR_PERMISSION))) {
406           throw new IOException("Mkdirs failed to create " + dir.toString());
407         }
408 
409         existingDoneSubdirs.add(dir);
410 
411         return false;
412       } else {
413         if (LOG.isDebugEnabled()) {
414           LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir
415               + " didn't already exist, but it did.");
416         }
417 
418         return false;
419       }
420     }
421   }
422 
canonicalHistoryLogPath(JobID id, long millisecondTime)423   private static Path canonicalHistoryLogPath(JobID id, long millisecondTime) {
424     return new Path(DONE, historyLogSubdirectory(id, millisecondTime));
425   }
426 
historyLogSubdirectory(JobID id, long millisecondTime)427   private static String historyLogSubdirectory(JobID id, long millisecondTime) {
428     String result
429       = (DONE_DIRECTORY_FORMAT_DIRNAME
430          + "/" + jobtrackerDirectoryComponent(id));
431 
432     String serialNumberDirectory = serialNumberDirectoryComponent(id);
433 
434     result = (result
435               + "/" + timestampDirectoryComponent(id, millisecondTime)
436               + "/" + serialNumberDirectory
437               + "/");
438 
439     return result;
440   }
441 
jobtrackerDirectoryComponent(JobID id)442   private static String jobtrackerDirectoryComponent(JobID id) {
443     return JOBTRACKER_UNIQUE_STRING;
444   }
445 
doneSubdirsBeforeSerialTail()446   private static String doneSubdirsBeforeSerialTail() {
447     // job tracker ID
448     String result
449       = ("/" + DONE_DIRECTORY_FORMAT_DIRNAME
450          + "/*");   // job tracker instance ID
451 
452     // date
453     result = result + "/*/*/*";  // YYYY/MM/DD ;
454 
455     return result;
456   }
457 
458   /**
459    * Record types are identifiers for each line of log in history files.
460    * A record type appears as the first token in a single line of log.
461    */
462   public static enum RecordTypes {
463     Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
464   }
465 
466   /**
467    * Job history files contain key="value" pairs, where keys belong to this enum.
468    * It acts as a global namespace for all keys.
469    */
470   public static enum Keys {
471     JOBTRACKERID,
472     START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
473     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
474     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
475     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
476     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
477     TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
478     VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON, LOCALITY, AVATAAR,
479     WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES,
480     WORKFLOW_TAGS
481   }
482 
483   /**
484    * This enum contains some of the values commonly used by history log events.
485    * since values in history can only be strings - Values.name() is used in
486    * most places in history file.
487    */
488   public static enum Values {
489     SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
490   }
491 
492   /**
493    * Initialize JobHistory files.
494    * @param conf Jobconf of the job tracker.
495    * @param hostname jobtracker's hostname
496    * @param jobTrackerStartTime jobtracker's start time
497    */
init(JobTracker jobTracker, JobConf conf, String hostname, long jobTrackerStartTime)498   public static void init(JobTracker jobTracker, JobConf conf,
499              String hostname, long jobTrackerStartTime) throws IOException {
500     initLogDir(conf);
501     SERIAL_NUMBER_LOW_DIGITS = 3;
502     SERIAL_NUMBER_FORMAT = ("%0"
503        + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS)
504        + "d");
505     JOBTRACKER_UNIQUE_STRING = hostname + "_" +
506                                   String.valueOf(jobTrackerStartTime) + "_";
507     jobtrackerHostname = hostname;
508     Path logDir = new Path(LOG_DIR);
509     if (!LOGDIR_FS.exists(logDir)){
510       if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
511         throw new IOException("Mkdirs failed to create " + logDir.toString());
512       }
513     } else { // directory exists
514       checkDirectoryPermissions(LOGDIR_FS, logDir, "hadoop.job.history.location");
515     }
516     conf.set("hadoop.job.history.location", LOG_DIR);
517     // set the job history block size (default is 3MB)
518     jobHistoryBlockSize =
519       conf.getLong("mapred.jobtracker.job.history.block.size",
520                    3 * 1024 * 1024);
521     jtConf = conf;
522 
523     // queue and job level security is enabled on the mapreduce cluster or not
524     aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
525 
526     // initialize the file manager
527     fileManager = new JobHistoryFilesManager(conf, jobTracker);
528   }
529 
initLogDir(JobConf conf)530   private static void initLogDir(JobConf conf) throws IOException {
531     LOG_DIR = conf.get("hadoop.job.history.location" ,
532       "file:///" + new File(
533       System.getProperty("hadoop.log.dir")).getAbsolutePath()
534       + File.separator + "history");
535     Path logDir = new Path(LOG_DIR);
536     LOGDIR_FS = logDir.getFileSystem(conf);
537   }
538 
initDone(JobConf conf, FileSystem fs)539   static void initDone(JobConf conf, FileSystem fs) throws IOException {
540     initDone(conf, fs, true);
541   }
542 
initDone(JobConf conf, FileSystem fs, boolean setup)543   static void initDone(JobConf conf, FileSystem fs,
544                                      boolean setup)
545       throws IOException {
546     //if completed job history location is set, use that
547     String doneLocation = conf.
548                      get("mapred.job.tracker.history.completed.location");
549     if (doneLocation != null) {
550       DONE = fs.makeQualified(new Path(doneLocation));
551       DONEDIR_FS = fs;
552     } else {
553       if (!setup) {
554         initLogDir(conf);
555       }
556       DONE = new Path(LOG_DIR, "done");
557       DONEDIR_FS = LOGDIR_FS;
558     }
559     Path versionSubdir = new Path(DONE, DONE_DIRECTORY_FORMAT_DIRNAME);
560     //If not already present create the done folder with appropriate
561     //permission
562     if (!DONEDIR_FS.exists(DONE)) {
563       LOG.info("Creating DONE folder at "+ DONE);
564       if (!DONEDIR_FS.mkdirs(DONE,
565           new FsPermission(HISTORY_DIR_PERMISSION))) {
566         throw new IOException("Mkdirs failed to create " + DONE.toString());
567       }
568 
569       if (!DONEDIR_FS.exists(versionSubdir)) {
570         if (!DONEDIR_FS.mkdirs(versionSubdir,
571                                new FsPermission(HISTORY_DIR_PERMISSION))) {
572           throw new IOException("Mkdirs failed to create " + versionSubdir);
573         }
574       }
575     } else { // directory exists. Checks version subdirectory permissions as
576       // well.
577       checkDirectoryPermissions(DONEDIR_FS, DONE,
578           "mapred.job.tracker.history.completed.location");
579       if (DONEDIR_FS.exists(versionSubdir))
580         checkDirectoryPermissions(DONEDIR_FS, versionSubdir,
581             "mapred.job.tracker.history.completed.location-versionsubdir");
582     }
583 
584     if (!setup) {
585       return;
586     }
587 
588     fileManager.start();
589 
590     HistoryCleaner.cleanupFrequency =
591     	      conf.getLong("mapreduce.jobhistory.cleaner.interval-ms",
592     	      HistoryCleaner.DEFAULT_CLEANUP_FREQUENCY);
593     HistoryCleaner.maxAgeOfHistoryFiles =
594     	      conf.getLong("mapreduce.jobhistory.max-age-ms",
595     	      HistoryCleaner.DEFAULT_HISTORY_MAX_AGE);
596     LOG.info(String.format("Job History MaxAge is %d ms (%.2f days), " +
597     	      "Cleanup Frequency is %d ms (%.2f days)",
598     	      HistoryCleaner.maxAgeOfHistoryFiles,
599     	      ((float) HistoryCleaner.maxAgeOfHistoryFiles)/HistoryCleaner.ONE_DAY_IN_MS,
600     	      HistoryCleaner.cleanupFrequency,
601     	      ((float) HistoryCleaner.cleanupFrequency)/HistoryCleaner.ONE_DAY_IN_MS));
602   }
603 
604   /**
605    * @param FileSystem
606    * @param Path
607    * @param configKey
608    * @throws IOException
609    * @throws DiskErrorException
610    */
checkDirectoryPermissions(FileSystem fs, Path path, String configKey)611   static void checkDirectoryPermissions(FileSystem fs, Path path,
612       String configKey) throws IOException, DiskErrorException {
613     FileStatus stat = fs.getFileStatus(path);
614     FsPermission actual = stat.getPermission();
615     if (!stat.isDir())
616       throw new DiskErrorException(configKey + " - not a directory: "
617           + path.toString());
618     FsAction user = actual.getUserAction();
619     if (!user.implies(FsAction.READ))
620       throw new DiskErrorException("bad " + configKey
621           + "- directory is not readable: " + path.toString());
622     if (!user.implies(FsAction.WRITE))
623       throw new DiskErrorException("bad " + configKey
624           + "- directory is not writable " + path.toString());
625   }
626 
627   /**
628    * Manages job-history's meta information such as version etc.
629    * Helps in logging version information to the job-history and recover
630    * version information from the history.
631    */
632   static class MetaInfoManager implements Listener {
633     private long version = 0L;
634     private KeyValuePair pairs = new KeyValuePair();
635 
636     // Extract the version of the history that was used to write the history
MetaInfoManager(String line)637     public MetaInfoManager(String line) throws IOException {
638       if (null != line) {
639         // Parse the line
640         parseLine(line, this, false);
641       }
642     }
643 
644     // Get the line delimiter
getLineDelim()645     char getLineDelim() {
646       if (version == 0) {
647         return '"';
648       } else {
649         return LINE_DELIMITER_CHAR;
650       }
651     }
652 
653     // Checks if the values are escaped or not
isValueEscaped()654     boolean isValueEscaped() {
655       // Note that the values are not escaped in version 0
656       return version != 0;
657     }
658 
handle(RecordTypes recType, Map<Keys, String> values)659     public void handle(RecordTypes recType, Map<Keys, String> values)
660     throws IOException {
661       // Check if the record is of type META
662       if (RecordTypes.Meta == recType) {
663         pairs.handle(values);
664         version = pairs.getLong(Keys.VERSION); // defaults to 0
665       }
666     }
667 
668     /**
669      * Logs history meta-info to the history file. This needs to be called once
670      * per history file.
671      * @param jobId job id, assigned by jobtracker.
672      */
logMetaInfo(ArrayList<PrintWriter> writers)673     static void logMetaInfo(ArrayList<PrintWriter> writers){
674       if (null != writers){
675          JobHistory.log(writers, RecordTypes.Meta,
676              new Keys[] {Keys.VERSION},
677              new String[] {String.valueOf(VERSION)});
678       }
679     }
680   }
681 
682   /** Escapes the string especially for {@link JobHistory}
683    */
escapeString(String data)684   static String escapeString(String data) {
685     return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
686                                     charsToEscape);
687   }
688 
689   /**
690    * Parses history file and invokes Listener.handle() for
691    * each line of history. It can be used for looking through history
692    * files for specific items without having to keep whole history in memory.
693    * @param path path to history file
694    * @param l Listener for history events
695    * @param fs FileSystem where history file is present
696    * @throws IOException
697    */
parseHistoryFromFS(String path, Listener l, FileSystem fs)698   public static void parseHistoryFromFS(String path, Listener l, FileSystem fs)
699   throws IOException{
700     FSDataInputStream in = fs.open(new Path(path));
701     BufferedReader reader = new BufferedReader(new InputStreamReader (in));
702     try {
703       String line = null;
704       StringBuffer buf = new StringBuffer();
705 
706       // Read the meta-info line. Note that this might a jobinfo line for files
707       // written with older format
708       line = reader.readLine();
709 
710       // Check if the file is empty
711       if (line == null) {
712         return;
713       }
714 
715       // Get the information required for further processing
716       MetaInfoManager mgr = new MetaInfoManager(line);
717       boolean isEscaped = mgr.isValueEscaped();
718       String lineDelim = String.valueOf(mgr.getLineDelim());
719       String escapedLineDelim =
720         StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR,
721                                  mgr.getLineDelim());
722 
723       do {
724         buf.append(line);
725         if (!line.trim().endsWith(lineDelim)
726             || line.trim().endsWith(escapedLineDelim)) {
727           buf.append("\n");
728           continue;
729         }
730         parseLine(buf.toString(), l, isEscaped);
731         buf = new StringBuffer();
732       } while ((line = reader.readLine())!= null);
733     } finally {
734       try { reader.close(); } catch (IOException ex) {}
735     }
736   }
737 
738   /**
739    * Parse a single line of history.
740    * @param line
741    * @param l
742    * @throws IOException
743    */
parseLine(String line, Listener l, boolean isEscaped)744   private static void parseLine(String line, Listener l, boolean isEscaped)
745   throws IOException{
746     // extract the record type
747     int idx = line.indexOf(' ');
748     String recType = line.substring(0, idx);
749     String data = line.substring(idx+1, line.length());
750 
751     Matcher matcher = pattern.matcher(data);
752     Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
753 
754     while(matcher.find()){
755       String tuple = matcher.group(0);
756       String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
757       String value = parts[1].substring(1, parts[1].length() -1);
758       if (isEscaped) {
759         value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
760                                            charsToEscape);
761       }
762       parseBuffer.put(Keys.valueOf(parts[0]), value);
763     }
764 
765     l.handle(RecordTypes.valueOf(recType), parseBuffer);
766 
767     parseBuffer.clear();
768   }
769 
770 
771   /**
772    * Log a raw record type with keys and values. This is method is generally not used directly.
773    * @param recordType type of log event
774    * @param key key
775    * @param value value
776    */
777 
log(PrintWriter out, RecordTypes recordType, Keys key, String value)778   static void log(PrintWriter out, RecordTypes recordType, Keys key,
779                   String value){
780     value = escapeString(value);
781     out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""
782                 + DELIMITER + LINE_DELIMITER_CHAR);
783   }
784 
785   /**
786    * Log a number of keys and values with record. the array length of keys and values
787    * should be same.
788    * @param recordType type of log event
789    * @param keys type of log event
790    * @param values type of log event
791    */
792 
793   /**
794    * Log a number of keys and values with record. the array length of keys and values
795    * should be same.
796    * @param recordType type of log event
797    * @param keys type of log event
798    * @param values type of log event
799    */
800 
log(ArrayList<PrintWriter> writers, RecordTypes recordType, Keys[] keys, String[] values)801   static void log(ArrayList<PrintWriter> writers, RecordTypes recordType,
802                   Keys[] keys, String[] values) {
803     log(writers, recordType, keys, values, null);
804   }
805 
806   static class JobHistoryLogger {
807     static final Log LOG = LogFactory.getLog(JobHistoryLogger.class);
808   }
809 
810   /**
811    * Log a number of keys and values with record. the array length of keys and values
812    * should be same.
813    * @param recordType type of log event
814    * @param keys type of log event
815    * @param values type of log event
816    * @param JobID jobid of the job
817    */
818 
log(ArrayList<PrintWriter> writers, RecordTypes recordType, Keys[] keys, String[] values, JobID id)819   static void log(ArrayList<PrintWriter> writers, RecordTypes recordType,
820                   Keys[] keys, String[] values, JobID id) {
821 
822     // First up calculate the length of buffer, so that we are performant
823     // enough.
824     int length = recordType.name().length() + keys.length * 4 + 2;
825     for (int i = 0; i < keys.length; i++) {
826       values[i] = escapeString(values[i]);
827       length += values[i].length() + keys[i].toString().length();
828     }
829 
830     // We have the length of the buffer, now construct it.
831     StringBuilder builder = new StringBuilder(length);
832     builder.append(recordType.name());
833     builder.append(DELIMITER);
834     for(int i =0; i< keys.length; i++){
835       builder.append(keys[i]);
836       builder.append("=\"");
837       builder.append(values[i]);
838       builder.append("\"");
839       builder.append(DELIMITER);
840     }
841     builder.append(LINE_DELIMITER_CHAR);
842 
843     String logLine = builder.toString();
844     for (Iterator<PrintWriter> iter = writers.iterator(); iter.hasNext();) {
845       PrintWriter out = iter.next();
846       out.println(logLine);
847       if (out.checkError() && id != null) {
848         LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager");
849         iter.remove();
850       }
851     }
852     if (recordType != RecordTypes.Meta) {
853       JobHistoryLogger.LOG.debug(logLine);
854     }
855   }
856 
857   /**
858    * Get the history location
859    */
getJobHistoryLocation()860   static Path getJobHistoryLocation() {
861     return new Path(LOG_DIR);
862   }
863 
864   /**
865    * Get the history location for completed jobs
866    */
getCompletedJobHistoryLocation()867   static Path getCompletedJobHistoryLocation() {
868     return DONE;
869   }
870 
serialNumberDirectoryDigits()871   static int serialNumberDirectoryDigits() {
872     return SERIAL_NUMBER_DIRECTORY_DIGITS;
873   }
874 
serialNumberTotalDigits()875   static int serialNumberTotalDigits() {
876     return serialNumberDirectoryDigits() + SERIAL_NUMBER_LOW_DIGITS;
877   }
878 
879   /**
880    * Get the
881    */
882 
883   /**
884    * Base class contais utility stuff to manage types key value pairs with enums.
885    */
886   static class KeyValuePair{
887     private Map<Keys, String> values = new HashMap<Keys, String>();
888 
889     /**
890      * Get 'String' value for given key. Most of the places use Strings as
891      * values so the default get' method returns 'String'.  This method never returns
892      * null to ease on GUIs. if no value is found it returns empty string ""
893      * @param k
894      * @return if null it returns empty string - ""
895      */
get(Keys k)896     public String get(Keys k){
897       String s = values.get(k);
898       return s == null ? "" : s;
899     }
900     /**
901      * Convert value from history to int and return.
902      * if no value is found it returns 0.
903      * @param k key
904      */
getInt(Keys k)905     public int getInt(Keys k){
906       String s = values.get(k);
907       if (null != s){
908         return Integer.parseInt(s);
909       }
910       return 0;
911     }
912     /**
913      * Convert value from history to int and return.
914      * if no value is found it returns 0.
915      * @param k
916      */
getLong(Keys k)917     public long getLong(Keys k){
918       String s = values.get(k);
919       if (null != s){
920         return Long.parseLong(s);
921       }
922       return 0;
923     }
924     /**
925      * Set value for the key.
926      * @param k
927      * @param s
928      */
set(Keys k, String s)929     public void set(Keys k, String s){
930       values.put(k, s);
931     }
932     /**
933      * Adds all values in the Map argument to its own values.
934      * @param m
935      */
set(Map<Keys, String> m)936     public void set(Map<Keys, String> m){
937       values.putAll(m);
938     }
939     /**
940      * Reads values back from the history, input is same Map as passed to Listener by parseHistory().
941      * @param values
942      */
handle(Map<Keys, String> values)943     public synchronized void handle(Map<Keys, String> values){
944       set(values);
945     }
946     /**
947      * Returns Map containing all key-values.
948      */
getValues()949     public Map<Keys, String> getValues(){
950       return values;
951     }
952   }
953 
954   // hasMismatches is just used to return a second value if you want
955   // one.  I would have used MutableBoxedBoolean if such had been provided.
filteredStat2Paths(FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches)956   static Path[] filteredStat2Paths
957           (FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches) {
958     int resultCount = 0;
959 
960     if (hasMismatches == null) {
961       hasMismatches = new AtomicBoolean(false);
962     }
963 
964     for (int i = 0; i < stats.length; ++i) {
965       if (stats[i].isDir() == dirs) {
966         stats[resultCount++] = stats[i];
967       } else {
968         hasMismatches.set(true);
969       }
970     }
971 
972     Path[] paddedResult = FileUtil.stat2Paths(stats);
973 
974     Path[] result = new Path[resultCount];
975 
976     System.arraycopy(paddedResult, 0, result, 0, resultCount);
977 
978     return result;
979   }
980 
localGlobber(FileSystem fs, Path root, String tail)981   static FileStatus[] localGlobber
982         (FileSystem fs, Path root, String tail)
983       throws IOException {
984     return localGlobber(fs, root, tail, null);
985   }
986 
localGlobber(FileSystem fs, Path root, String tail, PathFilter filter)987   static FileStatus[] localGlobber
988         (FileSystem fs, Path root, String tail, PathFilter filter)
989       throws IOException {
990     return localGlobber(fs, root, tail, filter, null);
991   }
992 
nullToEmpty(FileStatus[] result)993   private static FileStatus[] nullToEmpty(FileStatus[] result) {
994     return result == null ? new FileStatus[0] : result;
995   }
996 
listFilteredStatus(FileSystem fs, Path root, PathFilter filter)997   private static FileStatus[] listFilteredStatus
998         (FileSystem fs, Path root, PathFilter filter)
999      throws IOException {
1000     return filter == null ? fs.listStatus(root) : fs.listStatus(root, filter);
1001   }
1002 
1003   // hasMismatches is just used to return a second value if you want
1004   // one.  I would have used MutableBoxedBoolean if such had been provided.
localGlobber(FileSystem fs, Path root, String tail, PathFilter filter, AtomicBoolean hasFlatFiles)1005   static FileStatus[] localGlobber
1006     (FileSystem fs, Path root, String tail, PathFilter filter,
1007      AtomicBoolean hasFlatFiles)
1008       throws IOException {
1009     if (tail.equals("")) {
1010       return nullToEmpty(listFilteredStatus(fs, root, filter));
1011     }
1012 
1013     if (tail.startsWith("/*")) {
1014       Path[] subdirs = filteredStat2Paths(nullToEmpty(fs.listStatus(root)),
1015                                           true, hasFlatFiles);
1016 
1017       FileStatus[][] subsubdirs = new FileStatus[subdirs.length][];
1018 
1019       int subsubdirCount = 0;
1020 
1021       if (subsubdirs.length == 0) {
1022         return new FileStatus[0];
1023       }
1024 
1025       String newTail = tail.substring(2);
1026 
1027       for (int i = 0; i < subdirs.length; ++i) {
1028         subsubdirs[i] = localGlobber(fs, subdirs[i], newTail, filter, null);
1029         subsubdirCount += subsubdirs[i].length;
1030       }
1031 
1032       FileStatus[] result = new FileStatus[subsubdirCount];
1033 
1034       int segmentStart = 0;
1035 
1036       for (int i = 0; i < subsubdirs.length; ++i) {
1037         System.arraycopy(subsubdirs[i], 0, result, segmentStart, subsubdirs[i].length);
1038         segmentStart += subsubdirs[i].length;
1039       }
1040 
1041       return result;
1042     }
1043 
1044     if (tail.startsWith("/")) {
1045       int split = tail.indexOf('/', 1);
1046 
1047       if (split < 0) {
1048         return nullToEmpty
1049           (listFilteredStatus(fs, new Path(root, tail.substring(1)), filter));
1050       } else {
1051         String thisSegment = tail.substring(1, split);
1052         String newTail = tail.substring(split);
1053         return localGlobber
1054           (fs, new Path(root, thisSegment), newTail, filter, hasFlatFiles);
1055       }
1056     }
1057 
1058     IOException e = new IOException("localGlobber: bad tail");
1059 
1060     throw e;
1061   }
1062 
confPathFromLogFilePath(Path logFile)1063   static Path confPathFromLogFilePath(Path logFile) {
1064     String jobId = jobIdNameFromLogFileName(logFile.getName());
1065 
1066     Path logDir = logFile.getParent();
1067 
1068     return new Path(logDir, jobId + CONF_FILE_NAME_SUFFIX);
1069   }
1070 
jobIdNameFromLogFileName(String logFileName)1071   static String jobIdNameFromLogFileName(String logFileName) {
1072     String[] jobDetails = logFileName.split("_");
1073     return jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2];
1074   }
1075 
userNameFromLogFileName(String logFileName)1076   static String userNameFromLogFileName(String logFileName) {
1077     String[] jobDetails = logFileName.split("_");
1078     return jobDetails[3];
1079   }
1080 
jobNameFromLogFileName(String logFileName)1081   static String jobNameFromLogFileName(String logFileName) {
1082     String[] jobDetails = logFileName.split("_");
1083     return jobDetails[4];
1084   }
1085 
1086 
1087   // This code will be inefficient if the subject contains dozens of underscores
escapeUnderscores(String escapee)1088   static String escapeUnderscores(String escapee) {
1089     return replaceStringInstances(escapee, "_", UNDERSCORE_ESCAPE);
1090   }
1091 
nonOccursString(String logFileName)1092   static String nonOccursString(String logFileName) {
1093     int adHocIndex = 0;
1094 
1095     String unfoundString = "q" + adHocIndex;
1096 
1097     while (logFileName.contains(unfoundString)) {
1098       unfoundString = "q" + ++adHocIndex;
1099     }
1100 
1101     return unfoundString + "q";
1102   }
1103 
1104   // I tolerate this code because I expect a low number of
1105   // occurrences in a relatively short string
replaceStringInstances(String logFileName, String old, String replacement)1106   static String replaceStringInstances
1107       (String logFileName, String old, String replacement) {
1108     int index = logFileName.indexOf(old);
1109 
1110     while (index > 0) {
1111       logFileName = (logFileName.substring(0, index)
1112                      + replacement
1113                      + replaceStringInstances
1114                          (logFileName.substring(index + old.length()),
1115                           old, replacement));
1116 
1117       index = logFileName.indexOf(old);
1118     }
1119 
1120     return logFileName;
1121   }
1122 
1123 
1124   /**
1125    * Helper class for logging or reading back events related to job start, finish or failure.
1126    */
1127   public static class JobInfo extends KeyValuePair{
1128 
1129     private Map<String, Task> allTasks = new TreeMap<String, Task>();
1130     private Map<JobACL, AccessControlList> jobACLs =
1131         new HashMap<JobACL, AccessControlList>();
1132     private String queueName = null;// queue to which this job was submitted to
1133 
1134     /** Create new JobInfo */
JobInfo(String jobId)1135     public JobInfo(String jobId){
1136       set(Keys.JOBID, jobId);
1137     }
1138 
1139     /**
1140      * Returns all map and reduce tasks <taskid-Task>.
1141      */
getAllTasks()1142     public Map<String, Task> getAllTasks() { return allTasks; }
1143 
1144     /**
1145      * Get the job acls.
1146      *
1147      * @return a {@link Map} from {@link JobACL} to {@link AccessControlList}
1148      */
getJobACLs()1149     public Map<JobACL, AccessControlList> getJobACLs() {
1150       return jobACLs;
1151     }
1152 
1153     @Override
handle(Map<Keys, String> values)1154     public synchronized void handle(Map<Keys, String> values) {
1155       if (values.containsKey(Keys.SUBMIT_TIME)) {// job submission
1156         // construct the job ACLs
1157         String viewJobACL = values.get(Keys.VIEW_JOB);
1158         String modifyJobACL = values.get(Keys.MODIFY_JOB);
1159         if (viewJobACL != null) {
1160           jobACLs.put(JobACL.VIEW_JOB, new AccessControlList(viewJobACL));
1161         }
1162         if (modifyJobACL != null) {
1163           jobACLs.put(JobACL.MODIFY_JOB, new AccessControlList(modifyJobACL));
1164         }
1165         // get the job queue name
1166         queueName = values.get(Keys.JOB_QUEUE);
1167       }
1168       super.handle(values);
1169     }
1170 
getJobQueue()1171     String getJobQueue() {
1172       return queueName;
1173     }
1174 
1175     /**
1176      * Get the path of the locally stored job file
1177      * @param jobId id of the job
1178      * @return the path of the job file on the local file system
1179      */
getLocalJobFilePath(JobID jobId)1180     public static String getLocalJobFilePath(JobID jobId){
1181       return System.getProperty("hadoop.log.dir") + File.separator +
1182                jobId + CONF_FILE_NAME_SUFFIX;
1183     }
1184 
1185 
1186     /**
1187      * Helper function to encode the URL of the path of the job-history
1188      * log file.
1189      *
1190      * @param logFile path of the job-history file
1191      * @return URL encoded path
1192      * @throws IOException
1193      */
encodeJobHistoryFilePath(String logFile)1194     public static String encodeJobHistoryFilePath(String logFile)
1195     throws IOException {
1196       Path rawPath = new Path(logFile);
1197       String encodedFileName = null;
1198       try {
1199         encodedFileName = URLEncoder.encode(rawPath.getName(), "UTF-8");
1200       } catch (UnsupportedEncodingException uee) {
1201         IOException ioe = new IOException();
1202         ioe.initCause(uee);
1203         ioe.setStackTrace(uee.getStackTrace());
1204         throw ioe;
1205       }
1206 
1207       Path encodedPath = new Path(rawPath.getParent(), encodedFileName);
1208       return encodedPath.toString();
1209     }
1210 
1211     /**
1212      * Helper function to encode the URL of the filename of the job-history
1213      * log file.
1214      *
1215      * @param logFileName file name of the job-history file
1216      * @return URL encoded filename
1217      * @throws IOException
1218      */
encodeJobHistoryFileName(String logFileName)1219     public static String encodeJobHistoryFileName(String logFileName)
1220     throws IOException {
1221       String replacementUnderscoreEscape = null;
1222 
1223       if (logFileName.contains(UNDERSCORE_ESCAPE)) {
1224         replacementUnderscoreEscape = nonOccursString(logFileName);
1225 
1226         logFileName = replaceStringInstances
1227           (logFileName, UNDERSCORE_ESCAPE, replacementUnderscoreEscape);
1228       }
1229 
1230       String encodedFileName = null;
1231       try {
1232         encodedFileName = URLEncoder.encode(logFileName, "UTF-8");
1233       } catch (UnsupportedEncodingException uee) {
1234         IOException ioe = new IOException();
1235         ioe.initCause(uee);
1236         ioe.setStackTrace(uee.getStackTrace());
1237         throw ioe;
1238       }
1239 
1240       if (replacementUnderscoreEscape != null) {
1241         encodedFileName = replaceStringInstances
1242           (encodedFileName, replacementUnderscoreEscape, UNDERSCORE_ESCAPE);
1243       }
1244 
1245       return encodedFileName;
1246     }
1247 
1248     /**
1249      * Helper function to decode the URL of the filename of the job-history
1250      * log file.
1251      *
1252      * @param logFileName file name of the job-history file
1253      * @return URL decoded filename
1254      * @throws IOException
1255      */
decodeJobHistoryFileName(String logFileName)1256     public static String decodeJobHistoryFileName(String logFileName)
1257     throws IOException {
1258       String decodedFileName = null;
1259       try {
1260         decodedFileName = URLDecoder.decode(logFileName, "UTF-8");
1261       } catch (UnsupportedEncodingException uee) {
1262         IOException ioe = new IOException();
1263         ioe.initCause(uee);
1264         ioe.setStackTrace(uee.getStackTrace());
1265         throw ioe;
1266       }
1267       return decodedFileName;
1268     }
1269 
1270     /**
1271      * Get the job name from the job conf
1272      */
getJobName(JobConf jobConf)1273     static String getJobName(JobConf jobConf) {
1274       String jobName = jobConf.getJobName();
1275       if (jobName == null || jobName.length() == 0) {
1276         jobName = "NA";
1277       }
1278       return jobName;
1279     }
1280 
1281     /**
1282      * Get the user name from the job conf
1283      */
getUserName(JobConf jobConf)1284     public static String getUserName(JobConf jobConf) {
1285       String user = jobConf.getUser();
1286       if (user == null || user.length() == 0) {
1287         user = "NA";
1288       }
1289       return user;
1290     }
1291 
1292     /**
1293      * Get the workflow adjacencies from the job conf
1294      * The string returned is of the form "key"="value" "key"="value" ...
1295      */
getWorkflowAdjacencies(Configuration conf)1296     public static String getWorkflowAdjacencies(Configuration conf) {
1297       int prefixLen = JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
1298       Map<String,String> adjacencies =
1299           conf.getValByRegex(JobConf.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
1300       if (adjacencies.isEmpty())
1301         return "";
1302       int size = 0;
1303       for (Entry<String,String> entry : adjacencies.entrySet()) {
1304         int keyLen = entry.getKey().length();
1305         size += keyLen - prefixLen;
1306         size += entry.getValue().length() + 6;
1307       }
1308       StringBuilder sb = new StringBuilder(size);
1309       for (Entry<String,String> entry : adjacencies.entrySet()) {
1310         int keyLen = entry.getKey().length();
1311         sb.append("\"");
1312         sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
1313         sb.append("\"=\"");
1314         sb.append(escapeString(entry.getValue()));
1315         sb.append("\" ");
1316       }
1317       return sb.toString();
1318     }
1319 
1320     /**
1321      * Get the job history file path given the history filename
1322      */
getJobHistoryLogLocation(String logFileName)1323     public static Path getJobHistoryLogLocation(String logFileName)
1324     {
1325       return LOG_DIR == null ? null : new Path(LOG_DIR, logFileName);
1326     }
1327 
1328     /**
1329      * Get the user job history file path
1330      */
getJobHistoryLogLocationForUser(String logFileName, JobConf jobConf)1331     public static Path getJobHistoryLogLocationForUser(String logFileName,
1332                                                        JobConf jobConf) {
1333       // find user log directory
1334       Path userLogFile = null;
1335       Path outputPath = FileOutputFormat.getOutputPath(jobConf);
1336       String userLogDir = jobConf.get("hadoop.job.history.user.location",
1337                                       outputPath == null
1338                                       ? null
1339                                       : outputPath.toString());
1340       if ("none".equals(userLogDir)) {
1341         userLogDir = null;
1342       }
1343       if (userLogDir != null) {
1344         userLogDir = userLogDir + Path.SEPARATOR + "_logs" + Path.SEPARATOR
1345                      + "history";
1346         userLogFile = new Path(userLogDir, logFileName);
1347       }
1348       return userLogFile;
1349     }
1350 
1351     /**
1352      * Generates the job history filename for a new job
1353      */
getNewJobHistoryFileName(JobConf jobConf, JobID id, long submitTime)1354     private static String getNewJobHistoryFileName(JobConf jobConf, JobID id, long submitTime) {
1355       return
1356         id.toString() + "_"
1357         + submitTime + "_"
1358         + escapeUnderscores(getUserName(jobConf)) + "_"
1359         + escapeUnderscores(trimJobName(getJobName(jobConf)));
1360     }
1361 
1362     /**
1363      * Trims the job-name if required
1364      */
trimJobName(String jobName)1365     private static String trimJobName(String jobName) {
1366       if (jobName.length() > JOB_NAME_TRIM_LENGTH) {
1367         jobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH);
1368       }
1369       return jobName;
1370     }
1371 
escapeRegexChars( String string )1372     private static String escapeRegexChars( String string ) {
1373       return "\\Q"+string.replaceAll("\\\\E", "\\\\E\\\\\\\\E\\\\Q")+"\\E";
1374     }
1375 
1376     /**
1377      * Recover the job history filename from the history folder.
1378      * Uses the following pattern
1379      *    $jt-hostname_[0-9]*_$job-id_$user_$job-name*
1380      * @param jobConf the job conf
1381      * @param id job id
1382      */
getJobHistoryFileName(JobConf jobConf, JobID id)1383     public static synchronized String getJobHistoryFileName(JobConf jobConf,
1384                                                             JobID id)
1385     throws IOException {
1386       return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
1387     }
1388 
1389     // Returns that portion of the pathname that sits under the DONE directory
getDoneJobHistoryFileName(JobConf jobConf, JobID id)1390     static synchronized String getDoneJobHistoryFileName(JobConf jobConf,
1391         JobID id) throws IOException {
1392       if (DONE == null) {
1393         return null;
1394       }
1395       return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
1396     }
1397 
1398     /**
1399      * @param dir The directory where to search.
1400      */
getJobHistoryFileName(JobConf jobConf, JobID id, Path dir, FileSystem fs)1401     private static synchronized String getJobHistoryFileName(JobConf jobConf,
1402                                           JobID id, Path dir, FileSystem fs)
1403     throws IOException {
1404       String user =  getUserName(jobConf);
1405       String jobName = trimJobName(getJobName(jobConf));
1406       if (LOG_DIR == null) {
1407         return null;
1408       }
1409 
1410       // Make the pattern matching the job's history file
1411 
1412       final String regexp
1413         = id.toString() + "_" + DIGITS + "_" + user + "_"
1414              + escapeRegexChars(jobName) + "+";
1415 
1416       final Pattern historyFilePattern = Pattern.compile(regexp);
1417 
1418       // a path filter that matches 4 parts of the filenames namely
1419       //  - jt-hostname
1420       //  - job-id
1421       //  - username
1422       //  - jobname
1423       PathFilter filter = new PathFilter() {
1424         public boolean accept(Path path) {
1425           String unescapedFileName = path.getName();
1426           String fileName = null;
1427           try {
1428             fileName = decodeJobHistoryFileName(unescapedFileName);
1429           } catch (IOException ioe) {
1430             LOG.info("Error while decoding history file " + fileName + "."
1431                      + " Ignoring file.", ioe);
1432             return false;
1433           }
1434 
1435           return historyFilePattern.matcher(fileName).find();
1436         }
1437       };
1438 
1439       FileStatus[] statuses = null;
1440 
1441       if (dir == DONE) {
1442         final String scanTail
1443           = (DONE_BEFORE_SERIAL_TAIL
1444              + "/" + serialNumberDirectoryComponent(id));
1445 
1446         if (LOG.isDebugEnabled()) {
1447           LOG.debug("JobHistory.getJobHistoryFileName DONE dir: scanning "
1448               + scanTail);
1449           if (LOG.isTraceEnabled()) {
1450             LOG.trace(Thread.currentThread().getStackTrace());
1451           }
1452         }
1453 
1454         statuses = localGlobber(fs, DONE, scanTail, filter);
1455       } else {
1456         statuses = fs.listStatus(dir, filter);
1457       }
1458 
1459       String filename = null;
1460       if (statuses == null || statuses.length == 0) {
1461         LOG.info("Nothing to recover for job " + id);
1462       } else {
1463         // return filename considering that fact the name can be a
1464         // secondary filename like filename.recover
1465         filename = getPrimaryFilename(statuses[0].getPath().getName(), jobName);
1466         if (dir == DONE) {
1467           Path parent = statuses[0].getPath().getParent();
1468           String parentPathName = parent.toString();
1469           String donePathName = DONE.toString();
1470           filename = (parentPathName.substring(donePathName.length() + Path.SEPARATOR.length())
1471                       + Path.SEPARATOR + filename);
1472         }
1473 
1474         LOG.info("Recovered job history filename for job " + id + " is "
1475                  + filename);
1476       }
1477       return filename;
1478     }
1479 
1480     // removes all extra extensions from a filename and returns the core/primary
1481     // filename
getPrimaryFilename(String filename, String jobName)1482     private static String getPrimaryFilename(String filename, String jobName)
1483     throws IOException{
1484       filename = decodeJobHistoryFileName(filename);
1485       // Remove the '.recover' suffix if it exists
1486       if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
1487         int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
1488         filename = filename.substring(0, newLength);
1489       }
1490       return encodeJobHistoryFileName(filename);
1491     }
1492 
1493     /** Since there was a restart, there should be a master file and
1494      * a recovery file. Once the recovery is complete, the master should be
1495      * deleted as an indication that the recovery file should be treated as the
1496      * master upon completion or next restart.
1497      * @param fileName the history filename that needs checkpointing
1498      * @param conf Job conf
1499      * @throws IOException
1500      */
checkpointRecovery(String fileName, JobConf conf)1501     static synchronized void checkpointRecovery(String fileName, JobConf conf)
1502     throws IOException {
1503       Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
1504       if (logPath != null) {
1505         LOG.info("Deleting job history file " + logPath.getName());
1506         LOGDIR_FS.delete(logPath, false);
1507       }
1508       // do the same for the user file too
1509       logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName,
1510                                                                    conf);
1511       if (logPath != null) {
1512         FileSystem fs = logPath.getFileSystem(conf);
1513         fs.delete(logPath, false);
1514       }
1515     }
1516 
getSecondaryJobHistoryFile(String filename)1517     static String getSecondaryJobHistoryFile(String filename)
1518     throws IOException {
1519       return encodeJobHistoryFileName(
1520           decodeJobHistoryFileName(filename) + SECONDARY_FILE_SUFFIX);
1521     }
1522 
1523     /** Selects one of the two files generated as a part of recovery.
1524      * The thumb rule is that always select the oldest file.
1525      * This call makes sure that only one file is left in the end.
1526      * @param conf job conf
1527      * @param logFilePath Path of the log file
1528      * @throws IOException
1529      */
recoverJobHistoryFile(JobConf conf, Path logFilePath)1530     public synchronized static Path recoverJobHistoryFile(JobConf conf,
1531                                                           Path logFilePath)
1532     throws IOException {
1533       Path ret;
1534       String logFileName = logFilePath.getName();
1535       String tmpFilename = getSecondaryJobHistoryFile(logFileName);
1536       Path logDir = logFilePath.getParent();
1537       Path tmpFilePath = new Path(logDir, tmpFilename);
1538       if (LOGDIR_FS.exists(logFilePath)) {
1539         LOG.info(logFileName + " exists!");
1540         if (LOGDIR_FS.exists(tmpFilePath)) {
1541           LOG.info("Deleting " + tmpFilename
1542                    + "  and using " + logFileName + " for recovery.");
1543           LOGDIR_FS.delete(tmpFilePath, false);
1544         }
1545         ret = tmpFilePath;
1546       } else {
1547         LOG.info(logFileName + " doesnt exist! Using "
1548                  + tmpFilename + " for recovery.");
1549         if (LOGDIR_FS.exists(tmpFilePath)) {
1550           LOG.info("Renaming " + tmpFilename + " to " + logFileName);
1551           LOGDIR_FS.rename(tmpFilePath, logFilePath);
1552           ret = tmpFilePath;
1553         } else {
1554           ret = logFilePath;
1555         }
1556       }
1557 
1558       // do the same for the user files too
1559       logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
1560       if (logFilePath != null) {
1561         FileSystem fs = logFilePath.getFileSystem(conf);
1562         logDir = logFilePath.getParent();
1563         tmpFilePath = new Path(logDir, tmpFilename);
1564         if (fs.exists(logFilePath)) {
1565           LOG.info(logFileName + " exists!");
1566           if (fs.exists(tmpFilePath)) {
1567             LOG.info("Deleting " + tmpFilename + "  and making " + logFileName
1568                      + " as the master history file for user.");
1569             fs.delete(tmpFilePath, false);
1570           }
1571         } else {
1572           LOG.info(logFileName + " doesnt exist! Using "
1573                    + tmpFilename + " as the master history file for user.");
1574           if (fs.exists(tmpFilePath)) {
1575             LOG.info("Renaming " + tmpFilename + " to " + logFileName
1576                      + " in user directory");
1577             fs.rename(tmpFilePath, logFilePath);
1578           }
1579         }
1580       }
1581 
1582       return ret;
1583     }
1584 
1585     /** Finalize the recovery and make one file in the end.
1586      * This invloves renaming the recover file to the master file.
1587      * Note that this api should be invoked only if recovery is involved.
1588      * @param id Job id
1589      * @param conf the job conf
1590      * @throws IOException
1591      */
finalizeRecovery(JobID id, JobConf conf)1592     static synchronized void finalizeRecovery(JobID id, JobConf conf)
1593     throws IOException {
1594        Path tmpLogPath = fileManager.getHistoryFile(id);
1595        if (tmpLogPath == null) {
1596          if (LOG.isDebugEnabled()) {
1597            LOG.debug("No file for job with " + id + " found in cache!");
1598          }
1599          return;
1600        }
1601        String tmpLogFileName = tmpLogPath.getName();
1602 
1603        // get the primary filename from the cached filename
1604        String masterLogFileName =
1605          getPrimaryFilename(tmpLogFileName, getJobName(conf));
1606        Path masterLogPath = new Path(tmpLogPath.getParent(), masterLogFileName);
1607 
1608        // rename the tmp file to the master file. Note that this should be
1609        // done only when the file is closed and handles are released.
1610        LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
1611        LOGDIR_FS.rename(tmpLogPath, masterLogPath);
1612        // update the cache
1613        fileManager.setHistoryFile(id, masterLogPath);
1614 
1615       // do the same for the user file too
1616       masterLogPath =
1617         JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
1618                                                            conf);
1619       tmpLogPath =
1620         JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName,
1621                                                            conf);
1622       if (masterLogPath != null) {
1623         FileSystem fs = masterLogPath.getFileSystem(conf);
1624         if (fs.exists(tmpLogPath)) {
1625           LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
1626                    + " in user directory");
1627           fs.rename(tmpLogPath, masterLogPath);
1628         }
1629       }
1630     }
1631 
1632     /**
1633      * Deletes job data from the local disk.
1634      * For now just deletes the localized copy of job conf
1635      */
cleanupJob(JobID id)1636     static void cleanupJob(JobID id) {
1637       String localJobFilePath =  JobInfo.getLocalJobFilePath(id);
1638       File f = new File (localJobFilePath);
1639       LOG.info("Deleting localized job conf at " + f);
1640       if (!f.delete()) {
1641         if (LOG.isDebugEnabled()) {
1642           LOG.debug("Failed to delete file " + f);
1643         }
1644       }
1645     }
1646 
1647     /**
1648      * Delete job conf from the history folder.
1649      */
deleteConfFiles()1650     static void deleteConfFiles() throws IOException {
1651       LOG.info("Cleaning up config files from the job history folder");
1652       FileSystem fs = new Path(LOG_DIR).getFileSystem(jtConf);
1653       FileStatus[] status = fs.listStatus(new Path(LOG_DIR), CONF_FILTER);
1654       for (FileStatus s : status) {
1655         LOG.info("Deleting conf file " + s.getPath());
1656         fs.delete(s.getPath(), false);
1657       }
1658     }
1659 
1660     /**
1661      * Move the completed job into the completed folder.
1662      * This assumes that the jobhistory file is closed and all operations on the
1663      * jobhistory file is complete.
1664      * This *should* be the last call to jobhistory for a given job.
1665      */
markCompleted(JobID id)1666     static void markCompleted(JobID id) throws IOException {
1667       fileManager.moveToDone(id);
1668     }
1669 
1670      /**
1671      * Log job submitted event to history. Creates a new file in history
1672      * for the job. if history file creation fails, it disables history
1673      * for all other events.
1674      * @param jobId job id assigned by job tracker.
1675      * @param jobConf job conf of the job
1676      * @param jobConfPath path to job conf xml file in HDFS.
1677      * @param submitTime time when job tracker received the job
1678      * @throws IOException
1679      * @deprecated Use
1680      *     {@link #logSubmitted(JobID, JobConf, String, long, boolean)} instead.
1681      */
1682     @Deprecated
logSubmitted(JobID jobId, JobConf jobConf, String jobConfPath, long submitTime)1683     public static void logSubmitted(JobID jobId, JobConf jobConf,
1684                                     String jobConfPath, long submitTime)
1685     throws IOException {
1686       logSubmitted(jobId, jobConf, jobConfPath, submitTime, true);
1687     }
1688 
logSubmitted(JobID jobId, JobConf jobConf, String jobConfPath, long submitTime, boolean restarted)1689     public static void logSubmitted(JobID jobId, JobConf jobConf,
1690                                     String jobConfPath, long submitTime,
1691                                     boolean restarted)
1692     throws IOException {
1693       FileSystem fs = null;
1694       String userLogDir = null;
1695       String jobUniqueString = jobId.toString();
1696 
1697       // Get the username and job name to be used in the actual log filename;
1698       // sanity check them too
1699       String jobName = getJobName(jobConf);
1700       String user = getUserName(jobConf);
1701 
1702       // get the history filename
1703       String logFileName = null;
1704       if (restarted) {
1705         logFileName = getJobHistoryFileName(jobConf, jobId);
1706         if (logFileName == null) {
1707           logFileName =
1708             encodeJobHistoryFileName(getNewJobHistoryFileName
1709                                      (jobConf, jobId, submitTime));
1710         } else {
1711           String parts[] = logFileName.split("_");
1712           //TODO this is a hack :(
1713           // jobtracker-hostname_jobtracker-identifier_
1714           String jtUniqueString = parts[0] + "_" + parts[1] + "_";
1715           jobUniqueString = jobId.toString();
1716         }
1717       } else {
1718         logFileName =
1719           encodeJobHistoryFileName(getNewJobHistoryFileName
1720                                    (jobConf, jobId, submitTime));
1721       }
1722 
1723       // setup the history log file for this job
1724       Path logFile = getJobHistoryLogLocation(logFileName);
1725 
1726       // find user log directory
1727       Path userLogFile =
1728         getJobHistoryLogLocationForUser(logFileName, jobConf);
1729       PrintWriter writer = null;
1730       try{
1731         FSDataOutputStream out = null;
1732         if (LOG_DIR != null) {
1733           // create output stream for logging in hadoop.job.history.location
1734           if (restarted) {
1735             logFile = recoverJobHistoryFile(jobConf, logFile);
1736             logFileName = logFile.getName();
1737           }
1738 
1739           int defaultBufferSize =
1740             LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
1741           out = LOGDIR_FS.create(logFile,
1742                           new FsPermission(HISTORY_FILE_PERMISSION),
1743                           true,
1744                           defaultBufferSize,
1745                           LOGDIR_FS.getDefaultReplication(),
1746                           jobHistoryBlockSize, null);
1747           writer = new PrintWriter(out);
1748           fileManager.addWriter(jobId, writer);
1749 
1750           // cache it ...
1751           fileManager.setHistoryFile(jobId, logFile);
1752         }
1753         if (userLogFile != null) {
1754           // Get the actual filename as recoverJobHistoryFile() might return
1755           // a different filename
1756           userLogDir = userLogFile.getParent().toString();
1757           userLogFile = new Path(userLogDir, logFileName);
1758 
1759           // create output stream for logging
1760           // in hadoop.job.history.user.location
1761           fs = userLogFile.getFileSystem(jobConf);
1762 
1763           out = fs.create(userLogFile, true, 4096);
1764           writer = new PrintWriter(out);
1765           fileManager.addWriter(jobId, writer);
1766         }
1767 
1768         ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
1769         // Log the history meta info
1770         JobHistory.MetaInfoManager.logMetaInfo(writers);
1771 
1772         String viewJobACL = "*";
1773         String modifyJobACL = "*";
1774         if (aclsEnabled) {
1775           viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " ");
1776           modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " ");
1777         }
1778         //add to writer as well
1779         JobHistory.log(writers, RecordTypes.Job,
1780                        new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
1781                                   Keys.SUBMIT_TIME, Keys.JOBCONF,
1782                                   Keys.VIEW_JOB, Keys.MODIFY_JOB,
1783                                   Keys.JOB_QUEUE, Keys.WORKFLOW_ID,
1784                                   Keys.WORKFLOW_NAME, Keys.WORKFLOW_NODE_NAME,
1785                                   Keys.WORKFLOW_ADJACENCIES,
1786                                   Keys.WORKFLOW_TAGS},
1787                        new String[]{jobId.toString(), jobName, user,
1788                                     String.valueOf(submitTime) , jobConfPath,
1789                                     viewJobACL, modifyJobACL,
1790                                     jobConf.getQueueName(),
1791                                     jobConf.get(JobConf.WORKFLOW_ID, ""),
1792                                     jobConf.get(JobConf.WORKFLOW_NAME, ""),
1793                                     jobConf.get(JobConf.WORKFLOW_NODE_NAME, ""),
1794                                     getWorkflowAdjacencies(jobConf),
1795                                     jobConf.get(JobConf.WORKFLOW_TAGS, ""),
1796                                     },
1797                                     jobId
1798                       );
1799 
1800       }catch(IOException e){
1801         LOG.error("Failed creating job history log file for job " + jobId, e);
1802         if (writer != null) {
1803           fileManager.removeWriter(jobId, writer);
1804         }
1805       }
1806       // Always store job conf on local file system
1807       String localJobFilePath =  JobInfo.getLocalJobFilePath(jobId);
1808       File localJobFile = new File(localJobFilePath);
1809       FileOutputStream jobOut = null;
1810       try {
1811         jobOut = new FileOutputStream(localJobFile);
1812         jobConf.writeXml(jobOut);
1813         if (LOG.isDebugEnabled()) {
1814           LOG.debug("Job conf for " + jobId + " stored at "
1815                     + localJobFile.getAbsolutePath());
1816         }
1817       } catch (IOException ioe) {
1818         LOG.error("Failed to store job conf on the local filesystem ", ioe);
1819       } finally {
1820         if (jobOut != null) {
1821           try {
1822             jobOut.close();
1823           } catch (IOException ie) {
1824             LOG.info("Failed to close the job configuration file "
1825                        + StringUtils.stringifyException(ie));
1826           }
1827         }
1828       }
1829 
1830       /* Storing the job conf on the log dir */
1831       Path jobFilePath = null;
1832       if (LOG_DIR != null) {
1833         jobFilePath = new Path(LOG_DIR + File.separator +
1834                                jobUniqueString + CONF_FILE_NAME_SUFFIX);
1835         fileManager.setConfFile(jobId, jobFilePath);
1836       }
1837       Path userJobFilePath = null;
1838       if (userLogDir != null) {
1839         userJobFilePath = new Path(userLogDir + File.separator +
1840                                    jobUniqueString + CONF_FILE_NAME_SUFFIX);
1841       }
1842       FSDataOutputStream jobFileOut = null;
1843       try {
1844         if (LOG_DIR != null) {
1845           int defaultBufferSize =
1846               LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
1847           if (!LOGDIR_FS.exists(jobFilePath)) {
1848             jobFileOut = LOGDIR_FS.create(jobFilePath,
1849                                    new FsPermission(HISTORY_FILE_PERMISSION),
1850                                    true,
1851                                    defaultBufferSize,
1852                                    LOGDIR_FS.getDefaultReplication(),
1853                                    LOGDIR_FS.getDefaultBlockSize(), null);
1854             jobConf.writeXml(jobFileOut);
1855             jobFileOut.close();
1856           }
1857         }
1858         if (userLogDir != null) {
1859           fs = new Path(userLogDir).getFileSystem(jobConf);
1860           jobFileOut = fs.create(userJobFilePath);
1861           jobConf.writeXml(jobFileOut);
1862         }
1863         if (LOG.isDebugEnabled()) {
1864           LOG.debug("Job conf for " + jobId + " stored at "
1865                     + jobFilePath + "and" + userJobFilePath );
1866         }
1867       } catch (IOException ioe) {
1868         LOG.error("Failed to store job conf in the log dir", ioe);
1869       } finally {
1870         if (jobFileOut != null) {
1871           try {
1872             jobFileOut.close();
1873           } catch (IOException ie) {
1874             LOG.info("Failed to close the job configuration file "
1875                      + StringUtils.stringifyException(ie));
1876           }
1877         }
1878       }
1879     }
1880     /**
1881      * Logs launch time of job.
1882      *
1883      * @param jobId job id, assigned by jobtracker.
1884      * @param startTime start time of job.
1885      * @param totalMaps total maps assigned by jobtracker.
1886      * @param totalReduces total reduces.
1887      */
logInited(JobID jobId, long startTime, int totalMaps, int totalReduces)1888     public static void logInited(JobID jobId, long startTime,
1889                                  int totalMaps, int totalReduces) {
1890       ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
1891 
1892       if (null != writer){
1893         JobHistory.log(writer, RecordTypes.Job,
1894             new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS,
1895                         Keys.TOTAL_REDUCES, Keys.JOB_STATUS},
1896             new String[] {jobId.toString(), String.valueOf(startTime),
1897                           String.valueOf(totalMaps),
1898                           String.valueOf(totalReduces),
1899                           Values.PREP.name()}, jobId);
1900       }
1901     }
1902 
1903    /**
1904      * Logs the job as RUNNING.
1905      *
1906      * @param jobId job id, assigned by jobtracker.
1907      * @param startTime start time of job.
1908      * @param totalMaps total maps assigned by jobtracker.
1909      * @param totalReduces total reduces.
1910      * @deprecated Use {@link #logInited(JobID, long, int, int)} and
1911      * {@link #logStarted(JobID)}
1912      */
1913     @Deprecated
logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces)1914     public static void logStarted(JobID jobId, long startTime,
1915                                   int totalMaps, int totalReduces) {
1916       logStarted(jobId);
1917     }
1918 
1919     /**
1920      * Logs job as running
1921      * @param jobId job id, assigned by jobtracker.
1922      */
logStarted(JobID jobId)1923     public static void logStarted(JobID jobId){
1924       ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
1925 
1926       if (null != writer){
1927         JobHistory.log(writer, RecordTypes.Job,
1928             new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
1929             new String[] {jobId.toString(),
1930                           Values.RUNNING.name()}, jobId);
1931       }
1932     }
1933 
1934     /**
1935      * Log job finished. closes the job file in history.
1936      * @param jobId job id, assigned by jobtracker.
1937      * @param finishTime finish time of job in ms.
1938      * @param finishedMaps no of maps successfully finished.
1939      * @param finishedReduces no of reduces finished sucessfully.
1940      * @param failedMaps no of failed map tasks.
1941      * @param failedReduces no of failed reduce tasks.
1942      * @param counters the counters from the job
1943      */
logFinished(JobID jobId, long finishTime, int finishedMaps, int finishedReduces, int failedMaps, int failedReduces, Counters mapCounters, Counters reduceCounters, Counters counters)1944     public static void logFinished(JobID jobId, long finishTime,
1945                                    int finishedMaps, int finishedReduces,
1946                                    int failedMaps, int failedReduces,
1947                                    Counters mapCounters,
1948                                    Counters reduceCounters,
1949                                    Counters counters) {
1950         // close job file for this job
1951       ArrayList<PrintWriter> writer = fileManager.getWriters(jobId);
1952 
1953       if (null != writer){
1954         JobHistory.log(writer, RecordTypes.Job,
1955                        new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
1956                                    Keys.JOB_STATUS, Keys.FINISHED_MAPS,
1957                                    Keys.FINISHED_REDUCES,
1958                                    Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
1959                                    Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS,
1960                                    Keys.COUNTERS},
1961                        new String[] {jobId.toString(),  Long.toString(finishTime),
1962                                      Values.SUCCESS.name(),
1963                                      String.valueOf(finishedMaps),
1964                                      String.valueOf(finishedReduces),
1965                                      String.valueOf(failedMaps),
1966                                      String.valueOf(failedReduces),
1967                                      mapCounters.makeEscapedCompactString(),
1968                                      reduceCounters.makeEscapedCompactString(),
1969                                      counters.makeEscapedCompactString()}, jobId);
1970         for (PrintWriter out : writer) {
1971           out.close();
1972         }
1973       }
1974       Thread historyCleaner  = new Thread(new HistoryCleaner());
1975       historyCleaner.start();
1976     }
1977     /**
1978      * Logs job failed event. Closes the job history log file.
1979      * @param jobid job id
1980      * @param timestamp time when job failure was detected in ms.
1981      * @param finishedMaps no finished map tasks.
1982      * @param finishedReduces no of finished reduce tasks.
1983      */
logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces, String failReason)1984     public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces, String failReason){
1985       ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
1986 
1987       if (null != writer){
1988         JobHistory.log(writer, RecordTypes.Job,
1989                        new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES, Keys.FAIL_REASON },
1990                        new String[] {jobid.toString(),  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
1991                                      String.valueOf(finishedReduces), failReason}, jobid);
1992         for (PrintWriter out : writer) {
1993           out.close();
1994         }
1995       }
1996     }
1997     /**
1998      * Logs job killed event. Closes the job history log file.
1999      *
2000      * @param jobid
2001      *          job id
2002      * @param timestamp
2003      *          time when job killed was issued in ms.
2004      * @param finishedMaps
2005      *          no finished map tasks.
2006      * @param finishedReduces
2007      *          no of finished reduce tasks.
2008      */
logKilled(JobID jobid, long timestamp, int finishedMaps, int finishedReduces)2009     public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
2010         int finishedReduces) {
2011       ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
2012 
2013       if (null != writer) {
2014         JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
2015             Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
2016             Keys.FINISHED_REDUCES }, new String[] { jobid.toString(),
2017             String.valueOf(timestamp), Values.KILLED.name(),
2018             String.valueOf(finishedMaps), String.valueOf(finishedReduces) }, jobid);
2019         for (PrintWriter out : writer) {
2020           out.close();
2021         }
2022       }
2023     }
2024     /**
2025      * Log job's priority.
2026      * @param jobid job id
2027      * @param priority Jobs priority
2028      */
logJobPriority(JobID jobid, JobPriority priority)2029     public static void logJobPriority(JobID jobid, JobPriority priority){
2030       ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
2031 
2032       if (null != writer){
2033         JobHistory.log(writer, RecordTypes.Job,
2034                        new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY},
2035                        new String[] {jobid.toString(), priority.toString()}, jobid);
2036       }
2037     }
2038     /**
2039      * Log job's submit-time/launch-time
2040      * @param jobid job id
2041      * @param submitTime job's submit time
2042      * @param launchTime job's launch time
2043      * @param restartCount number of times the job got restarted
2044      * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead.
2045      */
2046     @Deprecated
logJobInfo(JobID jobid, long submitTime, long launchTime, int restartCount)2047     public static void logJobInfo(JobID jobid, long submitTime, long launchTime,
2048                                   int restartCount){
2049       logJobInfo(jobid, submitTime, launchTime);
2050     }
2051 
logJobInfo(JobID jobid, long submitTime, long launchTime)2052     public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
2053     {
2054       ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
2055 
2056       if (null != writer){
2057         JobHistory.log(writer, RecordTypes.Job,
2058                        new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME,
2059                                    Keys.LAUNCH_TIME},
2060                        new String[] {jobid.toString(),
2061                                      String.valueOf(submitTime),
2062                                      String.valueOf(launchTime)}, jobid);
2063       }
2064     }
2065   }
2066 
2067   /**
2068    * Helper class for logging or reading back events related to Task's start, finish or failure.
2069    * All events logged by this class are logged in a separate file per job in
2070    * job tracker history. These events map to TIPs in jobtracker.
2071    */
2072   public static class Task extends KeyValuePair{
2073     private Map <String, TaskAttempt> taskAttempts = new TreeMap<String, TaskAttempt>();
2074 
2075     /**
2076      * Log start time of task (TIP).
2077      * @param taskId task id
2078      * @param taskType MAP or REDUCE
2079      * @param startTime startTime of tip.
2080      */
logStarted(TaskID taskId, String taskType, long startTime, String splitLocations)2081     public static void logStarted(TaskID taskId, String taskType,
2082                                   long startTime, String splitLocations) {
2083       JobID id = taskId.getJobID();
2084       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2085 
2086       if (null != writer){
2087         JobHistory.log(writer, RecordTypes.Task,
2088                        new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
2089                                   Keys.START_TIME, Keys.SPLITS},
2090                        new String[]{taskId.toString(), taskType,
2091                                     String.valueOf(startTime),
2092                                     splitLocations}, id);
2093       }
2094     }
2095     /**
2096      * Log finish time of task.
2097      * @param taskId task id
2098      * @param taskType MAP or REDUCE
2099      * @param finishTime finish timeof task in ms
2100      */
logFinished(TaskID taskId, String taskType, long finishTime, Counters counters)2101     public static void logFinished(TaskID taskId, String taskType,
2102                                    long finishTime, Counters counters){
2103       JobID id = taskId.getJobID();
2104       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2105 
2106       if (null != writer){
2107          JobHistory.log(writer, RecordTypes.Task,
2108                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
2109                                    Keys.TASK_STATUS, Keys.FINISH_TIME,
2110                                    Keys.COUNTERS},
2111                         new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(),
2112                                       String.valueOf(finishTime),
2113                                       counters.makeEscapedCompactString()}, id);
2114       }
2115     }
2116 
2117     /**
2118      * Update the finish time of task.
2119      * @param taskId task id
2120      * @param finishTime finish time of task in ms
2121      */
logUpdates(TaskID taskId, long finishTime)2122     public static void logUpdates(TaskID taskId, long finishTime){
2123       JobID id = taskId.getJobID();
2124       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2125 
2126       if (null != writer){
2127         JobHistory.log(writer, RecordTypes.Task,
2128                        new Keys[]{Keys.TASKID, Keys.FINISH_TIME},
2129                        new String[]{ taskId.toString(),
2130                                      String.valueOf(finishTime)}, id);
2131       }
2132     }
2133 
2134     /**
2135      * Log job failed event.
2136      * @param taskId task id
2137      * @param taskType MAP or REDUCE.
2138      * @param time timestamp when job failed detected.
2139      * @param error error message for failure.
2140      */
logFailed(TaskID taskId, String taskType, long time, String error)2141     public static void logFailed(TaskID taskId, String taskType, long time, String error){
2142       logFailed(taskId, taskType, time, error, null);
2143     }
2144 
2145     /**
2146      * @param failedDueToAttempt The attempt that caused the failure, if any
2147      */
logFailed(TaskID taskId, String taskType, long time, String error, TaskAttemptID failedDueToAttempt)2148     public static void logFailed(TaskID taskId, String taskType, long time,
2149                                  String error,
2150                                  TaskAttemptID failedDueToAttempt){
2151       JobID id = taskId.getJobID();
2152       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2153 
2154       if (null != writer){
2155         String failedAttempt = failedDueToAttempt == null
2156                                ? ""
2157                                : failedDueToAttempt.toString();
2158         JobHistory.log(writer, RecordTypes.Task,
2159                        new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
2160                                   Keys.TASK_STATUS, Keys.FINISH_TIME,
2161                                   Keys.ERROR, Keys.TASK_ATTEMPT_ID},
2162                        new String[]{ taskId.toString(),  taskType,
2163                                     Values.FAILED.name(),
2164                                     String.valueOf(time) , error,
2165                                     failedAttempt}, id);
2166       }
2167     }
2168     /**
2169      * Returns all task attempts for this task. <task attempt id - TaskAttempt>
2170      */
getTaskAttempts()2171     public Map<String, TaskAttempt> getTaskAttempts(){
2172       return this.taskAttempts;
2173     }
2174   }
2175 
2176   /**
2177    * Base class for Map and Reduce TaskAttempts.
2178    */
2179   public static class TaskAttempt extends Task{}
2180 
2181   /**
2182    * Helper class for logging or reading back events related to start, finish or failure of
2183    * a Map Attempt on a node.
2184    */
2185   public static class MapAttempt extends TaskAttempt{
2186     /**
2187      * Log start time of this map task attempt.
2188      * @param taskAttemptId task attempt id
2189      * @param startTime start time of task attempt as reported by task tracker.
2190      * @param hostName host name of the task attempt.
2191      * @deprecated Use
2192      *             {@link #logStarted(TaskAttemptID, long, String, int, String)}
2193      */
2194     @Deprecated
logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName)2195     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
2196       logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
2197     }
2198 
2199     @Deprecated
logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType)2200     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
2201         String trackerName, int httpPort, String taskType) {
2202       logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType,
2203           Locality.OFF_SWITCH, Avataar.VIRGIN);
2204     }
2205 
2206     /**
2207      * Log start time of this map task attempt.
2208      *
2209      * @param taskAttemptId task attempt id
2210      * @param startTime start time of task attempt as reported by task tracker.
2211      * @param trackerName name of the tracker executing the task attempt.
2212      * @param httpPort http port of the task tracker executing the task attempt
2213      * @param taskType Whether the attempt is cleanup or setup or map
2214      * @param locality the data locality of the task attempt
2215      * @param Avataar the avataar of the task attempt
2216      */
logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType, Locality locality, Avataar avataar)2217     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
2218                                   String trackerName, int httpPort,
2219                                   String taskType,
2220                                   Locality locality, Avataar avataar) {
2221       JobID id = taskAttemptId.getJobID();
2222       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2223 
2224       if (null != writer){
2225         JobHistory.log(writer, RecordTypes.MapAttempt,
2226                        new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
2227                                    Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
2228                                    Keys.TRACKER_NAME, Keys.HTTP_PORT,
2229                                    Keys.LOCALITY, Keys.AVATAAR},
2230                        new String[]{taskType,
2231                                     taskAttemptId.getTaskID().toString(),
2232                                     taskAttemptId.toString(),
2233                                     String.valueOf(startTime), trackerName,
2234                                     httpPort == -1 ? "" : String.valueOf(httpPort),
2235                                     locality.toString(), avataar.toString()},
2236                        id
2237                        );
2238       }
2239     }
2240 
2241     /**
2242      * Log finish time of map task attempt.
2243      * @param taskAttemptId task attempt id
2244      * @param finishTime finish time
2245      * @param hostName host name
2246      * @deprecated Use
2247      * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
2248      */
2249     @Deprecated
logFinished(TaskAttemptID taskAttemptId, long finishTime, String hostName)2250     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime,
2251                                    String hostName){
2252       logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "",
2253                   new Counters());
2254     }
2255 
2256     /**
2257      * Log finish time of map task attempt.
2258      *
2259      * @param taskAttemptId task attempt id
2260      * @param finishTime finish time
2261      * @param hostName host name
2262      * @param taskType Whether the attempt is cleanup or setup or map
2263      * @param stateString state string of the task attempt
2264      * @param counter counters of the task attempt
2265      */
logFinished(TaskAttemptID taskAttemptId, long finishTime, String hostName, String taskType, String stateString, Counters counter)2266     public static void logFinished(TaskAttemptID taskAttemptId,
2267                                    long finishTime,
2268                                    String hostName,
2269                                    String taskType,
2270                                    String stateString,
2271                                    Counters counter) {
2272       JobID id = taskAttemptId.getJobID();
2273       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2274 
2275       if (null != writer){
2276         JobHistory.log(writer, RecordTypes.MapAttempt,
2277                        new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
2278                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
2279                                    Keys.FINISH_TIME, Keys.HOSTNAME,
2280                                    Keys.STATE_STRING, Keys.COUNTERS},
2281                        new String[]{taskType,
2282                                     taskAttemptId.getTaskID().toString(),
2283                                     taskAttemptId.toString(),
2284                                     Values.SUCCESS.name(),
2285                                     String.valueOf(finishTime), hostName,
2286                                     stateString,
2287                                     counter.makeEscapedCompactString()}, id);
2288       }
2289     }
2290 
2291     /**
2292      * Log task attempt failed event.
2293      * @param taskAttemptId task attempt id
2294      * @param timestamp timestamp
2295      * @param hostName hostname of this task attempt.
2296      * @param error error message if any for this task attempt.
2297      * @deprecated Use
2298      * {@link #logFailed(TaskAttemptID, long, String, String, String)}
2299      */
2300     @Deprecated
logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2301     public static void logFailed(TaskAttemptID taskAttemptId,
2302                                  long timestamp, String hostName,
2303                                  String error) {
2304       logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
2305     }
2306 
2307     /**
2308      * Log task attempt failed event.
2309      *
2310      * @param taskAttemptId task attempt id
2311      * @param timestamp timestamp
2312      * @param hostName hostname of this task attempt.
2313      * @param error error message if any for this task attempt.
2314      * @param taskType Whether the attempt is cleanup or setup or map
2315      */
logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2316     public static void logFailed(TaskAttemptID taskAttemptId,
2317                                  long timestamp, String hostName,
2318                                  String error, String taskType) {
2319       JobID id = taskAttemptId.getJobID();
2320       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2321 
2322       if (null != writer){
2323         JobHistory.log(writer, RecordTypes.MapAttempt,
2324                        new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
2325                                   Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
2326                                   Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
2327                        new String[]{ taskType,
2328                                      taskAttemptId.getTaskID().toString(),
2329                                      taskAttemptId.toString(),
2330                                      Values.FAILED.name(),
2331                                      String.valueOf(timestamp),
2332                                      hostName, error}, id);
2333       }
2334     }
2335 
2336     /**
2337      * Log task attempt killed event.
2338      * @param taskAttemptId task attempt id
2339      * @param timestamp timestamp
2340      * @param hostName hostname of this task attempt.
2341      * @param error error message if any for this task attempt.
2342      * @deprecated Use
2343      * {@link #logKilled(TaskAttemptID, long, String, String, String)}
2344      */
2345     @Deprecated
logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2346     public static void logKilled(TaskAttemptID taskAttemptId,
2347                                  long timestamp, String hostName, String error){
2348       logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
2349     }
2350 
2351     /**
2352      * Log task attempt killed event.
2353      *
2354      * @param taskAttemptId task attempt id
2355      * @param timestamp timestamp
2356      * @param hostName hostname of this task attempt.
2357      * @param error error message if any for this task attempt.
2358      * @param taskType Whether the attempt is cleanup or setup or map
2359      */
logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2360     public static void logKilled(TaskAttemptID taskAttemptId,
2361                                  long timestamp, String hostName,
2362                                  String error, String taskType) {
2363       JobID id = taskAttemptId.getJobID();
2364       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2365 
2366       if (null != writer){
2367         JobHistory.log(writer, RecordTypes.MapAttempt,
2368                        new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
2369                                   Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
2370                                   Keys.FINISH_TIME, Keys.HOSTNAME,
2371                                   Keys.ERROR},
2372                        new String[]{ taskType,
2373                                      taskAttemptId.getTaskID().toString(),
2374                                      taskAttemptId.toString(),
2375                                      Values.KILLED.name(),
2376                                      String.valueOf(timestamp),
2377                                      hostName, error}, id);
2378       }
2379     }
2380   }
2381   /**
2382    * Helper class for logging or reading back events related to start, finish or failure of
2383    * a Map Attempt on a node.
2384    */
2385   public static class ReduceAttempt extends TaskAttempt{
2386     /**
2387      * Log start time of  Reduce task attempt.
2388      * @param taskAttemptId task attempt id
2389      * @param startTime start time
2390      * @param hostName host name
2391      * @deprecated Use
2392      * {@link #logStarted(TaskAttemptID, long, String, int, String)}
2393      */
2394     @Deprecated
logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName)2395     public static void logStarted(TaskAttemptID taskAttemptId,
2396                                   long startTime, String hostName){
2397       logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
2398     }
2399 
2400     @Deprecated
logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType)2401     public static void logStarted(TaskAttemptID taskAttemptId,
2402         long startTime, String trackerName,
2403         int httpPort,
2404         String taskType) {
2405       logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType,
2406           Locality.OFF_SWITCH, Avataar.VIRGIN);
2407     }
2408     /**
2409      * Log start time of  Reduce task attempt.
2410      *
2411      * @param taskAttemptId task attempt id
2412      * @param startTime start time
2413      * @param trackerName tracker name
2414      * @param httpPort the http port of the tracker executing the task attempt
2415      * @param taskType Whether the attempt is cleanup or setup or reduce
2416      * @param locality the data locality of the task attempt
2417      * @param Avataar the avataar of the task attempt
2418      */
logStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType, Locality locality, Avataar avataar)2419     public static void logStarted(TaskAttemptID taskAttemptId,
2420                                   long startTime, String trackerName,
2421                                   int httpPort,
2422                                   String taskType,
2423                                   Locality locality, Avataar avataar) {
2424       JobID id = taskAttemptId.getJobID();
2425 	    ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2426 
2427       if (null != writer){
2428         JobHistory.log(writer, RecordTypes.ReduceAttempt,
2429             new Keys[] {Keys.TASK_TYPE, Keys.TASKID,
2430                 Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
2431                 Keys.TRACKER_NAME, Keys.HTTP_PORT,
2432                 Keys.LOCALITY, Keys.AVATAAR},
2433             new String[]{taskType,
2434                 taskAttemptId.getTaskID().toString(),
2435                 taskAttemptId.toString(),
2436                 String.valueOf(startTime), trackerName,
2437                 httpPort == -1 ? "" :
2438                 String.valueOf(httpPort),
2439                 locality.toString(), avataar.toString()},
2440             id);
2441         }
2442 	    }
2443 
2444 	    /**
2445 	     * Log finished event of this task.
2446 	     * @param taskAttemptId task attempt id
2447 	     * @param shuffleFinished shuffle finish time
2448 	     * @param sortFinished sort finish time
2449 	     * @param finishTime finish time of task
2450 	     * @param hostName host name where task attempt executed
2451 	     * @deprecated Use
2452 	     * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
2453 	     */
2454 	    @Deprecated
logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, long sortFinished, long finishTime, String hostName)2455 	    public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished,
2456 					   long sortFinished, long finishTime,
2457 					   String hostName){
2458 	      logFinished(taskAttemptId, shuffleFinished, sortFinished,
2459 			  finishTime, hostName, Values.REDUCE.name(),
2460 			  "", new Counters());
2461 	    }
2462 
2463 	    /**
2464 	     * Log finished event of this task.
2465 	     *
2466 	     * @param taskAttemptId task attempt id
2467 	     * @param shuffleFinished shuffle finish time
2468 	     * @param sortFinished sort finish time
2469 	     * @param finishTime finish time of task
2470 	     * @param hostName host name where task attempt executed
2471 	     * @param taskType Whether the attempt is cleanup or setup or reduce
2472 	     * @param stateString the state string of the attempt
2473 	     * @param counter counters of the attempt
2474 	     */
logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, long sortFinished, long finishTime, String hostName, String taskType, String stateString, Counters counter)2475 	    public static void logFinished(TaskAttemptID taskAttemptId,
2476 					   long shuffleFinished,
2477 					   long sortFinished, long finishTime,
2478 					   String hostName, String taskType,
2479 					   String stateString, Counters counter) {
2480         JobID id = taskAttemptId.getJobID();
2481         ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2482 
2483         if (null != writer){
2484           JobHistory.log(writer, RecordTypes.ReduceAttempt,
2485                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
2486                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
2487                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
2488                                      Keys.FINISH_TIME, Keys.HOSTNAME,
2489                                      Keys.STATE_STRING, Keys.COUNTERS},
2490                          new String[]{taskType,
2491                                       taskAttemptId.getTaskID().toString(),
2492                                       taskAttemptId.toString(),
2493                                       Values.SUCCESS.name(),
2494                                       String.valueOf(shuffleFinished),
2495                                       String.valueOf(sortFinished),
2496                                       String.valueOf(finishTime), hostName,
2497                                       stateString,
2498                                       counter.makeEscapedCompactString()}, id);
2499         }
2500     }
2501 
2502     /**
2503      * Log failed reduce task attempt.
2504      * @param taskAttemptId task attempt id
2505      * @param timestamp time stamp when task failed
2506      * @param hostName host name of the task attempt.
2507      * @param error error message of the task.
2508      * @deprecated Use
2509      * {@link #logFailed(TaskAttemptID, long, String, String, String)}
2510      */
2511     @Deprecated
logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2512     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp,
2513                                  String hostName, String error){
2514       logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
2515     }
2516 
2517     /**
2518      * Log failed reduce task attempt.
2519      *
2520      * @param taskAttemptId task attempt id
2521      * @param timestamp time stamp when task failed
2522      * @param hostName host name of the task attempt.
2523      * @param error error message of the task.
2524      * @param taskType Whether the attempt is cleanup or setup or reduce
2525      */
logFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2526     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp,
2527                                  String hostName, String error,
2528                                  String taskType) {
2529       JobID id = taskAttemptId.getJobID();
2530       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2531 
2532       if (null != writer){
2533         JobHistory.log(writer, RecordTypes.ReduceAttempt,
2534                        new Keys[]{  Keys.TASK_TYPE, Keys.TASKID,
2535                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
2536                                     Keys.FINISH_TIME, Keys.HOSTNAME,
2537                                     Keys.ERROR },
2538                        new String[]{ taskType,
2539                                      taskAttemptId.getTaskID().toString(),
2540                                      taskAttemptId.toString(),
2541                                      Values.FAILED.name(),
2542                                      String.valueOf(timestamp), hostName, error }, id);
2543       }
2544     }
2545 
2546     /**
2547      * Log killed reduce task attempt.
2548      * @param taskAttemptId task attempt id
2549      * @param timestamp time stamp when task failed
2550      * @param hostName host name of the task attempt.
2551      * @param error error message of the task.
2552      * @deprecated Use
2553      * {@link #logKilled(TaskAttemptID, long, String, String, String)}
2554      */
2555     @Deprecated
logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error)2556     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp,
2557                                  String hostName, String error) {
2558       logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
2559     }
2560 
2561     /**
2562      * Log killed reduce task attempt.
2563      *
2564      * @param taskAttemptId task attempt id
2565      * @param timestamp time stamp when task failed
2566      * @param hostName host name of the task attempt.
2567      * @param error error message of the task.
2568      * @param taskType Whether the attempt is cleanup or setup or reduce
2569     */
logKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType)2570     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp,
2571                                  String hostName, String error,
2572                                  String taskType) {
2573       JobID id = taskAttemptId.getJobID();
2574       ArrayList<PrintWriter> writer = fileManager.getWriters(id);
2575 
2576       if (null != writer){
2577         JobHistory.log(writer, RecordTypes.ReduceAttempt,
2578                        new Keys[]{  Keys.TASK_TYPE, Keys.TASKID,
2579                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
2580                                     Keys.FINISH_TIME, Keys.HOSTNAME,
2581                                     Keys.ERROR },
2582                        new String[]{ taskType,
2583                                      taskAttemptId.getTaskID().toString(),
2584                                      taskAttemptId.toString(),
2585                                      Values.KILLED.name(),
2586                                      String.valueOf(timestamp),
2587                                      hostName, error }, id);
2588       }
2589     }
2590   }
2591 
2592   /**
2593    * Callback interface for reading back log events from JobHistory. This interface
2594    * should be implemented and passed to JobHistory.parseHistory()
2595    *
2596    */
2597   public static interface Listener{
2598     /**
2599      * Callback method for history parser.
2600      * @param recType type of record, which is the first entry in the line.
2601      * @param values a map of key-value pairs as thry appear in history.
2602      * @throws IOException
2603      */
handle(RecordTypes recType, Map<Keys, String> values)2604     public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException;
2605   }
2606 
2607   /**
2608    * Returns the time in milliseconds, truncated to the day.
2609    */
directoryTime(String year, String month, String day)2610   static long directoryTime(String year, String month, String day) {
2611     Calendar result = Calendar.getInstance();
2612     result.clear();
2613 
2614     result.set(Calendar.YEAR, Integer.parseInt(year));
2615 
2616     // months are 0-based in Calendar, but people will expect January
2617     // to be month #1 .  Therefore the number is bumped before we make the
2618     // directory name and must be debumped to seek the time.
2619     result.set(Calendar.MONTH, Integer.parseInt(month) - 1);
2620 
2621     result.set(Calendar.DAY_OF_MONTH, Integer.parseInt(day));
2622 
2623     // truncate to day granularity
2624     long timeInMillis = result.getTimeInMillis();
2625     return timeInMillis;
2626   }
2627 
2628   /**
2629    * Delete history files older than one month. Update master index and remove all
2630    * jobs older than one month. Also if a job tracker has no jobs in last one month
2631    * remove reference to the job tracker.
2632    *
2633    */
2634   public static class HistoryCleaner implements Runnable {
2635     static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
2636     static final long DEFAULT_HISTORY_MAX_AGE = 30 * ONE_DAY_IN_MS;
2637     static final long DEFAULT_CLEANUP_FREQUENCY = ONE_DAY_IN_MS;
2638     static long cleanupFrequency = DEFAULT_CLEANUP_FREQUENCY;
2639     static long maxAgeOfHistoryFiles = DEFAULT_HISTORY_MAX_AGE;
2640     private long now;
2641     private static final AtomicBoolean isRunning = new AtomicBoolean(false);
2642     private static long lastRan = 0;
2643 
2644     private static Pattern parseDirectory
2645       = Pattern.compile(".+/([0-9]+)/([0-9]+)/([0-9]+)/[0-9]+/?");
2646 
2647     /**
2648      * Cleans up history data.
2649      */
run()2650     public void run() {
2651       if (isRunning.getAndSet(true)) {
2652         return;
2653       }
2654       now = System.currentTimeMillis();
2655       // clean history only once a day at max
2656       if (lastRan != 0 && (now - lastRan) < cleanupFrequency) {
2657         isRunning.set(false);
2658         return;
2659       }
2660       lastRan = now;
2661       clean(now);
2662     }
2663 
clean(long now)2664     public void clean(long now) {
2665       Set<String> deletedPathnames = new HashSet<String>();
2666 
2667       // XXXXX debug code
2668       boolean printedOneDeletee = false;
2669       boolean printedOneMovedFile = false;
2670 
2671       try {
2672         Path[] datedDirectories
2673           = FileUtil.stat2Paths(localGlobber(DONEDIR_FS, DONE,
2674                                              DONE_BEFORE_SERIAL_TAIL, null));
2675 
2676         // any file with a timestamp earlier than cutoff should be deleted
2677         long cutoff = now - maxAgeOfHistoryFiles;
2678         Calendar cutoffDay = Calendar.getInstance();
2679         cutoffDay.setTimeInMillis(cutoff);
2680         cutoffDay.set(Calendar.HOUR_OF_DAY, 0);
2681         cutoffDay.set(Calendar.MINUTE, 0);
2682         cutoffDay.set(Calendar.SECOND, 0);
2683         cutoffDay.set(Calendar.MILLISECOND, 0);
2684 
2685         // find directories older than the maximum age
2686         for (int i = 0; i < datedDirectories.length; ++i) {
2687           String thisDir = datedDirectories[i].toString();
2688           Matcher pathMatcher = parseDirectory.matcher(thisDir);
2689 
2690           if (pathMatcher.matches()) {
2691             long dirDay = directoryTime(pathMatcher.group(1),
2692                                          pathMatcher.group(2),
2693                                          pathMatcher.group(3));
2694 
2695             if (LOG.isDebugEnabled()) {
2696               LOG.debug("HistoryCleaner.run just parsed " + thisDir
2697                   + " as year/month/day = " + pathMatcher.group(1) + "/"
2698                   + pathMatcher.group(2) + "/" + pathMatcher.group(3));
2699             }
2700 
2701             if (dirDay <= cutoffDay.getTimeInMillis()) {
2702               if (LOG.isDebugEnabled()) {
2703                 Calendar nnow = Calendar.getInstance();
2704                 nnow.setTimeInMillis(now);
2705                 Calendar then = Calendar.getInstance();
2706                 then.setTimeInMillis(dirDay);
2707 
2708                 LOG.debug("HistoryCleaner.run directory: " + thisDir
2709                     + " because its time is " + then + " but it's now " + nnow);
2710               }
2711             }
2712 
2713             // if dirDay is cutoffDay, some files may be old enough and others not
2714             if (dirDay == cutoffDay.getTimeInMillis()) {
2715               // remove old enough files in the directory
2716               FileStatus[] possibleDeletees = DONEDIR_FS.listStatus(datedDirectories[i]);
2717 
2718               for (int j = 0; j < possibleDeletees.length; ++j) {
2719             	  if (possibleDeletees[j].getModificationTime() < now -
2720             	      maxAgeOfHistoryFiles) {
2721             	    Path deletee = possibleDeletees[j].getPath();
2722                   if (LOG.isDebugEnabled() && !printedOneDeletee) {
2723                     LOG.debug("HistoryCleaner.run deletee: "
2724                         + deletee.toString());
2725                     printedOneDeletee = true;
2726                   }
2727 
2728                   DONEDIR_FS.delete(deletee);
2729                   deletedPathnames.add(deletee.toString());
2730             	  }
2731               }
2732             }
2733 
2734             // if the directory is older than cutoffDay, we can flat out
2735             // delete it because all the files in it are old enough
2736             if (dirDay < cutoffDay.getTimeInMillis()) {
2737               synchronized (existingDoneSubdirs) {
2738                 if (!existingDoneSubdirs.contains(datedDirectories[i])) {
2739                   LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
2740                       + datedDirectories[i] + ", but should.");
2741                 }
2742                 DONEDIR_FS.delete(datedDirectories[i], true);
2743                 existingDoneSubdirs.remove(datedDirectories[i]);
2744               }
2745             }
2746           }
2747         }
2748 
2749         //walking over the map to purge entries from jobHistoryFileMap
2750         synchronized (jobHistoryFileMap) {
2751           Iterator<Entry<JobID, MovedFileInfo>> it =
2752             jobHistoryFileMap.entrySet().iterator();
2753           while (it.hasNext()) {
2754             MovedFileInfo info = it.next().getValue();
2755 
2756             if (LOG.isDebugEnabled() && !printedOneMovedFile) {
2757               LOG.debug("HistoryCleaner.run a moved file: " + info.historyFile);
2758               printedOneMovedFile = true;
2759             }
2760 
2761             if (deletedPathnames.contains(info.historyFile)) {
2762               it.remove();
2763             }
2764           }
2765         }
2766       } catch (IOException ie) {
2767         LOG.info("Error cleaning up history directory" +
2768                  StringUtils.stringifyException(ie));
2769       } finally {
2770           isRunning.set(false);
2771       }
2772     }
2773 
getLastRan()2774     static long getLastRan() {
2775       return lastRan;
2776     }
2777   }
2778 
2779   /**
2780    * Return the TaskLogsUrl of a particular TaskAttempt
2781    *
2782    * @param attempt
2783    * @return the taskLogsUrl. null if http-port or tracker-name or
2784    *         task-attempt-id are unavailable.
2785    */
getTaskLogsUrl(JobHistory.TaskAttempt attempt)2786   public static String getTaskLogsUrl(JobHistory.TaskAttempt attempt) {
2787     if (attempt.get(Keys.HTTP_PORT).equals("")
2788         || attempt.get(Keys.TRACKER_NAME).equals("")
2789         || attempt.get(Keys.TASK_ATTEMPT_ID).equals("")) {
2790       return null;
2791     }
2792 
2793     String taskTrackerName =
2794       JobInProgress.convertTrackerNameToHostName(
2795         attempt.get(Keys.TRACKER_NAME));
2796     return TaskLogServlet.getTaskLogUrl(taskTrackerName, attempt
2797         .get(Keys.HTTP_PORT), attempt.get(Keys.TASK_ATTEMPT_ID));
2798   }
2799 }
2800