1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.util.ArrayList;
25 import java.util.Calendar;
26 import java.util.List;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Iterator;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
32 
33 import junit.framework.TestCase;
34 
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.fs.PathFilter;
40 import org.apache.hadoop.fs.permission.FsPermission;
41 import org.apache.hadoop.hdfs.MiniDFSCluster;
42 import org.apache.hadoop.mapred.JobHistory.*;
43 import org.apache.hadoop.mapred.QueueManager.QueueACL;
44 import org.apache.hadoop.mapreduce.JobACL;
45 import org.apache.hadoop.mapreduce.TaskType;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.security.UserGroupInformation;
49 import org.apache.hadoop.security.authorize.AccessControlList;
50 
51 /**
52  * Tests the JobHistory files - to catch any changes to JobHistory that can
53  * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
54  *
55  * testJobHistoryFile
56  * Run a job that will be succeeded and validate its history file format and
57  * content.
58  *
59  * testJobHistoryUserLogLocation
60  * Run jobs with the given values of hadoop.job.history.user.location as
61  *   (1)null(default case), (2)"none", and (3)some user specified dir.
62  *   Validate user history file location in each case.
63  *
64  * testJobHistoryJobStatus
65  * Run jobs that will be (1) succeeded (2) failed (3) killed.
66  *   Validate job status read from history file in each case.
67  *
68  * Future changes to job history are to be reflected here in this file.
69  */
70 public class TestJobHistory extends TestCase {
71    private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
72 
73   private static String TEST_ROOT_DIR = new File(System.getProperty(
74       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
75 
76   private static final Pattern digitsPattern =
77                                      Pattern.compile(JobHistory.DIGITS);
78 
79   // hostname like   /default-rack/host1.foo.com OR host1.foo.com
80   private static final Pattern hostNamePattern = Pattern.compile(
81                                        "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)");
82 
83   private static final String IP_ADDR =
84                        "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?";
85 
86   // hostname like   /default-rack/host1.foo.com OR host1.foo.com
87   private static final Pattern trackerNamePattern = Pattern.compile(
88                          "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
89                          IP_ADDR + ":" + JobHistory.DIGITS);
90 
91   private static final Pattern splitsPattern = Pattern.compile(
92                               hostNamePattern + "(," + hostNamePattern + ")*");
93 
94   private static Map<String, List<String>> taskIDsToAttemptIDs =
95                                      new HashMap<String, List<String>>();
96 
97   //Each Task End seen from history file is added here
98   private static List<String> taskEnds = new ArrayList<String>();
99 
100   // List of tasks that appear in history file after JT reatart. This is to
101   // allow START_TIME=0 for these tasks.
102   private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
103 
104   // List of potential tasks whose start time can be 0 because of JT restart
105   private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
106 
107   /**
108    * Listener for history log file, it populates JobHistory.JobInfo
109    * object with data from log file and validates the data.
110    */
111   static class TestListener
112                     extends DefaultJobHistoryParser.JobTasksParseListener {
113     int lineNum;//line number of history log file
114     boolean isJobLaunched;
115     boolean isJTRestarted;
116 
TestListener(JobHistory.JobInfo job)117     TestListener(JobHistory.JobInfo job) {
118       super(job);
119       lineNum = 0;
120       isJobLaunched = false;
121       isJTRestarted = false;
122     }
123 
124     // TestListener implementation
handle(RecordTypes recType, Map<Keys, String> values)125     public void handle(RecordTypes recType, Map<Keys, String> values)
126     throws IOException {
127 
128       lineNum++;
129 
130       // Check if the record is of type Meta
131       if (recType == JobHistory.RecordTypes.Meta) {
132         long version = Long.parseLong(values.get(Keys.VERSION));
133         assertTrue("Unexpected job history version ",
134                    (version >= 0 && version <= JobHistory.VERSION));
135       }
136       else if (recType.equals(RecordTypes.Job)) {
137         String jobid = values.get(Keys.JOBID);
138         assertTrue("record type 'Job' is seen without JOBID key" +
139         		" in history file at line " + lineNum, jobid != null);
140         JobID id = JobID.forName(jobid);
141         assertTrue("JobID in history file is in unexpected format " +
142                   "at line " + lineNum, id != null);
143         String time = values.get(Keys.LAUNCH_TIME);
144         if (time != null) {
145           if (isJobLaunched) {
146             // We assume that if we see LAUNCH_TIME again, it is because of JT restart
147             isJTRestarted = true;
148           }
149           else {// job launched first time
150             isJobLaunched = true;
151           }
152         }
153         time = values.get(Keys.FINISH_TIME);
154         if (time != null) {
155           assertTrue ("Job FINISH_TIME is seen in history file at line " +
156                       lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
157         }
158       }
159       else if (recType.equals(RecordTypes.Task)) {
160         String taskid = values.get(Keys.TASKID);
161         assertTrue("record type 'Task' is seen without TASKID key" +
162         		" in history file at line " + lineNum, taskid != null);
163         TaskID id = TaskID.forName(taskid);
164         assertTrue("TaskID in history file is in unexpected format " +
165                   "at line " + lineNum, id != null);
166 
167         String time = values.get(Keys.START_TIME);
168         if (time != null) {
169           List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
170           assertTrue("Duplicate START_TIME seen for task " + taskid +
171                      " in history file at line " + lineNum, attemptIDs == null);
172           attemptIDs = new ArrayList<String>();
173           taskIDsToAttemptIDs.put(taskid, attemptIDs);
174 
175           if (isJTRestarted) {
176             // This maintains a potential ignoreStartTimeTasks list
177             tempIgnoreStartTimeOfTasks.add(taskid);
178           }
179         }
180 
181         time = values.get(Keys.FINISH_TIME);
182         if (time != null) {
183           String s = values.get(Keys.TASK_STATUS);
184           if (s != null) {
185             List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
186             assertTrue ("Task FINISH_TIME is seen in history file at line " +
187                     lineNum + " before START_TIME is seen", attemptIDs != null);
188 
189             // Check if all the attemptIDs of this task are finished
190             assertTrue("TaskId " + taskid + " is finished at line " +
191                        lineNum + " but its attemptID is not finished.",
192                        (attemptIDs.size() <= 1));
193 
194             // Check if at least 1 attempt of this task is seen
195             assertTrue("TaskId " + taskid + " is finished at line " +
196                        lineNum + " but no attemptID is seen before this.",
197                        attemptIDs.size() == 1);
198 
199             if (s.equals("KILLED") || s.equals("FAILED")) {
200               // Task End with KILLED/FAILED status in history file is
201               // considered as TaskEnd, TaskStart. This is useful in checking
202               // the order of history lines.
203               attemptIDs = new ArrayList<String>();
204               taskIDsToAttemptIDs.put(taskid, attemptIDs);
205             }
206             else {
207               taskEnds.add(taskid);
208             }
209           }
210           else {
211             // This line of history file could be just an update to finish time
212           }
213         }
214       }
215       else if (recType.equals(RecordTypes.MapAttempt) ||
216                  recType.equals(RecordTypes.ReduceAttempt)) {
217         String taskid =  values.get(Keys.TASKID);
218         assertTrue("record type " + recType + " is seen without TASKID key" +
219         		" in history file at line " + lineNum, taskid != null);
220 
221         String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
222         TaskAttemptID id = TaskAttemptID.forName(attemptId);
223         assertTrue("AttemptID in history file is in unexpected format " +
224                    "at line " + lineNum, id != null);
225 
226         String time = values.get(Keys.START_TIME);
227         if (time != null) {
228           List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
229           assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
230                       " before Task is seen", attemptIDs != null);
231           assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
232                       "file at line " + lineNum, attemptIDs.remove(attemptId));
233 
234           if (attemptIDs.isEmpty()) {
235             //just a boolean whether any attempt is seen or not
236             attemptIDs.add("firstAttemptIsSeen");
237           }
238           attemptIDs.add(attemptId);
239 
240           if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
241               (id.getId() < 1000)) {
242             // If Task line of this attempt is seen in history file after
243             // JT restart and if this attempt is < 1000(i.e. attempt is noti
244             // started after JT restart) - assuming single JT restart happened
245             ignoreStartTimeOfTasks.add(taskid);
246           }
247         }
248 
249         time = values.get(Keys.FINISH_TIME);
250         if (time != null) {
251           List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
252           assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
253                       + lineNum + " before Task is seen", attemptIDs != null);
254 
255           assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
256                       + lineNum + " before TaskAttempt START_TIME is seen",
257                       attemptIDs.remove(attemptId));
258         }
259       }
260       super.handle(recType, values);
261     }
262   }
263 
264   // Check if the time is in the expected format
isTimeValid(String time)265   private static boolean isTimeValid(String time) {
266     Matcher m = digitsPattern.matcher(time);
267     return m.matches() && (Long.parseLong(time) > 0);
268   }
269 
areTimesInOrder(String time1, String time2)270   private static boolean areTimesInOrder(String time1, String time2) {
271     return (Long.parseLong(time1) <= Long.parseLong(time2));
272   }
273 
274   // Validate Format of Job Level Keys, Values read from history file
validateJobLevelKeyValuesFormat(Map<Keys, String> values, String status)275   private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
276                                                       String status) {
277     String time = values.get(Keys.SUBMIT_TIME);
278     assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
279                " in history file", isTimeValid(time));
280 
281     time = values.get(Keys.LAUNCH_TIME);
282     assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
283                " in history file", isTimeValid(time));
284 
285     String time1 = values.get(Keys.FINISH_TIME);
286     assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
287                " in history file", isTimeValid(time1));
288     assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
289                areTimesInOrder(time, time1));
290 
291     String stat = values.get(Keys.JOB_STATUS);
292     assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" +
293                " history file", (status.equals(stat)));
294 
295     String priority = values.get(Keys.JOB_PRIORITY);
296     assertTrue("Unknown priority for the job in history file",
297                (priority.equals("HIGH") ||
298                 priority.equals("LOW")  || priority.equals("NORMAL") ||
299                 priority.equals("VERY_HIGH") || priority.equals("VERY_LOW")));
300   }
301 
302   // Validate Format of Task Level Keys, Values read from history file
validateTaskLevelKeyValuesFormat(JobHistory.JobInfo job, boolean splitsCanBeEmpty)303   private static void validateTaskLevelKeyValuesFormat(JobHistory.JobInfo job,
304                                   boolean splitsCanBeEmpty) {
305     Map<String, JobHistory.Task> tasks = job.getAllTasks();
306 
307     // validate info of each task
308     for (JobHistory.Task task : tasks.values()) {
309 
310       String tid = task.get(Keys.TASKID);
311       String time = task.get(Keys.START_TIME);
312       // We allow START_TIME=0 for tasks seen in history after JT restart
313       if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
314         assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
315                  time + " in history file", isTimeValid(time));
316       }
317 
318       String time1 = task.get(Keys.FINISH_TIME);
319       assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
320                  time1 + " in history file", isTimeValid(time1));
321       assertTrue("Task FINISH_TIME is < START_TIME in history file",
322                  areTimesInOrder(time, time1));
323 
324       // Make sure that the Task type exists and it is valid
325       String type = task.get(Keys.TASK_TYPE);
326       assertTrue("Unknown Task type \"" + type + "\" is seen in " +
327                  "history file for task " + tid,
328                  (type.equals("MAP") || type.equals("REDUCE") ||
329                   type.equals("SETUP") || type.equals("CLEANUP")));
330 
331       if (type.equals("MAP")) {
332         String splits = task.get(Keys.SPLITS);
333         //order in the condition OR check is important here
334         if (!splitsCanBeEmpty || splits.length() != 0) {
335           Matcher m = splitsPattern.matcher(splits);
336           assertTrue("Unexpected format of SPLITS \"" + splits + "\" is seen" +
337                      " in history file for task " + tid, m.matches());
338         }
339       }
340 
341       // Validate task status
342       String status = task.get(Keys.TASK_STATUS);
343       assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
344                  " history file for task " + tid, (status.equals("SUCCESS") ||
345                  status.equals("FAILED") || status.equals("KILLED")));
346     }
347   }
348 
349   // Validate foramt of Task Attempt Level Keys, Values read from history file
validateTaskAttemptLevelKeyValuesFormat(JobHistory.JobInfo job)350   private static void validateTaskAttemptLevelKeyValuesFormat(JobHistory.JobInfo job) {
351     Map<String, JobHistory.Task> tasks = job.getAllTasks();
352 
353     // For each task
354     for (JobHistory.Task task : tasks.values()) {
355       // validate info of each attempt
356       for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
357 
358         String id = attempt.get(Keys.TASK_ATTEMPT_ID);
359         String time = attempt.get(Keys.START_TIME);
360         assertTrue("START_TIME of task attempt " + id +
361                    " is in unexpected format:" + time +
362                    " in history file", isTimeValid(time));
363 
364         String time1 = attempt.get(Keys.FINISH_TIME);
365         assertTrue("FINISH_TIME of task attempt " + id +
366                    " is in unexpected format:" + time1 +
367                    " in history file", isTimeValid(time1));
368         assertTrue("Task FINISH_TIME is < START_TIME in history file",
369                    areTimesInOrder(time, time1));
370 
371         // Make sure that the Task type exists and it is valid
372         String type = attempt.get(Keys.TASK_TYPE);
373         assertTrue("Unknown Task type \"" + type + "\" is seen in " +
374                    "history file for task attempt " + id,
375                    (type.equals("MAP") || type.equals("REDUCE") ||
376                     type.equals("SETUP") || type.equals("CLEANUP")));
377 
378         // Validate task status
379         String status = attempt.get(Keys.TASK_STATUS);
380         assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
381                    " history file for task attempt " + id,
382                    (status.equals("SUCCESS") || status.equals("FAILED") ||
383                     status.equals("KILLED")));
384 
385         // Validate task Avataar
386         String avataar = attempt.get(Keys.AVATAAR);
387         assertTrue("Unexpected LOCALITY \"" + avataar + "\" is seen in " +
388             " history file for task attempt " + id,
389             (avataar.equals("VIRGIN") || avataar.equals("SPECULATIVE"))
390         );
391 
392         // Map Task Attempts should have valid LOCALITY
393         if (type.equals("MAP")) {
394           String locality = attempt.get(Keys.LOCALITY);
395           assertTrue("Unexpected LOCALITY \"" + locality + "\" is seen in " +
396               " history file for task attempt " + id,
397               (locality.equals("NODE_LOCAL") || locality.equals("GROUP_LOCAL") ||
398                   locality.equals("RACK_LOCAL") || locality.equals("OFF_SWITCH"))
399           );
400         }
401 
402         // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
403         // SORT_FINISHED time
404         if (type.equals("REDUCE") && status.equals("SUCCESS")) {
405           time1 = attempt.get(Keys.SHUFFLE_FINISHED);
406           assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
407                      " is in unexpected format:" + time1 +
408                      " in history file", isTimeValid(time1));
409           assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
410                      "in history file", areTimesInOrder(time, time1));
411           time = attempt.get(Keys.SORT_FINISHED);
412           assertTrue("SORT_FINISHED of task attempt " + id +
413                      " is in unexpected format:" + time +
414                      " in history file", isTimeValid(time));
415           assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
416                      " in history file", areTimesInOrder(time1, time));
417         }
418 
419         // check if hostname is valid
420         String hostname = attempt.get(Keys.HOSTNAME);
421         Matcher m = hostNamePattern.matcher(hostname);
422         assertTrue("Unexpected Host name of task attempt " + id, m.matches());
423 
424         // check if trackername is valid
425         String trackerName = attempt.get(Keys.TRACKER_NAME);
426         m = trackerNamePattern.matcher(trackerName);
427         assertTrue("Unexpected tracker name of task attempt " + id,
428                    m.matches());
429 
430         if (!status.equals("KILLED")) {
431           // check if http port is valid
432           String httpPort = attempt.get(Keys.HTTP_PORT);
433           m = digitsPattern.matcher(httpPort);
434           assertTrue("Unexpected http port of task attempt " + id, m.matches());
435         }
436 
437         // check if counters are parsable
438         String counters = attempt.get(Keys.COUNTERS);
439         try {
440           Counters readCounters = Counters.fromEscapedCompactString(counters);
441           assertTrue("Counters of task attempt " + id + " are not parsable",
442                      readCounters != null);
443         } catch (ParseException pe) {
444           LOG.warn("While trying to parse counters of task attempt " + id +
445                    ", " + pe);
446         }
447       }
448     }
449   }
450 
451   /**
452    * Returns the conf file name in the same
453    * @param path path of the jobhistory file
454    * @param running whether the job is running or completed
455    */
getPathForConf(Path path)456   private static Path getPathForConf(Path path) {
457     return JobHistory.confPathFromLogFilePath(path);
458   }
459 
460   /**
461    *  Validates the format of contents of history file
462    *  (1) history file exists and in correct location
463    *  (2) Verify if the history file is parsable
464    *  (3) Validate the contents of history file
465    *     (a) Format of all TIMEs are checked against a regex
466    *     (b) validate legality/format of job level key, values
467    *     (c) validate legality/format of task level key, values
468    *     (d) validate legality/format of attempt level key, values
469    *     (e) check if all the TaskAttempts, Tasks started are finished.
470    *         Check finish of each TaskAttemptID against its start to make sure
471    *         that all TaskAttempts, Tasks started are indeed finished and the
472    *         history log lines are in the proper order.
473    *         We want to catch ordering of history lines like
474    *            Task START
475    *            Attempt START
476    *            Task FINISH
477    *            Attempt FINISH
478    *         (speculative execution is turned off for this).
479    * @param id job id
480    * @param conf job conf
481    */
validateJobHistoryFileFormat(JobID id, JobConf conf, String status, boolean splitsCanBeEmpty)482   static void validateJobHistoryFileFormat(JobID id, JobConf conf,
483                  String status, boolean splitsCanBeEmpty) throws IOException  {
484 
485     // Get the history file name
486     Path dir = JobHistory.getCompletedJobHistoryLocation();
487     String logFileName = getDoneFile(conf, id, dir);
488 
489     // Framework history log file location
490     Path logFile = new Path(dir, logFileName);
491     FileSystem fileSys = logFile.getFileSystem(conf);
492 
493     // Check if the history file exists
494     assertTrue("History file does not exist", fileSys.exists(logFile));
495 
496     // Check that the log file name includes a directory level for the version number
497     assertTrue("History filename does not include a directory level "
498                  + "for the version number.",
499                logFile.toString()
500                  .contains("/"
501                            + JobHistory.DONE_DIRECTORY_FORMAT_DIRNAME
502                            + "/"));
503 
504     // check if the history file is parsable
505     String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
506     		                                   logFileName).split("_");
507 
508     String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
509     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
510 
511     TestListener l = new TestListener(jobInfo);
512     JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
513 
514 
515     // validate format of job level key, values
516     validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
517 
518     // validate format of task level key, values
519     validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
520 
521     // validate format of attempt level key, values
522     validateTaskAttemptLevelKeyValuesFormat(jobInfo);
523 
524     // check if all the TaskAttempts, Tasks started are finished for
525     // successful jobs
526     if (status.equals("SUCCESS")) {
527       // Make sure that the lists in taskIDsToAttemptIDs are empty.
528       for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
529         String taskid = it.next();
530         assertTrue("There are some Tasks which are not finished in history " +
531                    "file.", taskEnds.contains(taskid));
532         List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
533         if(attemptIDs != null) {
534           assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" +
535                      " that are not finished.", (attemptIDs.size() == 1));
536         }
537       }
538     }
539   }
540 
541   // Validate Job Level Keys, Values read from history file by
542   // comparing them with the actual values from JT.
validateJobLevelKeyValues(MiniMRCluster mr, RunningJob job, JobHistory.JobInfo jobInfo, JobConf conf)543   private static void validateJobLevelKeyValues(MiniMRCluster mr,
544           RunningJob job, JobHistory.JobInfo jobInfo, JobConf conf) throws IOException  {
545 
546     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
547     JobInProgress jip = jt.getJob(job.getID());
548 
549     Map<Keys, String> values = jobInfo.getValues();
550 
551     assertTrue("SUBMIT_TIME of job obtained from history file did not " +
552                "match the expected value", jip.getStartTime() ==
553                Long.parseLong(values.get(Keys.SUBMIT_TIME)));
554 
555     assertTrue("LAUNCH_TIME of job obtained from history file did not " +
556                "match the expected value", jip.getLaunchTime() ==
557                Long.parseLong(values.get(Keys.LAUNCH_TIME)));
558 
559     assertTrue("FINISH_TIME of job obtained from history file did not " +
560                "match the expected value", jip.getFinishTime() ==
561                Long.parseLong(values.get(Keys.FINISH_TIME)));
562 
563     assertTrue("Job Status of job obtained from history file did not " +
564                "match the expected value",
565                values.get(Keys.JOB_STATUS).equals("SUCCESS"));
566 
567     assertTrue("Job Priority of job obtained from history file did not " +
568                "match the expected value", jip.getPriority().toString().equals(
569                values.get(Keys.JOB_PRIORITY)));
570 
571     assertTrue("Job Name of job obtained from history file did not " +
572                "match the expected value", JobHistory.JobInfo.getJobName(conf).equals(
573                values.get(Keys.JOBNAME)));
574 
575     assertTrue("User Name of job obtained from history file did not " +
576                "match the expected value", JobHistory.JobInfo.getUserName(conf).equals(
577                values.get(Keys.USER)));
578 
579     // Validate job counters
580     Counters c = new Counters();
581     jip.getCounters(c);
582     assertTrue("Counters of job obtained from history file did not " +
583                "match the expected value",
584                c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
585     Counters m = new Counters();
586     jip.getMapCounters(m);
587     assertTrue("Map Counters of job obtained from history file did not " +
588                "match the expected value", m.makeEscapedCompactString().
589                equals(values.get(Keys.MAP_COUNTERS)));
590     Counters r = new Counters();
591     jip.getReduceCounters(r);
592     assertTrue("Reduce Counters of job obtained from history file did not " +
593                "match the expected value", r.makeEscapedCompactString().
594                equals(values.get(Keys.REDUCE_COUNTERS)));
595 
596     // Validate number of total maps, total reduces, finished maps,
597     // finished reduces, failed maps, failed recudes
598     String totalMaps = values.get(Keys.TOTAL_MAPS);
599     assertTrue("Unexpected number of total maps in history file",
600                Integer.parseInt(totalMaps) == jip.desiredMaps());
601 
602     String totalReduces = values.get(Keys.TOTAL_REDUCES);
603     assertTrue("Unexpected number of total reduces in history file",
604                Integer.parseInt(totalReduces) == jip.desiredReduces());
605 
606     String finMaps = values.get(Keys.FINISHED_MAPS);
607     assertTrue("Unexpected number of finished maps in history file",
608                Integer.parseInt(finMaps) == jip.finishedMaps());
609 
610     String finReduces = values.get(Keys.FINISHED_REDUCES);
611     assertTrue("Unexpected number of finished reduces in history file",
612                Integer.parseInt(finReduces) == jip.finishedReduces());
613 
614     String failedMaps = values.get(Keys.FAILED_MAPS);
615     assertTrue("Unexpected number of failed maps in history file",
616                Integer.parseInt(failedMaps) == jip.failedMapTasks);
617 
618     String failedReduces = values.get(Keys.FAILED_REDUCES);
619     assertTrue("Unexpected number of failed reduces in history file",
620                Integer.parseInt(failedReduces) == jip.failedReduceTasks);
621   }
622 
623   // Validate Task Level Keys, Values read from history file by
624   // comparing them with the actual values from JT.
validateTaskLevelKeyValues(MiniMRCluster mr, RunningJob job, JobHistory.JobInfo jobInfo)625   private static void validateTaskLevelKeyValues(MiniMRCluster mr,
626                       RunningJob job, JobHistory.JobInfo jobInfo) throws IOException  {
627 
628     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
629     JobInProgress jip = jt.getJob(job.getID());
630 
631     // Get the 1st map, 1st reduce, cleanup & setup taskIDs and
632     // validate their history info
633     TaskID mapTaskId = new TaskID(job.getID(), true, 0);
634     TaskID reduceTaskId = new TaskID(job.getID(), false, 0);
635 
636     TaskInProgress cleanups[] = jip.getTasks(TaskType.JOB_CLEANUP);
637     TaskID cleanupTaskId;
638     if (cleanups[0].isComplete()) {
639       cleanupTaskId = cleanups[0].getTIPId();
640     }
641     else {
642       cleanupTaskId = cleanups[1].getTIPId();
643     }
644 
645     TaskInProgress setups[] = jip.getTasks(TaskType.JOB_SETUP);
646     TaskID setupTaskId;
647     if (setups[0].isComplete()) {
648       setupTaskId = setups[0].getTIPId();
649     }
650     else {
651       setupTaskId = setups[1].getTIPId();
652     }
653 
654     Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
655 
656     // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
657     for (JobHistory.Task task : tasks.values()) {
658 
659       String tid = task.get(Keys.TASKID);
660       if (tid.equals(mapTaskId.toString()) ||
661           tid.equals(reduceTaskId.toString()) ||
662           tid.equals(cleanupTaskId.toString()) ||
663           tid.equals(setupTaskId.toString())) {
664 
665         TaskID taskId = null;
666         if (tid.equals(mapTaskId.toString())) {
667           taskId = mapTaskId;
668         }
669         else if (tid.equals(reduceTaskId.toString())) {
670           taskId = reduceTaskId;
671         }
672         else if (tid.equals(cleanupTaskId.toString())) {
673           taskId = cleanupTaskId;
674         }
675         else if (tid.equals(setupTaskId.toString())) {
676           taskId = setupTaskId;
677         }
678         TaskInProgress tip = jip.getTaskInProgress(taskId);
679         assertTrue("START_TIME of Task " + tid + " obtained from history " +
680              "file did not match the expected value", tip.getExecStartTime() ==
681              Long.parseLong(task.get(Keys.START_TIME)));
682 
683         assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
684              "file did not match the expected value", tip.getExecFinishTime() ==
685              Long.parseLong(task.get(Keys.FINISH_TIME)));
686 
687         if (taskId == mapTaskId) {//check splits only for map task
688           assertTrue("Splits of Task " + tid + " obtained from history file " +
689                      " did not match the expected value",
690                      tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
691         }
692 
693         TaskAttemptID attemptId = tip.getSuccessfulTaskid();
694         TaskStatus ts = tip.getTaskStatus(attemptId);
695 
696         // Validate task counters
697         Counters c = ts.getCounters();
698         assertTrue("Counters of Task " + tid + " obtained from history file " +
699                    " did not match the expected value",
700                   c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
701       }
702     }
703   }
704 
705   // Validate Task Attempt Level Keys, Values read from history file by
706   // comparing them with the actual values from JT.
validateTaskAttemptLevelKeyValues(MiniMRCluster mr, RunningJob job, JobHistory.JobInfo jobInfo)707   private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
708                       RunningJob job, JobHistory.JobInfo jobInfo) throws IOException  {
709 
710     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
711     JobInProgress jip = jt.getJob(job.getID());
712 
713     Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
714 
715     // For each task
716     for (JobHistory.Task task : tasks.values()) {
717       // validate info of each attempt
718       for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
719 
720         String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
721         TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
722         TaskID tid = attemptId.getTaskID();
723 
724         // Validate task id
725         assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
726                    "history file did not match the expected value",
727                    tid.toString().equals(attempt.get(Keys.TASKID)));
728 
729         TaskInProgress tip = jip.getTaskInProgress(tid);
730         TaskStatus ts = tip.getTaskStatus(attemptId);
731 
732         // Validate task attempt start time
733         assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
734                    "history file did not match the expected value",
735             ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
736 
737         // Validate task attempt finish time
738         assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
739                    "history file did not match the expected value",
740             ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
741 
742 
743         TaskTrackerStatus ttStatus =
744           jt.getTaskTrackerStatus(ts.getTaskTracker());
745 
746         if (ttStatus != null) {
747           assertTrue("http port of task attempt " + idStr + " obtained from " +
748                      "history file did not match the expected value",
749                      ttStatus.getHttpPort() ==
750                      Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
751 
752           if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
753             String ttHostname = jt.getNode(ttStatus.getHost()).toString();
754 
755             // check if hostname is valid
756             assertTrue("Host name of task attempt " + idStr + " obtained from" +
757                        " history file did not match the expected value",
758                        ttHostname.equals(attempt.get(Keys.HOSTNAME)));
759           }
760         }
761         if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
762           // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
763           // Reduce Task Attempts
764           if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
765             assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
766                      " obtained from history file did not match the expected" +
767                      " value", ts.getShuffleFinishTime() ==
768                      Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
769             assertTrue("SORT_FINISHED time of task attempt " + idStr +
770                      " obtained from history file did not match the expected" +
771                      " value", ts.getSortFinishTime() ==
772                      Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
773           }
774 
775           //Validate task counters
776           Counters c = ts.getCounters();
777           assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
778                      "history file did not match the expected value",
779                c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
780         }
781 
782         // check if tracker name is valid
783         assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
784                    "history file did not match the expected value",
785                    ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
786       }
787     }
788   }
789 
790   /**
791    * Checks if the history file content is as expected comparing with the
792    * actual values obtained from JT.
793    * Job Level, Task Level and Task Attempt Level Keys, Values are validated.
794    * @param job RunningJob object of the job whose history is to be validated
795    * @param conf job conf
796    */
validateJobHistoryFileContent(MiniMRCluster mr, RunningJob job, JobConf conf)797   static void validateJobHistoryFileContent(MiniMRCluster mr,
798                               RunningJob job, JobConf conf) throws IOException  {
799 
800     JobID id = job.getID();
801     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
802     // Get the history file name
803     String logFileName = getDoneFile(conf, id, doneDir);
804 
805     // Framework history log file location
806     Path logFile = new Path(doneDir, logFileName);
807     FileSystem fileSys = logFile.getFileSystem(conf);
808 
809     // Check if the history file exists
810     assertTrue("History file does not exist", fileSys.exists(logFile));
811 
812 
813     // check if the history file is parsable
814     String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
815     		                                   logFileName).split("_");
816 
817     String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
818     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
819 
820     DefaultJobHistoryParser.JobTasksParseListener l =
821                    new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
822     JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
823 
824     // Now the history file contents are available in jobInfo. Let us compare
825     // them with the actual values from JT.
826     validateJobLevelKeyValues(mr, job, jobInfo, conf);
827     validateTaskLevelKeyValues(mr, job, jobInfo);
828     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
829 
830     // Also JobACLs should be correct
831     if (mr.getJobTrackerRunner().getJobTracker().areACLsEnabled()) {
832       AccessControlList acl = new AccessControlList(
833           conf.get(JobACL.VIEW_JOB.getAclName(), " "));
834       assertTrue(acl.toString().equals(
835           jobInfo.getJobACLs().get(JobACL.VIEW_JOB).toString()));
836       acl = new AccessControlList(
837           conf.get(JobACL.MODIFY_JOB.getAclName(), " "));
838       assertTrue(acl.toString().equals(
839           jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString()));
840     }
841 
842     // Validate the job queue name
843     assertTrue(jobInfo.getJobQueue().equals(conf.getQueueName()));
844 
845     // Validate the workflow properties
846     assertTrue(jobInfo.get(Keys.WORKFLOW_ID).equals(
847         conf.get(JobConf.WORKFLOW_ID, "")));
848     assertTrue(jobInfo.get(Keys.WORKFLOW_NAME).equals(
849         conf.get(JobConf.WORKFLOW_NAME, "")));
850     assertTrue(jobInfo.get(Keys.WORKFLOW_NODE_NAME).equals(
851         conf.get(JobConf.WORKFLOW_NODE_NAME, "")));
852     assertTrue(jobInfo.get(Keys.WORKFLOW_ADJACENCIES).equals(
853         JobHistory.JobInfo.getWorkflowAdjacencies(conf)));
854     assertTrue(jobInfo.get(Keys.WORKFLOW_TAGS).equals(
855         conf.get(JobConf.WORKFLOW_TAGS, "")));
856   }
857 
testDoneFolderOnHDFS()858   public void testDoneFolderOnHDFS() throws IOException {
859     MiniMRCluster mr = null;
860     try {
861       JobConf conf = new JobConf();
862       // keep for less time
863       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
864       conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
865 
866       //set the done folder location
867       String doneFolder = "history_done";
868       conf.set("mapred.job.tracker.history.completed.location", doneFolder);
869 
870       MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
871       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
872           3, null, null, conf);
873 
874       // run the TCs
875       conf = mr.createJobConf();
876 
877       FileSystem fs = FileSystem.get(conf);
878       // clean up
879       fs.delete(new Path("succeed"), true);
880 
881       Path inDir = new Path("succeed/input");
882       Path outDir = new Path("succeed/output");
883 
884       //Disable speculative execution
885       conf.setSpeculativeExecution(false);
886 
887       // Make sure that the job is not removed from memory until we do finish
888       // the validation of history file content
889       conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
890       conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
891       // Run a job that will be succeeded and validate its history file
892       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
893 
894       Path doneDir = JobHistory.getCompletedJobHistoryLocation();
895       assertEquals("History DONE folder not correct",
896           doneFolder, doneDir.getName());
897       JobID id = job.getID();
898       String logFileName = getDoneFile(conf, id, doneDir);
899       assertNotNull(logFileName);
900       System.err.println("testDoneFolderOnHDFS -- seeking " + logFileName);
901       // Framework history log file location
902       Path logFile = new Path(doneDir, logFileName);
903       FileSystem fileSys = logFile.getFileSystem(conf);
904 
905       // Check if the history file exists
906       assertTrue("History file does not exist", fileSys.exists(logFile));
907 
908       // check if the corresponding conf file exists
909       Path confFile = getPathForConf(logFile);
910       assertTrue("Config for completed jobs doesnt exist: " + confFile,
911                  fileSys.exists(confFile));
912 
913       // check if the file exists under a done folder
914       assertTrue("Completed job config doesnt exist under the done folder",
915                  confFile.toString().startsWith(doneDir.toString()));
916 
917       // check if the file exists in a done folder
918       assertTrue("Completed jobs doesnt exist under the done folder",
919                  logFile.toString().startsWith(doneDir.toString()));
920 
921       assertTrue("Completed job and config file aren't in the same directory",
922                  confFile.getParent().toString().equals(logFile.getParent().toString()));
923 
924       // Test that all of the ancestors of the log file have the same
925       //   permissions as the done directory
926 
927       Path cursor = logFile.getParent();
928 
929       Path doneParent = doneDir.getParent();
930 
931       FsPermission donePermission = getStatus(fileSys, doneDir).getPermission();
932 
933       System.err.println("testDoneFolderOnHDFS: done dir permission = "
934                          + donePermission);
935 
936       while (!cursor.equals(doneParent)) {
937         FileStatus cursorStatus = getStatus(fileSys, cursor);
938         FsPermission cursorPermission = cursorStatus.getPermission();
939 
940         assertEquals("testDoneFolderOnHDFS: A done directory descendant, "
941                      + cursor
942                      + " does not have the same permisison as the done directory, "
943                      + doneDir,
944                      donePermission,
945                      cursorPermission);
946 
947         cursor = cursor.getParent();
948       }
949 
950       // check if the job file is removed from the history location
951       Path runningJobsHistoryFolder = logFile.getParent().getParent();
952       Path runningJobHistoryFilename =
953         new Path(runningJobsHistoryFolder, logFile.getName());
954       Path runningJobConfFilename =
955         new Path(runningJobsHistoryFolder, confFile.getName());
956       assertFalse("History file not deleted from the running folder",
957                   fileSys.exists(runningJobHistoryFilename));
958       assertFalse("Config for completed jobs not deleted from running folder",
959                   fileSys.exists(runningJobConfFilename));
960 
961       validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
962       validateJobHistoryFileContent(mr, job, conf);
963 
964       // get the job conf filename
965     } finally {
966       if (mr != null) {
967         cleanupLocalFiles(mr);
968         mr.shutdown();
969       }
970     }
971   }
972 
getStatus(FileSystem fs, final Path path)973   private static FileStatus getStatus(FileSystem fs, final Path path) {
974     Path pathParent = path.getParent();
975 
976     try {
977       FileStatus[] statuses
978         = fs.listStatus(pathParent,
979                         new PathFilter() {
980                           @Override
981                             public boolean accept(Path filterPath) {
982                             return filterPath.getName().equals(path.getName());
983                           }
984                         }
985                         );
986 
987       return statuses[0];
988     } catch (IOException e) {
989       return null;
990     }
991   }
992 
993   /** Run a job that will be succeeded and validate its history file format
994    *  and its content.
995    */
testJobHistoryFile()996   public void testJobHistoryFile() throws IOException {
997     MiniMRCluster mr = null;
998     try {
999       JobConf conf = new JobConf();
1000       // keep for less time
1001       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
1002       conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
1003 
1004       //set the done folder location
1005       String doneFolder = TEST_ROOT_DIR + "history_done";
1006       conf.set("mapred.job.tracker.history.completed.location", doneFolder);
1007 
1008       // Enable ACLs so that they are logged to history
1009       conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
1010       // no queue admins for default queue
1011       conf.set(QueueManager.toFullPropertyName(
1012           "default", QueueACL.ADMINISTER_JOBS.getAclName()), " ");
1013 
1014       // set workflow properties
1015       conf.set(JobConf.WORKFLOW_ID, "workflowId1");
1016       conf.set(JobConf.WORKFLOW_NAME, "workflowName1");
1017       String workflowNodeName = "A";
1018       conf.set(JobConf.WORKFLOW_NODE_NAME, workflowNodeName);
1019       conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName,
1020           "BC");
1021       conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName,
1022           "DEF");
1023       conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "DEF", "G");
1024       conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "Z",
1025           workflowNodeName);
1026       conf.set(JobConf.WORKFLOW_TAGS, "tag1,tag2");
1027 
1028       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
1029 
1030       // run the TCs
1031       conf = mr.createJobConf();
1032 
1033       FileSystem fs = FileSystem.get(conf);
1034       // clean up
1035       fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
1036 
1037       Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input");
1038       Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output");
1039 
1040       //Disable speculative execution
1041       conf.setSpeculativeExecution(false);
1042       conf.set(JobACL.VIEW_JOB.getAclName(), "user1,user2 group1,group2");
1043       conf.set(JobACL.MODIFY_JOB.getAclName(), "user3,user4 group3,group4");
1044 
1045       // Make sure that the job is not removed from memory until we do finish
1046       // the validation of history file content
1047       conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
1048       conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
1049       // Run a job that will be succeeded and validate its history file
1050       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
1051 
1052       Path doneDir = JobHistory.getCompletedJobHistoryLocation();
1053       assertEquals("History DONE folder not correct",
1054           doneFolder, doneDir.toString());
1055       JobID id = job.getID();
1056       String logFileName = getDoneFile(conf, id, doneDir);
1057 
1058       // Framework history log file location
1059       Path logFile = new Path(doneDir, logFileName);
1060       FileSystem fileSys = logFile.getFileSystem(conf);
1061 
1062       // Check if the history file exists
1063       System.err.println("testJobHistoryFile -- seeking " + logFile);
1064       assertTrue("History file does not exist", fileSys.exists(logFile));
1065 
1066       // check if the corresponding conf file exists
1067       Path confFile = getPathForConf(logFile);
1068       assertTrue("Config for completed jobs doesnt exist: " + confFile,
1069                  fileSys.exists(confFile));
1070 
1071       // check if the file exists in a done folder
1072       assertTrue("Completed job config doesnt exist under the done folder",
1073                  confFile.toString().startsWith(doneDir.toString()));
1074 
1075       // check if the file exists in a done folder
1076       assertTrue("Completed jobs doesnt exist in the done folder",
1077                  logFile.toString().startsWith(doneDir.toString()));
1078 
1079       assertTrue("Completed job and config file aren't in the same directory",
1080                  confFile.getParent().toString().equals(logFile.getParent().toString()));
1081 
1082 
1083       // check if the job file is removed from the history location
1084       Path runningJobsHistoryFolder = logFile.getParent().getParent();
1085       Path runningJobHistoryFilename =
1086         new Path(runningJobsHistoryFolder, logFile.getName());
1087       Path runningJobConfFilename =
1088         new Path(runningJobsHistoryFolder, confFile.getName());
1089       assertFalse("History file not deleted from the running folder",
1090                   fileSys.exists(runningJobHistoryFilename));
1091       assertFalse("Config for completed jobs not deleted from running folder",
1092                   fileSys.exists(runningJobConfFilename));
1093 
1094       validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
1095       validateJobHistoryFileContent(mr, job, conf);
1096 
1097       // get the job conf filename
1098       String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID());
1099       File file = new File(name);
1100 
1101       // check if the file get deleted
1102       while (file.exists()) {
1103         LOG.info("Waiting for " + file + " to be deleted");
1104         UtilsForTests.waitFor(100);
1105       }
1106     } finally {
1107       if (mr != null) {
1108         cleanupLocalFiles(mr);
1109         mr.shutdown();
1110       }
1111     }
1112   }
1113 
1114   //Returns the file in the done folder
1115   //Waits for sometime to get the file moved to done
getDoneFile(JobConf conf, JobID id, Path doneDir)1116   static String getDoneFile(JobConf conf, JobID id,
1117       Path doneDir) throws IOException {
1118     String name = null;
1119     for (int i = 0; name == null && i < 20; i++) {
1120       name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
1121       UtilsForTests.waitFor(1000);
1122     }
1123     return name;
1124   }
1125   // Returns the output path where user history log file is written to with
1126   // default configuration setting for hadoop.job.history.user.location
getLogLocationInOutputPath(String logFileName, JobConf conf)1127   private static Path getLogLocationInOutputPath
1128          (String logFileName, JobConf conf) {
1129     JobConf jobConf = new JobConf(true);//default JobConf
1130     FileOutputFormat.setOutputPath(jobConf,
1131                      FileOutputFormat.getOutputPath(conf));
1132 
1133     Path result = JobHistory.JobInfo.getJobHistoryLogLocationForUser
1134       (logFileName, jobConf);
1135     return result;
1136   }
1137 
coreLogLocation(String subdirLogLocation)1138   static private String coreLogLocation(String subdirLogLocation) {
1139     return subdirLogLocation.substring
1140       (subdirLogLocation.lastIndexOf(Path.SEPARATOR_CHAR) + 1);
1141   }
1142 
1143   /**
1144    * Checks if the user history file exists in the correct dir
1145    * @param id job id
1146    * @param conf job conf
1147    */
validateJobHistoryUserLogLocation(JobID id, JobConf conf)1148   private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf)
1149           throws IOException  {
1150     // Get the history file name
1151     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
1152     String logFileName = getDoneFile(conf, id, doneDir);
1153 
1154     // User history log file location
1155     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
1156                                                      coreLogLocation(logFileName), conf);
1157 
1158     if(logFile == null) {
1159       // get the output path where history file is written to when
1160       // hadoop.job.history.user.location is not set
1161 
1162       logFile = getLogLocationInOutputPath(coreLogLocation(logFileName), conf);
1163     }
1164 
1165     FileSystem fileSys = null;
1166     fileSys = logFile.getFileSystem(conf);
1167 
1168     // Check if the user history file exists in the correct dir
1169     if (conf.get("hadoop.job.history.user.location") == null) {
1170       assertTrue("User log file " + logFile + " does not exist",
1171                  fileSys.exists(logFile));
1172     }
1173     else if ("none".equals(conf.get("hadoop.job.history.user.location"))) {
1174       // history file should not exist in the output path
1175       assertFalse("Unexpected. User log file exists in output dir when " +
1176                  "hadoop.job.history.user.location is set to \"none\"",
1177                  fileSys.exists(logFile));
1178     }
1179     else {
1180       //hadoop.job.history.user.location is set to a specific location.
1181       // User log file should exist in that location
1182       assertTrue("User log file " + logFile + " does not exist",
1183                  fileSys.exists(logFile));
1184 
1185       // User log file should not exist in output path.
1186 
1187       // get the output path where history file is written to when
1188       // hadoop.job.history.user.location is not set
1189       Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
1190 
1191       if (logFile != logFile1) {
1192         fileSys = logFile1.getFileSystem(conf);
1193         assertFalse("Unexpected. User log file exists in output dir when " +
1194               "hadoop.job.history.user.location is set to a different location",
1195               fileSys.exists(logFile1));
1196       }
1197     }
1198   }
1199 
1200   // Validate user history file location for the given values of
1201   // hadoop.job.history.user.location as
1202   // (1)null(default case), (2)"none", and (3)some user specified dir.
testJobHistoryUserLogLocation()1203   public void testJobHistoryUserLogLocation() throws IOException {
1204     MiniMRCluster mr = null;
1205     try {
1206       mr = new MiniMRCluster(2, "file:///", 3);
1207 
1208       // run the TCs
1209       JobConf conf = mr.createJobConf();
1210 
1211       FileSystem fs = FileSystem.get(conf);
1212       // clean up
1213       fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
1214 
1215       Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
1216       Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
1217       conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
1218       // validate for the case of null(default)
1219       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
1220       validateJobHistoryUserLogLocation(job.getID(), conf);
1221 
1222       inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
1223       outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
1224       // validate for the case of "none"
1225       conf.set("hadoop.job.history.user.location", "none");
1226       job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
1227       validateJobHistoryUserLogLocation(job.getID(), conf);
1228 
1229       inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
1230       outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
1231       // validate for the case of any dir
1232       conf.set("hadoop.job.history.user.location", TEST_ROOT_DIR + "/succeed");
1233       job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
1234       validateJobHistoryUserLogLocation(job.getID(), conf);
1235 
1236     } finally {
1237       if (mr != null) {
1238         cleanupLocalFiles(mr);
1239         mr.shutdown();
1240       }
1241     }
1242   }
1243 
cleanupLocalFiles(MiniMRCluster mr)1244   private void cleanupLocalFiles(MiniMRCluster mr)
1245   throws IOException {
1246     Configuration conf = mr.createJobConf();
1247     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
1248     Path sysDir = new Path(jt.getSystemDir());
1249     FileSystem fs = sysDir.getFileSystem(conf);
1250     fs.delete(sysDir, true);
1251     Path jobHistoryDir = JobHistory.getJobHistoryLocation();
1252     fs = jobHistoryDir.getFileSystem(conf);
1253     fs.delete(jobHistoryDir, true);
1254   }
1255 
1256   /**
1257    * Checks if the history file has expected job status
1258    * @param id job id
1259    * @param conf job conf
1260    */
validateJobHistoryJobStatus(JobID id, JobConf conf, String status)1261   private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
1262           String status) throws IOException  {
1263 
1264     // Get the history file name
1265     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
1266     String logFileName = getDoneFile(conf, id, doneDir);
1267 
1268     // Framework history log file location
1269     Path logFile = new Path(doneDir, logFileName);
1270     FileSystem fileSys = logFile.getFileSystem(conf);
1271 
1272     // Check if the history file exists
1273     System.err.println("validateJobHistoryJobStatus -- seeking " + logFile);
1274     assertTrue("History file does not exist", fileSys.exists(logFile));
1275 
1276     // check history file permission
1277     assertTrue("History file permissions does not match",
1278     fileSys.getFileStatus(logFile).getPermission().equals(
1279        new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)));
1280 
1281     // check if the history file is parsable
1282     String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
1283     		                                   logFileName).split("_");
1284 
1285     String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
1286     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
1287 
1288     DefaultJobHistoryParser.JobTasksParseListener l =
1289                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
1290     JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
1291 
1292     assertTrue("Job Status read from job history file is not the expected" +
1293          " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
1294   }
1295 
1296   // run jobs that will be (1) succeeded (2) failed (3) killed
1297   // and validate job status read from history file in each case
testJobHistoryJobStatus()1298   public void testJobHistoryJobStatus() throws IOException {
1299     MiniMRCluster mr = null;
1300     try {
1301       mr = new MiniMRCluster(2, "file:///", 3);
1302 
1303       // run the TCs
1304       JobConf conf = mr.createJobConf();
1305 
1306       FileSystem fs = FileSystem.get(conf);
1307       // clean up
1308       fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true);
1309 
1310       Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input");
1311       Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output");
1312       conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
1313       // Run a job that will be succeeded and validate its job status
1314       // existing in history file
1315       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
1316       validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
1317       long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
1318       assertTrue(historyCleanerRanAt != 0);
1319 
1320       // Run a job that will be failed and validate its job status
1321       // existing in history file
1322       job = UtilsForTests.runJobFail(conf, inDir, outDir);
1323       validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
1324       assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
1325 
1326       // Run a job that will be killed and validate its job status
1327       // existing in history file
1328       job = UtilsForTests.runJobKill(conf, inDir, outDir);
1329       validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
1330       assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
1331 
1332     } finally {
1333       if (mr != null) {
1334         cleanupLocalFiles(mr);
1335         mr.shutdown();
1336       }
1337     }
1338   }
1339 
testJobHistoryCleaner()1340   public void testJobHistoryCleaner() throws Exception {
1341     JobConf conf = new JobConf();
1342     FileSystem fs = FileSystem.get(conf);
1343     JobHistory.DONEDIR_FS = fs;
1344     JobHistory.DONE = new Path(TEST_ROOT_DIR + "/done");
1345     Path histDirOld = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/05/000000/");
1346     Path histDirOnLine = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/06/000000/");
1347     final int dayMillis = 1000 * 60 * 60 * 24;
1348 
1349     try {
1350       Calendar runTime = Calendar.getInstance();
1351       runTime.clear();
1352       runTime.set(2013, 1, 8, 12, 0);
1353       long runTimeMillis = runTime.getTimeInMillis();
1354 
1355       fs.mkdirs(histDirOld);
1356       fs.mkdirs(histDirOnLine);
1357       Path histFileOldDir = new Path(histDirOld, "jobfile1.txt");
1358       Path histFileOnLineDir = new Path(histDirOnLine, "jobfile1.txt");
1359       Path histFileDontDelete = new Path(histDirOnLine, "jobfile2.txt");
1360       fs.create(histFileOldDir).close();
1361       fs.create(histFileOnLineDir).close();
1362       fs.create(histFileDontDelete).close();
1363       new File(histFileOnLineDir.toUri()).setLastModified(
1364           runTimeMillis - dayMillis * 5 / 2);
1365       new File(histFileDontDelete.toUri()).setLastModified(
1366           runTimeMillis - dayMillis * 3 / 2);
1367 
1368       HistoryCleaner.maxAgeOfHistoryFiles = dayMillis * 2; // two days
1369       HistoryCleaner historyCleaner = new HistoryCleaner();
1370 
1371       historyCleaner.clean(runTimeMillis);
1372 
1373       assertFalse(fs.exists(histDirOld));
1374       assertTrue(fs.exists(histDirOnLine));
1375       assertFalse(fs.exists(histFileOldDir));
1376       assertFalse(fs.exists(histFileOnLineDir));
1377       assertTrue(fs.exists(histFileDontDelete));
1378     } finally {
1379       fs.delete(JobHistory.DONE, true);
1380     }
1381   }
1382 }
1383